decouple networking threads from RelayService

Oreo's creating a new service instance for every single intent passed to
enqueueWork, meaning a brand new set of threads with a new queue, empty
set of messages to be retried, etc. was created every time, and all
attempts to optimize and retry were broken. So: make the threads a
static singleton that are given a Service instance from onCreate() and
told to drop it from onDestroy(). The threads proceed until they need an
instance, then block until one's available. Seems to work on Oreo and an
older Android as well.
This commit is contained in:
Eric House 2018-12-09 18:21:30 -08:00
parent 450c39ed57
commit 5284dce9b8

View file

@ -114,13 +114,14 @@ public class RelayService extends JobIntentService
static { resetBackoffTimer(); }
private Thread m_fetchThread = null; // no longer used
private AtomicReference<UDPThreads> m_UDPThreadsRef = new AtomicReference<>();
private static final AtomicReference<UDPThreads> sUDPThreadsRef = new AtomicReference<>();
private Handler m_handler;
private UDPThreads mThreads;
private Runnable m_onInactivity;
private int m_maxIntervalSeconds = 0;
private long m_lastGamePacketReceived;
private int m_nativeFailScore;
private boolean m_skipUPDSet;
private static AtomicInteger sNativeFailScore = new AtomicInteger();;
private static boolean sSkipUPDSet;
private RelayServiceHelper mHelper;
private static DevIDType s_curType = DevIDType.ID_TYPE_NONE;
private static long s_regStartTime = 0;
@ -358,7 +359,7 @@ public class RelayService extends JobIntentService
@Override
public void onCreate()
{
Log.d( TAG, "onCreate()" );
Log.d( TAG, "%s.onCreate()", this );
super.onCreate();
mHelper = new RelayServiceHelper( this );
m_lastGamePacketReceived =
@ -378,31 +379,20 @@ public class RelayService extends JobIntentService
}
}
};
m_skipUPDSet = XWPrefs.getSkipToWebAPI( this );
}
@Override
public int onStartCommand( Intent intent, int flags, int startId )
{
Log.d( TAG, "onStartCommand(%s)", intent );
super.onStartCommand( intent, flags, startId );
Integer result = handleCommand( intent );
if ( null == result ) {
result = Service.START_STICKY_COMPATIBILITY;
mThreads = startUDPThreadsOnce();
if ( null == mThreads ) {
stopSelf();
}
NetStateCache.register( this, this );
resetExitTimer();
return result;
sSkipUPDSet = XWPrefs.getSkipToWebAPI( this );
}
@Override
public void onHandleWork( Intent intent )
{
Log.d( TAG, "onHandleWork(%s)", intent );
/*void*/ handleCommand( intent );
DbgUtils.assertOnUIThread( false );
handleCommand( intent );
resetExitTimer();
}
@Override
@ -412,8 +402,11 @@ public class RelayService extends JobIntentService
long interval_millis = getMaxIntervalSeconds() * 1000;
RelayReceiver.setTimer( this, interval_millis );
}
stopThreads();
if ( null != mThreads ) {
mThreads.unsetService();
}
super.onDestroy();
Log.d( TAG, "%s.onDestroy() DONE", this );
}
// NetStateCache.StateChangedIf interface
@ -422,9 +415,8 @@ public class RelayService extends JobIntentService
startService( this ); // bad name: will *stop* threads too
}
private Integer handleCommand( Intent intent )
private void handleCommand( Intent intent )
{
Integer result = null;
MsgCmds cmd;
try {
cmd = MsgCmds.values()[intent.getIntExtra( CMD_STR, -1 )];
@ -432,7 +424,7 @@ public class RelayService extends JobIntentService
cmd = null;
}
if ( null != cmd ) {
Log.d( TAG, "onStartCommand(): cmd=%s", cmd.toString() );
Log.d( TAG, "handleCommand(): cmd=%s", cmd.toString() );
switch( cmd ) {
case PROCESS_GAME_MSGS:
String[] relayIDs = new String[1];
@ -470,7 +462,7 @@ public class RelayService extends JobIntentService
case SEND:
case RECEIVE:
case SENDNOCONN:
startUDPThreadsIfNot();
startUDPThreadsOnce();
long rowid = intent.getLongExtra( ROWID, -1 );
byte[] msg = intent.getByteArrayExtra( BINBUFFER );
if ( MsgCmds.SEND == cmd ) {
@ -483,7 +475,7 @@ public class RelayService extends JobIntentService
}
break;
case INVITE:
startUDPThreadsIfNot();
startUDPThreadsOnce();
srcDevID = intent.getIntExtra( DEV_ID_SRC, 0 );
int destDevID = intent.getIntExtra( DEV_ID_DEST, 0 );
String relayID = intent.getStringExtra( RELAY_ID );
@ -505,12 +497,9 @@ public class RelayService extends JobIntentService
stopSelf();
break;
default:
Assert.fail();
Assert.assertFalse( BuildConfig.DEBUG );
}
result = Service.START_STICKY;
}
return result;
}
private void setupNotifications( String[] relayIDs, BackMoveResult[] bmrs,
@ -553,6 +542,7 @@ public class RelayService extends JobIntentService
{
while ( null != m_fetchThread ) {
Log.w( TAG, "2: m_fetchThread NOT NULL; WHAT TO DO???" );
Assert.assertFalse( BuildConfig.DEBUG );
try {
Thread.sleep( 20 );
} catch( java.lang.InterruptedException ie ) {
@ -561,24 +551,38 @@ public class RelayService extends JobIntentService
}
}
private void startUDPThreadsIfNot()
private UDPThreads startUDPThreadsOnce()
{
UDPThreads threads = null;
if ( XWApp.UDP_ENABLED && relayEnabled( this ) ) {
synchronized ( m_UDPThreadsRef ) {
if ( null == m_UDPThreadsRef.get() ) {
UDPThreads threads = new UDPThreads();
m_UDPThreadsRef.set( threads );
synchronized ( sUDPThreadsRef ) {
threads = sUDPThreadsRef.get();
if ( null == threads ) {
threads = new UDPThreads();
sUDPThreadsRef.set( threads );
threads.start();
}
threads.setService( this );
}
} else {
Log.i( TAG, "startUDPThreadsIfNot(): UDP disabled" );
Log.i( TAG, "startUDPThreadsOnce(): UDP disabled" );
}
} // startUDPThreadsIfNot
return threads;
} // startUDPThreadsOnce
private boolean skipNativeSend()
private void stopUDPThreads()
{
boolean skip = m_nativeFailScore > UDP_FAIL_LIMIT || m_skipUPDSet;
synchronized ( sUDPThreadsRef ) {
UDPThreads threads = sUDPThreadsRef.getAndSet( null );
if ( null != threads ) {
threads.stop();
}
}
}
private static boolean skipNativeSend()
{
boolean skip = sNativeFailScore.get() > UDP_FAIL_LIMIT || sSkipUPDSet;
// Log.d( TAG, "skipNativeSend(score=%d)) => %b", m_nativeFailScore, skip );
return skip;
}
@ -615,14 +619,6 @@ public class RelayService extends JobIntentService
Log.d( TAG, "noteSent(fromUDP=%b): size after: %d", fromUDP, map.size() );
}
private void stopUDPThreadsIf()
{
UDPThreads threads = m_UDPThreadsRef.getAndSet( null );
if ( null != threads ) {
threads.stop();
}
}
// MIGHT BE Running on reader thread
private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP )
{
@ -687,7 +683,7 @@ public class RelayService extends JobIntentService
break;
case XWPDEV_UPGRADE:
intent = getIntentTo( this, MsgCmds.UPGRADE );
startService( intent );
startService( this, intent );
break;
case XWPDEV_GOTINVITE:
resetBackoff = true;
@ -700,7 +696,7 @@ public class RelayService extends JobIntentService
String asStr = nli.toString();
Log.d( TAG, "got invitation: %s", asStr );
intent.putExtra( NLI_DATA, asStr );
startService( intent );
startService( this, intent );
break;
case XWPDEV_ACK:
noteAck( vli2un( dis ), fromUDP );
@ -932,8 +928,7 @@ public class RelayService extends JobIntentService
private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd )
{
startUDPThreadsIfNot();
UDPThreads threads = m_UDPThreadsRef.get();
UDPThreads threads = startUDPThreadsOnce();
if ( threads != null ) {
threads.add( new PacketData( bas, cmd ) );
}
@ -1001,14 +996,111 @@ public class RelayService extends JobIntentService
}
}
private class UDPThreads {
private static class UDPThreads {
private DatagramSocket m_UDPSocket;
private LinkedBlockingQueue<PacketData> m_queue =
new LinkedBlockingQueue<PacketData>();
private Thread m_UDPReadThread;
private Thread m_UDPWriteThread;
private RelayService[] mServiceHolder = {null};
private AtomicInteger mHaveServiceCount = new AtomicInteger();
UDPThreads() {}
// Here's the problem: In the Oreo world, RelayService instances come
// and go, typically lasting only long enough to disptach a single
// onHandleWork() call. So the UDPThreads instance is now a static
// singleton that lasts a really long time, across the lives of
// hundreds of RelayService instances. It and its threads, however,
// have to make calls on RelayService instances. So I need a way to
// block them when there's no instance available AND to block
// onDestroy() returning until all references to the current instance
// have been forgotten.
//
// The solution is to set and clear the instance through
// [un]setService(). And to wrap the code inside that needs
// references in a AutoCloseable that when instantiated blocks when
// the instance is null. Thus when setService() is called with a new
// instance, all the requests for an instance are unblocked and each
// of them increments a counter. When the AutoCloseable's close() is
// called, the counter is decremented. unsetService() waits until that
// counter is 0 (and so the close() method calls notifyAll() when the
// counter's dropped to 0.)
//
// It works like this: the threads run. When they need a reference to
// the current RelayService they block until there's one
// available. They're able to get the reference as long as it's
// non-null, but once it's null they start to block again. Once the
// last that didn't block finishes with its instance unsetService()
// returns (and so onDestroy() can as well.) This clears it so the OS
// can create another RelayService instance that will eventually get
// passed to setService() again.
private class ServiceWrapper implements AutoCloseable {
private final RelayService mService;
ServiceWrapper( RelayService service ) { mService = service; }
RelayService get() { return mService; }
@Override
public void close()
{
synchronized ( mServiceHolder ) {
int newVal = mHaveServiceCount.decrementAndGet();
Log.d( TAG, "close(): count now %d", newVal );
if ( 0 == newVal ) {
mServiceHolder.notifyAll();
}
}
}
}
void setService( RelayService service )
{
synchronized ( mServiceHolder ) {
mServiceHolder[0] = service;
mServiceHolder.notifyAll();
}
}
// When service is cleared, DO NOT RETURN until all threads using the
// old value of the service have relinquished it. This is called from
// onDestroy(), and once onDestroy() had returned nobody should
// reference its RelayService instance again.
void unsetService()
{
Log.d( TAG, "unsetService()" );
synchronized ( mServiceHolder ) {
mServiceHolder[0] = null;
// Now block until the used count drops to 0
while ( mHaveServiceCount.get() > 0 ) {
try {
mServiceHolder.wait();
} catch (InterruptedException ie) {
Log.e( TAG, "wait() threw: %s", ie.getMessage() );
// Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
Log.d( TAG, "unsetService() DONE" );
}
// called by various threads that need to run
private ServiceWrapper acquireWrapper()
{
synchronized ( mServiceHolder ) {
while ( null == mServiceHolder[0] ) {
try {
mServiceHolder.wait();
} catch (InterruptedException ie) {
Log.e( TAG, "wait() threw: %s", ie.getMessage() );
// InterruptedException is how we get unblocked!
// Assert.assertFalse( BuildConfig.DEBUG );
}
}
int newVal = mHaveServiceCount.incrementAndGet();
Log.d( TAG, "acquireWrapper(): count now %d", newVal );
return new ServiceWrapper( mServiceHolder[0] );
}
}
void start()
{
@ -1025,8 +1117,10 @@ public class RelayService extends JobIntentService
new DatagramPacket( buf, buf.length );
try {
m_UDPSocket.receive( packet );
resetExitTimer();
gotPacket( packet );
try (ServiceWrapper wrapper = acquireWrapper() ) {
wrapper.get().resetExitTimer();
wrapper.get().gotPacket( packet );
}
} catch ( java.io.InterruptedIOException iioe ) {
// DbgUtils.logf( "FYI: udp receive timeout" );
} catch( java.io.IOException ioe ) {
@ -1052,8 +1146,12 @@ public class RelayService extends JobIntentService
private void connectSocket()
{
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
String host = XWPrefs.getDefaultRelayHost( RelayService.this );
int port;
String host;
try ( ServiceWrapper wrapper = acquireWrapper() ) {
port = XWPrefs.getDefaultRelayPort( wrapper.get() );
host = XWPrefs.getDefaultRelayHost( wrapper.get() );
}
try {
m_UDPSocket = new DatagramSocket();
m_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log
@ -1108,7 +1206,9 @@ public class RelayService extends JobIntentService
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
resetExitTimer();
try ( ServiceWrapper wrapper = acquireWrapper() ) {
wrapper.get().resetExitTimer();
}
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
@ -1135,47 +1235,52 @@ public class RelayService extends JobIntentService
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
int sentLen = 0;
if ( packets.size() > 0 ) {
HttpURLConnection conn = NetUtils.makeHttpRelayConn( RelayService.this, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet instanceof EOQPacketData );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
gotPacket( datum, false, false );
try ( ServiceWrapper wrapper = acquireWrapper() ) {
RelayService service = wrapper.get();
HttpURLConnection conn = NetUtils
.makeHttpRelayConn( service, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet instanceof EOQPacketData );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
service
.noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
service.gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( service, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
ConnStatusHandler.updateStatus( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
@ -1187,43 +1292,47 @@ public class RelayService extends JobIntentService
int sentLen = 0;
if ( packets.size() > 0 ) {
noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
try ( ServiceWrapper wrapper = acquireWrapper() ) {
RelayService service = wrapper.get();
service.noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
true );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( service, null,
CommsConnType.COMMS_CONN_RELAY,
true );
m_handler.post( new Runnable() {
public void run() {
stopUDPThreadsIf();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
service.m_handler.post( new Runnable() {
public void run() {
Assert.assertFalse( BuildConfig.DEBUG );
// stopUDPThreadsIf();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
}
}
ConnStatusHandler.updateStatus( service, null,
CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
}
ConnStatusHandler.updateStatus( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
}
return sentLen;
@ -1256,7 +1365,7 @@ public class RelayService extends JobIntentService
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
++m_nativeFailScore;
sNativeFailScore.incrementAndGet();
}
iter.remove();
}
@ -1433,7 +1542,7 @@ public class RelayService extends JobIntentService
// Log.d( TAG, "noteAck(fromUDP=%b): removed for id %d: %s",
// fromUDP, packetID, packet );
if ( fromUDP ) {
--m_nativeFailScore;
sNativeFailScore.decrementAndGet();
}
} else {
Log.w( TAG, "Weird: got ack %d but never sent", packetID );
@ -1462,6 +1571,7 @@ public class RelayService extends JobIntentService
// Called from any thread
private void resetExitTimer()
{
Log.d( TAG, "resetExitTimer()" );
m_handler.removeCallbacks( m_onInactivity );
// UDP socket's no good as a return address after several
@ -1477,10 +1587,11 @@ public class RelayService extends JobIntentService
stopThreads();
} else if ( XWApp.UDP_ENABLED ) {
stopFetchThreadIf();
startUDPThreadsIfNot();
startUDPThreadsOnce();
registerWithRelay();
} else {
stopUDPThreadsIf();
Assert.assertFalse( BuildConfig.DEBUG );
stopUDPThreads();
startFetchThreadIfNotUDP();
}
}
@ -1489,7 +1600,7 @@ public class RelayService extends JobIntentService
{
Log.d( TAG, "stopThreads()" );
stopFetchThreadIf();
stopUDPThreadsIf();
stopUDPThreads();
}
private static void un2vli( int nn, OutputStream os )
@ -1656,7 +1767,7 @@ public class RelayService extends JobIntentService
}
}
private class PacketData {
private static class PacketData {
public ByteArrayOutputStream m_bas;
public XWRelayReg m_cmd;
public byte[] m_header;
@ -1745,5 +1856,6 @@ public class RelayService extends JobIntentService
}
// Exits only to exist, so instanceof can distinguish
private class EOQPacketData extends PacketData {}
// Can this be replaced by a final Object tested with ==?
private static class EOQPacketData extends PacketData {}
}