use per-device UDP rather than per-board TCP to communicate with

relay, including latest UDP protocol and acking changes.  Basically
works (in emulator at least) but there are problems especially with
initial game creation.
This commit is contained in:
Eric House 2013-01-23 07:46:13 -08:00
parent 12f70154a0
commit da4d841220
3 changed files with 401 additions and 113 deletions

View file

@ -57,6 +57,7 @@ public class CommsTransport implements TransportProcs,
private ByteBuffer m_bytesIn;
private Context m_context;
private long m_rowid;
// assembling inbound packet
private byte[] m_packetIn;
@ -64,11 +65,12 @@ public class CommsTransport implements TransportProcs,
public CommsTransport( int jniGamePtr, Context context,
TransportProcs.TPMsgHandler handler,
DeviceRole role )
long rowid, DeviceRole role )
{
m_jniGamePtr = jniGamePtr;
m_context = context;
m_tpHandler = handler;
m_rowid = rowid;
m_buffersOut = new Vector<ByteBuffer>();
m_bytesIn = ByteBuffer.allocate( 2048 );
@ -377,13 +379,17 @@ public class CommsTransport implements TransportProcs,
switch ( addr.conType ) {
case COMMS_CONN_RELAY:
if ( NetStateCache.netAvail( m_context ) ) {
putOut( buf ); // add to queue
if ( null == m_thread ) {
m_thread = new CommsThread();
m_thread.start();
if ( XWPrefs.getUDPEnabled( m_context ) ) {
nSent = RelayService.sendPacket( m_context, m_rowid, buf );
} else {
if ( NetStateCache.netAvail( m_context ) ) {
putOut( buf ); // add to queue
if ( null == m_thread ) {
m_thread = new CommsThread();
m_thread.start();
}
nSent = buf.length;
}
nSent = buf.length;
}
break;
case COMMS_CONN_SMS:

View file

@ -74,6 +74,7 @@ public class GamesList extends XWExpandableListActivity
private static final String SAVE_DICTNAMES = "SAVE_DICTNAMES";
private static final String RELAYIDS_EXTRA = "relayids";
private static final String ROWID_EXTRA = "rowid";
private static final String GAMEID_EXTRA = "gameid";
private static final String REMATCH_ROWID_EXTRA = "rowid";
@ -386,6 +387,7 @@ public class GamesList extends XWExpandableListActivity
super.onNewIntent( intent );
Assert.assertNotNull( intent );
invalRelayIDs( intent.getStringArrayExtra( RELAYIDS_EXTRA ) );
invalRowID( intent.getLongExtra( ROWID_EXTRA, -1 ) );
startFirstHasDict( intent );
startNewNetGame( intent );
startHasGameID( intent );
@ -916,10 +918,18 @@ public class GamesList extends XWExpandableListActivity
}
}
private void invalRowID( long rowid )
{
if ( -1 != rowid ) {
m_adapter.inval( rowid );
}
}
// Launch the first of these for which there's a dictionary
// present.
private void startFirstHasDict( String[] relayIDs )
private boolean startFirstHasDict( String[] relayIDs )
{
boolean launched = false;
if ( null != relayIDs ) {
outer:
for ( String relayID : relayIDs ) {
@ -928,19 +938,33 @@ public class GamesList extends XWExpandableListActivity
for ( long rowid : rowids ) {
if ( GameUtils.gameDictsHere( this, rowid ) ) {
launchGame( rowid );
launched = true;
break outer;
}
}
}
}
}
return launched;
}
private void startFirstHasDict( long rowid )
{
if ( -1 != rowid ) {
if ( GameUtils.gameDictsHere( this, rowid ) ) {
launchGame( rowid );
}
}
}
private void startFirstHasDict( Intent intent )
{
if ( null != intent ) {
String[] relayIDs = intent.getStringArrayExtra( RELAYIDS_EXTRA );
startFirstHasDict( relayIDs );
if ( !startFirstHasDict( relayIDs ) ) {
long rowid = intent.getLongExtra( ROWID_EXTRA, -1 );
startFirstHasDict( rowid );
}
}
}
@ -1093,11 +1117,10 @@ public class GamesList extends XWExpandableListActivity
return intent;
}
public static Intent makeRelayIdsIntent( Context context,
String[] relayIDs )
public static Intent makeRowidIntent( Context context, long rowid )
{
Intent intent = makeSelfIntent( context );
intent.putExtra( RELAYIDS_EXTRA, relayIDs );
intent.putExtra( ROWID_EXTRA, rowid );
return intent;
}

