use a single queue for all nbsproto threads

No point in worrying about whether a thread exists without emptying its
queue. Instead have a thread start when something's added to the queue
and exit when it's been empty for a while. Only trick is the need to
remember the phone number[s] on behalf of which a retry's necessary
because jni code is waiting to see if combining can be done.
This commit is contained in:
Eric House 2019-04-09 11:45:32 -07:00
parent 012ebe26f7
commit c757ba1386
3 changed files with 202 additions and 174 deletions

View file

@ -428,7 +428,7 @@ public class CommsTransport implements TransportProcs,
break; break;
case COMMS_CONN_SMS: case COMMS_CONN_SMS:
nSent = NBSProto.sendPacket( context, addr.sms_phone, nSent = NBSProto.sendPacket( context, addr.sms_phone,
gameID, buf ); gameID, buf, msgID );
break; break;
case COMMS_CONN_BT: case COMMS_CONN_BT:
nSent = BTService.sendPacket( context, buf, msgID, addr, gameID ); nSent = BTService.sendPacket( context, buf, msgID, addr, gameID );

View file

@ -68,9 +68,9 @@ public class MultiMsgSink implements TransportProcs {
return BTService.sendPacket( m_context, buf, msgID, addr, gameID ); return BTService.sendPacket( m_context, buf, msgID, addr, gameID );
} }
public int sendViaSMS( byte[] buf, int gameID, CommsAddrRec addr ) public int sendViaSMS( byte[] buf, String msgID, int gameID, CommsAddrRec addr )
{ {
return NBSProto.sendPacket( m_context, addr.sms_phone, gameID, buf ); return NBSProto.sendPacket( m_context, addr.sms_phone, gameID, buf, msgID );
} }
public int sendViaP2P( byte[] buf, int gameID, CommsAddrRec addr ) public int sendViaP2P( byte[] buf, int gameID, CommsAddrRec addr )
@ -101,7 +101,7 @@ public class MultiMsgSink implements TransportProcs {
nSent = sendViaBluetooth( buf, msgID, gameID, addr ); nSent = sendViaBluetooth( buf, msgID, gameID, addr );
break; break;
case COMMS_CONN_SMS: case COMMS_CONN_SMS:
nSent = sendViaSMS( buf, gameID, addr ); nSent = sendViaSMS( buf, msgID, gameID, addr );
break; break;
case COMMS_CONN_P2P: case COMMS_CONN_P2P:
nSent = sendViaP2P( buf, gameID, addr ); nSent = sendViaP2P( buf, gameID, addr );

View file

@ -28,6 +28,7 @@ import android.telephony.SmsManager;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -60,7 +61,7 @@ public class NBSProto {
public static void handleFrom( Context context, byte[] buffer, public static void handleFrom( Context context, byte[] buffer,
String phone, short port ) String phone, short port )
{ {
getCurThread( phone, port ).addPacketFrom( context, buffer ); addPacketFrom( context, phone, port, buffer );
if ( (0 == (++s_nReceived % TOAST_FREQ)) && showToasts( context ) ) { if ( (0 == (++s_nReceived % TOAST_FREQ)) && showToasts( context ) ) {
DbgUtils.showf( context, "Got msg %d", s_nReceived ); DbgUtils.showf( context, "Got msg %d", s_nReceived );
} }
@ -69,25 +70,27 @@ public class NBSProto {
public static void inviteRemote( Context context, String phone, public static void inviteRemote( Context context, String phone,
NetLaunchInfo nli ) NetLaunchInfo nli )
{ {
getCurThread( phone ).addInviteTo( context, nli ); addInviteTo( context, phone, nli );
} }
public static int sendPacket( Context context, String phone, public static int sendPacket( Context context, String phone,
int gameID, byte[] binmsg ) int gameID, byte[] binmsg, String msgID )
{ {
getCurThread( phone ).addPacketTo( context, gameID, binmsg ); Log.d( TAG, "sendPacket(phone=%s, gameID=%d, len=%d, msgID=%s)",
phone, gameID, binmsg.length, msgID );
addPacketTo( context, phone, gameID, binmsg );
return binmsg.length; return binmsg.length;
} }
public static void gameDied( Context context, int gameID, String phone ) public static void gameDied( Context context, int gameID, String phone )
{ {
getCurThread( phone ).addGameDied( context, gameID ); addGameDied( context, phone, gameID );
} }
public static void onGameDictDownload( Context context, Intent oldIntent ) public static void onGameDictDownload( Context context, Intent oldIntent )
{ {
NetLaunchInfo nli = MultiService.getMissingDictData( context, oldIntent ); NetLaunchInfo nli = MultiService.getMissingDictData( context, oldIntent );
getCurThread( nli.phone ).addInviteFrom( context, nli ); addInviteFrom( context, nli );
} }
public static void stopThreads() public static void stopThreads()
@ -100,57 +103,61 @@ public class NBSProto {
s_showToasts = newVal; s_showToasts = newVal;
} }
private static void addPacketFrom( Context context, String phone,
short port, byte[] data )
{
add( new ReceiveElem( context, phone, port, data ) );
}
private static void addInviteFrom( Context context, NetLaunchInfo nli )
{
add( new ReceiveElem( context, nli ) );
}
private static void addPacketTo( Context context, String phone,
int gameID, byte[] binmsg )
{
add( new SendElem( context, phone, SMS_CMD.DATA, gameID, binmsg ) );
}
private static void addInviteTo( Context context, String phone, NetLaunchInfo nli )
{
add( new SendElem( context, phone, SMS_CMD.INVITE, nli ) );
}
private static void addGameDied( Context context, String phone, int gameID )
{
add( new SendElem( context, phone, SMS_CMD.DEATH, gameID, null ) );
}
private static void addAck( Context context, String phone, int gameID )
{
add( new SendElem( context, phone, SMS_CMD.ACK_INVITE, gameID, null ) );
}
private static void add( QueueElem elem ) {
if ( XWPrefs.getNBSEnabled( elem.context ) ) {
sQueue.add( elem );
startThreadOnce();
}
}
private static LinkedBlockingQueue<QueueElem> sQueue = new LinkedBlockingQueue<>();
static class NBSProtoThread extends Thread { static class NBSProtoThread extends Thread {
private String mPhone;
private short mPort;
private LinkedBlockingQueue<QueueElem> mQueue = new LinkedBlockingQueue<>();
private boolean mForceNow; private boolean mForceNow;
private int[] mWaitSecs = { 0 }; private int[] mWaitSecs = { 0 };
private Set<String> mCachedDests = new HashSet<>();
NBSProtoThread( String phone, short port ) NBSProtoThread()
{ {
super( "NBSProtoThread" ); super( "NBSProtoThread" );
mPhone = phone;
mPort = port;
mForceNow = !XWPrefs.getSMSProtoEnabled( XWApp.getContext() ); mForceNow = !XWPrefs.getSMSProtoEnabled( XWApp.getContext() );
} }
private String getPhone() { return mPhone; }
private short getPort() { return mPort; }
void addPacketFrom( Context context, byte[] data )
{
add( new ReceiveElem( context, data ) );
}
void addInviteFrom( Context context, NetLaunchInfo nli )
{
add( new ReceiveElem( context, nli ) );
}
void addPacketTo( Context context, int gameID, byte[] binmsg )
{
add( new SendElem( context, SMS_CMD.DATA, gameID, binmsg ) );
}
void addInviteTo( Context context, NetLaunchInfo nli )
{
add( new SendElem( context, SMS_CMD.INVITE, nli ) );
}
void addGameDied( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.DEATH, gameID, null ) );
}
void addAck( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.ACK_INVITE, gameID, null ) );
}
@Override @Override
public void run() { public void run() {
Log.d( TAG, "%s.run() starting for %s", this, mPhone ); Log.d( TAG, "%s.run() starting", this );
while ( !isInterrupted() ) { while ( !isInterrupted() ) {
try { try {
@ -159,8 +166,8 @@ public class NBSProto {
// set by smsproto_prepOutbound(). Otherwise sleep until // set by smsproto_prepOutbound(). Otherwise sleep until
// there's something in the queue. // there's something in the queue.
long waitSecs = mWaitSecs[0] <= 0 ? 10 * 60 : mWaitSecs[0]; long waitSecs = mWaitSecs[0] <= 0 ? 10 * 60 : mWaitSecs[0];
QueueElem elem = mQueue.poll( waitSecs, TimeUnit.SECONDS ); QueueElem elem = sQueue.poll( waitSecs, TimeUnit.SECONDS );
if ( !process( elem, false ) ) { if ( !process( elem ) ) {
break; break;
} }
} catch ( InterruptedException iex ) { } catch ( InterruptedException iex ) {
@ -169,76 +176,89 @@ public class NBSProto {
} }
} }
removeSelf( this ); // should stop additions to the queue removeSelf( this );
// Now just empty out the queue, in case anything was added
// late. Note that if we're abandoning a half-assembled
// multi-message buffer that data's lost until a higher level
// resends. So don't let that happen.
for ( ; ; ) {
QueueElem elem = mQueue.poll();
process( elem, true );
if ( null == elem ) {
break;
}
}
// Better not be leaving anything behind!
Assert.assertTrue( 0 == mQueue.size() || !BuildConfig.DEBUG );
Log.d( TAG, "%s.run() DONE", this ); Log.d( TAG, "%s.run() DONE", this );
} }
private void add( QueueElem elem ) { private boolean processReceive( ReceiveElem elem )
if ( XWPrefs.getNBSEnabled( elem.context ) ) {
mQueue.add( elem );
}
}
private boolean processReceive( ReceiveElem elem, boolean exiting )
{ {
if ( null != elem.data ) { if ( null != elem.data ) {
SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, mPhone, mPort ); SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, elem.phone, elem.port );
if ( null != msgs ) { if ( null != msgs ) {
Log.d( TAG, "got %d msgs combined!", msgs.length ); Log.d( TAG, "got %d msgs combined!", msgs.length );
for ( int ii = 0; ii < msgs.length; ++ii ) { for ( int ii = 0; ii < msgs.length; ++ii ) {
Log.d( TAG, "%d: type: %s; len: %d", ii, msgs[ii].cmd, msgs[ii].data.length ); Log.d( TAG, "%d: type: %s; len: %d", ii, msgs[ii].cmd, msgs[ii].data.length );
} }
for ( SMSProtoMsg msg : msgs ) { for ( SMSProtoMsg msg : msgs ) {
receive( elem.context, msg ); receive( elem.context, elem.phone, msg );
} }
getHelper().postEvent( MultiEvent.SMS_RECEIVE_OK ); getHelper().postEvent( MultiEvent.SMS_RECEIVE_OK );
} else { } else {
Log.d( TAG, "receiveBuffer(): bogus or incomplete message from %s", Log.d( TAG, "receiveBuffer(): bogus or incomplete message from %s",
getPhone() ); elem.phone );
} }
} }
if ( null != elem.nli ) { if ( null != elem.nli ) {
makeForInvite( elem.context, elem.nli ); makeForInvite( elem.context, elem.phone, elem.nli );
} }
return true; return true;
} }
private boolean processSend( SendElem elem, boolean exiting ) // Called when we have nothing to add, but might be sending what's
// already waiting for possible combination with other messages.
private boolean processRetry()
{ {
boolean forceNow = mForceNow || exiting; Assert.assertTrue( !mForceNow || !BuildConfig.DEBUG );
byte[][] msgs = null != elem boolean handled = false;
? XwJNI.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
mPhone, mPort, forceNow, mWaitSecs )
: XwJNI.smsproto_prepOutbound( mPhone, mPort, forceNow, mWaitSecs );
if ( null != msgs ) { for ( Iterator<String> iter = mCachedDests.iterator();
sendBuffers( msgs ); iter.hasNext(); ) {
String[] portAndPhone = iter.next().split( "\0", 2 );
short port = Short.valueOf(portAndPhone[0]);
byte[][] msgs = XwJNI
.smsproto_prepOutbound( portAndPhone[1], port,
mForceNow, mWaitSecs );
if ( null != msgs ) {
sendBuffers( msgs, portAndPhone[1], port );
handled = true;
}
boolean needsRetry = mWaitSecs[0] > 0;
if ( !needsRetry ) {
iter.remove();
}
handled = handled || needsRetry;
} }
return null != msgs || mWaitSecs[0] > 0; return handled;
} }
private boolean process( QueueElem qelm, boolean exiting ) private boolean processSend( SendElem elem )
{
boolean forceNow = mForceNow;
byte[][] msgs = XwJNI
.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
elem.phone, elem.port, forceNow, mWaitSecs );
if ( null != msgs ) {
sendBuffers( msgs, elem.phone, elem.port );
}
boolean needsRetry = mWaitSecs[0] > 0;
if ( needsRetry ) {
cacheForRetry( elem );
}
return null != msgs || needsRetry;
}
private boolean process( QueueElem qelm )
{ {
boolean handled; boolean handled;
if ( null == qelm || qelm instanceof SendElem ) { if ( null == qelm ) {
handled = processSend( (SendElem)qelm, exiting ); handled = processRetry();
} else if ( qelm instanceof SendElem ) {
handled = processSend( (SendElem)qelm );
} else { } else {
handled = processReceive( (ReceiveElem)qelm, exiting ); handled = processReceive( (ReceiveElem)qelm );
} }
Log.d( TAG, "%s.process(%s) => %b", this, qelm, handled ); Log.d( TAG, "%s.process(%s) => %b", this, qelm, handled );
return handled; return handled;
@ -253,15 +273,17 @@ public class NBSProto {
return mHelper; return mHelper;
} }
private void receive( Context context, SMSProtoMsg msg ) private void receive( Context context, String phone, SMSProtoMsg msg )
{ {
Log.i( TAG, "receive(cmd=%s)", msg.cmd ); Log.i( TAG, "receive(cmd=%s)", msg.cmd );
switch( msg.cmd ) { switch( msg.cmd ) {
case INVITE: case INVITE:
makeForInvite( context, NetLaunchInfo.makeFrom( context, msg.data ) ); makeForInvite( context, phone,
NetLaunchInfo.makeFrom( context, msg.data ) );
break; break;
case DATA: case DATA:
if ( feedMessage( context, msg.gameID, msg.data, new CommsAddrRec( getPhone() ) ) ) { if ( feedMessage( context, msg.gameID, msg.data,
new CommsAddrRec( phone ) ) ) {
SMSResendReceiver.resetTimer( context ); SMSResendReceiver.resetTimer( context );
} }
break; break;
@ -292,28 +314,25 @@ public class NBSProto {
private void sendDiedPacket( Context context, String phone, int gameID ) private void sendDiedPacket( Context context, String phone, int gameID )
{ {
if ( !s_sentDied.contains( gameID ) ) { if ( !s_sentDied.contains( gameID ) ) {
getCurThread( phone ).addGameDied( context, gameID ); addGameDied( context, phone, gameID );
s_sentDied.add( gameID ); s_sentDied.add( gameID );
} }
} }
private void makeForInvite( Context context, NetLaunchInfo nli ) private void makeForInvite( Context context, String phone, NetLaunchInfo nli )
{ {
if ( nli != null ) { if ( nli != null ) {
getHelper().handleInvitation( nli, mPhone, DictFetchOwner.OWNER_SMS ); getHelper().handleInvitation( nli, phone, DictFetchOwner.OWNER_SMS );
getCurThread(mPhone).addAck( context, nli.gameID() ); addAck( context, phone, nli.gameID() );
} }
} }
private void sendBuffers( byte[][] fragments ) private void sendBuffers( byte[][] fragments, String phone, short port )
{ {
Context context = XWApp.getContext(); Context context = XWApp.getContext();
boolean success = false; boolean success = false;
if ( XWPrefs.getNBSEnabled( context ) ) { if ( XWPrefs.getNBSEnabled( context ) ) {
String phone = getPhone();
short port = getPort();
// Try send-to-self // Try send-to-self
if ( XWPrefs.getSMSToSelfEnabled( context ) ) { if ( XWPrefs.getSMSToSelfEnabled( context ) ) {
String myPhone = SMSPhoneInfo.get( context ).number; String myPhone = SMSPhoneInfo.get( context ).number;
@ -373,85 +392,94 @@ public class NBSProto {
return PendingIntent.getBroadcast( context, 0, intent, 0 ); return PendingIntent.getBroadcast( context, 0, intent, 0 );
} }
private void cacheForRetry( QueueElem elem )
static class QueueElem { {
Context context; String dest = elem.port + "\0" + elem.phone;
QueueElem( Context context ) { this.context = context; } mCachedDests.add( dest );
}
private static class SendElem extends QueueElem {
SMS_CMD cmd;
int gameID;
byte[] data;
SendElem( Context context, SMS_CMD cmd, int gameID, byte[] data ) {
super( context );
this.cmd = cmd;
this.gameID = gameID;
this.data = data;
}
SendElem( Context context, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, cmd, 0, nli.asByteArray() );
}
@Override
public String toString()
{
return String.format( "SendElem: {cmd: %s, dataLen: %d}", cmd,
data == null ? 0 : data.length );
}
}
private static class ReceiveElem extends QueueElem {
// One of these two will be set
byte[] data;
NetLaunchInfo nli;
ReceiveElem( Context context, byte[] data )
{
super( context );
this.data = data;
}
ReceiveElem( Context context, NetLaunchInfo nli )
{
super( context );
this.nli = nli;
}
@Override
public String toString()
{
return String.format( "ReceiveElem: {nli: %s, data: %s}", nli, data );
}
} }
} }
private static Map<String, NBSProtoThread> sThreadMap = new HashMap<>(); private static class QueueElem {
Context context;
String phone;
short port;
QueueElem( Context context, String phone, short port )
{
this.context = context;
this.phone = phone;
this.port = port;
}
private static NBSProtoThread getCurThread( String phone ) QueueElem( Context context, String phone )
{ {
return getCurThread( phone, getNBSPort() ); this( context, phone, getNBSPort() );
}
} }
private static NBSProtoThread getCurThread( String phone, short port ) private static class SendElem extends QueueElem {
SMS_CMD cmd;
int gameID;
byte[] data;
SendElem( Context context, String phone, SMS_CMD cmd, int gameID,
byte[] data ) {
super( context, phone );
this.cmd = cmd;
this.gameID = gameID;
this.data = data;
}
SendElem( Context context, String phone, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, phone, cmd, 0, nli.asByteArray() );
}
@Override
public String toString()
{
return String.format( "SendElem: {cmd: %s, dataLen: %d}", cmd,
data == null ? 0 : data.length );
}
}
private static class ReceiveElem extends QueueElem {
// One of these two will be set
byte[] data;
NetLaunchInfo nli;
ReceiveElem( Context context, String phone, short port, byte[] data )
{
super( context, phone, port );
this.data = data;
}
ReceiveElem( Context context, NetLaunchInfo nli )
{
super( context, nli.phone );
this.nli = nli;
}
@Override
public String toString()
{
return String.format( "ReceiveElem: {nli: %s, data: %s}", nli, data );
}
}
private static NBSProtoThread[] sThreadHolder = { null };
private static void startThreadOnce()
{ {
NBSProtoThread result = null; synchronized ( sThreadHolder ) {
synchronized ( sThreadMap ) { if ( sThreadHolder[0] == null ) {
result = sThreadMap.get( phone ); sThreadHolder[0] = new NBSProtoThread();
if ( result == null ) { sThreadHolder[0].start();
result = new NBSProtoThread( phone, port );
result.start();
sThreadMap.put( phone, result );
} }
} }
Assert.assertTrue( result.getPort() == port || !BuildConfig.DEBUG );
return result;
} }
private static void removeSelf( NBSProtoThread self ) private static void removeSelf( NBSProtoThread self )
{ {
synchronized ( sThreadMap ) { synchronized ( sThreadHolder ) {
String phone = self.getPhone(); if ( sThreadHolder[0] == self ) {
if ( sThreadMap.get(phone) == self ) { sThreadHolder[0] = null;
sThreadMap.remove( phone );
} }
} }
} }
@ -464,9 +492,9 @@ public class NBSProto {
} }
@Override @Override
public int sendViaSMS( byte[] buf, int gameID, CommsAddrRec addr ) public int sendViaSMS( byte[] buf, String msgID, int gameID, CommsAddrRec addr )
{ {
return sendPacket( mContext, addr.sms_phone, gameID, buf ); return sendPacket( mContext, addr.sms_phone, gameID, buf, msgID );
} }
} }
@ -496,10 +524,10 @@ public class NBSProto {
private static void stopCurThreads() private static void stopCurThreads()
{ {
synchronized( sThreadMap ) { synchronized( sThreadHolder ) {
for ( String phone : sThreadMap.keySet() ) { NBSProtoThread self = sThreadHolder[0];
// should cause them all to call removeSelf() soon if ( null != self ) {
sThreadMap.get( phone ).interrupt(); self.interrupt();
} }
} }
} }