ensure writer gets run every time OS schedules via onHandleWork()

This commit is contained in:
Eric House 2019-02-05 09:58:39 -08:00 committed by Eric House
parent 130487f7df
commit 37085b0ba1

View file

@ -389,6 +389,13 @@ public class RelayService extends JobIntentService
DbgUtils.assertOnUIThread( false );
Log.d( TAG, "%s.onHandleWork(cmd=%s)", this, cmdFrom( intent ) );
handleCommand( intent );
boolean goOn = mThreads.serviceQueue();
if ( !goOn ) {
Log.e( TAG, "onHandleWork(): need to exit... HELP!!!!" );
mThreads.killThreads();
}
resetExitTimer();
Log.d( TAG, "%s.onHandleWork(cmd=%s) DONE", this, cmdFrom( intent ) );
}
@ -405,6 +412,8 @@ public class RelayService extends JobIntentService
}
if ( startImmediately ) {
// Log.d( TAG, "onDestroy(): restarting: %d in queue",
// mThreads.m_queue.size() );
timerFired( this );
} else if ( shouldMaintainConnection() ) {
long interval_millis = getMaxIntervalSeconds() * 1000;
@ -1068,9 +1077,12 @@ public class RelayService extends JobIntentService
// continue to use the instance for a short while after returning.
void unsetService()
{
// RelayService oldService;
synchronized ( mServiceHolder ) {
// oldService = mServiceHolder[0];
mServiceHolder[0] = null;
}
// Log.d( TAG, "unsetService() DONE (was %s)", oldService );
}
private RelayService getService() throws InterruptedException
@ -1163,6 +1175,49 @@ public class RelayService extends JobIntentService
}
}
// Break out the work of sending queued packets so it can be called
// from several threads. Fixes starvation problems on some devices.
private synchronized boolean serviceQueue()
{
Log.d( TAG, "serviceQueue()" );
boolean shouldGoOn = true;
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 1000;
Log.d( TAG, "blocking %dms on poll()", ts );
for ( outData = m_queue.poll( ts, TimeUnit.MILLISECONDS );
null != outData;
outData = m_queue.poll() ) { // doesn't block
Log.d( TAG, "removed packet from queue (%d left): %s",
m_queue.size(), outData );
if ( outData == sEOQPacket ) {
shouldGoOn = false;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add( outData );
} else {
dataListUDP.add( outData );
}
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
getService().resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
shouldGoOn = false;
}
Log.d( TAG, "serviceQueue() => %b", shouldGoOn );
return shouldGoOn;
}
private void startWriteThread()
{
Assert.assertNull( m_UDPWriteThread );
@ -1170,59 +1225,34 @@ public class RelayService extends JobIntentService
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
Log.i( TAG, "write thread starting" );
for ( boolean gotEOQ = false; !gotEOQ; ) {
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600;
Log.d( TAG, "blocking %d sec on poll()", ts );
for ( outData = m_queue.poll(ts, TimeUnit.SECONDS);
null != outData;
outData = m_queue.poll() ) { // doesn't block
Log.d( TAG, "removed packet from queue: %s", outData );
if ( outData == sEOQPacket ) {
gotEOQ = true;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add( outData );
} else {
dataListUDP.add( outData );
}
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
getService().resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
for ( ; ; ) {
if ( !serviceQueue() ) {
break;
}
}
Log.i( TAG, "write thread killing read thread" );
killThreads();
// now kill the read thread
m_UDPSocket.close();
try {
m_UDPReadThread.join();
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
m_UDPSocket = null;
Log.i( TAG, "write thread exiting (with %d in queue)",
m_queue.size() );
}
}, getClass().getName() );
m_UDPWriteThread.start();
}
private void killThreads()
{
m_UDPSocket.close();
try {
m_UDPReadThread.join();
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
m_UDPSocket = null;
Log.i( TAG, "write thread exiting (with %d in queue)",
m_queue.size() );
}
private int sendViaWeb( List<PacketData> packets ) throws InterruptedException
{
int sentLen = 0;