View file

@ -48,6 +48,11 @@ public class RelayService extends XWService {
private static final String CMD_STR = "CMD";
private static final int UDP_CHANGED = 1;
private static final int SEND = 2;
private static final int RECEIVE = 3;
private static final String ROWID = "ROWID";
private static final String BINBUFFER = "BINBUFFER";
private Thread m_fetchThread = null;
private Thread m_UDPReadThread = null;
@ -58,8 +63,32 @@ public class RelayService extends XWService {
// These must match the enum XWRelayReg in xwrelay.h
private static final int XWPDEV_PROTO_VERSION = 0;
// private static final int XWPDEV_NONE = 0;
private static final int XWPDEV_ALERT = 1;
private static final int XWPDEV_REG = 2;
private enum XWRelayReg {
XWPDEV_NONE
,XWPDEV_ALERT
,XWPDEV_REG
,XWPDEV_REGRSP
,XWPDEV_PING
,XWPDEV_HAVEMSGS
,XWPDEV_RQSTMSGS
,XWPDEV_MSG
,XWPDEV_MSGNOCONN
,XWPDEV_MSGRSP
,XWPDEV_BADREG
,XWPDEV_ACK
};
// private static final int XWPDEV_ALERT = 1;
// private static final int XWPDEV_REG = 2;
// private static final int XWPDEV_REGRSP = 3;
// private static final int XWPDEV_PING = 4;
// private static final int XWPDEV_HAVEMSGS = 5;
// private static final int XWPDEV_RQSTMSGS = 6;
// private static final int XWPDEV_MSG = 7;
// private static final int XWPDEV_MSGNOCONN = 8;
// private static final int XWPDEV_MSGRSP = 9;
// private static final int XWPDEV_BADREG = 10;
public static void startService( Context context )
{
@ -67,6 +96,26 @@ public class RelayService extends XWService {
context.startService( intent );
}
public static int sendPacket( Context context, long rowid, byte[] buf )
{
Intent intent = getIntentTo( context, SEND );
intent.putExtra( ROWID, rowid );
intent.putExtra( BINBUFFER, buf );
context.startService( intent );
return buf.length;
}
// Exists to get incoming data onto the main thread
private static void postData( Context context, long rowid, byte[] msg )
{
DbgUtils.logf( "RelayService::postData: packet of length %d for token %d",
msg.length, rowid );
Intent intent = getIntentTo( context, RECEIVE );
intent.putExtra( ROWID, rowid );
intent.putExtra( BINBUFFER, msg );
context.startService( intent );
}
public static void udpChanged( Context context )
{
startService( context );
@ -94,6 +143,8 @@ public class RelayService extends XWService {
if ( null != intent ) {
int cmd = intent.getIntExtra( CMD_STR, -1 );
switch( cmd ) {
case -1:
break;
case UDP_CHANGED:
DbgUtils.logf( "RelayService::onStartCommand::UDP_CHANGED" );
if ( XWPrefs.getUDPEnabled( this ) ) {
@ -105,6 +156,16 @@ public class RelayService extends XWService {
startFetchThreadIf();
}
break;
case SEND:
case RECEIVE:
long rowid = intent.getLongExtra( ROWID, -1 );
byte[] msg = intent.getByteArrayExtra( BINBUFFER );
if ( SEND == cmd ) {
sendMessage( rowid, msg );
} else {
feedMessage( rowid, msg );
}
break;
default:
Assert.fail();
}
@ -122,17 +183,20 @@ public class RelayService extends XWService {
long[] rowids = DBUtils.getRowIDsFor( this, relayID );
if ( null != rowids ) {
for ( long rowid : rowids ) {
Intent intent =
GamesList.makeRelayIdsIntent( this,
new String[] {relayID} );
String msg = Utils.format( this, R.string.notify_bodyf,
GameUtils.getName( this, rowid ) );
Utils.postNotification( this, intent, R.string.notify_title,
msg, (int)rowid );
setupNotification( rowid );
}
}
}
}
private void setupNotification( long rowid )
{
Intent intent = GamesList.makeRowidIntent( this, rowid );
String msg = Utils.format( this, R.string.notify_bodyf,
GameUtils.getName( this, rowid ) );
Utils.postNotification( this, intent, R.string.notify_title,
msg, (int)rowid );
}
private void startFetchThreadIf()
{
@ -159,70 +223,85 @@ public class RelayService extends XWService {
private void startUDPThreads()
{
DbgUtils.logf( "startUDPThreads" );
Assert.assertNull( m_UDPWriteThread );
Assert.assertNull( m_UDPReadThread );
Assert.assertTrue( XWPrefs.getUDPEnabled( this ) );
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
String host = XWPrefs.getDefaultRelayHost( RelayService.this );
try {
m_UDPSocket = new DatagramSocket();
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port );
} catch( java.net.SocketException se ) {
DbgUtils.loge( se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
DbgUtils.loge( uhe );
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
String host = XWPrefs.getDefaultRelayHost( RelayService.this );
try {
m_UDPSocket = new DatagramSocket();
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // meaning: remember this address
} catch( java.net.SocketException se ) {
DbgUtils.loge( se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
DbgUtils.loge( uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
DbgUtils.logf( "m_UDPSocket not null" );
}
m_UDPReadThread = new Thread( null, new Runnable() {
public void run() {
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
DbgUtils.logf( "UPD read thread blocking on receive" );
m_UDPSocket.receive( packet );
DbgUtils.logf( "UPD read thread: receive returned" );
} catch( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
break; // ???
if ( null == m_UDPReadThread ) {
m_UDPReadThread = new Thread( null, new Runnable() {
public void run() {
DbgUtils.logf( "read thread running" );
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
DbgUtils.logf( "UPD read thread blocking on receive" );
m_UDPSocket.receive( packet );
DbgUtils.logf( "UPD read thread: receive returned" );
} catch( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
break; // ???
}
DbgUtils.logf( "received %d bytes", packet.getLength() );
gotPacket( packet );
}
DbgUtils.logf( "received %d bytes", packet.getLength() );
gotPacket( packet );
DbgUtils.logf( "read thread exiting" );
}
}
}, getClass().getName() );
m_UDPReadThread.start();
}, getClass().getName() );
m_UDPReadThread.start();
} else {
DbgUtils.logf( "m_UDPReadThread not null and assumed to be running" );
}
m_queue = new LinkedBlockingQueue<DatagramPacket>();
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
for ( ; ; ) {
DatagramPacket outPacket;
try {
outPacket = m_queue.take();
} catch ( InterruptedException ie ) {
DbgUtils.logf( "RelayService; write thread killed" );
break;
}
if ( null == outPacket || 0 == outPacket.getLength() ) {
DbgUtils.logf( "stopping write thread" );
break;
}
DbgUtils.logf( "Sending packet of length %d",
outPacket.getLength() );
try {
m_UDPSocket.send( outPacket );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
if ( null == m_UDPWriteThread ) {
m_queue = new LinkedBlockingQueue<DatagramPacket>();
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
DbgUtils.logf( "write thread running" );
for ( ; ; ) {
DatagramPacket outPacket;
try {
outPacket = m_queue.take();
} catch ( InterruptedException ie ) {
DbgUtils.logf( "RelayService; write thread killed" );
break;
}
if ( null == outPacket || 0 == outPacket.getLength() ) {
DbgUtils.logf( "stopping write thread" );
break;
}
DbgUtils.logf( "Sending udp packet of length %d",
outPacket.getLength() );
try {
m_UDPSocket.send( outPacket );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
DbgUtils.logf( "write thread exiting" );
}
}
}, getClass().getName() );
m_UDPWriteThread.start();
}, getClass().getName() );
m_UDPWriteThread.start();
} else {
DbgUtils.logf( "m_UDPWriteThread not null and assumed to be running" );
}
}
private void stopUDPThreadsIf()
@ -256,35 +335,185 @@ public class RelayService extends XWService {
DbgUtils.logf( "stopUDPThreadsIf DONE" );
}
// Running on reader thread
private void gotPacket( DatagramPacket packet )
{
DbgUtils.logf( "gotPacket" );
ByteArrayInputStream bis = new ByteArrayInputStream( packet.getData() );
int packetLen = packet.getLength();
byte[] data = new byte[packetLen];
System.arraycopy( packet.getData(), 0, data, 0, packetLen );
DbgUtils.logf( "RelayService::gotPacket: %d bytes of data", packetLen );
ByteArrayInputStream bis = new ByteArrayInputStream( data );
DataInputStream dis = new DataInputStream( bis );
try {
byte proto = dis.readByte();
if ( XWPDEV_PROTO_VERSION == proto ) {
byte cmd = dis.readByte();
switch ( cmd ) {
PacketHeader header = readHeader( dis );
if ( null != header ) {
sendAckIf( header );
switch ( header.m_cmd ) {
case XWPDEV_ALERT:
short len = dis.readShort();
byte[] tmp = new byte[len];
dis.read( tmp );
sendResult( MultiEvent.RELAY_ALERT, new String( tmp ) );
String str = getStringWithLength( dis );
sendResult( MultiEvent.RELAY_ALERT, str );
break;
case XWPDEV_BADREG:
str = getStringWithLength( dis );
DbgUtils.logf( "bad relayID \"%s\" reported", str );
XWPrefs.clearRelayDevID( this );
registerWithRelay();
break;
case XWPDEV_REGRSP:
DbgUtils.logf( "got XWPDEV_REGRSP" );
str = getStringWithLength( dis );
DbgUtils.logf( "got relayid %s", str );
XWPrefs.setRelayDevID( this, str );
break;
case XWPDEV_HAVEMSGS:
requestMessages();
break;
case XWPDEV_MSG:
DbgUtils.logf( "got XWPDEV_MSG" );
int token = dis.readInt();
byte[] msg = new byte[dis.available()];
Assert.assertTrue( packet.getLength() >= msg.length );
Assert.assertTrue( packetLen >= msg.length );
dis.read( msg );
postData( RelayService.this, token, msg );
break;
default:
DbgUtils.logf( "RelayService: Unhandled cmd: %d", cmd );
DbgUtils.logf( "RelayService: Unhandled cmd: %d",
header.m_cmd );
break;
}
} else {
DbgUtils.logf( "bad proto %d", proto );
}
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
} // gotPacket
private void registerWithRelay()
{
DbgUtils.logf( "registerWithRelay" );
byte[] typ = new byte[1];
String devid = getDevID(typ);
ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
DataOutputStream out = addProtoAndCmd( bas, XWRelayReg.XWPDEV_REG );
out.writeByte( typ[0] );
out.writeShort( devid.length() );
out.writeBytes( devid );
postPacket( bas );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
private void registerWithRelay()
private void requestMessages()
{
DbgUtils.logf( "requestMessages" );
ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
DataOutputStream out =
addProtoAndCmd( bas, XWRelayReg.XWPDEV_RQSTMSGS );
String devid = getDevID( null );
out.writeShort( devid.length() );
out.writeBytes( devid );
postPacket( bas );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
private void sendMessage( long rowid, byte[] msg )
{
ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
DataOutputStream out = addProtoAndCmd( bas, XWRelayReg.XWPDEV_MSG );
Assert.assertTrue( rowid < Integer.MAX_VALUE );
out.writeInt( (int)rowid );
out.write( msg, 0, msg.length );
postPacket( bas );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
private void sendNoConnMessage( long rowid, String relayID, byte[] msg )
{
ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
DataOutputStream out =
addProtoAndCmd( bas, XWRelayReg.XWPDEV_MSGNOCONN );
Assert.assertTrue( rowid < Integer.MAX_VALUE );
out.writeInt( (int)rowid );
out.writeBytes( relayID );
out.write( '\n' );
out.write( msg, 0, msg.length );
postPacket( bas );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
private void sendAckIf( PacketHeader header )
{
DbgUtils.logf( "sendAckIf" );
if ( XWRelayReg.XWPDEV_ACK != header.m_cmd ) {
ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
DataOutputStream out =
addProtoAndCmd( bas, XWRelayReg.XWPDEV_ACK );
out.writeInt( header.m_packetID );
postPacket( bas );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
}
private PacketHeader readHeader( DataInputStream dis )
throws java.io.IOException
{
PacketHeader result = null;
byte proto = dis.readByte();
if ( XWPDEV_PROTO_VERSION == proto ) {
int packetID = dis.readInt();
DbgUtils.logf( "readHeader: got packetID %d", packetID );
byte ordinal = dis.readByte();
XWRelayReg cmd = XWRelayReg.values()[ordinal];
result = new PacketHeader( cmd, packetID );
} else {
DbgUtils.logf( "bad proto: %d", proto );
}
DbgUtils.logf( "readHeader => %H", result );
return result;
}
private String getStringWithLength( DataInputStream dis )
throws java.io.IOException
{
short len = dis.readShort();
byte[] tmp = new byte[len];
dis.read( tmp );
return new String( tmp );
}
private DataOutputStream addProtoAndCmd( ByteArrayOutputStream bas,
XWRelayReg cmd )
throws java.io.IOException
{
DataOutputStream out = new DataOutputStream( bas );
out.writeByte( XWPDEV_PROTO_VERSION );
out.writeInt( 0 ); // packetID
out.writeByte( cmd.ordinal() );
return out;
}
private void postPacket( ByteArrayOutputStream bas )
{
byte[] data = bas.toByteArray();
m_queue.add( new DatagramPacket( data, data.length ) );
}
private String getDevID( byte[] typp )
{
byte typ;
String devid = XWPrefs.getRelayDevID( this );
@ -299,20 +528,30 @@ public class RelayService extends XWService {
typ = UtilCtxt.ID_TYPE_ANDROID_OTHER;
}
}
if ( null != typp ) {
typp[0] = typ;
} else {
Assert.assertTrue( typ == UtilCtxt.ID_TYPE_RELAY );
}
return devid;
}
ByteArrayOutputStream bas = new ByteArrayOutputStream();
DataOutputStream outBuf = new DataOutputStream( bas );
try {
outBuf.writeByte( XWPDEV_PROTO_VERSION );
outBuf.writeByte( XWPDEV_REG );
outBuf.writeByte( typ );
outBuf.writeShort( devid.length() );
outBuf.writeBytes( devid );
byte[] data = bas.toByteArray();
m_queue.add( new DatagramPacket( data, data.length ) );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
private void feedMessage( long rowid, byte[] msg )
{
DbgUtils.logf( "RelayService::feedMessage: %d bytes for rowid %d",
msg.length, rowid );
if ( BoardActivity.feedMessage( rowid, msg ) ) {
DbgUtils.logf( "feedMessage: board ate it" );
// do nothing
} else {
RelayMsgSink sink = new RelayMsgSink();
sink.setRowID( rowid );
if ( GameUtils.feedMessage( this, rowid, msg, null,
sink ) ) {
setupNotification( rowid );
} else {
DbgUtils.logf( "feedMessage: background dropped it" );
}
}
}
@ -428,29 +667,49 @@ public class RelayService extends XWService {
private class RelayMsgSink extends MultiMsgSink {
private HashMap<String,ArrayList<byte[]>> m_msgLists = null;
private long m_rowid = -1;
public void setRowID( long rowid ) { m_rowid = rowid; }
public void send( Context context )
{
sendToRelay( context, m_msgLists );
if ( -1 == m_rowid ) {
sendToRelay( context, m_msgLists );
} else {
Assert.assertNull( m_msgLists );
}
}
/***** TransportProcs interface *****/
public boolean relayNoConnProc( byte[] buf, String relayID )
{
if ( null == m_msgLists ) {
m_msgLists = new HashMap<String,ArrayList<byte[]>>();
}
if ( -1 != m_rowid ) {
sendNoConnMessage( m_rowid, relayID, buf );
} else {
if ( null == m_msgLists ) {
m_msgLists = new HashMap<String,ArrayList<byte[]>>();
}
ArrayList<byte[]> list = m_msgLists.get( relayID );
if ( list == null ) {
list = new ArrayList<byte[]>();
m_msgLists.put( relayID, list );
ArrayList<byte[]> list = m_msgLists.get( relayID );
if ( list == null ) {
list = new ArrayList<byte[]>();
m_msgLists.put( relayID, list );
}
list.add( buf );
}
list.add( buf );
return true;
}
}
private class PacketHeader {
public int m_packetID;
public XWRelayReg m_cmd;
public PacketHeader( XWRelayReg cmd, int packetID ) {
DbgUtils.logf( "in PacketHeader contructor" );
m_packetID = packetID;
m_cmd = cmd;
}
}
}