use a single queue for all nbsproto threads

No point in worrying about whether a thread exists without emptying its
queue. Instead have a thread start when something's added to the queue
and exit when it's been empty for a while. Only trick is the need to
remember the phone number[s] on behalf of which a retry's necessary
because jni code is waiting to see if combining can be done.
This commit is contained in:
Eric House 2019-04-09 11:45:32 -07:00
parent 8c2582b9a2
commit 993fff8e13
3 changed files with 202 additions and 174 deletions

View file

@ -428,7 +428,7 @@ public class CommsTransport implements TransportProcs,
break;
case COMMS_CONN_SMS:
nSent = NBSProto.sendPacket( context, addr.sms_phone,
gameID, buf );
gameID, buf, msgID );
break;
case COMMS_CONN_BT:
nSent = BTService.sendPacket( context, buf, msgID, addr, gameID );

View file

@ -68,9 +68,9 @@ public class MultiMsgSink implements TransportProcs {
return BTService.sendPacket( m_context, buf, msgID, addr, gameID );
}
public int sendViaSMS( byte[] buf, int gameID, CommsAddrRec addr )
public int sendViaSMS( byte[] buf, String msgID, int gameID, CommsAddrRec addr )
{
return NBSProto.sendPacket( m_context, addr.sms_phone, gameID, buf );
return NBSProto.sendPacket( m_context, addr.sms_phone, gameID, buf, msgID );
}
public int sendViaP2P( byte[] buf, int gameID, CommsAddrRec addr )
@ -101,7 +101,7 @@ public class MultiMsgSink implements TransportProcs {
nSent = sendViaBluetooth( buf, msgID, gameID, addr );
break;
case COMMS_CONN_SMS:
nSent = sendViaSMS( buf, gameID, addr );
nSent = sendViaSMS( buf, msgID, gameID, addr );
break;
case COMMS_CONN_P2P:
nSent = sendViaP2P( buf, gameID, addr );

View file

@ -28,6 +28,7 @@ import android.telephony.SmsManager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
@ -60,7 +61,7 @@ public class NBSProto {
public static void handleFrom( Context context, byte[] buffer,
String phone, short port )
{
getCurThread( phone, port ).addPacketFrom( context, buffer );
addPacketFrom( context, phone, port, buffer );
if ( (0 == (++s_nReceived % TOAST_FREQ)) && showToasts( context ) ) {
DbgUtils.showf( context, "Got msg %d", s_nReceived );
}
@ -69,25 +70,27 @@ public class NBSProto {
public static void inviteRemote( Context context, String phone,
NetLaunchInfo nli )
{
getCurThread( phone ).addInviteTo( context, nli );
addInviteTo( context, phone, nli );
}
public static int sendPacket( Context context, String phone,
int gameID, byte[] binmsg )
int gameID, byte[] binmsg, String msgID )
{
getCurThread( phone ).addPacketTo( context, gameID, binmsg );
Log.d( TAG, "sendPacket(phone=%s, gameID=%d, len=%d, msgID=%s)",
phone, gameID, binmsg.length, msgID );
addPacketTo( context, phone, gameID, binmsg );
return binmsg.length;
}
public static void gameDied( Context context, int gameID, String phone )
{
getCurThread( phone ).addGameDied( context, gameID );
addGameDied( context, phone, gameID );
}
public static void onGameDictDownload( Context context, Intent oldIntent )
{
NetLaunchInfo nli = MultiService.getMissingDictData( context, oldIntent );
getCurThread( nli.phone ).addInviteFrom( context, nli );
addInviteFrom( context, nli );
}
public static void stopThreads()
@ -100,57 +103,61 @@ public class NBSProto {
s_showToasts = newVal;
}
static class NBSProtoThread extends Thread {
private String mPhone;
private short mPort;
private LinkedBlockingQueue<QueueElem> mQueue = new LinkedBlockingQueue<>();
private boolean mForceNow;
private int[] mWaitSecs = { 0 };
NBSProtoThread( String phone, short port )
private static void addPacketFrom( Context context, String phone,
short port, byte[] data )
{
super( "NBSProtoThread" );
mPhone = phone;
mPort = port;
mForceNow = !XWPrefs.getSMSProtoEnabled( XWApp.getContext() );
add( new ReceiveElem( context, phone, port, data ) );
}
private String getPhone() { return mPhone; }
private short getPort() { return mPort; }
void addPacketFrom( Context context, byte[] data )
{
add( new ReceiveElem( context, data ) );
}
void addInviteFrom( Context context, NetLaunchInfo nli )
private static void addInviteFrom( Context context, NetLaunchInfo nli )
{
add( new ReceiveElem( context, nli ) );
}
void addPacketTo( Context context, int gameID, byte[] binmsg )
private static void addPacketTo( Context context, String phone,
int gameID, byte[] binmsg )
{
add( new SendElem( context, SMS_CMD.DATA, gameID, binmsg ) );
add( new SendElem( context, phone, SMS_CMD.DATA, gameID, binmsg ) );
}
void addInviteTo( Context context, NetLaunchInfo nli )
private static void addInviteTo( Context context, String phone, NetLaunchInfo nli )
{
add( new SendElem( context, SMS_CMD.INVITE, nli ) );
add( new SendElem( context, phone, SMS_CMD.INVITE, nli ) );
}
void addGameDied( Context context, int gameID )
private static void addGameDied( Context context, String phone, int gameID )
{
add( new SendElem( context, SMS_CMD.DEATH, gameID, null ) );
add( new SendElem( context, phone, SMS_CMD.DEATH, gameID, null ) );
}
void addAck( Context context, int gameID )
private static void addAck( Context context, String phone, int gameID )
{
add( new SendElem( context, SMS_CMD.ACK_INVITE, gameID, null ) );
add( new SendElem( context, phone, SMS_CMD.ACK_INVITE, gameID, null ) );
}
private static void add( QueueElem elem ) {
if ( XWPrefs.getNBSEnabled( elem.context ) ) {
sQueue.add( elem );
startThreadOnce();
}
}
private static LinkedBlockingQueue<QueueElem> sQueue = new LinkedBlockingQueue<>();
static class NBSProtoThread extends Thread {
private boolean mForceNow;
private int[] mWaitSecs = { 0 };
private Set<String> mCachedDests = new HashSet<>();
NBSProtoThread()
{
super( "NBSProtoThread" );
mForceNow = !XWPrefs.getSMSProtoEnabled( XWApp.getContext() );
}
@Override
public void run() {
Log.d( TAG, "%s.run() starting for %s", this, mPhone );
Log.d( TAG, "%s.run() starting", this );
while ( !isInterrupted() ) {
try {
@ -159,8 +166,8 @@ public class NBSProto {
// set by smsproto_prepOutbound(). Otherwise sleep until
// there's something in the queue.
long waitSecs = mWaitSecs[0] <= 0 ? 10 * 60 : mWaitSecs[0];
QueueElem elem = mQueue.poll( waitSecs, TimeUnit.SECONDS );
if ( !process( elem, false ) ) {
QueueElem elem = sQueue.poll( waitSecs, TimeUnit.SECONDS );
if ( !process( elem ) ) {
break;
}
} catch ( InterruptedException iex ) {
@ -169,76 +176,89 @@ public class NBSProto {
}
}
removeSelf( this ); // should stop additions to the queue
removeSelf( this );
// Now just empty out the queue, in case anything was added
// late. Note that if we're abandoning a half-assembled
// multi-message buffer that data's lost until a higher level
// resends. So don't let that happen.
for ( ; ; ) {
QueueElem elem = mQueue.poll();
process( elem, true );
if ( null == elem ) {
break;
}
}
// Better not be leaving anything behind!
Assert.assertTrue( 0 == mQueue.size() || !BuildConfig.DEBUG );
Log.d( TAG, "%s.run() DONE", this );
}
private void add( QueueElem elem ) {
if ( XWPrefs.getNBSEnabled( elem.context ) ) {
mQueue.add( elem );
}
}
private boolean processReceive( ReceiveElem elem, boolean exiting )
private boolean processReceive( ReceiveElem elem )
{
if ( null != elem.data ) {
SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, mPhone, mPort );
SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, elem.phone, elem.port );
if ( null != msgs ) {
Log.d( TAG, "got %d msgs combined!", msgs.length );
for ( int ii = 0; ii < msgs.length; ++ii ) {
Log.d( TAG, "%d: type: %s; len: %d", ii, msgs[ii].cmd, msgs[ii].data.length );
}
for ( SMSProtoMsg msg : msgs ) {
receive( elem.context, msg );
receive( elem.context, elem.phone, msg );
}
getHelper().postEvent( MultiEvent.SMS_RECEIVE_OK );
} else {
Log.d( TAG, "receiveBuffer(): bogus or incomplete message from %s",
getPhone() );
elem.phone );
}
}
if ( null != elem.nli ) {
makeForInvite( elem.context, elem.nli );
makeForInvite( elem.context, elem.phone, elem.nli );
}
return true;
}
private boolean processSend( SendElem elem, boolean exiting )
// Called when we have nothing to add, but might be sending what's
// already waiting for possible combination with other messages.
private boolean processRetry()
{
boolean forceNow = mForceNow || exiting;
byte[][] msgs = null != elem
? XwJNI.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
mPhone, mPort, forceNow, mWaitSecs )
: XwJNI.smsproto_prepOutbound( mPhone, mPort, forceNow, mWaitSecs );
Assert.assertTrue( !mForceNow || !BuildConfig.DEBUG );
boolean handled = false;
for ( Iterator<String> iter = mCachedDests.iterator();
iter.hasNext(); ) {
String[] portAndPhone = iter.next().split( "\0", 2 );
short port = Short.valueOf(portAndPhone[0]);
byte[][] msgs = XwJNI
.smsproto_prepOutbound( portAndPhone[1], port,
mForceNow, mWaitSecs );
if ( null != msgs ) {
sendBuffers( msgs );
sendBuffers( msgs, portAndPhone[1], port );
handled = true;
}
return null != msgs || mWaitSecs[0] > 0;
boolean needsRetry = mWaitSecs[0] > 0;
if ( !needsRetry ) {
iter.remove();
}
handled = handled || needsRetry;
}
return handled;
}
private boolean process( QueueElem qelm, boolean exiting )
private boolean processSend( SendElem elem )
{
boolean forceNow = mForceNow;
byte[][] msgs = XwJNI
.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
elem.phone, elem.port, forceNow, mWaitSecs );
if ( null != msgs ) {
sendBuffers( msgs, elem.phone, elem.port );
}
boolean needsRetry = mWaitSecs[0] > 0;
if ( needsRetry ) {
cacheForRetry( elem );
}
return null != msgs || needsRetry;
}
private boolean process( QueueElem qelm )
{
boolean handled;
if ( null == qelm || qelm instanceof SendElem ) {
handled = processSend( (SendElem)qelm, exiting );
if ( null == qelm ) {
handled = processRetry();
} else if ( qelm instanceof SendElem ) {
handled = processSend( (SendElem)qelm );
} else {
handled = processReceive( (ReceiveElem)qelm, exiting );
handled = processReceive( (ReceiveElem)qelm );
}
Log.d( TAG, "%s.process(%s) => %b", this, qelm, handled );
return handled;
@ -253,15 +273,17 @@ public class NBSProto {
return mHelper;
}
private void receive( Context context, SMSProtoMsg msg )
private void receive( Context context, String phone, SMSProtoMsg msg )
{
Log.i( TAG, "receive(cmd=%s)", msg.cmd );
switch( msg.cmd ) {
case INVITE:
makeForInvite( context, NetLaunchInfo.makeFrom( context, msg.data ) );
makeForInvite( context, phone,
NetLaunchInfo.makeFrom( context, msg.data ) );
break;
case DATA:
if ( feedMessage( context, msg.gameID, msg.data, new CommsAddrRec( getPhone() ) ) ) {
if ( feedMessage( context, msg.gameID, msg.data,
new CommsAddrRec( phone ) ) ) {
SMSResendReceiver.resetTimer( context );
}
break;
@ -292,28 +314,25 @@ public class NBSProto {
private void sendDiedPacket( Context context, String phone, int gameID )
{
if ( !s_sentDied.contains( gameID ) ) {
getCurThread( phone ).addGameDied( context, gameID );
addGameDied( context, phone, gameID );
s_sentDied.add( gameID );
}
}
private void makeForInvite( Context context, NetLaunchInfo nli )
private void makeForInvite( Context context, String phone, NetLaunchInfo nli )
{
if ( nli != null ) {
getHelper().handleInvitation( nli, mPhone, DictFetchOwner.OWNER_SMS );
getCurThread(mPhone).addAck( context, nli.gameID() );
getHelper().handleInvitation( nli, phone, DictFetchOwner.OWNER_SMS );
addAck( context, phone, nli.gameID() );
}
}
private void sendBuffers( byte[][] fragments )
private void sendBuffers( byte[][] fragments, String phone, short port )
{
Context context = XWApp.getContext();
boolean success = false;
if ( XWPrefs.getNBSEnabled( context ) ) {
String phone = getPhone();
short port = getPort();
// Try send-to-self
if ( XWPrefs.getSMSToSelfEnabled( context ) ) {
String myPhone = SMSPhoneInfo.get( context ).number;
@ -373,24 +392,43 @@ public class NBSProto {
return PendingIntent.getBroadcast( context, 0, intent, 0 );
}
private void cacheForRetry( QueueElem elem )
{
String dest = elem.port + "\0" + elem.phone;
mCachedDests.add( dest );
}
}
static class QueueElem {
private static class QueueElem {
Context context;
QueueElem( Context context ) { this.context = context; }
String phone;
short port;
QueueElem( Context context, String phone, short port )
{
this.context = context;
this.phone = phone;
this.port = port;
}
QueueElem( Context context, String phone )
{
this( context, phone, getNBSPort() );
}
}
private static class SendElem extends QueueElem {
SMS_CMD cmd;
int gameID;
byte[] data;
SendElem( Context context, SMS_CMD cmd, int gameID, byte[] data ) {
super( context );
SendElem( Context context, String phone, SMS_CMD cmd, int gameID,
byte[] data ) {
super( context, phone );
this.cmd = cmd;
this.gameID = gameID;
this.data = data;
}
SendElem( Context context, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, cmd, 0, nli.asByteArray() );
SendElem( Context context, String phone, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, phone, cmd, 0, nli.asByteArray() );
}
@Override
@ -405,14 +443,16 @@ public class NBSProto {
// One of these two will be set
byte[] data;
NetLaunchInfo nli;
ReceiveElem( Context context, byte[] data )
ReceiveElem( Context context, String phone, short port, byte[] data )
{
super( context );
super( context, phone, port );
this.data = data;
}
ReceiveElem( Context context, NetLaunchInfo nli )
{
super( context );
super( context, nli.phone );
this.nli = nli;
}
@ -422,36 +462,24 @@ public class NBSProto {
return String.format( "ReceiveElem: {nli: %s, data: %s}", nli, data );
}
}
}
private static Map<String, NBSProtoThread> sThreadMap = new HashMap<>();
private static NBSProtoThread[] sThreadHolder = { null };
private static NBSProtoThread getCurThread( String phone )
private static void startThreadOnce()
{
return getCurThread( phone, getNBSPort() );
}
private static NBSProtoThread getCurThread( String phone, short port )
{
NBSProtoThread result = null;
synchronized ( sThreadMap ) {
result = sThreadMap.get( phone );
if ( result == null ) {
result = new NBSProtoThread( phone, port );
result.start();
sThreadMap.put( phone, result );
synchronized ( sThreadHolder ) {
if ( sThreadHolder[0] == null ) {
sThreadHolder[0] = new NBSProtoThread();
sThreadHolder[0].start();
}
}
Assert.assertTrue( result.getPort() == port || !BuildConfig.DEBUG );
return result;
}
private static void removeSelf( NBSProtoThread self )
{
synchronized ( sThreadMap ) {
String phone = self.getPhone();
if ( sThreadMap.get(phone) == self ) {
sThreadMap.remove( phone );
synchronized ( sThreadHolder ) {
if ( sThreadHolder[0] == self ) {
sThreadHolder[0] = null;
}
}
}
@ -464,9 +492,9 @@ public class NBSProto {
}
@Override
public int sendViaSMS( byte[] buf, int gameID, CommsAddrRec addr )
public int sendViaSMS( byte[] buf, String msgID, int gameID, CommsAddrRec addr )
{
return sendPacket( mContext, addr.sms_phone, gameID, buf );
return sendPacket( mContext, addr.sms_phone, gameID, buf, msgID );
}
}
@ -496,10 +524,10 @@ public class NBSProto {
private static void stopCurThreads()
{
synchronized( sThreadMap ) {
for ( String phone : sThreadMap.keySet() ) {
// should cause them all to call removeSelf() soon
sThreadMap.get( phone ).interrupt();
synchronized( sThreadHolder ) {
NBSProtoThread self = sThreadHolder[0];
if ( null != self ) {
self.interrupt();
}
}
}