make it a single thread

This commit is contained in:
Eric House 2019-04-02 15:26:18 -07:00
parent bade9f6b40
commit 71af4d3187
2 changed files with 277 additions and 243 deletions

View file

@ -1,6 +1,6 @@
/* -*- compile-command: "find-and-gradle.sh inXw4Deb"; -*- */
/*
* Copyright 2010 - 2018 by Eric House (xwords@eehouse.org). All rights
* Copyright 2010 - 2019 by Eric House (xwords@eehouse.org). All rights
* reserved.
*
* This program is free software; you can redistribute it and/or
@ -49,67 +49,110 @@ public class NBSProto {
private static final String MSG_SENT = "MSG_SENT";
private static final String MSG_DELIVERED = "MSG_DELIVERED";
private static final int TOAST_FREQ = 5;
private static Boolean s_showToasts;
private static int s_nReceived = 0;
private static int s_nSent = 0;
private static Set<Integer> s_sentDied = new HashSet<Integer>();
public static void handleFrom( Context context, byte[] buffer,
String phone, short port )
{
getCurReceiver( phone, port ).addPacket( context, buffer );
getCurThread( phone, port ).addPacketFrom( context, buffer );
if ( (0 == (++s_nReceived % TOAST_FREQ)) && showToasts( context ) ) {
DbgUtils.showf( context, "Got msg %d", s_nReceived );
}
}
public static void onGameDictDownload( Context context, Intent intentOld )
public static void onGameDictDownload( Context context, Intent oldIntent )
{
NetLaunchInfo nli = MultiService.getMissingDictData( context, oldIntent );
getCurThread( nli.phone ).addInviteFrom( context, nli );
}
public static void inviteRemote( Context context, String phone,
NetLaunchInfo nli )
{
getCurSender( phone ).addInvite( context, nli );
getCurThread( phone ).addInviteTo( context, nli );
}
public static int sendPacket( Context context, String phone,
int gameID, byte[] binmsg )
{
getCurSender( phone ).addPacket( context, gameID, binmsg );
getCurThread( phone ).addPacketTo( context, gameID, binmsg );
return binmsg.length;
}
public static void gameDied( Context context, int gameID, String phone )
{
getCurSender( phone ).addDeathNotice( context, gameID );
getCurThread( phone ).addGameDied( context, gameID );
}
public static void stopService( Context context )
public static void stopThreads()
{
Log.d( TAG, "stopService() does nothing" );
stopCurThreads();
}
private static boolean s_showToasts;
public static void smsToastEnable( boolean newVal )
{
s_showToasts = newVal;
}
static abstract class NBSProtoThread extends Thread {
static class NBSProtoThread extends Thread {
private String mPhone;
private short mPort;
private LinkedBlockingQueue<QueueElem> mQueue = new LinkedBlockingQueue<>();
private boolean mForceNow;
private int[] mWaitSecs = { 0 };
NBSProtoThread( String name, String phone, short port )
NBSProtoThread( String phone, short port )
{
super( name );
super( "NBSProtoThread" );
mPhone = phone;
mPort = port;
boolean newSMSEnabled = XWPrefs.getSMSProtoEnabled( XWApp.getContext() );
mForceNow = !newSMSEnabled;
}
String getPhone() { return mPhone; }
short getPort() { return mPort; }
abstract void removeSelf();
abstract void process( QueueElem elem, boolean exiting );
void addPacketFrom( Context context, byte[] data )
{
add( new ReceiveElem( context, data ) );
}
void add( QueueElem elem ) { mQueue.add( elem ); }
void addInviteFrom( Context context, NetLaunchInfo nli )
{
add( new ReceiveElem( context, nli ) );
}
void addPacketTo( Context context, int gameID, byte[] binmsg )
{
add( new SendElem( context, SMS_CMD.DATA, gameID, binmsg ) );
}
void addInviteTo( Context context, NetLaunchInfo nli )
{
add( new SendElem( context, SMS_CMD.INVITE, nli ) );
}
void addGameDied( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.DEATH, gameID, null ) );
}
void addAck( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.ACK_INVITE, gameID, null ) );
}
void removeSelf()
{
NBSProto.removeSelf( this );
}
@Override
public void run() {
@ -117,9 +160,14 @@ public class NBSProto {
while ( !isInterrupted() ) {
try {
QueueElem elem = mQueue.poll( 5, TimeUnit.MINUTES );
process( elem, false );
if ( null == elem ) {
// We want to time out quickly IFF there's a potential
// message combination going on, i.e. if mWaitSecs[0] was
// set by smsproto_prepOutbound(). Otherwise sleep until
// there's something in the queue.
long waitSecs = mWaitSecs[0] <= 0 ? 10 * 60 : mWaitSecs[0];
QueueElem elem = mQueue.poll( waitSecs, TimeUnit.SECONDS );
boolean handled = process( elem, false );
if ( /*null == elem && */!handled ) {
break;
}
} catch ( InterruptedException iex ) {
@ -128,7 +176,7 @@ public class NBSProto {
}
}
removeSelf();
removeSelf(); // should stop additions to the queue
// Now just empty out the queue, in case anything was added
// late. Note that if we're abandoning a half-assembled
@ -147,6 +195,66 @@ public class NBSProto {
Log.d( TAG, "%s.run() DONE", this );
}
private void add( QueueElem elem ) {
if ( XWPrefs.getNBSEnabled( elem.context ) ) {
mQueue.add( elem );
}
}
private boolean processReceive( ReceiveElem elem, boolean exiting )
{
if ( null != elem.data ) {
SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, mPhone, mPort );
if ( null != msgs ) {
Log.d( TAG, "got %d msgs combined!", msgs.length );
for ( int ii = 0; ii < msgs.length; ++ii ) {
Log.d( TAG, "%d: type: %s; len: %d", ii, msgs[ii].cmd, msgs[ii].data.length );
}
for ( SMSProtoMsg msg : msgs ) {
receive( elem.context, msg );
}
getHelper(elem.context).postEvent( MultiEvent.SMS_RECEIVE_OK );
} else {
Log.d( TAG, "receiveBuffer(): bogus or incomplete message from %s",
getPhone() );
}
}
if ( null != elem.nli ) {
makeForInvite( elem.context, elem.nli );
}
return true;
}
private boolean processSend( SendElem elem, boolean exiting )
{
byte[][] msgs;
boolean forceNow = mForceNow || exiting;
if ( null != elem ) {
msgs = XwJNI.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
mPhone, mPort, forceNow, mWaitSecs );
} else { // timed out
msgs = XwJNI.smsproto_prepOutbound( SMS_CMD.NONE, 0, null, mPhone,
mPort, forceNow, mWaitSecs );
}
if ( null != msgs ) {
sendBuffers( msgs );
}
return null != msgs || mWaitSecs[0] > 0;
}
private boolean process( QueueElem qelm, boolean exiting )
{
boolean handled;
if ( null == qelm || qelm instanceof SendElem ) {
handled = processSend( (SendElem)qelm, exiting );
} else {
handled = processReceive( (ReceiveElem)qelm, exiting );
}
Log.d( TAG, "%s.process(%s) => %b", this, qelm, handled );
return handled;
}
private SMSServiceHelper mHelper = null;
protected SMSServiceHelper getHelper( Context context )
{
@ -156,213 +264,6 @@ public class NBSProto {
return mHelper;
}
static class QueueElem {
Context context;
QueueElem( Context context ) { this.context = context; }
}
}
private static class SendThread extends NBSProtoThread {
private int[] mWaitSecs = { 0 };
private boolean mForceNow;
SendThread( String phone ) {
super( "SendThread", phone, getNBSPort() );
boolean newSMSEnabled = XWPrefs.getSMSProtoEnabled( XWApp.getContext() );
mForceNow = !newSMSEnabled;
}
void addPacket( Context context, int gameID, byte[] binmsg )
{
add( new SendElem( context, SMS_CMD.DATA, gameID, binmsg ) );
}
void addInvite( Context context, NetLaunchInfo nli )
{
add( new SendElem( context, SMS_CMD.INVITE, nli ) );
}
void addDeathNotice( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.DEATH, gameID, null ) );
}
void addAck( Context context, int gameID )
{
add( new SendElem( context, SMS_CMD.ACK_INVITE, gameID, null ) );
}
@Override
protected void removeSelf() { NBSProto.removeSelf( this ); }
@Override
protected void process( QueueElem qelm, boolean exiting )
{
SendElem elem = (SendElem)qelm;
byte[][] msgs;
boolean forceNow = mForceNow || exiting;
if ( null != elem ) {
msgs = XwJNI.smsproto_prepOutbound( elem.cmd, elem.gameID, elem.data,
getPhone(), getPort(), forceNow, mWaitSecs );
} else { // timed out
msgs = XwJNI.smsproto_prepOutbound( SMS_CMD.NONE, 0, null, getPhone(),
getPort(), forceNow, mWaitSecs );
}
if ( null != msgs ) {
sendBuffers( msgs );
}
// if ( mWaitSecs[0] <= 0 ) {
// mWaitSecs[0] = 5;
// }
}
private void sendBuffers( byte[][] fragments )
{
Context context = XWApp.getContext();
boolean success = false;
if ( XWPrefs.getNBSEnabled( context ) ) {
String phone = getPhone();
short port = getPort();
// Try send-to-self
if ( XWPrefs.getSMSToSelfEnabled( context ) ) {
String myPhone = SMSPhoneInfo.get( context ).number;
if ( null != myPhone
&& PhoneNumberUtils.compare( phone, myPhone ) ) {
for ( byte[] fragment : fragments ) {
handleFrom( context, fragment, phone, port );
}
success = true;
}
}
if ( !success ) {
try {
SmsManager mgr = SmsManager.getDefault();
boolean useProxy = Perms23.Perm.SEND_SMS.isBanned( context )
&& NBSProxy.isInstalled( context );
PendingIntent sent = useProxy ? null
: makeStatusIntent( context, MSG_SENT );
PendingIntent delivery = useProxy ? null
: makeStatusIntent( context, MSG_DELIVERED );
for ( byte[] fragment : fragments ) {
if ( useProxy ) {
NBSProxy.send( context, phone, port, fragment );
} else {
mgr.sendDataMessage( phone, null, port, fragment,
sent, delivery );
}
}
success = true;
} catch ( IllegalArgumentException iae ) {
Log.w( TAG, "sendBuffers(%s): %s", phone, iae.toString() );
} catch ( NullPointerException npe ) {
Assert.assertFalse( BuildConfig.DEBUG ); // shouldn't be trying to do this!!!
} catch ( java.lang.SecurityException se ) {
getHelper(context).postEvent( MultiEvent.SMS_SEND_FAILED_NOPERMISSION );
} catch ( Exception ee ) {
Log.ex( TAG, ee );
}
}
} else {
Log.i( TAG, "dropping because SMS disabled" );
}
// if ( showToasts( context ) && success && (0 == (++s_nSent % 5)) ) {
// DbgUtils.showf( context, "Sent msg %d", s_nSent );
// }
ConnStatusHandler.updateStatusOut( context, null,
CommsConnType.COMMS_CONN_SMS,
success );
}
private PendingIntent makeStatusIntent( Context context, String msg )
{
Intent intent = new Intent( msg );
return PendingIntent.getBroadcast( context, 0, intent, 0 );
}
private static class SendElem extends QueueElem {
SMS_CMD cmd;
int gameID;
byte[] data;
SendElem( Context context, SMS_CMD cmd, int gameID, byte[] data ) {
super( context );
this.cmd = cmd;
this.gameID = gameID;
this.data = data;
}
SendElem( Context context, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, cmd, 0, nli.asByteArray() );
}
}
}
private static Map<String, SendThread> sSendersMap = new HashMap<>();
private static SendThread getCurSender( String toPhone )
{
SendThread result = null;
synchronized ( sSendersMap ) {
result = sSendersMap.get( toPhone );
if ( result == null ) {
result = new SendThread( toPhone );
result.start();
sSendersMap.put( toPhone, result );
}
}
return result;
}
private static void removeSelf( SendThread self )
{
synchronized ( sSendersMap ) {
String phone = self.getPhone();
if ( sSendersMap.get(phone) == self ) {
sSendersMap.remove( phone );
}
}
}
private static class ReceiveThread extends NBSProtoThread {
private short mPort;
ReceiveThread( String fromPhone, short port ) {
super( "ReceiveThread", fromPhone, port );
}
void addPacket( Context context, byte[] data )
{
add( new ReceiveElem( context, data ) );
}
@Override
protected void removeSelf() { NBSProto.removeSelf( this ); }
@Override
protected void process( QueueElem qelem, boolean exiting )
{
ReceiveElem elem = (ReceiveElem)qelem;
SMSProtoMsg[] msgs = XwJNI.smsproto_prepInbound( elem.data, getPhone(), getPort() );
if ( null != msgs ) {
Log.d( TAG, "got %d msgs combined!", msgs.length );
for ( int ii = 0; ii < msgs.length; ++ii ) {
Log.d( TAG, "%d: type: %s; len: %d", ii, msgs[ii].cmd, msgs[ii].data.length );
}
for ( SMSProtoMsg msg : msgs ) {
receive( elem.context, msg );
}
getHelper(elem.context).postEvent( MultiEvent.SMS_RECEIVE_OK );
} else {
Log.d( TAG, "receiveBuffer(): bogus or incomplete message from %s",
getPhone() );
}
}
private void receive( Context context, SMSProtoMsg msg )
{
Log.i( TAG, "receive(cmd=%s)", msg.cmd );
@ -402,7 +303,7 @@ public class NBSProto {
private void sendDiedPacket( Context context, String phone, int gameID )
{
if ( !s_sentDied.contains( gameID ) ) {
getCurSender( phone ).addDeathNotice( context, gameID );
getCurThread( phone ).addGameDied( context, gameID );
// resendFor( phone, SMS_CMD.DEATH, gameID, null );
s_sentDied.add( gameID );
}
@ -411,45 +312,158 @@ public class NBSProto {
private void makeForInvite( Context context, NetLaunchInfo nli )
{
if ( nli != null ) {
getHelper(context).handleInvitation( nli, mPhone, DictFetchOwner.OWNER_SMS );
getCurThread(mPhone).addAck( context, nli.gameID() );
}
}
private void sendBuffers( byte[][] fragments )
{
Context context = XWApp.getContext();
boolean success = false;
if ( XWPrefs.getNBSEnabled( context ) ) {
String phone = getPhone();
getHelper(context).handleInvitation( nli, phone, DictFetchOwner.OWNER_SMS );
getCurSender( phone ).addAck( context, nli.gameID() );
short port = getPort();
// Try send-to-self
if ( XWPrefs.getSMSToSelfEnabled( context ) ) {
String myPhone = SMSPhoneInfo.get( context ).number;
if ( null != myPhone
&& PhoneNumberUtils.compare( phone, myPhone ) ) {
for ( byte[] fragment : fragments ) {
handleFrom( context, fragment, phone, port );
}
success = true;
}
}
if ( !success ) {
try {
SmsManager mgr = SmsManager.getDefault();
boolean useProxy = Perms23.Perm.SEND_SMS.isBanned( context )
&& NBSProxy.isInstalled( context );
PendingIntent sent = useProxy ? null
: makeStatusIntent( context, MSG_SENT );
PendingIntent delivery = useProxy ? null
: makeStatusIntent( context, MSG_DELIVERED );
for ( byte[] fragment : fragments ) {
if ( useProxy ) {
NBSProxy.send( context, phone, port, fragment );
} else {
mgr.sendDataMessage( phone, null, port, fragment,
sent, delivery );
}
Log.d( TAG, "sendBuffers(): sent fragment of len %d",
fragment.length );
}
success = true;
} catch ( IllegalArgumentException iae ) {
Log.w( TAG, "sendBuffers(%s): %s", phone, iae.toString() );
} catch ( NullPointerException npe ) {
Assert.assertFalse( BuildConfig.DEBUG ); // shouldn't be trying to do this!!!
} catch ( java.lang.SecurityException se ) {
getHelper(context).postEvent( MultiEvent.SMS_SEND_FAILED_NOPERMISSION );
} catch ( Exception ee ) {
Log.ex( TAG, ee );
}
}
} else {
Log.i( TAG, "dropping because SMS disabled" );
}
if ( success && (0 == (++s_nSent % TOAST_FREQ) && showToasts( context )) ) {
DbgUtils.showf( context, "Sent msg %d", s_nSent );
}
ConnStatusHandler.updateStatusOut( context, null,
CommsConnType.COMMS_CONN_SMS,
success );
}
private PendingIntent makeStatusIntent( Context context, String msg )
{
Intent intent = new Intent( msg );
return PendingIntent.getBroadcast( context, 0, intent, 0 );
}
static class QueueElem {
Context context;
QueueElem( Context context ) { this.context = context; }
}
private static class SendElem extends QueueElem {
SMS_CMD cmd;
int gameID;
byte[] data;
SendElem( Context context, SMS_CMD cmd, int gameID, byte[] data ) {
super( context );
this.cmd = cmd;
this.gameID = gameID;
this.data = data;
}
SendElem( Context context, SMS_CMD cmd, NetLaunchInfo nli ) {
this( context, cmd, 0, nli.asByteArray() );
}
@Override
public String toString()
{
return String.format( "SendElem: {cmd: %s, dataLen: %d}", cmd,
data == null ? 0 : data.length );
}
}
private static class ReceiveElem extends QueueElem {
byte[] data;
NetLaunchInfo nli;
ReceiveElem( Context context, byte[] data )
{
super( context );
this.data = data;
}
ReceiveElem( Context context, NetLaunchInfo nli )
{
super( context );
this.nli = nli;
}
@Override
public String toString()
{
return String.format( "ReceiveElem: {nli: %s, data: %s}", nli, data );
}
}
}
private static Map<String, ReceiveThread> sReceiversMap = new HashMap<>();
private static ReceiveThread getCurReceiver( String fromPhone, short port )
private static Map<String, NBSProtoThread> sThreadMap = new HashMap<>();
private static NBSProtoThread getCurThread( String phone )
{
ReceiveThread result = null;
synchronized ( sReceiversMap ) {
result = sReceiversMap.get( fromPhone );
return getCurThread( phone, getNBSPort() );
}
private static NBSProtoThread getCurThread( String phone, short port )
{
NBSProtoThread result = null;
synchronized ( sThreadMap ) {
result = sThreadMap.get( phone );
if ( result == null ) {
result = new ReceiveThread( fromPhone, port );
result = new NBSProtoThread( phone, port );
result.start();
sReceiversMap.put( fromPhone, result );
} else {
Assert.assertTrue( port == result.getPort() || !BuildConfig.DEBUG );
sThreadMap.put( phone, result );
}
}
Assert.assertTrue( result.getPort() == port || !BuildConfig.DEBUG );
return result;
}
private static void removeSelf( ReceiveThread self )
private static void removeSelf( NBSProtoThread self )
{
synchronized ( sReceiversMap ) {
synchronized ( sThreadMap ) {
String phone = self.getPhone();
if ( sReceiversMap.get(phone) == self ) {
sReceiversMap.remove( phone );
if ( sThreadMap.get(phone) == self ) {
sThreadMap.remove( phone );
}
}
}
@ -492,6 +506,16 @@ public class NBSProto {
}
}
private static void stopCurThreads()
{
synchronized( sThreadMap ) {
for ( String phone : sThreadMap.keySet() ) {
// should cause them all to call removeSelf() soon
sThreadMap.get( phone ).interrupt();
}
}
}
private static Short s_nbsPort = null;
private static short getNBSPort()
{
@ -501,4 +525,14 @@ public class NBSProto {
}
return s_nbsPort;
}
private static boolean showToasts( Context context )
{
if ( null == s_showToasts ) {
s_showToasts =
XWPrefs.getPrefsBoolean( context, R.string.key_show_sms, false );
}
boolean result = s_showToasts;
return result;
}
}

View file

@ -193,7 +193,7 @@ public class PrefsDelegate extends DelegateBase
break;
case R.string.key_enable_nbs:
if ( ! sp.getBoolean( key, true ) ) {
NBSProto.stopService( m_activity );
NBSProto.stopThreads();
}
break;
case R.string.key_download_path: