Move back to old one-msg-per-connect() model

Add support for the old protocol, and define to use it. After the next
release has been adopted and everybody can read the new protocol another
release will turn it on.
This commit is contained in:
Eric House 2019-02-19 18:31:20 -08:00
parent 31d8345e6b
commit 9915c464af

View file

@ -93,9 +93,12 @@ public class BTService extends XWJIService {
private static final int CONNECT_TIMEOUT_MS = 10000; private static final int CONNECT_TIMEOUT_MS = 10000;
private static final int MAX_PACKET_LEN = 4 * 1024; private static final int MAX_PACKET_LEN = 4 * 1024;
private static final int BT_PROTO_ORIG = 0; private static final int BT_PROTO_JSONS = 1; // using jsons instead of lots of fields
private static final int BT_PROTO_BATCH = 3; private static final int BT_PROTO_BATCH = 2;
private static final int BT_PROTO = BT_PROTO_BATCH; // Move to BT_PROTO_BATCH after everybody's upgraded.
private static final int BT_PROTO = BT_PROTO_JSONS; /* BT_PROTO_BATCH */
private static boolean IS_BATCH_PROTO() { return BT_PROTO_BATCH == BT_PROTO; }
private enum BTAction implements XWJICmds { _NONE, private enum BTAction implements XWJICmds { _NONE,
ACL_CONN, ACL_CONN,
@ -615,10 +618,10 @@ public class BTService extends XWJIService {
new DataInputStream( socket.getInputStream() ); new DataInputStream( socket.getInputStream() );
byte proto = inStream.readByte(); byte proto = inStream.readByte();
if ( proto == BT_PROTO_BATCH ) { if ( proto == BT_PROTO_BATCH || proto == BT_PROTO_JSONS ) {
resetSenderFor( socket ); // still looks good here? resetSenderFor( socket ); // still looks good here?
new PacketParser() new PacketParser( proto )
.dispatchAll( inStream, socket, BTListenerThread.this ); .dispatchAll( inStream, socket, BTListenerThread.this );
// hack: will close if nobody ref'd it inside dispatchAll() // hack: will close if nobody ref'd it inside dispatchAll()
closeForRef( makeRefFor( socket ) ); closeForRef( makeRefFor( socket ) );
@ -947,7 +950,7 @@ public class BTService extends XWJIService {
int nTries = 0; int nTries = 0;
for ( long end = timeout + System.currentTimeMillis(); ; ) { for ( long end = timeout + System.currentTimeMillis(); ; ) {
try { try {
Log.d( TAG, "trying connect(%s/%s) (check accept() logs)", name, addr ); // Log.d( TAG, "trying connect(%s/%s) (check accept() logs)", name, addr );
++nTries; ++nTries;
socket.connect(); socket.connect();
Log.i( TAG, "connect(%s/%s) succeeded after %d tries", Log.i( TAG, "connect(%s/%s) succeeded after %d tries",
@ -1129,7 +1132,9 @@ public class BTService extends XWJIService {
try { try {
tmpOp.dos.writeByte( cmd.ordinal() ); tmpOp.dos.writeByte( cmd.ordinal() );
byte[] data = op.bos.toByteArray(); byte[] data = op.bos.toByteArray();
tmpOp.dos.writeShort( data.length ); if ( IS_BATCH_PROTO() ) {
tmpOp.dos.writeShort( data.length );
}
tmpOp.dos.write( data, 0, data.length ); tmpOp.dos.write( data, 0, data.length );
mData = tmpOp.bos.toByteArray(); mData = tmpOp.bos.toByteArray();
} catch (IOException ioe ) { } catch (IOException ioe ) {
@ -1220,24 +1225,27 @@ public class BTService extends XWJIService {
synchronized ( this ) { synchronized ( this ) {
if ( 0 < mLength ) { if ( 0 < mLength ) {
try { try {
// Format is <proto><len-of-rest><msgCount><msgsData> To // Format is <proto><len-of-rest><msgCount><msg1>..<msgN> To
// insert the count at the beginning we have to create a // insert len-of-rest at the beginning we have to create a
// whole new byte array since there's no random access. // tmp byte array then append it after writing its length.
int msgCount = mElems.size();
OutputPair tmpOP = new OutputPair(); OutputPair tmpOP = new OutputPair();
tmpOP.dos.writeByte( msgCount ); // count of messages int msgCount = IS_BATCH_PROTO() ? mElems.size() : 1;
// Log.d( TAG, "writeAndCheck(): wrote msg count: %d", msgCount ); if ( IS_BATCH_PROTO() ) {
tmpOP.dos.writeByte( msgCount );
}
for ( MsgElem elem : mElems ) { for ( int ii = 0; ii < msgCount; ++ii ) {
byte[] elemData = elem.mData; byte[] elemData = mElems.get(ii).mData;
tmpOP.dos.write( elemData, 0, elemData.length ); tmpOP.dos.write( elemData, 0, elemData.length );
} }
byte[] data = tmpOP.bos.toByteArray(); byte[] data = tmpOP.bos.toByteArray();
// now write to the socket. Note that connect() // now write to the socket. Note that connect()
// writes BT_PROTO as the first byte. // writes BT_PROTO as the first byte.
dos.writeShort( data.length ); if ( IS_BATCH_PROTO() ) {
dos.writeShort( data.length );
}
dos.write( data, 0, data.length ); dos.write( data, 0, data.length );
dos.flush(); dos.flush();
Log.d( TAG, "writeAndCheck(): wrote %d msgs as" Log.d( TAG, "writeAndCheck(): wrote %d msgs as"
@ -1245,7 +1253,11 @@ public class BTService extends XWJIService {
msgCount, data.length, Utils.getMD5SumFor( data ), msgCount, data.length, Utils.getMD5SumFor( data ),
this ); this );
localElems = mElems; if ( IS_BATCH_PROTO() ) {
localElems = mElems;
} else {
localElems = mElems.subList(0, 1);
}
} catch ( IOException ioe ) { } catch ( IOException ioe ) {
Log.e( TAG, "writeAndCheck(): ioe: %s", ioe.getMessage() ); Log.e( TAG, "writeAndCheck(): ioe: %s", ioe.getMessage() );
} }
@ -1354,9 +1366,13 @@ public class BTService extends XWJIService {
{ {
try { try {
OutputPair op = new OutputPair(); OutputPair op = new OutputPair();
byte[] nliData = XwJNI.nliToStream( nli ); if ( IS_BATCH_PROTO() ) {
op.dos.writeShort( nliData.length ); byte[] nliData = XwJNI.nliToStream( nli );
op.dos.write( nliData, 0, nliData.length ); op.dos.writeShort( nliData.length );
op.dos.write( nliData, 0, nliData.length );
} else {
op.dos.writeUTF( nli.toString() );
}
append( BTCmd.INVITE, op ); append( BTCmd.INVITE, op );
} catch ( IOException ioe ) { } catch ( IOException ioe ) {
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
@ -1482,11 +1498,18 @@ public class BTService extends XWJIService {
} }
private static class PacketParser { private static class PacketParser {
private int mProto;
PacketParser(int proto) {
mProto = proto;
}
void dispatchAll( DataInputStream inStream, BluetoothSocket socket, void dispatchAll( DataInputStream inStream, BluetoothSocket socket,
BTListenerThread processor ) BTListenerThread processor )
{ {
try { try {
short isLen = inStream.readShort(); boolean isOldProto = mProto == BT_PROTO_JSONS;
short isLen = isOldProto
? (short)inStream.available() : inStream.readShort();
if ( isLen >= MAX_PACKET_LEN ) { if ( isLen >= MAX_PACKET_LEN ) {
Log.e( TAG, "packet too big; dropping!!!" ); Log.e( TAG, "packet too big; dropping!!!" );
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
@ -1496,14 +1519,14 @@ public class BTService extends XWJIService {
ByteArrayInputStream bis = new ByteArrayInputStream( data ); ByteArrayInputStream bis = new ByteArrayInputStream( data );
DataInputStream dis = new DataInputStream( bis ); DataInputStream dis = new DataInputStream( bis );
int nMessages = dis.readByte(); int nMessages = isOldProto ? 1 : dis.readByte();
Log.d( TAG, "dispatchAll(): read %d-byte payload with sum %s containing %d messages", Log.d( TAG, "dispatchAll(): read %d-byte payload with sum %s containing %d messages",
data.length, Utils.getMD5SumFor( data ), nMessages ); data.length, Utils.getMD5SumFor( data ), nMessages );
for ( int ii = 0; ii < nMessages; ++ii ) { for ( int ii = 0; ii < nMessages; ++ii ) {
byte cmdOrd = dis.readByte(); byte cmdOrd = dis.readByte();
short oneLen = dis.readShort(); // used only to skip short oneLen = isOldProto ? 0 : dis.readShort(); // used only to skip
int availableBefore = dis.available(); int availableBefore = dis.available();
if ( cmdOrd < BTCmd.values().length ) { if ( cmdOrd < BTCmd.values().length ) {
BTCmd cmd = BTCmd.values()[cmdOrd]; BTCmd cmd = BTCmd.values()[cmdOrd];
@ -1515,9 +1538,15 @@ public class BTService extends XWJIService {
processor.receivePing( gameID, socket ); processor.receivePing( gameID, socket );
break; break;
case INVITE: case INVITE:
data = new byte[dis.readShort()]; NetLaunchInfo nli;
dis.readFully( data ); if ( isOldProto ) {
NetLaunchInfo nli = XwJNI.nliFromStream( data ); nli = NetLaunchInfo.makeFrom( XWApp.getContext(),
dis.readUTF() );
} else {
data = new byte[dis.readShort()];
dis.readFully( data );
nli = XwJNI.nliFromStream( data );
}
processor.receiveInvitation( nli, socket ); processor.receiveInvitation( nli, socket );
break; break;
case MESG_SEND: case MESG_SEND:
@ -1545,7 +1574,8 @@ public class BTService extends XWJIService {
// sanity-check based on packet length // sanity-check based on packet length
int availableAfter = dis.available(); int availableAfter = dis.available();
Assert.assertTrue( oneLen == availableBefore - availableAfter Assert.assertTrue( 0 == oneLen
|| oneLen == availableBefore - availableAfter
|| !BuildConfig.DEBUG ); || !BuildConfig.DEBUG );
} }
} }