make packet accumulator a thread that sends

Get rid of the single sender thread and the complexity of querying
individual queues for their state (and the slowdown that happened when
successful writes had to wait for those to devices that weren't
responding). Instead each PacketAccumulator does its own waiting with
timeouts and backoffs, wakes itself when appropriate, and periodically
sends if it can make a connection. Now when there are a bunch of
messages ready they'll get sent pretty quickly once connections to the
remote device start to be successful.
This commit is contained in:
Eric House 2019-02-20 21:54:51 -08:00
parent 89338f43dc
commit 7d48fdd00a

View file

@ -104,11 +104,11 @@ public class BTService extends XWJIService {
private enum BTAction implements XWJICmds { _NONE, private enum BTAction implements XWJICmds { _NONE,
ACL_CONN, ACL_CONN,
START_BACKGROUND, _START_BACKGROUND, // unused
RESEND, RESEND,
SCAN, SCAN,
INVITE, // old INVITE,
SEND, // old SEND,
RADIO, RADIO,
REMOVE, REMOVE,
PINGHOST, PINGHOST,
@ -418,19 +418,11 @@ public class BTService extends XWJIService {
void onHandleWorkImpl( Intent intent, XWJICmds jicmd, long timestamp ) void onHandleWorkImpl( Intent intent, XWJICmds jicmd, long timestamp )
{ {
if ( BTEnabled() ) { if ( BTEnabled() ) {
BTSenderThread.startYourself( this );
BTAction cmd = (BTAction)jicmd; BTAction cmd = (BTAction)jicmd;
switch( cmd ) { switch( cmd ) {
case ACL_CONN: // just forces onCreate to run case ACL_CONN: // just forces onCreate to run
startListener(); startListener();
break; break;
case START_BACKGROUND:
noteLastUsed( this ); // prevent timer from killing immediately
setTimeoutTimer();
break;
case SCAN: case SCAN:
int timeout = intent.getIntExtra( SCAN_TIMEOUT_KEY, -1 ); int timeout = intent.getIntExtra( SCAN_TIMEOUT_KEY, -1 );
startScanThread( timeout ); startScanThread( timeout );
@ -440,12 +432,12 @@ public class BTService extends XWJIService {
String jsonData = intent.getStringExtra( GAMEDATA_KEY ); String jsonData = intent.getStringExtra( GAMEDATA_KEY );
NetLaunchInfo nli = NetLaunchInfo.makeFrom( this, jsonData ); NetLaunchInfo nli = NetLaunchInfo.makeFrom( this, jsonData );
// Log.i( TAG, "onHandleWorkImpl(): nli: %s", nli ); // Log.i( TAG, "onHandleWorkImpl(): nli: %s", nli );
getSenderFor( btAddr ).addInvite( nli ); getPA( btAddr ).addInvite( nli );
break; break;
case PINGHOST: case PINGHOST:
btAddr = intent.getStringExtra( ADDR_KEY ); btAddr = intent.getStringExtra( ADDR_KEY );
int gameID = intent.getIntExtra( GAMEID_KEY, 0 ); int gameID = intent.getIntExtra( GAMEID_KEY, 0 );
getSenderFor( btAddr ).addPing( gameID ); getPA( btAddr ).addPing( gameID );
break; break;
case SEND: case SEND:
@ -454,7 +446,7 @@ public class BTService extends XWJIService {
String msgID = intent.getStringExtra( MSGID_KEY ); String msgID = intent.getStringExtra( MSGID_KEY );
gameID = intent.getIntExtra( GAMEID_KEY, -1 ); gameID = intent.getIntExtra( GAMEID_KEY, -1 );
if ( -1 != gameID ) { if ( -1 != gameID ) {
getSenderFor( btAddr ).addMsg( gameID, buf, msgID ); getPA( btAddr ).addMsg( gameID, buf, msgID );
} }
break; break;
case RADIO: case RADIO:
@ -476,7 +468,7 @@ public class BTService extends XWJIService {
case REMOVE: case REMOVE:
gameID = intent.getIntExtra( GAMEID_KEY, -1 ); gameID = intent.getIntExtra( GAMEID_KEY, -1 );
btAddr = intent.getStringExtra( ADDR_KEY ); btAddr = intent.getStringExtra( ADDR_KEY );
getSenderFor( btAddr ).addDied( gameID ); getPA( btAddr ).addDied( gameID );
break; break;
case MAKE_OR_NOTIFY: case MAKE_OR_NOTIFY:
@ -552,14 +544,13 @@ public class BTService extends XWJIService {
} }
} // onHandleWorkImpl() } // onHandleWorkImpl()
private void startScanThread( final int timeout ) private void startScanThread( final int timeoutMS )
{ {
new Thread( new Runnable() { new Thread( new Runnable() {
@Override @Override
public void run() { public void run() {
Log.d( TAG, "scan thread starting" ); Log.d( TAG, "scan thread starting (timeout=%dms)", timeoutMS );
BTSenderThread.startYourself( BTService.this ) sendPings( MultiEvent.HOST_PONGED, timeoutMS );
.sendPings( MultiEvent.HOST_PONGED, timeout );
mHelper.postEvent( MultiEvent.SCAN_DONE ); mHelper.postEvent( MultiEvent.SCAN_DONE );
Log.d( TAG, "scan thread done" ); Log.d( TAG, "scan thread done" );
} }
@ -744,245 +735,41 @@ public class BTService extends XWJIService {
return btAddr; return btAddr;
} }
// sender thread private void sendPings( MultiEvent event, int timeoutMS )
//
// Thing wants to outlive an instance of BTService, but not live
// forever. Ideally it exists long enough to send the elems posted by one
// instance then dies.
private static class BTSenderThread extends Thread {
private volatile boolean mFinishing = false;
private BTService mService;
private static BTSenderThread[] sSenderHolder = {null};
private static BTSenderThread startYourself( BTService bts )
{ {
synchronized ( sSenderHolder ) { Set<BluetoothDevice> pairedDevs = m_adapter.getBondedDevices();
BTSenderThread result = sSenderHolder[0]; Map<BluetoothDevice, PacketAccumulator> pas = new HashMap<>();
if ( null == result || !result.isAlive() ) {
result = new BTSenderThread( bts );
result.start();
sSenderHolder[0] = result;
} else {
result.setService( bts );
}
return result;
}
}
private BTSenderThread( BTService service ) {
setService( service );
}
void setService( BTService service ) { mService = service; }
@Override
public void run()
{
final String className = getClass().getSimpleName();
final AtomicInteger nDone = new AtomicInteger();
Log.d( TAG, "%s.run() starting", className );
while ( !mFinishing && BTEnabled() ) {
try {
List<PacketAccumulator> pas = getHasData(); // blocks
Thread[] threads = new Thread[pas.size()];
for ( int ii = 0; ii < threads.length; ++ii ) {
final PacketAccumulator pa = pas.get( ii );
threads[ii] = new Thread( new Runnable() {
@Override
public void run() {
boolean success = false;
BluetoothSocket socket = null;
try {
socket = mService.m_adapter.getRemoteDevice( pa.getAddr() )
.createRfcommSocketToServiceRecord( XWApp.getAppUUID() );
DataOutputStream dos = connect( socket );
if ( null == dos ) {
pa.setNoHost();
updateStatusOut( false );
} else {
Log.d( TAG, "%s.thread.run(): connect(%s) => %s",
className, pa.getName(), dos );
nDone.addAndGet( pa.writeAndCheck( socket, dos,
mService.mHelper ) );
success = true;
}
} catch ( IOException ioe ) {
Log.e( TAG, "%s.run(): ioe: %s", className,
ioe.getMessage() );
} finally {
if ( null != socket ) {
try { socket.close(); }
catch (Exception ex) {}
}
}
if ( !success ) {
pa.setNoHost();
}
}
} );
threads[ii].start();
}
Log.d( TAG, "%s.run(): waiting on %d threads", className,
threads.length );
for ( Thread thread : threads ) {
try {
thread.join();
} catch ( InterruptedException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
Log.d( TAG, "%s.run(): DONE waiting on %d threads", className,
threads.length );
// } catch ( IOException ie ) {
// Log.e( TAG, "run(): ioe: %s", ie.getMessage() );
} catch ( InterruptedException ie ) {
Log.w( TAG, "%s.run() interrupted; killing thread", className );
break;
}
}
Log.d( TAG, "%s.run() exiting after sending %d packets (owner was %s)", className,
nDone.get(), mService );
}
private void sendPings( MultiEvent event, int timeout )
{
Set<BluetoothDevice> pairedDevs = mService.m_adapter.getBondedDevices();
Map<BluetoothDevice, PingThread> threads = new HashMap<>();
for ( BluetoothDevice dev : pairedDevs ) { for ( BluetoothDevice dev : pairedDevs ) {
// Skip things that can't host an Android app // Skip things that can't host an Android app
int clazz = dev.getBluetoothClass().getMajorDeviceClass(); int clazz = dev.getBluetoothClass().getMajorDeviceClass();
if ( Major.PHONE == clazz || Major.COMPUTER == clazz ) { if ( Major.PHONE == clazz || Major.COMPUTER == clazz ) {
PingThread thread = new PingThread( dev, timeout, event ); PacketAccumulator pa =
threads.put( dev, thread ); new PacketAccumulator( dev.getAddress(), timeoutMS )
thread.start(); .addPing( 0 )
.setExitWhenEmpty()
.setLifetimeMS(timeoutMS)
.setService( this )
;
pas.put( dev, pa );
} else { } else {
Log.d( TAG, "skipping %s (clazz=%d); not an android device!", Log.d( TAG, "skipping %s (clazz=%d); not an android device!",
dev.getName(), clazz ); dev.getName(), clazz );
} }
} }
for ( BluetoothDevice dev : threads.keySet() ) { for ( BluetoothDevice dev : pas.keySet() ) {
PingThread thread = threads.get( dev ); PacketAccumulator pa = pas.get( dev );
try { try {
thread.join(); pa.join();
if ( 0 < pa.getResponseCount() ) {
mHelper.postEvent( event, dev );
}
} catch ( InterruptedException ex ) { } catch ( InterruptedException ex ) {
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
} }
} }
} }
private class PingThread extends Thread {
private boolean mGotResponse;
private BluetoothDevice mDev;
private int mTimeout;
private MultiEvent mEvent;
PingThread(BluetoothDevice dev, int timeout, MultiEvent event)
{
mDev = dev; mTimeout = timeout; mEvent = event;
}
@Override
public void run() {
mGotResponse = sendPing( mDev, 0, mTimeout );
if ( mGotResponse && null != mEvent) {
mService.mHelper.postEvent( mEvent, mDev );
}
}
boolean gotResponse() { return mGotResponse; }
}
private boolean sendPing( BluetoothDevice dev, int gameID, int timeout )
{
Log.d( TAG, "sendPing(dev=%s)", dev );
boolean gotReply = false;
boolean sendWorking = false;
boolean receiveWorking = false;
try {
BluetoothSocket socket =
dev.createRfcommSocketToServiceRecord( XWApp.getAppUUID() );
if ( null != socket ) {
DataOutputStream os = connect( socket, timeout );
if ( null != os ) {
// quick hack to use new formatting code here too
PacketAccumulator pa = new PacketAccumulator(socket.getRemoteDevice().getAddress());
pa.addPing( gameID );
int nSent = pa.writeAndCheck( socket, os, mService.mHelper );
gotReply = nSent == 1;
Assert.assertTrue( nSent == 1 );
receiveWorking = true;
sendWorking = true;
}
socket.close();
}
} catch ( IOException ioe ) {
Log.e( TAG, "sendPing() failure; %s", ioe.getMessage() );
DbgUtils.printStack( TAG, ioe );
}
updateStatusOut( sendWorking );
updateStatusIn( receiveWorking );
// Log.d( TAG, "sendPing(%s) => %b", dev.getName(), gotReply );
return gotReply;
} // sendPing
private boolean sendPing( String btAddr, int gameID, int timeout )
{
boolean success = false;
BluetoothDevice dev = mService.m_adapter.getRemoteDevice( btAddr );
success = sendPing( dev, gameID, timeout );
return success;
}
private DataOutputStream connect( BluetoothSocket socket )
{
return connect( socket, 20000 );
}
private DataOutputStream connect( BluetoothSocket socket, int timeout )
{
String name = socket.getRemoteDevice().getName();
String addr = socket.getRemoteDevice().getAddress();
Log.w( TAG, "connect(%s/%s, timeout=%d) starting", name, addr, timeout );
// DbgUtils.logf( "connecting to %s to send cmd %s", name, cmd.toString() );
// Docs say always call cancelDiscovery before trying to connect
mService.m_adapter.cancelDiscovery();
DataOutputStream dos = null;
// Retry for some time. Some devices take a long time to generate and
// broadcast ACL conn ACTION
int nTries = 0;
for ( long end = timeout + System.currentTimeMillis(); ; ) {
try {
// Log.d( TAG, "trying connect(%s/%s) (check accept() logs)", name, addr );
++nTries;
socket.connect();
Log.i( TAG, "connect(%s/%s) succeeded after %d tries",
name, addr, nTries );
dos = new DataOutputStream( socket.getOutputStream() );
dos.writeByte( BT_PROTO );
break; // success!!!
} catch (IOException ioe) {
// Log.d( TAG, "connect(): %s", ioe.getMessage() );
long msLeft = end - System.currentTimeMillis();
if ( msLeft <= 0 ) {
break;
}
try {
Thread.sleep( Math.min( CONNECT_SLEEP_MS, msLeft ) );
} catch ( InterruptedException ex ) {
break;
}
}
}
Log.w( TAG, "connect(%s/%s) => %s", name, addr, dos );
return dos;
}
} // class BTSenderThread
private void startListener() private void startListener()
{ {
Assert.assertNotNull( mHelper ); Assert.assertNotNull( mHelper );
@ -1008,60 +795,12 @@ public class BTService extends XWJIService {
return result; return result;
} }
private static void noteLastUsed( Context context )
{
Log.d( TAG, "noteLastUsed(" + context + ")" );
// synchronized (BTService.class) {
// int nowSecs = (int)(SystemClock.uptimeMillis() / 1000);
// int newKeepSecs = nowSecs + DEFAULT_KEEPALIVE_SECONDS;
// DBUtils.setIntFor( context, KEY_KEEPALIVE_UNTIL_SECS, newKeepSecs );
// }
}
private void setTimeoutTimer()
{
// // DbgUtils.assertOnUIThread();
// long dieTimeMillis;
// synchronized (BTService.class) {
// dieTimeMillis = 1000 * DBUtils.getIntFor( this, KEY_KEEPALIVE_UNTIL_SECS, 0 );
// }
// long nowMillis = SystemClock.uptimeMillis();
// if ( dieTimeMillis <= nowMillis ) {
// Log.d( TAG, "setTimeoutTimer(): killing the thing" );
// stopListener();
// stopForeground(true);
// } else {
// mHandler.removeCallbacksAndMessages( this );
// mHandler.postAtTime( new Runnable() {
// @Override
// public void run() {
// setTimeoutTimer();
// }
// }, this, dieTimeMillis );
// Log.d( TAG, "setTimeoutTimer(): set for %dms from now", dieTimeMillis - nowMillis );
// }
}
private static void logIOE( IOException ioe ) private static void logIOE( IOException ioe )
{ {
Log.ex( TAG, ioe ); Log.ex( TAG, ioe );
++s_errCount; ++s_errCount;
} }
private void sendBadProto( BluetoothSocket socket )
{
sendBadProto( mHelper, socket );
}
private static void sendBadProto( BTServiceHelper helper, BluetoothSocket socket )
{
helper.postEvent( MultiEvent.BAD_PROTO_BT,
socket.getRemoteDevice().getName() );
DbgUtils.printStack( TAG );
}
private static void updateStatusOut( boolean success ) private static void updateStatusOut( boolean success )
{ {
Context context = XWApp.getContext(); Context context = XWApp.getContext();
@ -1116,8 +855,7 @@ public class BTService extends XWJIService {
int nSent = -1; int nSent = -1;
String btAddr = getSafeAddr( addr ); String btAddr = getSafeAddr( addr );
if ( null != btAddr && 0 < btAddr.length() ) { if ( null != btAddr && 0 < btAddr.length() ) {
getSenderFor( btAddr ).addMsg( gameID, buf, msgID ); getPA( btAddr ).addMsg( gameID, buf, msgID );
BTSenderThread.startYourself( BTService.this );
nSent = buf.length; nSent = buf.length;
} else { } else {
Log.i( TAG, "sendViaBluetooth(): no addr for dev %s", Log.i( TAG, "sendViaBluetooth(): no addr for dev %s",
@ -1153,7 +891,7 @@ public class BTService extends XWJIService {
} }
} }
private static class PacketAccumulator { private static class PacketAccumulator extends Thread {
private static class MsgElem { private static class MsgElem {
BTCmd mCmd; BTCmd mCmd;
@ -1218,22 +956,128 @@ public class BTService extends XWJIService {
private int mFailCount; private int mFailCount;
private int mLength; private int mLength;
private int mCounter; private int mCounter;
private BTService mService;
private boolean mShouldExit = false;
private long mDieTimeMS = Long.MAX_VALUE;
private int mResponseCount;
private int mTimeoutMS;
private volatile boolean mExitWhenEmpty = false;
PacketAccumulator( String addr ) { PacketAccumulator( String addr ) { this(addr, 20000); }
PacketAccumulator( String addr, int timeoutMS )
{
mAddr = addr; mAddr = addr;
mName = getName( addr ); mName = getName( addr );
mElems = new ArrayList<>(); mElems = new ArrayList<>();
mFailCount = 0; mFailCount = 0;
mLength = 0; mLength = 0;
mTimeoutMS = timeoutMS;
start();
} }
String getAddr() { return mAddr; } synchronized PacketAccumulator setService( BTService service )
String getName() { return mName; } {
Assert.assertNotNull( service );
mService = service;
long getNextReadyMS() notifyAll();
return this;
}
PacketAccumulator setExitWhenEmpty()
{
mExitWhenEmpty = true;
return this;
}
PacketAccumulator setLifetimeMS( long msToLive )
{
mDieTimeMS = System.currentTimeMillis() + msToLive;
return this;
}
int getResponseCount()
{
return mResponseCount;
}
@Override
public void run()
{
Log.d( TAG, "run starting for %s", this );
// Run as long as I have something to send. Sleep for as long as
// appropriate based on backoff logic, and be awakened when
// something new comes in or there's reason to hope a send try
// will succeed.
while ( ! mShouldExit ) {
synchronized ( this ) {
if ( mExitWhenEmpty && 0 == mElems.size() ) {
break;
} else if ( System.currentTimeMillis() >= mDieTimeMS ) {
break;
}
long waitTimeMS = null == mService
? Long.MAX_VALUE : figureWait();
if ( waitTimeMS > 0 ) {
Log.d( TAG, "%s: waiting %dms", this, waitTimeMS );
try {
wait( waitTimeMS );
Log.d( TAG, "%s: done waiting", this );
continue; // restart in case state's changed
} catch ( InterruptedException ie ) {
Log.d( TAG, "ie inside wait: %s", ie.getMessage() );
}
}
}
mResponseCount += trySend();
}
Log.d( TAG, "run finishing for %s after sending %d packets",
this, mResponseCount );
// A hack: mExitWhenEmpty only set in the ping case
if ( !mExitWhenEmpty ) {
removeSenderFor( this );
}
}
String getBTAddr() { return mAddr; }
String getBTName() { return mName; }
private int trySend()
{
int nDone = 0;
BluetoothSocket socket = null;
try {
socket = mService.m_adapter.getRemoteDevice( getBTAddr() )
.createRfcommSocketToServiceRecord( XWApp.getAppUUID() );
DataOutputStream dos = connect( socket, mTimeoutMS );
if ( null == dos ) {
setNoHost();
updateStatusOut( false );
} else {
Log.d( TAG, "PacketAccumulator.run(): connect(%s) => %s",
getBTName(), dos );
nDone += writeAndCheck( socket, dos, mService.mHelper );
updateStatusOut( true );
}
} catch ( IOException ioe ) {
Log.e( TAG, "PacketAccumulator.run(): ioe: %s",
ioe.getMessage() );
} finally {
if ( null != socket ) {
try { socket.close(); }
catch (Exception ex) {}
}
}
return nDone;
}
private long figureWait()
{ {
long waitFromNow; long waitFromNow;
// Log.d( TAG, "getNextReadyMS() IN" );
try ( DeadlockWatch dw = new DeadlockWatch( this ) ) { try ( DeadlockWatch dw = new DeadlockWatch( this ) ) {
synchronized ( this ) { synchronized ( this ) {
if ( 0 == mElems.size() ) { // nothing to send if ( 0 == mElems.size() ) { // nothing to send
@ -1247,7 +1091,7 @@ public class BTService extends XWJIService {
} }
} }
} }
Log.d( TAG, "%s.getNextReadyMS() => %dms", this, waitFromNow ); Log.d( TAG, "%s.figureWait() => %dms", this, waitFromNow );
return waitFromNow; return waitFromNow;
} }
@ -1288,7 +1132,7 @@ public class BTService extends XWJIService {
return sb.append('}').toString(); return sb.append('}').toString();
} }
int writeAndCheck( BluetoothSocket socket, DataOutputStream dos, private int writeAndCheck( BluetoothSocket socket, DataOutputStream dos,
BTServiceHelper helper ) BTServiceHelper helper )
throws IOException throws IOException
{ {
@ -1355,7 +1199,12 @@ public class BTService extends XWJIService {
Log.d( TAG, "writeAndCheck() %s: got response %s to cmd[%d] %s", Log.d( TAG, "writeAndCheck() %s: got response %s to cmd[%d] %s",
this, reply, ii, cmd ); this, reply, ii, cmd );
handleReply( helper, socket, inStream, cmd, gameID, reply ); if ( reply == BTCmd.BAD_PROTO ) {
helper.postEvent( MultiEvent.BAD_PROTO_BT,
socket.getRemoteDevice().getName() );
} else {
handleReply( helper, inStream, cmd, gameID, reply );
}
++nDone; ++nDone;
} }
} catch ( IOException ioe ) { } catch ( IOException ioe ) {
@ -1370,14 +1219,10 @@ public class BTService extends XWJIService {
return nDone; return nDone;
} }
private void handleReply( BTServiceHelper helper, BluetoothSocket socket, private void handleReply( BTServiceHelper helper, DataInputStream inStream,
DataInputStream inStream, BTCmd cmd, int gameID, BTCmd cmd, int gameID, BTCmd reply ) throws IOException
BTCmd reply ) throws IOException
{ {
MultiEvent evt = null; MultiEvent evt = null;
if ( reply == BTCmd.BAD_PROTO ) {
sendBadProto( helper, socket );
} else {
switch ( cmd ) { switch ( cmd ) {
case MESG_SEND: case MESG_SEND:
case MESG_GAMEGONE: case MESG_GAMEGONE:
@ -1417,16 +1262,51 @@ public class BTService extends XWJIService {
if ( null != evt ) { if ( null != evt ) {
helper.postEvent( evt, gameID, 0, mName ); helper.postEvent( evt, gameID, 0, mName );
// if ( ! success ) {
// int failCount = elem.incrFailCount();
// mHelper.postEvent( MultiEvent.MESSAGE_RESEND, btName,
// RESEND_TIMEOUT, failCount );
// }
}
} }
} }
void addPing( int gameID ) private DataOutputStream connect( BluetoothSocket socket, int timeout )
{
String name = socket.getRemoteDevice().getName();
String addr = socket.getRemoteDevice().getAddress();
Log.w( TAG, "connect(%s/%s, timeout=%d) starting", name, addr, timeout );
// DbgUtils.logf( "connecting to %s to send cmd %s", name, cmd.toString() );
// Docs say always call cancelDiscovery before trying to connect
mService.m_adapter.cancelDiscovery();
DataOutputStream dos = null;
// Retry for some time. Some devices take a long time to generate and
// broadcast ACL conn ACTION
int nTries = 0;
for ( long end = timeout + System.currentTimeMillis(); ; ) {
try {
// Log.d( TAG, "trying connect(%s/%s) (check accept() logs)", name, addr );
++nTries;
socket.connect();
Log.i( TAG, "connect(%s/%s) succeeded after %d tries",
name, addr, nTries );
dos = new DataOutputStream( socket.getOutputStream() );
dos.writeByte( BT_PROTO );
break; // success!!!
} catch (IOException ioe) {
// Log.d( TAG, "connect(): %s", ioe.getMessage() );
long msLeft = end - System.currentTimeMillis();
if ( msLeft <= 0 ) {
break;
}
try {
Thread.sleep( Math.min( CONNECT_SLEEP_MS, msLeft ) );
} catch ( InterruptedException ex ) {
break;
}
}
}
Log.w( TAG, "connect(%s/%s) => %s", name, addr, dos );
return dos;
}
PacketAccumulator addPing( int gameID )
{ {
try { try {
OutputPair op = new OutputPair(); OutputPair op = new OutputPair();
@ -1435,6 +1315,7 @@ public class BTService extends XWJIService {
} catch ( IOException ioe ) { } catch ( IOException ioe ) {
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
} }
return this;
} }
void addInvite( NetLaunchInfo nli ) void addInvite( NetLaunchInfo nli )
@ -1515,7 +1396,7 @@ public class BTService extends XWJIService {
} }
// for now, we restart timer on new data, even if a dupe // for now, we restart timer on new data, even if a dupe
mFailCount = 0; mFailCount = 0;
tellSomebody(); notifyAll();
} }
} }
} }
@ -1562,17 +1443,6 @@ public class BTService extends XWJIService {
int length() { return bos.toByteArray().length; } int length() { return bos.toByteArray().length; }
} }
private void tellSomebody()
{
// Log.d( TAG, "tellSomebody() IN" );
try ( DeadlockWatch dw = new DeadlockWatch( sBlocker ) ) {
synchronized ( sBlocker ) {
sBlocker.notifyAll();
}
}
// Log.d( TAG, "tellSomebody() OUT" );
}
private String getName( String addr ) private String getName( String addr )
{ {
Assert.assertFalse( BOGUS_MARSHMALLOW_ADDR.equals( addr ) ); Assert.assertFalse( BOGUS_MARSHMALLOW_ADDR.equals( addr ) );
@ -1597,9 +1467,7 @@ public class BTService extends XWJIService {
private static class PacketParser { private static class PacketParser {
private int mProto; private int mProto;
PacketParser(int proto) { PacketParser( int proto ) { mProto = proto; }
mProto = proto;
}
void dispatchAll( DataInputStream inStream, BluetoothSocket socket, void dispatchAll( DataInputStream inStream, BluetoothSocket socket,
BTListenerThread processor ) BTListenerThread processor )
@ -1690,38 +1558,10 @@ public class BTService extends XWJIService {
} }
} // class PacketParser } // class PacketParser
// Blocks until can return an Accumulator with data private PacketAccumulator getPA( String addr )
private static Object sBlocker = new Object();
private static List<PacketAccumulator> getHasData() throws InterruptedException
{ {
// Log.d( TAG, "getHasData() IN" ); PacketAccumulator pa = getSenderFor( addr );
List<PacketAccumulator> result = new ArrayList<>(); return pa.setService( this );
while ( 0 == result.size() ) {
long newMin = 60 * 60 * 1000; // longest wait: 1 hour
try ( DeadlockWatch dw = new DeadlockWatch( sSenders ) ) {
synchronized ( sSenders ) {
for ( String addr : sSenders.keySet() ) {
PacketAccumulator pa = sSenders.get( addr );
long nextReady = pa.getNextReadyMS();
if ( nextReady <= 0 ) {
result.add( pa );
} else {
newMin = Math.min( newMin, nextReady );
}
}
}
}
if ( 0 == result.size() ) {
synchronized ( sBlocker ) {
Log.d( TAG, "getHasData(): waiting %dms", newMin );
sBlocker.wait( 1 + newMin ); // 0 might mean forever
Log.d( TAG, "getHasData(): DONE waiting" );
}
}
}
// Log.d( TAG, "getHasData() => %s", result );
return result;
} }
private static Map<String, PacketAccumulator> sSenders = new HashMap<>(); private static Map<String, PacketAccumulator> sSenders = new HashMap<>();
@ -1744,6 +1584,19 @@ public class BTService extends XWJIService {
return result; return result;
} }
private static void removeSenderFor( PacketAccumulator pa )
{
try ( DeadlockWatch dw = new DeadlockWatch( sSenders ) ) {
synchronized ( sSenders ) {
if ( pa == sSenders.get( pa.getBTAddr() ) ) {
sSenders.remove( pa );
} else {
Log.e( TAG, "race? There's a different PA for %s", pa.getBTAddr() );
}
}
}
}
private static void resetSenderFor( BluetoothSocket socket ) private static void resetSenderFor( BluetoothSocket socket )
{ {
// Log.d( TAG, "resetSenderFor(%s)", socket ); // Log.d( TAG, "resetSenderFor(%s)", socket );