From da4d8412202b0b6fb4f61236d47f2e253e4c9733 Mon Sep 17 00:00:00 2001 From: Eric House Date: Wed, 23 Jan 2013 07:46:13 -0800 Subject: [PATCH] use per-device UDP rather than per-board TCP to communicate with relay, including latest UDP protocol and acking changes. Basically works (in emulator at least) but there are problems especially with initial game creation. --- .../eehouse/android/xw4/CommsTransport.java | 20 +- .../org/eehouse/android/xw4/GamesList.java | 33 +- .../org/eehouse/android/xw4/RelayService.java | 461 ++++++++++++++---- 3 files changed, 401 insertions(+), 113 deletions(-) diff --git a/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java b/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java index ba89de474..22c9e0bce 100644 --- a/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java +++ b/xwords4/android/XWords4/src/org/eehouse/android/xw4/CommsTransport.java @@ -57,6 +57,7 @@ public class CommsTransport implements TransportProcs, private ByteBuffer m_bytesIn; private Context m_context; + private long m_rowid; // assembling inbound packet private byte[] m_packetIn; @@ -64,11 +65,12 @@ public class CommsTransport implements TransportProcs, public CommsTransport( int jniGamePtr, Context context, TransportProcs.TPMsgHandler handler, - DeviceRole role ) + long rowid, DeviceRole role ) { m_jniGamePtr = jniGamePtr; m_context = context; m_tpHandler = handler; + m_rowid = rowid; m_buffersOut = new Vector(); m_bytesIn = ByteBuffer.allocate( 2048 ); @@ -377,13 +379,17 @@ public class CommsTransport implements TransportProcs, switch ( addr.conType ) { case COMMS_CONN_RELAY: - if ( NetStateCache.netAvail( m_context ) ) { - putOut( buf ); // add to queue - if ( null == m_thread ) { - m_thread = new CommsThread(); - m_thread.start(); + if ( XWPrefs.getUDPEnabled( m_context ) ) { + nSent = RelayService.sendPacket( m_context, m_rowid, buf ); + } else { + if ( NetStateCache.netAvail( m_context ) ) { + putOut( buf ); // add to queue + if ( null == m_thread ) { + m_thread = new CommsThread(); + m_thread.start(); + } + nSent = buf.length; } - nSent = buf.length; } break; case COMMS_CONN_SMS: diff --git a/xwords4/android/XWords4/src/org/eehouse/android/xw4/GamesList.java b/xwords4/android/XWords4/src/org/eehouse/android/xw4/GamesList.java index ff1677320..e5d9c0cc0 100644 --- a/xwords4/android/XWords4/src/org/eehouse/android/xw4/GamesList.java +++ b/xwords4/android/XWords4/src/org/eehouse/android/xw4/GamesList.java @@ -74,6 +74,7 @@ public class GamesList extends XWExpandableListActivity private static final String SAVE_DICTNAMES = "SAVE_DICTNAMES"; private static final String RELAYIDS_EXTRA = "relayids"; + private static final String ROWID_EXTRA = "rowid"; private static final String GAMEID_EXTRA = "gameid"; private static final String REMATCH_ROWID_EXTRA = "rowid"; @@ -386,6 +387,7 @@ public class GamesList extends XWExpandableListActivity super.onNewIntent( intent ); Assert.assertNotNull( intent ); invalRelayIDs( intent.getStringArrayExtra( RELAYIDS_EXTRA ) ); + invalRowID( intent.getLongExtra( ROWID_EXTRA, -1 ) ); startFirstHasDict( intent ); startNewNetGame( intent ); startHasGameID( intent ); @@ -916,10 +918,18 @@ public class GamesList extends XWExpandableListActivity } } + private void invalRowID( long rowid ) + { + if ( -1 != rowid ) { + m_adapter.inval( rowid ); + } + } + // Launch the first of these for which there's a dictionary // present. - private void startFirstHasDict( String[] relayIDs ) + private boolean startFirstHasDict( String[] relayIDs ) { + boolean launched = false; if ( null != relayIDs ) { outer: for ( String relayID : relayIDs ) { @@ -928,19 +938,33 @@ public class GamesList extends XWExpandableListActivity for ( long rowid : rowids ) { if ( GameUtils.gameDictsHere( this, rowid ) ) { launchGame( rowid ); + launched = true; break outer; } } } } } + return launched; + } + + private void startFirstHasDict( long rowid ) + { + if ( -1 != rowid ) { + if ( GameUtils.gameDictsHere( this, rowid ) ) { + launchGame( rowid ); + } + } } private void startFirstHasDict( Intent intent ) { if ( null != intent ) { String[] relayIDs = intent.getStringArrayExtra( RELAYIDS_EXTRA ); - startFirstHasDict( relayIDs ); + if ( !startFirstHasDict( relayIDs ) ) { + long rowid = intent.getLongExtra( ROWID_EXTRA, -1 ); + startFirstHasDict( rowid ); + } } } @@ -1093,11 +1117,10 @@ public class GamesList extends XWExpandableListActivity return intent; } - public static Intent makeRelayIdsIntent( Context context, - String[] relayIDs ) + public static Intent makeRowidIntent( Context context, long rowid ) { Intent intent = makeSelfIntent( context ); - intent.putExtra( RELAYIDS_EXTRA, relayIDs ); + intent.putExtra( ROWID_EXTRA, rowid ); return intent; } diff --git a/xwords4/android/XWords4/src/org/eehouse/android/xw4/RelayService.java b/xwords4/android/XWords4/src/org/eehouse/android/xw4/RelayService.java index 2e92d7c9d..963697c44 100644 --- a/xwords4/android/XWords4/src/org/eehouse/android/xw4/RelayService.java +++ b/xwords4/android/XWords4/src/org/eehouse/android/xw4/RelayService.java @@ -48,6 +48,11 @@ public class RelayService extends XWService { private static final String CMD_STR = "CMD"; private static final int UDP_CHANGED = 1; + private static final int SEND = 2; + private static final int RECEIVE = 3; + + private static final String ROWID = "ROWID"; + private static final String BINBUFFER = "BINBUFFER"; private Thread m_fetchThread = null; private Thread m_UDPReadThread = null; @@ -58,8 +63,32 @@ public class RelayService extends XWService { // These must match the enum XWRelayReg in xwrelay.h private static final int XWPDEV_PROTO_VERSION = 0; // private static final int XWPDEV_NONE = 0; - private static final int XWPDEV_ALERT = 1; - private static final int XWPDEV_REG = 2; + + private enum XWRelayReg { + XWPDEV_NONE + ,XWPDEV_ALERT + ,XWPDEV_REG + ,XWPDEV_REGRSP + ,XWPDEV_PING + ,XWPDEV_HAVEMSGS + ,XWPDEV_RQSTMSGS + ,XWPDEV_MSG + ,XWPDEV_MSGNOCONN + ,XWPDEV_MSGRSP + ,XWPDEV_BADREG + ,XWPDEV_ACK + }; + + // private static final int XWPDEV_ALERT = 1; + // private static final int XWPDEV_REG = 2; + // private static final int XWPDEV_REGRSP = 3; + // private static final int XWPDEV_PING = 4; + // private static final int XWPDEV_HAVEMSGS = 5; + // private static final int XWPDEV_RQSTMSGS = 6; + // private static final int XWPDEV_MSG = 7; + // private static final int XWPDEV_MSGNOCONN = 8; + // private static final int XWPDEV_MSGRSP = 9; + // private static final int XWPDEV_BADREG = 10; public static void startService( Context context ) { @@ -67,6 +96,26 @@ public class RelayService extends XWService { context.startService( intent ); } + public static int sendPacket( Context context, long rowid, byte[] buf ) + { + Intent intent = getIntentTo( context, SEND ); + intent.putExtra( ROWID, rowid ); + intent.putExtra( BINBUFFER, buf ); + context.startService( intent ); + return buf.length; + } + + // Exists to get incoming data onto the main thread + private static void postData( Context context, long rowid, byte[] msg ) + { + DbgUtils.logf( "RelayService::postData: packet of length %d for token %d", + msg.length, rowid ); + Intent intent = getIntentTo( context, RECEIVE ); + intent.putExtra( ROWID, rowid ); + intent.putExtra( BINBUFFER, msg ); + context.startService( intent ); + } + public static void udpChanged( Context context ) { startService( context ); @@ -94,6 +143,8 @@ public class RelayService extends XWService { if ( null != intent ) { int cmd = intent.getIntExtra( CMD_STR, -1 ); switch( cmd ) { + case -1: + break; case UDP_CHANGED: DbgUtils.logf( "RelayService::onStartCommand::UDP_CHANGED" ); if ( XWPrefs.getUDPEnabled( this ) ) { @@ -105,6 +156,16 @@ public class RelayService extends XWService { startFetchThreadIf(); } break; + case SEND: + case RECEIVE: + long rowid = intent.getLongExtra( ROWID, -1 ); + byte[] msg = intent.getByteArrayExtra( BINBUFFER ); + if ( SEND == cmd ) { + sendMessage( rowid, msg ); + } else { + feedMessage( rowid, msg ); + } + break; default: Assert.fail(); } @@ -122,17 +183,20 @@ public class RelayService extends XWService { long[] rowids = DBUtils.getRowIDsFor( this, relayID ); if ( null != rowids ) { for ( long rowid : rowids ) { - Intent intent = - GamesList.makeRelayIdsIntent( this, - new String[] {relayID} ); - String msg = Utils.format( this, R.string.notify_bodyf, - GameUtils.getName( this, rowid ) ); - Utils.postNotification( this, intent, R.string.notify_title, - msg, (int)rowid ); + setupNotification( rowid ); } } } } + + private void setupNotification( long rowid ) + { + Intent intent = GamesList.makeRowidIntent( this, rowid ); + String msg = Utils.format( this, R.string.notify_bodyf, + GameUtils.getName( this, rowid ) ); + Utils.postNotification( this, intent, R.string.notify_title, + msg, (int)rowid ); + } private void startFetchThreadIf() { @@ -159,70 +223,85 @@ public class RelayService extends XWService { private void startUDPThreads() { DbgUtils.logf( "startUDPThreads" ); - Assert.assertNull( m_UDPWriteThread ); - Assert.assertNull( m_UDPReadThread ); Assert.assertTrue( XWPrefs.getUDPEnabled( this ) ); - int port = XWPrefs.getDefaultRelayPort( RelayService.this ); - String host = XWPrefs.getDefaultRelayHost( RelayService.this ); - try { - m_UDPSocket = new DatagramSocket(); - InetAddress addr = InetAddress.getByName( host ); - m_UDPSocket.connect( addr, port ); - } catch( java.net.SocketException se ) { - DbgUtils.loge( se ); - Assert.fail(); - } catch( java.net.UnknownHostException uhe ) { - DbgUtils.loge( uhe ); + if ( null == m_UDPSocket ) { + int port = XWPrefs.getDefaultRelayPort( RelayService.this ); + String host = XWPrefs.getDefaultRelayHost( RelayService.this ); + try { + m_UDPSocket = new DatagramSocket(); + InetAddress addr = InetAddress.getByName( host ); + m_UDPSocket.connect( addr, port ); // meaning: remember this address + } catch( java.net.SocketException se ) { + DbgUtils.loge( se ); + Assert.fail(); + } catch( java.net.UnknownHostException uhe ) { + DbgUtils.loge( uhe ); + } + } else { + Assert.assertTrue( m_UDPSocket.isConnected() ); + DbgUtils.logf( "m_UDPSocket not null" ); } - m_UDPReadThread = new Thread( null, new Runnable() { - public void run() { - byte[] buf = new byte[1024]; - for ( ; ; ) { - DatagramPacket packet = - new DatagramPacket( buf, buf.length ); - try { - DbgUtils.logf( "UPD read thread blocking on receive" ); - m_UDPSocket.receive( packet ); - DbgUtils.logf( "UPD read thread: receive returned" ); - } catch( java.io.IOException ioe ) { - DbgUtils.loge( ioe ); - break; // ??? + if ( null == m_UDPReadThread ) { + m_UDPReadThread = new Thread( null, new Runnable() { + public void run() { + DbgUtils.logf( "read thread running" ); + byte[] buf = new byte[1024]; + for ( ; ; ) { + DatagramPacket packet = + new DatagramPacket( buf, buf.length ); + try { + DbgUtils.logf( "UPD read thread blocking on receive" ); + m_UDPSocket.receive( packet ); + DbgUtils.logf( "UPD read thread: receive returned" ); + } catch( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + break; // ??? + } + DbgUtils.logf( "received %d bytes", packet.getLength() ); + gotPacket( packet ); } - DbgUtils.logf( "received %d bytes", packet.getLength() ); - gotPacket( packet ); + DbgUtils.logf( "read thread exiting" ); } - } - }, getClass().getName() ); - m_UDPReadThread.start(); + }, getClass().getName() ); + m_UDPReadThread.start(); + } else { + DbgUtils.logf( "m_UDPReadThread not null and assumed to be running" ); + } - m_queue = new LinkedBlockingQueue(); - m_UDPWriteThread = new Thread( null, new Runnable() { - public void run() { - for ( ; ; ) { - DatagramPacket outPacket; - try { - outPacket = m_queue.take(); - } catch ( InterruptedException ie ) { - DbgUtils.logf( "RelayService; write thread killed" ); - break; - } - if ( null == outPacket || 0 == outPacket.getLength() ) { - DbgUtils.logf( "stopping write thread" ); - break; - } - DbgUtils.logf( "Sending packet of length %d", - outPacket.getLength() ); - try { - m_UDPSocket.send( outPacket ); - } catch ( java.io.IOException ioe ) { - DbgUtils.loge( ioe ); + if ( null == m_UDPWriteThread ) { + m_queue = new LinkedBlockingQueue(); + m_UDPWriteThread = new Thread( null, new Runnable() { + public void run() { + DbgUtils.logf( "write thread running" ); + for ( ; ; ) { + DatagramPacket outPacket; + try { + outPacket = m_queue.take(); + } catch ( InterruptedException ie ) { + DbgUtils.logf( "RelayService; write thread killed" ); + break; + } + if ( null == outPacket || 0 == outPacket.getLength() ) { + DbgUtils.logf( "stopping write thread" ); + break; + } + DbgUtils.logf( "Sending udp packet of length %d", + outPacket.getLength() ); + try { + m_UDPSocket.send( outPacket ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } } + DbgUtils.logf( "write thread exiting" ); } - } - }, getClass().getName() ); - m_UDPWriteThread.start(); + }, getClass().getName() ); + m_UDPWriteThread.start(); + } else { + DbgUtils.logf( "m_UDPWriteThread not null and assumed to be running" ); + } } private void stopUDPThreadsIf() @@ -256,35 +335,185 @@ public class RelayService extends XWService { DbgUtils.logf( "stopUDPThreadsIf DONE" ); } + // Running on reader thread private void gotPacket( DatagramPacket packet ) { - DbgUtils.logf( "gotPacket" ); - ByteArrayInputStream bis = new ByteArrayInputStream( packet.getData() ); + int packetLen = packet.getLength(); + byte[] data = new byte[packetLen]; + System.arraycopy( packet.getData(), 0, data, 0, packetLen ); + DbgUtils.logf( "RelayService::gotPacket: %d bytes of data", packetLen ); + ByteArrayInputStream bis = new ByteArrayInputStream( data ); DataInputStream dis = new DataInputStream( bis ); try { - byte proto = dis.readByte(); - if ( XWPDEV_PROTO_VERSION == proto ) { - byte cmd = dis.readByte(); - switch ( cmd ) { + PacketHeader header = readHeader( dis ); + if ( null != header ) { + sendAckIf( header ); + switch ( header.m_cmd ) { case XWPDEV_ALERT: - short len = dis.readShort(); - byte[] tmp = new byte[len]; - dis.read( tmp ); - sendResult( MultiEvent.RELAY_ALERT, new String( tmp ) ); + String str = getStringWithLength( dis ); + sendResult( MultiEvent.RELAY_ALERT, str ); + break; + case XWPDEV_BADREG: + str = getStringWithLength( dis ); + DbgUtils.logf( "bad relayID \"%s\" reported", str ); + XWPrefs.clearRelayDevID( this ); + registerWithRelay(); + break; + case XWPDEV_REGRSP: + DbgUtils.logf( "got XWPDEV_REGRSP" ); + str = getStringWithLength( dis ); + DbgUtils.logf( "got relayid %s", str ); + XWPrefs.setRelayDevID( this, str ); + break; + case XWPDEV_HAVEMSGS: + requestMessages(); + break; + case XWPDEV_MSG: + DbgUtils.logf( "got XWPDEV_MSG" ); + int token = dis.readInt(); + byte[] msg = new byte[dis.available()]; + Assert.assertTrue( packet.getLength() >= msg.length ); + Assert.assertTrue( packetLen >= msg.length ); + dis.read( msg ); + postData( RelayService.this, token, msg ); break; default: - DbgUtils.logf( "RelayService: Unhandled cmd: %d", cmd ); + DbgUtils.logf( "RelayService: Unhandled cmd: %d", + header.m_cmd ); break; } - } else { - DbgUtils.logf( "bad proto %d", proto ); } } catch ( java.io.IOException ioe ) { DbgUtils.loge( ioe ); } + } // gotPacket + + private void registerWithRelay() + { + DbgUtils.logf( "registerWithRelay" ); + byte[] typ = new byte[1]; + String devid = getDevID(typ); + + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + DataOutputStream out = addProtoAndCmd( bas, XWRelayReg.XWPDEV_REG ); + out.writeByte( typ[0] ); + out.writeShort( devid.length() ); + out.writeBytes( devid ); + postPacket( bas ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } } - private void registerWithRelay() + private void requestMessages() + { + DbgUtils.logf( "requestMessages" ); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + DataOutputStream out = + addProtoAndCmd( bas, XWRelayReg.XWPDEV_RQSTMSGS ); + String devid = getDevID( null ); + out.writeShort( devid.length() ); + out.writeBytes( devid ); + postPacket( bas ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } + } + + private void sendMessage( long rowid, byte[] msg ) + { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + DataOutputStream out = addProtoAndCmd( bas, XWRelayReg.XWPDEV_MSG ); + Assert.assertTrue( rowid < Integer.MAX_VALUE ); + out.writeInt( (int)rowid ); + out.write( msg, 0, msg.length ); + postPacket( bas ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } + } + + private void sendNoConnMessage( long rowid, String relayID, byte[] msg ) + { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + DataOutputStream out = + addProtoAndCmd( bas, XWRelayReg.XWPDEV_MSGNOCONN ); + Assert.assertTrue( rowid < Integer.MAX_VALUE ); + out.writeInt( (int)rowid ); + out.writeBytes( relayID ); + out.write( '\n' ); + out.write( msg, 0, msg.length ); + postPacket( bas ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } + } + + private void sendAckIf( PacketHeader header ) + { + DbgUtils.logf( "sendAckIf" ); + if ( XWRelayReg.XWPDEV_ACK != header.m_cmd ) { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + DataOutputStream out = + addProtoAndCmd( bas, XWRelayReg.XWPDEV_ACK ); + out.writeInt( header.m_packetID ); + postPacket( bas ); + } catch ( java.io.IOException ioe ) { + DbgUtils.loge( ioe ); + } + } + } + + private PacketHeader readHeader( DataInputStream dis ) + throws java.io.IOException + { + PacketHeader result = null; + byte proto = dis.readByte(); + if ( XWPDEV_PROTO_VERSION == proto ) { + int packetID = dis.readInt(); + DbgUtils.logf( "readHeader: got packetID %d", packetID ); + byte ordinal = dis.readByte(); + XWRelayReg cmd = XWRelayReg.values()[ordinal]; + result = new PacketHeader( cmd, packetID ); + } else { + DbgUtils.logf( "bad proto: %d", proto ); + } + DbgUtils.logf( "readHeader => %H", result ); + return result; + } + + private String getStringWithLength( DataInputStream dis ) + throws java.io.IOException + { + short len = dis.readShort(); + byte[] tmp = new byte[len]; + dis.read( tmp ); + return new String( tmp ); + } + + private DataOutputStream addProtoAndCmd( ByteArrayOutputStream bas, + XWRelayReg cmd ) + throws java.io.IOException + { + DataOutputStream out = new DataOutputStream( bas ); + out.writeByte( XWPDEV_PROTO_VERSION ); + out.writeInt( 0 ); // packetID + out.writeByte( cmd.ordinal() ); + return out; + } + + private void postPacket( ByteArrayOutputStream bas ) + { + byte[] data = bas.toByteArray(); + m_queue.add( new DatagramPacket( data, data.length ) ); + } + + private String getDevID( byte[] typp ) { byte typ; String devid = XWPrefs.getRelayDevID( this ); @@ -299,20 +528,30 @@ public class RelayService extends XWService { typ = UtilCtxt.ID_TYPE_ANDROID_OTHER; } } + if ( null != typp ) { + typp[0] = typ; + } else { + Assert.assertTrue( typ == UtilCtxt.ID_TYPE_RELAY ); + } + return devid; + } - ByteArrayOutputStream bas = new ByteArrayOutputStream(); - DataOutputStream outBuf = new DataOutputStream( bas ); - try { - outBuf.writeByte( XWPDEV_PROTO_VERSION ); - outBuf.writeByte( XWPDEV_REG ); - outBuf.writeByte( typ ); - outBuf.writeShort( devid.length() ); - outBuf.writeBytes( devid ); - - byte[] data = bas.toByteArray(); - m_queue.add( new DatagramPacket( data, data.length ) ); - } catch ( java.io.IOException ioe ) { - DbgUtils.loge( ioe ); + private void feedMessage( long rowid, byte[] msg ) + { + DbgUtils.logf( "RelayService::feedMessage: %d bytes for rowid %d", + msg.length, rowid ); + if ( BoardActivity.feedMessage( rowid, msg ) ) { + DbgUtils.logf( "feedMessage: board ate it" ); + // do nothing + } else { + RelayMsgSink sink = new RelayMsgSink(); + sink.setRowID( rowid ); + if ( GameUtils.feedMessage( this, rowid, msg, null, + sink ) ) { + setupNotification( rowid ); + } else { + DbgUtils.logf( "feedMessage: background dropped it" ); + } } } @@ -428,29 +667,49 @@ public class RelayService extends XWService { private class RelayMsgSink extends MultiMsgSink { private HashMap> m_msgLists = null; + private long m_rowid = -1; + + public void setRowID( long rowid ) { m_rowid = rowid; } public void send( Context context ) { - sendToRelay( context, m_msgLists ); + if ( -1 == m_rowid ) { + sendToRelay( context, m_msgLists ); + } else { + Assert.assertNull( m_msgLists ); + } } /***** TransportProcs interface *****/ public boolean relayNoConnProc( byte[] buf, String relayID ) { - if ( null == m_msgLists ) { - m_msgLists = new HashMap>(); - } + if ( -1 != m_rowid ) { + sendNoConnMessage( m_rowid, relayID, buf ); + } else { + if ( null == m_msgLists ) { + m_msgLists = new HashMap>(); + } - ArrayList list = m_msgLists.get( relayID ); - if ( list == null ) { - list = new ArrayList(); - m_msgLists.put( relayID, list ); + ArrayList list = m_msgLists.get( relayID ); + if ( list == null ) { + list = new ArrayList(); + m_msgLists.put( relayID, list ); + } + list.add( buf ); } - list.add( buf ); - return true; } } + private class PacketHeader { + public int m_packetID; + public XWRelayReg m_cmd; + public PacketHeader( XWRelayReg cmd, int packetID ) { + DbgUtils.logf( "in PacketHeader contructor" ); + m_packetID = packetID; + m_cmd = cmd; + } + } + }