fix relay interactions again

Eliminate the writer thread, since onHandleWork() is already getting
called on a thread and can process the outbound queue itself. And get
rid of reader thread needing to wait for an instance of the RelayService
to be available to process a packet (which was taking 5-10 seconds all
the time): just post received packets via an enqueueWork() call using
the app context that's always available.
This commit is contained in:
Eric House 2019-02-06 22:32:46 -08:00
parent 34b462b97c
commit 29661d2a7e

View file

@ -86,6 +86,7 @@ public class RelayService extends JobIntentService
private static final String TIMESTAMP = "TIMESTAMP"; private static final String TIMESTAMP = "TIMESTAMP";
private static enum MsgCmds { INVALID, private static enum MsgCmds { INVALID,
DO_WORK,
PROCESS_GAME_MSGS, PROCESS_GAME_MSGS,
PROCESS_DEV_MSGS, PROCESS_DEV_MSGS,
UDP_CHANGED, UDP_CHANGED,
@ -97,6 +98,7 @@ public class RelayService extends JobIntentService
UPGRADE, UPGRADE,
INVITE, INVITE,
GOT_INVITE, GOT_INVITE,
GOT_PACKET,
STOP, STOP,
} }
@ -110,6 +112,8 @@ public class RelayService extends JobIntentService
private static final String BINBUFFER = "BINBUFFER"; private static final String BINBUFFER = "BINBUFFER";
private static final String MSGNUM = "MSGNUM"; private static final String MSGNUM = "MSGNUM";
private static LinkedBlockingQueue<PacketData> s_queue =
new LinkedBlockingQueue<PacketData>();
private static List<PacketData> s_packetsSentUDP = new ArrayList<>(); private static List<PacketData> s_packetsSentUDP = new ArrayList<>();
private static List<PacketData> s_packetsSentWeb = new ArrayList<>(); private static List<PacketData> s_packetsSentWeb = new ArrayList<>();
private static final PacketData sEOQPacket = new PacketData(); private static final PacketData sEOQPacket = new PacketData();
@ -120,12 +124,15 @@ public class RelayService extends JobIntentService
new CommsAddrRec( CommsConnType.COMMS_CONN_RELAY ); new CommsAddrRec( CommsConnType.COMMS_CONN_RELAY );
private static int s_curBackoff; private static int s_curBackoff;
private static long s_curNextTimer; private static long s_curNextTimer;
private static DatagramSocket s_UDPSocket;
static { resetBackoffTimer(); } static { resetBackoffTimer(); }
private Thread m_fetchThread = null; // no longer used private Thread m_fetchThread = null; // no longer used
private static final AtomicReference<UDPThreads> sUDPThreadsRef = new AtomicReference<>(); private static final AtomicReference<UDPReadThread> sUDPReadThreadRef
= new AtomicReference<>();
private Handler m_handler; private Handler m_handler;
private UDPThreads mThreads; private UDPReadThread mReadThread;
private Runnable m_onInactivity; private Runnable m_onInactivity;
private int m_maxIntervalSeconds = 0; private int m_maxIntervalSeconds = 0;
private long m_lastGamePacketReceived; private long m_lastGamePacketReceived;
@ -140,7 +147,6 @@ public class RelayService extends JobIntentService
,XWPDEV_PROTO_VERSION_1 ,XWPDEV_PROTO_VERSION_1
}; };
// private static final int XWPDEV_NONE = 0;
// Must be kept in sync with eponymous enum in xwrelay.h // Must be kept in sync with eponymous enum in xwrelay.h
private enum XWRelayReg { XWPDEV_NONE, private enum XWRelayReg { XWPDEV_NONE,
@ -359,6 +365,7 @@ public class RelayService extends JobIntentService
{ {
Log.d( TAG, "%s.onCreate()", this ); Log.d( TAG, "%s.onCreate()", this );
super.onCreate(); super.onCreate();
mHelper = new RelayServiceHelper( this ); mHelper = new RelayServiceHelper( this );
m_lastGamePacketReceived = m_lastGamePacketReceived =
XWPrefs.getPrefsLong( this, R.string.key_last_packet, XWPrefs.getPrefsLong( this, R.string.key_last_packet,
@ -377,8 +384,8 @@ public class RelayService extends JobIntentService
} }
} }
}; };
mThreads = startUDPThreadsOnce(); mReadThread = startUDPReadThreadOnce();
if ( null == mThreads ) { if ( null == mReadThread ) {
stopSelf(); stopSelf();
} }
sSkipUPDSet = XWPrefs.getSkipToWebAPI( this ); sSkipUPDSet = XWPrefs.getSkipToWebAPI( this );
@ -389,12 +396,20 @@ public class RelayService extends JobIntentService
{ {
DbgUtils.assertOnUIThread( false ); DbgUtils.assertOnUIThread( false );
Log.d( TAG, "%s.onHandleWork(cmd=%s)", this, cmdFrom( intent ) ); Log.d( TAG, "%s.onHandleWork(cmd=%s)", this, cmdFrom( intent ) );
handleCommand( intent );
boolean goOn = mThreads.serviceQueue(); try {
if ( !goOn ) { connectSocketOnce(); // must not be on UI thread
Log.e( TAG, "onHandleWork(): need to exit... HELP!!!!" );
mThreads.killThreads(); handleCommand( intent );
boolean goOn = serviceQueue();
if ( !goOn ) {
Log.e( TAG, "onHandleWork(): need to exit... HELP!!!!" );
mReadThread.interrupt();
}
} catch (InterruptedException ie ) {
Log.e( TAG, "InterruptedException in onHandleWork(): %s",
ie.getMessage() );
} }
resetExitTimer(); resetExitTimer();
@ -406,17 +421,14 @@ public class RelayService extends JobIntentService
{ {
Log.d( TAG, "onDestroy() called" ); Log.d( TAG, "onDestroy() called" );
boolean startImmediately = false; if ( null != mReadThread ) {
if ( null != mThreads ) { mReadThread.unsetService();
startImmediately = 0 < mThreads.m_queue.size(); if ( 0 < s_queue.size() ) {
mThreads.unsetService(); enqueueWork( getIntentTo( this, MsgCmds.DO_WORK ) );
}
} }
if ( startImmediately ) { if ( shouldMaintainConnection() ) {
// Log.d( TAG, "onDestroy(): restarting: %d in queue",
// mThreads.m_queue.size() );
timerFired( this );
} else if ( shouldMaintainConnection() ) {
long interval_millis = getMaxIntervalSeconds() * 1000; long interval_millis = getMaxIntervalSeconds() * 1000;
RelayReceiver.setTimer( this, interval_millis ); RelayReceiver.setTimer( this, interval_millis );
Log.d( TAG, "onDestroy(): rescheduling in %d ms", Log.d( TAG, "onDestroy(): rescheduling in %d ms",
@ -472,6 +484,8 @@ public class RelayService extends JobIntentService
Log.d( TAG, "handleCommand(): cmd=%s (age=%dms)", cmd.toString(), Log.d( TAG, "handleCommand(): cmd=%s (age=%dms)", cmd.toString(),
System.currentTimeMillis() - timestamp ); System.currentTimeMillis() - timestamp );
switch( cmd ) { switch( cmd ) {
case DO_WORK: // exists only to launch service
break;
case PROCESS_GAME_MSGS: case PROCESS_GAME_MSGS:
String[] relayIDs = new String[1]; String[] relayIDs = new String[1];
relayIDs[0] = intent.getStringExtra( RELAY_ID ); relayIDs[0] = intent.getStringExtra( RELAY_ID );
@ -505,12 +519,16 @@ public class RelayService extends JobIntentService
= NetLaunchInfo.makeFrom( this, intent.getStringExtra(NLI_DATA) ); = NetLaunchInfo.makeFrom( this, intent.getStringExtra(NLI_DATA) );
receiveInvitation( srcDevID, nli ); receiveInvitation( srcDevID, nli );
break; break;
case GOT_PACKET:
byte[] msg = intent.getByteArrayExtra( BINBUFFER );
gotPacket( msg, false, true );
break;
case SEND: case SEND:
case RECEIVE: case RECEIVE:
case SENDNOCONN: case SENDNOCONN:
startUDPThreadsOnce(); startUDPReadThreadOnce();
long rowid = intent.getLongExtra( ROWID, -1 ); long rowid = intent.getLongExtra( ROWID, -1 );
byte[] msg = intent.getByteArrayExtra( BINBUFFER ); msg = intent.getByteArrayExtra( BINBUFFER );
if ( MsgCmds.SEND == cmd ) { if ( MsgCmds.SEND == cmd ) {
sendMessage( rowid, msg, timestamp ); sendMessage( rowid, msg, timestamp );
} else if ( MsgCmds.SENDNOCONN == cmd ) { } else if ( MsgCmds.SENDNOCONN == cmd ) {
@ -522,7 +540,7 @@ public class RelayService extends JobIntentService
} }
break; break;
case INVITE: case INVITE:
startUDPThreadsOnce(); startUDPReadThreadOnce();
srcDevID = intent.getIntExtra( DEV_ID_SRC, 0 ); srcDevID = intent.getIntExtra( DEV_ID_SRC, 0 );
int destDevID = intent.getIntExtra( DEV_ID_DEST, 0 ); int destDevID = intent.getIntExtra( DEV_ID_DEST, 0 );
String relayID = intent.getStringExtra( RELAY_ID ); String relayID = intent.getStringExtra( RELAY_ID );
@ -549,6 +567,229 @@ public class RelayService extends JobIntentService
} }
} }
private synchronized void connectSocketOnce() throws InterruptedException
{
if ( null == s_UDPSocket ) {
final RelayService service = this;
int port = XWPrefs.getDefaultRelayPort( service );
String host = XWPrefs.getDefaultRelayHost( service );
try {
s_UDPSocket = new DatagramSocket();
s_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log
InetAddress addr = InetAddress.getByName( host );
s_UDPSocket.connect( addr, port ); // remember this address
Log.d( TAG, "connectSocket(%s:%d): s_UDPSocket now %H",
host, port, s_UDPSocket );
} catch( java.net.SocketException se ) {
Log.ex( TAG, se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
Log.ex( TAG, uhe );
Assert.assertFalse( BuildConfig.DEBUG );
}
} else {
Assert.assertTrue( s_UDPSocket.isConnected() );
Log.i( TAG, "s_UDPSocket not null" );
}
}
private boolean serviceQueue()
{
Log.d( TAG, "serviceQueue()" );
boolean shouldGoOn = true;
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 1000;
Log.d( TAG, "blocking %dms on poll()", ts );
for ( outData = s_queue.poll( ts, TimeUnit.MILLISECONDS );
null != outData;
outData = s_queue.poll() ) { // doesn't block
Log.d( TAG, "removed packet from queue (%d left): %s",
s_queue.size(), outData );
if ( outData == sEOQPacket ) {
shouldGoOn = false;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add( outData );
} else {
dataListUDP.add( outData );
}
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
shouldGoOn = false;
}
Log.d( TAG, "serviceQueue() => %b", shouldGoOn );
return shouldGoOn;
}
private long m_lastRunMS = 0;
private void runUDPAckTimer()
{
long nowMS = System.currentTimeMillis();
if ( 0 == m_lastRunMS ) {
m_lastRunMS = nowMS;
}
long age = nowMS - m_lastRunMS;
if ( age < 3000 ) { // never more frequently than 3 sec.
// Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" );
} else {
m_lastRunMS = nowMS;
long minSentMS = nowMS - 10000; // 10 seconds ago
long prevSentMS = 0;
List<PacketData> forResend = new ArrayList<>();
boolean foundNonAck = false;
synchronized ( s_packetsSentUDP ) {
for ( Iterator<PacketData> iter = s_packetsSentUDP.iterator();
iter.hasNext(); ) {
PacketData packet = iter.next();
long sentMS = packet.getSentMS();
Assert.assertTrue( prevSentMS <= sentMS );
prevSentMS = sentMS;
if ( sentMS > minSentMS ) {
break;
}
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
sNativeFailScore.incrementAndGet();
}
iter.remove();
}
Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining",
s_packetsSentUDP.size() );
}
if ( foundNonAck ) {
Log.d( TAG, "runUDPAckTimer(): reposting %d packets",
forResend.size() );
s_queue.addAll( forResend );
}
}
}
private int sendViaWeb( List<PacketData> packets ) throws InterruptedException
{
int sentLen = 0;
if ( packets.size() > 0 ) {
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
final RelayService service = this;
HttpsURLConnection conn = NetUtils
.makeHttpsRelayConn( service, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet == sEOQPacket );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
service
.noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
service.gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( service, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
return sentLen;
}
private int sendViaUDP( List<PacketData> packets ) throws InterruptedException
{
int sentLen = 0;
if ( packets.size() > 0 ) {
Log.d( TAG, "sendViaUDP(): sending %d at once", packets.size() );
final RelayService service = this;
service.noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
s_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( service, null,
CommsConnType.COMMS_CONN_RELAY,
true );
service.m_handler.post( new Runnable() {
public void run() {
service.stopUDPReadThread();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.e( TAG, "sendViaUDP(): failure \"%s\" sending on %s",
s_UDPSocket, ioe.getMessage() );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
}
}
ConnStatusHandler.updateStatus( service, null,
CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
Log.d( TAG, "sendViaUDP(): sent %d bytes (%d packets)",
sentLen, packets.size() );
}
return sentLen;
}
private void setupNotifications( String[] relayIDs, BackMoveResult[] bmrs, private void setupNotifications( String[] relayIDs, BackMoveResult[] bmrs,
ArrayList<Boolean> locals ) ArrayList<Boolean> locals )
{ {
@ -598,33 +839,33 @@ public class RelayService extends JobIntentService
} }
} }
private UDPThreads startUDPThreadsOnce() private UDPReadThread startUDPReadThreadOnce()
{ {
UDPThreads threads = null; UDPReadThread thread = null;
if ( BuildConfig.UDP_ENABLED && relayEnabled( this ) ) { if ( BuildConfig.UDP_ENABLED && relayEnabled( this ) ) {
synchronized ( sUDPThreadsRef ) { synchronized ( sUDPReadThreadRef ) {
threads = sUDPThreadsRef.get(); thread = sUDPReadThreadRef.get();
if ( null == threads ) { if ( null == thread ) {
threads = new UDPThreads(); thread = new UDPReadThread();
sUDPThreadsRef.set( threads ); sUDPReadThreadRef.set( thread );
threads.start(); thread.start();
} else { } else {
Assert.assertTrue( null != threads.m_UDPSocket || !BuildConfig.DEBUG ); Assert.assertTrue( null != s_UDPSocket || !BuildConfig.DEBUG );
} }
threads.setService( this ); thread.setService( this );
} }
} else { } else {
Log.i( TAG, "startUDPThreadsOnce(): UDP disabled" ); Log.i( TAG, "startUDPThreadsOnce(): UDP disabled" );
} }
return threads; return thread;
} // startUDPThreadsOnce } // startUDPThreadsOnce
private void stopUDPThreads() private void stopUDPReadThread()
{ {
synchronized ( sUDPThreadsRef ) { synchronized ( sUDPReadThreadRef ) {
UDPThreads threads = sUDPThreadsRef.getAndSet( null ); UDPReadThread thread = sUDPReadThreadRef.getAndSet( null );
if ( null != threads ) { if ( null != thread ) {
threads.stop(); thread.interrupt();
} }
} }
} }
@ -731,7 +972,7 @@ public class RelayService extends JobIntentService
break; break;
case XWPDEV_UPGRADE: case XWPDEV_UPGRADE:
intent = getIntentTo( this, MsgCmds.UPGRADE ); intent = getIntentTo( this, MsgCmds.UPGRADE );
enqueueWork( this, intent ); enqueueWork( intent );
break; break;
case XWPDEV_GOTINVITE: case XWPDEV_GOTINVITE:
resetBackoff = true; resetBackoff = true;
@ -744,7 +985,7 @@ public class RelayService extends JobIntentService
intent = getIntentTo( this, MsgCmds.GOT_INVITE ) intent = getIntentTo( this, MsgCmds.GOT_INVITE )
.putExtra( INVITE_FROM, srcDevID ) .putExtra( INVITE_FROM, srcDevID )
.putExtra( NLI_DATA, asStr ); .putExtra( NLI_DATA, asStr );
enqueueWork( this, intent ); enqueueWork( intent );
break; break;
case XWPDEV_ACK: case XWPDEV_ACK:
noteAck( vli2un( dis ), fromUDP ); noteAck( vli2un( dis ), fromUDP );
@ -768,6 +1009,11 @@ public class RelayService extends JobIntentService
} }
} // gotPacket() } // gotPacket()
private void enqueueWork( Intent intent )
{
enqueueWork( this, intent );
}
private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP ) private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP )
{ {
gotPacket( data, skipAck, fromUDP, -1 ); gotPacket( data, skipAck, fromUDP, -1 );
@ -985,13 +1231,10 @@ public class RelayService extends JobIntentService
private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd, private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd,
long timestamp ) long timestamp )
{ {
UDPThreads threads = startUDPThreadsOnce(); PacketData packet = new PacketData( bas, cmd, timestamp );
if ( threads != null ) { s_queue.add( packet );
PacketData packet = new PacketData( bas, cmd, timestamp ); Log.d( TAG, "postPacket(%s); (now %d in queue)", packet,
threads.add( packet ); s_queue.size() );
Log.d( TAG, "postPacket(%s); (now %d in queue)", packet,
threads.m_queue.size() );
}
} }
private String getDevID( DevIDType[] typp ) private String getDevID( DevIDType[] typp )
@ -1054,12 +1297,7 @@ public class RelayService extends JobIntentService
} }
} }
private static class UDPThreads { private static class UDPReadThread extends Thread {
private DatagramSocket m_UDPSocket;
private LinkedBlockingQueue<PacketData> m_queue =
new LinkedBlockingQueue<PacketData>();
private Thread m_UDPReadThread;
private Thread m_UDPWriteThread;
private RelayService[] mServiceHolder = {null}; private RelayService[] mServiceHolder = {null};
void setService( RelayService service ) void setService( RelayService service )
@ -1101,312 +1339,52 @@ public class RelayService extends JobIntentService
} }
} }
void start() @Override
{ public void run() {
m_UDPReadThread = new Thread( null, new Runnable() { Context context = XWApp.getContext();
@Override
public void run() {
try {
connectSocket(); // block until this is done
startWriteThread();
Log.i( TAG, "read thread running" );
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
m_UDPSocket.receive( packet );
final RelayService service = getService();
service.resetExitTimer();
service.gotPacket( packet );
} catch ( java.io.InterruptedIOException iioe ) {
// Log.d( TAG, "iioe from receive(): %s", iioe.getMessage() );
} catch( java.io.IOException ioe ) {
Log.d( TAG, "ioe from receive(): %s/%s", ioe.getMessage() );
Assert.assertFalse( BuildConfig.DEBUG );
break;
}
}
} catch ( InterruptedException ie ) {
Log.d( TAG, "exiting on interrupt: %s",
ie.getMessage() );
}
Log.i( TAG, "read thread exiting" );
}
}, getClass().getName() );
m_UDPReadThread.start();
}
void stop()
{
m_queue.add( sEOQPacket ); // will kill the writer thread
}
void add( PacketData packet )
{
m_queue.add( packet );
}
private void connectSocket() throws InterruptedException
{
if ( null == m_UDPSocket ) {
final RelayService service = getService();
int port = XWPrefs.getDefaultRelayPort( service );
String host = XWPrefs.getDefaultRelayHost( service );
try {
m_UDPSocket = new DatagramSocket();
m_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // remember this address
Log.d( TAG, "connectSocket(%s:%d): m_UDPSocket now %H",
host, port, m_UDPSocket );
} catch( java.net.SocketException se ) {
Log.ex( TAG, se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
Log.ex( TAG, uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
Log.i( TAG, "m_UDPSocket not null" );
}
}
// Break out the work of sending queued packets so it can be called
// from several threads. Fixes starvation problems on some devices.
private synchronized boolean serviceQueue()
{
Log.d( TAG, "serviceQueue()" );
boolean shouldGoOn = true;
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try { try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 1000; if ( null == s_UDPSocket ) {
Log.d( TAG, "blocking %dms on poll()", ts ); getService().connectSocketOnce(); // block until this is done
for ( outData = m_queue.poll( ts, TimeUnit.MILLISECONDS );
null != outData;
outData = m_queue.poll() ) { // doesn't block
Log.d( TAG, "removed packet from queue (%d left): %s",
m_queue.size(), outData );
if ( outData == sEOQPacket ) {
shouldGoOn = false;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add( outData );
} else {
dataListUDP.add( outData );
}
} }
sendViaWeb( dataListWeb ); Log.i( TAG, "%s.run() starting", this );
sendViaUDP( dataListUDP ); byte[] buf = new byte[1024];
for ( ; ; ) {
getService().resetExitTimer(); DatagramPacket packet =
new DatagramPacket( buf, buf.length );
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
shouldGoOn = false;
}
Log.d( TAG, "serviceQueue() => %b", shouldGoOn );
return shouldGoOn;
}
private void startWriteThread()
{
Assert.assertNull( m_UDPWriteThread );
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
Log.i( TAG, "write thread starting" );
for ( ; ; ) {
if ( !serviceQueue() ) {
break;
}
}
Log.i( TAG, "write thread killing read thread" );
killThreads();
// now kill the read thread
}
}, getClass().getName() );
m_UDPWriteThread.start();
}
private void killThreads()
{
m_UDPSocket.close();
try {
m_UDPReadThread.join();
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
m_UDPSocket = null;
Log.i( TAG, "write thread exiting (with %d in queue)",
m_queue.size() );
}
private int sendViaWeb( List<PacketData> packets ) throws InterruptedException
{
int sentLen = 0;
if ( packets.size() > 0 ) {
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
final RelayService service = getService();
HttpsURLConnection conn = NetUtils
.makeHttpsRelayConn( service, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try { try {
JSONArray dataArray = new JSONArray(); s_UDPSocket.receive( packet );
for ( PacketData packet : packets ) { postGotPacket( context, packet );
Assert.assertFalse( packet == sEOQPacket ); // final RelayService service = getService();
byte[] datum = packet.assemble(); // service.resetExitTimer();
dataArray.put( Utils.base64Encode(datum) ); // service.gotPacket( packet );
sentLen += datum.length; } catch ( java.io.InterruptedIOException iioe ) {
} // Log.d( TAG, "iioe from receive(): %s", iioe.getMessage() );
JSONObject params = new JSONObject(); } catch( java.io.IOException ioe ) {
params.put( "data", dataArray ); Log.d( TAG, "ioe from receive(): %s/%s", ioe.getMessage() );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
service
.noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
service.gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( service, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
return sentLen;
}
private int sendViaUDP( List<PacketData> packets ) throws InterruptedException
{
int sentLen = 0;
if ( packets.size() > 0 ) {
Log.d( TAG, "sendViaUDP(): sending %d at once", packets.size() );
final RelayService service = getService();
service.noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( service, null,
CommsConnType.COMMS_CONN_RELAY,
true );
service.m_handler.post( new Runnable() {
public void run() {
service.stopUDPThreads();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break; break;
} }
} }
ConnStatusHandler.updateStatus( service, null, } catch ( InterruptedException ie ) {
CommsConnType.COMMS_CONN_RELAY, Log.d( TAG, "exiting on interrupt: %s",
sentLen > 0 ); ie.getMessage() );
Log.d( TAG, "sendViaUDP(): sent %d bytes (%d packets)",
sentLen, packets.size() );
} }
Log.i( TAG, "%s.run() exiting", this );
return sentLen;
} }
private long m_lastRunMS = 0; private void postGotPacket( Context context, DatagramPacket packet )
private void runUDPAckTimer()
{ {
long nowMS = System.currentTimeMillis(); Log.d( TAG, "postGotPacket()" );
if ( 0 == m_lastRunMS ) { int packetLen = packet.getLength();
m_lastRunMS = nowMS; byte[] data = new byte[packetLen];
} System.arraycopy( packet.getData(), 0, data, 0, packetLen );
long age = nowMS - m_lastRunMS;
if ( age < 3000 ) { // never more frequently than 3 sec.
// Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" );
} else {
m_lastRunMS = nowMS;
long minSentMS = nowMS - 10000; // 10 seconds ago Intent intent = getIntentTo( context, MsgCmds.GOT_PACKET )
long prevSentMS = 0; .putExtra( BINBUFFER, data );
List<PacketData> forResend = new ArrayList<>(); enqueueWork( context, intent );
boolean foundNonAck = false;
synchronized ( s_packetsSentUDP ) {
for ( Iterator<PacketData> iter = s_packetsSentUDP.iterator();
iter.hasNext(); ) {
PacketData packet = iter.next();
long sentMS = packet.getSentMS();
Assert.assertTrue( prevSentMS <= sentMS );
prevSentMS = sentMS;
if ( sentMS > minSentMS ) {
break;
}
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
sNativeFailScore.incrementAndGet();
}
iter.remove();
}
Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining",
s_packetsSentUDP.size() );
}
if ( foundNonAck ) {
Log.d( TAG, "runUDPAckTimer(): reposting %d packets",
forResend.size() );
m_queue.addAll( forResend );
}
}
} }
} }
private static class AsyncSender extends Thread { private static class AsyncSender extends Thread {
@ -1616,11 +1594,11 @@ public class RelayService extends JobIntentService
stopThreads(); stopThreads();
} else if ( BuildConfig.UDP_ENABLED ) { } else if ( BuildConfig.UDP_ENABLED ) {
stopFetchThreadIf(); stopFetchThreadIf();
startUDPThreadsOnce(); startUDPReadThreadOnce();
registerWithRelay( -1 ); registerWithRelay( -1 );
} else { } else {
Assert.assertFalse( BuildConfig.DEBUG ); Assert.assertFalse( BuildConfig.DEBUG );
stopUDPThreads(); stopUDPReadThread();
startFetchThreadIfNotUDP(); startFetchThreadIfNotUDP();
} }
} }
@ -1629,7 +1607,7 @@ public class RelayService extends JobIntentService
{ {
Log.d( TAG, "stopThreads()" ); Log.d( TAG, "stopThreads()" );
stopFetchThreadIf(); stopFetchThreadIf();
stopUDPThreads(); stopUDPReadThread();
} }
private static void un2vli( int nn, OutputStream os ) private static void un2vli( int nn, OutputStream os )