prune only those messages successfully sent

Fix race condition that would have cleared all messages including those
added after a send began. In the process, move to storing outbound
messages individually rather than concatenated in their stream format.
This commit is contained in:
Eric House 2019-02-19 08:31:49 -08:00
parent ff2a06592f
commit 341293dc55

View file

@ -21,8 +21,6 @@
package org.eehouse.android.xw4;
import android.app.Activity;
import android.app.Notification;
import android.app.PendingIntent;
import android.app.Service;
import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothClass.Device.Major;
@ -33,9 +31,7 @@ import android.content.Context;
import android.content.Intent;
import android.os.Build;
import android.os.Handler;
import android.os.SystemClock;
import android.provider.Settings;
import android.support.v4.app.NotificationCompat;
import android.text.TextUtils;
import org.eehouse.android.xw4.MultiService.DictFetchOwner;
@ -98,8 +94,6 @@ public class BTService extends XWJIService {
private static final int MAX_PACKET_LEN = 4 * 1024;
private static final int BT_PROTO_ORIG = 0;
// private static final int BT_PROTO_JSONS = 1; // using jsons instead of lots of fields
private static final int BT_PROTO_NLI = 2; // using binary/common form of NLI
private static final int BT_PROTO_BATCH = 3;
private static final int BT_PROTO = BT_PROTO_BATCH;
@ -1119,22 +1113,49 @@ public class BTService extends XWJIService {
}
private static class PacketAccumulator {
private static class MsgElem {
BTCmd mCmd;
int mGameID;
long mStamp;
byte[] mData;
MsgElem( BTCmd cmd, int gameID, OutputPair op )
{
mCmd = cmd;
mGameID = gameID;
mStamp = System.currentTimeMillis();
OutputPair tmpOp = new OutputPair();
try {
tmpOp.dos.writeByte( cmd.ordinal() );
byte[] data = op.bos.toByteArray();
tmpOp.dos.writeShort( data.length );
tmpOp.dos.write( data, 0, data.length );
mData = tmpOp.bos.toByteArray();
} catch (IOException ioe ) {
// With memory-backed IO this should be impossible
Log.e( TAG, "MsgElem.__init(): got ioe!: %s",
ioe.getMessage() );
}
}
int size() { return mData.length; }
}
private String mAddr;
private String mName;
private List<BTCmd> mCmds;
private List<Integer> mGameIDs;
private OutputPair mOP;
private long mStamp;
private List<MsgElem> mElems;
private long mLastFailTime;
private int mFailCount;
// private int mPacketCount;
private int mLength;
PacketAccumulator( String addr ) {
mAddr = addr;
mName = getName( addr );
clearAll();
// Log.d( TAG, "PacketAccumulator.__init() => %s", this );
mElems = new ArrayList<>();
mFailCount = 0;
mLength = 0;
}
String getAddr() { return mAddr; }
@ -1146,7 +1167,7 @@ public class BTService extends XWJIService {
// Log.d( TAG, "getNextReadyMS() IN" );
try ( DbgUtils.DeadlockWatch dw = new DbgUtils.DeadlockWatch( this ) ) {
synchronized ( this ) {
if ( 0 == mCmds.size() ) { // nothing to send
if ( 0 == mElems.size() ) { // nothing to send
waitFromNow = Long.MAX_VALUE;
} else if ( 0 == mFailCount ) {
waitFromNow = 0;
@ -1174,10 +1195,17 @@ public class BTService extends XWJIService {
@Override
public synchronized String toString()
{
long age = System.currentTimeMillis() - mStamp;
long age = 0;
if ( 0 < mElems.size() ) {
age = System.currentTimeMillis() - mElems.get(0).mStamp;
}
List<BTCmd> cmds = new ArrayList<>();
for ( MsgElem elem : mElems ) {
cmds.add( elem.mCmd );
}
return String.format("{name: %s, addr: %s, age: %dms, failCount: %d, len: %d, cmds: %s}",
mName, mAddr, age, mFailCount, mOP.length(),
TextUtils.join(",", mCmds) );
mName, mAddr, age, mFailCount, mLength,
TextUtils.join(",", cmds) );
}
int writeAndCheck( BluetoothSocket socket, DataOutputStream dos,
@ -1185,50 +1213,40 @@ public class BTService extends XWJIService {
throws IOException
{
Log.d( TAG, "%s.writeAndCheck() IN", this );
Assert.assertNotNull( dos );
List<BTCmd> localCmds = null;
List<Integer> localGameIDs = null;
List<MsgElem> localElems = null;
try ( DbgUtils.DeadlockWatch dw = new DbgUtils.DeadlockWatch( this ) ) {
synchronized ( this ) {
byte[] data = mOP.bos.toByteArray();
if ( 0 < data.length ) {
if ( 0 < mLength ) {
try {
// Format is <proto><len-of-rest><msgCount><msgsData> To
// insert the count at the beginning we have to create a
// whole new byte array since there's no random access.
int msgCount = mElems.size();
OutputPair tmpOP = new OutputPair();
tmpOP.dos.writeByte( mCmds.size() ); // count of messages
Log.d( TAG, "writeAndCheck(): wrote msg count: %d", mCmds.size() );
tmpOP.dos.write( data, 0, data.length );
data = tmpOP.bos.toByteArray(); // replace data
tmpOP.dos.writeByte( msgCount ); // count of messages
// Log.d( TAG, "writeAndCheck(): wrote msg count: %d", msgCount );
// now write to the socket
Assert.assertNotNull( dos );
// dos.writeByte( BT_PROTO );
for ( MsgElem elem : mElems ) {
byte[] elemData = elem.mData;
tmpOP.dos.write( elemData, 0, elemData.length );
}
byte[] data = tmpOP.bos.toByteArray();
// now write to the socket. Note that connect()
// writes BT_PROTO as the first byte.
dos.writeShort( data.length );
dos.write( data, 0, data.length );
dos.flush();
Log.d( TAG, "writeAndCheck(): wrote %d-byte payload with sum %s",
data.length, Utils.getMD5SumFor( data ) );
Log.d( TAG, "writeAndCheck(): wrote %d msgs as"
+ " %d-byte payload with sum %s (for %s)",
msgCount, data.length, Utils.getMD5SumFor( data ),
this );
// If we get this far, we're going to assume the send has
// succeeded. Now we need to get out of the synchronized
// block because handling all the reads on the socket can
// take a long time and will block lots of stuff
localCmds = mCmds;
localGameIDs = mGameIDs;
// Now read responses
// int nCmds = mCmds.size();
// On remote, socket is being passed through
// onHandleWork. Could take a f*ck of a long time. Maybe we
// don't want this any more?
// try ( KillerIn killer = new KillerIn( socket, 5 * nCmds ) ) {
// DataInputStream inStream =
// new DataInputStream( socket.getInputStream() );
localElems = mElems;
} catch ( IOException ioe ) {
Log.e( TAG, "writeAndCheck(): ioe: %s", ioe.getMessage() );
}
@ -1236,29 +1254,32 @@ public class BTService extends XWJIService {
}
}
Log.d( TAG, "writeAndCheck(): reading replies" );
int nDone = 0;
if ( null != localCmds ) {
if ( null != localElems ) {
Log.d( TAG, "writeAndCheck(): reading %d replies", localElems.size() );
try {
DataInputStream inStream =
new DataInputStream( socket.getInputStream() );
for ( int ii = 0; ii < localCmds.size(); ++ii ) {
BTCmd cmd = localCmds.get(ii);
int gameID = localGameIDs.get(ii);
BTCmd reply = BTCmd.values()[inStream.readByte()];
for ( int ii = 0; ii < localElems.size(); ++ii ) {
MsgElem elem = localElems.get(ii);
BTCmd cmd = elem.mCmd;
int gameID = elem.mGameID;
byte cmdOrd = inStream.readByte();
if ( cmdOrd >= BTCmd.values().length ) {
break; // SNAFU!!!
}
BTCmd reply = BTCmd.values()[cmdOrd];
Log.d( TAG, "writeAndCheck() %s: got response %s to cmd[%d] %s",
this, reply, ii, cmd );
handleReply( helper, socket, inStream, cmd, gameID, reply );
nDone = localCmds.size();
}
if ( nDone > 0 ) {
clearAll();
++nDone;
}
} catch ( IOException ioe ) {
Log.d( TAG, "failed reading replies: %s", ioe.getMessage() );
}
}
unappend( nDone );
Log.d( TAG, "writeAndCheck() => %d", nDone );
return nDone;
}
@ -1319,16 +1340,6 @@ public class BTService extends XWJIService {
}
}
private void clearAll() {
Log.d( TAG, "PacketAccumulator.clearAll(): wiping %d messages",
mCmds == null ? 0 : mCmds.size() );
mOP = new OutputPair();
mCmds = new ArrayList<>();
mGameIDs = new ArrayList<>();
mFailCount = 0;
mStamp = 0;
}
void addPing( int gameID )
{
try {
@ -1385,22 +1396,13 @@ public class BTService extends XWJIService {
private boolean append( BTCmd cmd, int gameID, OutputPair op ) throws IOException
{
boolean haveSpace;
// Log.d( TAG, "append() IN" );
try ( DbgUtils.DeadlockWatch dw = new DbgUtils.DeadlockWatch( this ) ) {
synchronized ( this ) {
haveSpace = mOP.length() + op.length() + 3 < MAX_PACKET_LEN;
MsgElem newElem = new MsgElem( cmd, gameID, op );
haveSpace = mLength + newElem.size() < MAX_PACKET_LEN;
if ( haveSpace ) {
if ( 0 == mCmds.size() ) {
mStamp = System.currentTimeMillis();
}
mCmds.add( cmd );
mGameIDs.add( gameID );
mOP.dos.writeByte( cmd.ordinal() );
byte[] data = op.bos.toByteArray();
mOP.dos.writeShort( data.length );
mOP.dos.write( data, 0, data.length );
mElems.add( newElem );
mLength += newElem.size();
mFailCount = 0; // for now, we restart timer on new data
tellSomebody();
}
@ -1410,6 +1412,21 @@ public class BTService extends XWJIService {
return haveSpace;
}
private void unappend( int nToRemove )
{
Assert.assertTrue( nToRemove <= mElems.size() );
try ( DbgUtils.DeadlockWatch dw = new DbgUtils.DeadlockWatch( this ) ) {
synchronized ( this ) {
for ( int ii = 0; ii < nToRemove; ++ii ) {
MsgElem elem = mElems.remove(0);
mLength -= elem.size();
}
Log.d( TAG, "unappend(): after removing %d, have %d left for size %d",
nToRemove, mElems.size(), mLength );
}
}
}
void resetBackoff()
{
// Log.d( TAG, "resetBackoff() IN" );
@ -1486,37 +1503,45 @@ public class BTService extends XWJIService {
data.length, Utils.getMD5SumFor( data ), nMessages );
for ( int ii = 0; ii < nMessages; ++ii ) {
BTCmd cmd = BTCmd.values()[dis.readByte()];
final short oneLen = dis.readShort(); // used only to skip
byte cmdOrd = dis.readByte();
short oneLen = dis.readShort(); // used only to skip
int availableBefore = dis.available();
switch ( cmd ) {
case PING:
int gameID = dis.readInt();
processor.receivePing( gameID, socket );
break;
case INVITE:
data = new byte[dis.readShort()];
dis.readFully( data );
NetLaunchInfo nli = XwJNI.nliFromStream( data );
processor.receiveInvitation( nli, socket );
break;
case MESG_SEND:
gameID = dis.readInt();
data = new byte[dis.readShort()];
dis.readFully( data );
processor.receiveMessage( gameID, data, socket );
break;
case MESG_GAMEGONE:
gameID = dis.readInt();
processor.receiveGameGone( gameID, socket );
break;
default:
Log.e( TAG, "unexpected command %s; skipping %d bytes", cmd, oneLen );
if ( cmdOrd < BTCmd.values().length ) {
BTCmd cmd = BTCmd.values()[cmdOrd];
Log.d( TAG, "dispatchAll(): reading msg %d: %s, len=%d",
ii, cmd, oneLen );
switch ( cmd ) {
case PING:
int gameID = dis.readInt();
processor.receivePing( gameID, socket );
break;
case INVITE:
data = new byte[dis.readShort()];
dis.readFully( data );
NetLaunchInfo nli = XwJNI.nliFromStream( data );
processor.receiveInvitation( nli, socket );
break;
case MESG_SEND:
gameID = dis.readInt();
data = new byte[dis.readShort()];
dis.readFully( data );
processor.receiveMessage( gameID, data, socket );
break;
case MESG_GAMEGONE:
gameID = dis.readInt();
processor.receiveGameGone( gameID, socket );
break;
default:
Assert.assertFalse( BuildConfig.DEBUG );
break;
}
} else {
Log.e( TAG, "unexpected command (ord: %d);"
+ " skipping %d bytes", cmdOrd, oneLen );
if ( oneLen <= dis.available() ) {
dis.readFully( new byte[oneLen] );
Assert.assertFalse( BuildConfig.DEBUG );
}
break;
}
// sanity-check based on packet length