diff --git a/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java b/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java index 457313b71..316eab952 100644 --- a/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java +++ b/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java @@ -20,8 +20,13 @@ package org.eehouse.android.xw4; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.UnresolvedAddressException; +import java.nio.ByteBuffer; import java.net.InetSocketAddress; -import java.net.Socket; import java.util.Vector; import java.util.Iterator; import junit.framework.Assert; @@ -32,11 +37,6 @@ import android.content.Context; import android.os.Build; import android.os.Handler; import android.os.Message; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.io.DataInputStream; -import java.io.DataOutputStream; - import org.eehouse.android.xw4.jni.*; import org.eehouse.android.xw4.jni.JNIThread.*; @@ -66,82 +66,141 @@ public class CommsTransport implements TransportProcs { public int m_nMissing; } + private Selector m_selector; + private SocketChannel m_socketChannel; private int m_jniGamePtr; private CommsAddrRec m_addr; private JNIThread m_jniThread; + private CommsThread m_thread; private Handler m_handler; + private boolean m_done = false; - private Socket m_socket; - private ReaderThread m_reader; - private WriterThread m_writer; - BlockingQueue m_queue; + private Vector m_buffersOut; + private ByteBuffer m_bytesOut; + private ByteBuffer m_bytesIn; private Context m_context; + // assembling inbound packet + private byte[] m_packetIn; + private int m_haveLen = -1; + public CommsTransport( int jniGamePtr, Context context, Handler handler, DeviceRole role ) { m_jniGamePtr = jniGamePtr; m_context = context; m_handler = handler; + m_buffersOut = new Vector(); + m_bytesIn = ByteBuffer.allocate( 2048 ); } - public class WriterThread extends Thread { + public class CommsThread extends Thread { + + @Override public void run() { - DataOutputStream os; - try { - os = new DataOutputStream( m_socket.getOutputStream() ); + try { + if ( Build.PRODUCT.contains("sdk") ) { + System.setProperty("java.net.preferIPv6Addresses", "false"); + } + + m_selector = Selector.open(); + + loop(); + + closeSocket(); } catch ( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); - return; + Utils.logf( ioe.toString() ); + } catch ( UnresolvedAddressException uae ) { + Utils.logf( "bad address: name: %s; port: %s; exception: %s", + m_addr.ip_relay_hostName, m_addr.ip_relay_port, + uae.toString() ); } - - for ( ; ; ) { + m_thread = null; + } + + private void loop() + { + while ( !m_done ) { try { - byte[] buf = m_queue.take(); // blocks + synchronized( this ) { - os.writeShort( buf.length ); - os.write( buf ); - Utils.logf( "wrote %d bytes to socket", buf.length ); - } catch ( InterruptedException inte ) { - Utils.logf( "%s", inte.toString() ); - break; - } catch( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); - break; + // if we have data and no socket, try to connect. + if ( null == m_socketChannel + && 0 < m_buffersOut.size() ) { + try { + m_socketChannel = SocketChannel.open(); + m_socketChannel.configureBlocking( false ); + Utils.logf( "connecting to %s:%d", + m_addr.ip_relay_hostName, + m_addr.ip_relay_port ); + InetSocketAddress isa + = new InetSocketAddress( m_addr.ip_relay_hostName, + m_addr.ip_relay_port ); + m_socketChannel.connect( isa ); + } catch ( java.io.IOException ioe ) { + Utils.logf( ioe.toString() ); + } + } + + if ( null != m_socketChannel ) { + int ops = figureOps(); + // Utils.logf( "calling with ops=%x", ops ); + m_socketChannel.register( m_selector, ops ); + } + } + m_selector.select(); + } catch ( ClosedChannelException cce ) { + // we get this when relay goes down. Need to notify! + m_jniThread.handle( JNICmd.CMD_TRANSFAIL ); + closeSocket(); + Utils.logf( "exiting: " + cce.toString() ); + break; // don't try again + } catch ( java.io.IOException ioe ) { + closeSocket(); + Utils.logf( "exiting: " + ioe.toString() ); + Utils.logf( ioe.toString() ); + } + + Iterator iter = m_selector.selectedKeys().iterator(); + while ( iter.hasNext() ) { + SelectionKey key = (SelectionKey)iter.next(); + SocketChannel channel = (SocketChannel)key.channel(); + iter.remove(); + try { + if (key.isValid() && key.isConnectable()) { + if ( !channel.finishConnect() ) { + key.cancel(); + } + } + if (key.isValid() && key.isReadable()) { + m_bytesIn.clear(); // will wipe any pending data! + // Utils.logf( "socket is readable; buffer has space for " + // + m_bytesIn.remaining() ); + int nRead = channel.read( m_bytesIn ); + if ( nRead == -1 ) { + channel.close(); + } else { + addIncoming(); + } + } + if (key.isValid() && key.isWritable()) { + getOut(); + if ( null != m_bytesOut ) { + int nWritten = channel.write( m_bytesOut ); + //Utils.logf( "wrote " + nWritten + " bytes" ); + } + } + } catch ( java.io.IOException ioe ) { + key.cancel(); + } } } - } + } // loop + } - - public class ReaderThread extends Thread { - public void run() - { - DataInputStream dis; - try { - dis = new DataInputStream( m_socket.getInputStream() ); - } catch ( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); - return; - } - - for ( ; ; ) { - try { - short len = dis.readShort(); - Utils.logf( "ReaderThread: read length short: %d", len ); - byte[] buf = new byte[len]; - dis.readFully( buf ); - Utils.logf( "returned from readFully()" ); - m_jniThread.handle( JNICmd.CMD_RECEIVE, buf ); - } catch( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); - break; - } - } - } - } - + public void setReceiver( JNIThread jnit ) { m_jniThread = jnit; @@ -149,35 +208,104 @@ public class CommsTransport implements TransportProcs { public void waitToStop() { - if ( null != m_socket ) { + m_done = true; + if ( null != m_selector ) { + m_selector.wakeup(); + } + if ( null != m_thread ) { // synchronized this? Or use Thread method try { - m_socket.close(); - m_socket = null; - } catch ( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); + m_thread.join(100); // wait up to 1/10 second + } catch ( java.lang.InterruptedException ie ) { + Utils.logf( "got InterruptedException: " + ie.toString() ); } + m_thread = null; } } - private void startThreadsIf() + private synchronized void putOut( final byte[] buf ) { - if ( null == m_socket ) { + int len = buf.length; + ByteBuffer netbuf = ByteBuffer.allocate( len + 2 ); + netbuf.putShort( (short)len ); + netbuf.put( buf ); + m_buffersOut.add( netbuf ); + Assert.assertEquals( netbuf.remaining(), 0 ); + + if ( null != m_selector ) { + m_selector.wakeup(); // tell it it's got some writing to do + } + } + + private synchronized void closeSocket() + { + if ( null != m_socketChannel ) { try { - m_socket = new Socket( m_addr.ip_relay_hostName, - m_addr.ip_relay_port ); - if ( null != m_socket ) { - m_queue = new ArrayBlockingQueue(16); - m_reader = new ReaderThread(); - m_reader.start(); - m_writer = new WriterThread(); - m_writer.start(); + m_socketChannel.close(); + } catch ( Exception e ) { + Utils.logf( "closing socket: %s", e.toString() ); + } + m_socketChannel = null; + } + } + + private synchronized void getOut() + { + if ( null != m_bytesOut && m_bytesOut.remaining() == 0 ) { + m_bytesOut = null; + } + + if ( null == m_bytesOut && m_buffersOut.size() > 0 ) { + m_bytesOut = m_buffersOut.remove(0); + m_bytesOut.flip(); + } + } + + private synchronized int figureOps() { + int ops; + if ( null == m_socketChannel ) { + ops = 0; + } else if ( m_socketChannel.isConnected() ) { + ops = SelectionKey.OP_READ; + if ( (null != m_bytesOut && m_bytesOut.hasRemaining()) + || m_buffersOut.size() > 0 ) { + ops |= SelectionKey.OP_WRITE; + } + } else { + ops = SelectionKey.OP_CONNECT; + } + return ops; + } + + private void addIncoming( ) + { + m_bytesIn.flip(); + + for ( ; ; ) { + int len = m_bytesIn.remaining(); + if ( len <= 0 ) { + break; + } + + if ( null == m_packetIn ) { // we're not mid-packet + Assert.assertTrue( len > 1 ); // tell me if I see this case + if ( len == 1 ) { // half a length byte... + break; // can I leave it in the buffer? + } else { + m_packetIn = new byte[m_bytesIn.getShort()]; + m_haveLen = 0; + } + } else { // we're mid-packet + int wantLen = m_packetIn.length - m_haveLen; + if ( wantLen > len ) { + wantLen = len; + } + m_bytesIn.get( m_packetIn, m_haveLen, wantLen ); + m_haveLen += wantLen; + if ( m_haveLen == m_packetIn.length ) { + // send completed packet + m_jniThread.handle( JNICmd.CMD_RECEIVE, m_packetIn ); + m_packetIn = null; } - } catch ( java.net.UnknownHostException uhe ) { - Utils.logf( "%s", uhe.toString() ); - m_socket = null; - } catch ( java.io.IOException ioe ) { - Utils.logf( "%s", ioe.toString() ); - m_socket = null; // need to notify user on some of these } } } @@ -199,14 +327,12 @@ public class CommsTransport implements TransportProcs { switch ( m_addr.conType ) { case COMMS_CONN_RELAY: - startThreadsIf(); - try { - // add(), not put(): don't block thread in comms if full - m_queue.add( buf ); - nSent = buf.length; - } catch ( IllegalStateException ise ) { - Utils.logf( "%s", ise.toString() ); + putOut( buf ); // add to queue + if ( null == m_thread ) { + m_thread = new CommsThread(); + m_thread.start(); } + nSent = buf.length; break; case COMMS_CONN_SMS: Assert.fail();