diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java index 1fa4f6a98..94fec7fc1 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java @@ -86,6 +86,7 @@ public class RelayService extends JobIntentService private static final String TIMESTAMP = "TIMESTAMP"; private static enum MsgCmds { INVALID, + DO_WORK, PROCESS_GAME_MSGS, PROCESS_DEV_MSGS, UDP_CHANGED, @@ -97,6 +98,7 @@ public class RelayService extends JobIntentService UPGRADE, INVITE, GOT_INVITE, + GOT_PACKET, STOP, } @@ -110,6 +112,8 @@ public class RelayService extends JobIntentService private static final String BINBUFFER = "BINBUFFER"; private static final String MSGNUM = "MSGNUM"; + private static LinkedBlockingQueue s_queue = + new LinkedBlockingQueue(); private static List s_packetsSentUDP = new ArrayList<>(); private static List s_packetsSentWeb = new ArrayList<>(); private static final PacketData sEOQPacket = new PacketData(); @@ -120,12 +124,15 @@ public class RelayService extends JobIntentService new CommsAddrRec( CommsConnType.COMMS_CONN_RELAY ); private static int s_curBackoff; private static long s_curNextTimer; + private static DatagramSocket s_UDPSocket; + static { resetBackoffTimer(); } private Thread m_fetchThread = null; // no longer used - private static final AtomicReference sUDPThreadsRef = new AtomicReference<>(); + private static final AtomicReference sUDPReadThreadRef + = new AtomicReference<>(); private Handler m_handler; - private UDPThreads mThreads; + private UDPReadThread mReadThread; private Runnable m_onInactivity; private int m_maxIntervalSeconds = 0; private long m_lastGamePacketReceived; @@ -140,7 +147,6 @@ public class RelayService extends JobIntentService ,XWPDEV_PROTO_VERSION_1 }; - // private static final int XWPDEV_NONE = 0; // Must be kept in sync with eponymous enum in xwrelay.h private enum XWRelayReg { XWPDEV_NONE, @@ -359,6 +365,7 @@ public class RelayService extends JobIntentService { Log.d( TAG, "%s.onCreate()", this ); super.onCreate(); + mHelper = new RelayServiceHelper( this ); m_lastGamePacketReceived = XWPrefs.getPrefsLong( this, R.string.key_last_packet, @@ -377,8 +384,8 @@ public class RelayService extends JobIntentService } } }; - mThreads = startUDPThreadsOnce(); - if ( null == mThreads ) { + mReadThread = startUDPReadThreadOnce(); + if ( null == mReadThread ) { stopSelf(); } sSkipUPDSet = XWPrefs.getSkipToWebAPI( this ); @@ -389,12 +396,20 @@ public class RelayService extends JobIntentService { DbgUtils.assertOnUIThread( false ); Log.d( TAG, "%s.onHandleWork(cmd=%s)", this, cmdFrom( intent ) ); - handleCommand( intent ); - boolean goOn = mThreads.serviceQueue(); - if ( !goOn ) { - Log.e( TAG, "onHandleWork(): need to exit... HELP!!!!" ); - mThreads.killThreads(); + try { + connectSocketOnce(); // must not be on UI thread + + handleCommand( intent ); + + boolean goOn = serviceQueue(); + if ( !goOn ) { + Log.e( TAG, "onHandleWork(): need to exit... HELP!!!!" ); + mReadThread.interrupt(); + } + } catch (InterruptedException ie ) { + Log.e( TAG, "InterruptedException in onHandleWork(): %s", + ie.getMessage() ); } resetExitTimer(); @@ -406,17 +421,14 @@ public class RelayService extends JobIntentService { Log.d( TAG, "onDestroy() called" ); - boolean startImmediately = false; - if ( null != mThreads ) { - startImmediately = 0 < mThreads.m_queue.size(); - mThreads.unsetService(); + if ( null != mReadThread ) { + mReadThread.unsetService(); + if ( 0 < s_queue.size() ) { + enqueueWork( getIntentTo( this, MsgCmds.DO_WORK ) ); + } } - if ( startImmediately ) { - // Log.d( TAG, "onDestroy(): restarting: %d in queue", - // mThreads.m_queue.size() ); - timerFired( this ); - } else if ( shouldMaintainConnection() ) { + if ( shouldMaintainConnection() ) { long interval_millis = getMaxIntervalSeconds() * 1000; RelayReceiver.setTimer( this, interval_millis ); Log.d( TAG, "onDestroy(): rescheduling in %d ms", @@ -472,6 +484,8 @@ public class RelayService extends JobIntentService Log.d( TAG, "handleCommand(): cmd=%s (age=%dms)", cmd.toString(), System.currentTimeMillis() - timestamp ); switch( cmd ) { + case DO_WORK: // exists only to launch service + break; case PROCESS_GAME_MSGS: String[] relayIDs = new String[1]; relayIDs[0] = intent.getStringExtra( RELAY_ID ); @@ -505,12 +519,16 @@ public class RelayService extends JobIntentService = NetLaunchInfo.makeFrom( this, intent.getStringExtra(NLI_DATA) ); receiveInvitation( srcDevID, nli ); break; + case GOT_PACKET: + byte[] msg = intent.getByteArrayExtra( BINBUFFER ); + gotPacket( msg, false, true ); + break; case SEND: case RECEIVE: case SENDNOCONN: - startUDPThreadsOnce(); + startUDPReadThreadOnce(); long rowid = intent.getLongExtra( ROWID, -1 ); - byte[] msg = intent.getByteArrayExtra( BINBUFFER ); + msg = intent.getByteArrayExtra( BINBUFFER ); if ( MsgCmds.SEND == cmd ) { sendMessage( rowid, msg, timestamp ); } else if ( MsgCmds.SENDNOCONN == cmd ) { @@ -522,7 +540,7 @@ public class RelayService extends JobIntentService } break; case INVITE: - startUDPThreadsOnce(); + startUDPReadThreadOnce(); srcDevID = intent.getIntExtra( DEV_ID_SRC, 0 ); int destDevID = intent.getIntExtra( DEV_ID_DEST, 0 ); String relayID = intent.getStringExtra( RELAY_ID ); @@ -549,6 +567,229 @@ public class RelayService extends JobIntentService } } + private synchronized void connectSocketOnce() throws InterruptedException + { + if ( null == s_UDPSocket ) { + final RelayService service = this; + int port = XWPrefs.getDefaultRelayPort( service ); + String host = XWPrefs.getDefaultRelayHost( service ); + + try { + s_UDPSocket = new DatagramSocket(); + s_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log + + InetAddress addr = InetAddress.getByName( host ); + s_UDPSocket.connect( addr, port ); // remember this address + Log.d( TAG, "connectSocket(%s:%d): s_UDPSocket now %H", + host, port, s_UDPSocket ); + } catch( java.net.SocketException se ) { + Log.ex( TAG, se ); + Assert.fail(); + } catch( java.net.UnknownHostException uhe ) { + Log.ex( TAG, uhe ); + Assert.assertFalse( BuildConfig.DEBUG ); + } + } else { + Assert.assertTrue( s_UDPSocket.isConnected() ); + Log.i( TAG, "s_UDPSocket not null" ); + } + } + + private boolean serviceQueue() + { + Log.d( TAG, "serviceQueue()" ); + boolean shouldGoOn = true; + List dataListUDP = new ArrayList<>(); + List dataListWeb = new ArrayList<>(); + PacketData outData; + try { + long ts = s_packetsSentUDP.size() > 0 ? 10 : 1000; + Log.d( TAG, "blocking %dms on poll()", ts ); + for ( outData = s_queue.poll( ts, TimeUnit.MILLISECONDS ); + null != outData; + outData = s_queue.poll() ) { // doesn't block + Log.d( TAG, "removed packet from queue (%d left): %s", + s_queue.size(), outData ); + if ( outData == sEOQPacket ) { + shouldGoOn = false; + break; + } else if ( skipNativeSend() || outData.getForWeb() ) { + dataListWeb.add( outData ); + } else { + dataListUDP.add( outData ); + } + } + + sendViaWeb( dataListWeb ); + sendViaUDP( dataListUDP ); + + resetExitTimer(); + + runUDPAckTimer(); + + ConnStatusHandler.showSuccessOut(); + } catch ( InterruptedException ie ) { + Log.w( TAG, "write thread killed" ); + shouldGoOn = false; + } + Log.d( TAG, "serviceQueue() => %b", shouldGoOn ); + return shouldGoOn; + } + + private long m_lastRunMS = 0; + private void runUDPAckTimer() + { + long nowMS = System.currentTimeMillis(); + if ( 0 == m_lastRunMS ) { + m_lastRunMS = nowMS; + } + long age = nowMS - m_lastRunMS; + if ( age < 3000 ) { // never more frequently than 3 sec. + // Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" ); + } else { + m_lastRunMS = nowMS; + + long minSentMS = nowMS - 10000; // 10 seconds ago + long prevSentMS = 0; + List forResend = new ArrayList<>(); + boolean foundNonAck = false; + synchronized ( s_packetsSentUDP ) { + for ( Iterator iter = s_packetsSentUDP.iterator(); + iter.hasNext(); ) { + PacketData packet = iter.next(); + long sentMS = packet.getSentMS(); + Assert.assertTrue( prevSentMS <= sentMS ); + prevSentMS = sentMS; + if ( sentMS > minSentMS ) { + break; + } + + forResend.add( packet ); + if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { + foundNonAck = true; + sNativeFailScore.incrementAndGet(); + } + iter.remove(); + } + Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining", + s_packetsSentUDP.size() ); + } + if ( foundNonAck ) { + Log.d( TAG, "runUDPAckTimer(): reposting %d packets", + forResend.size() ); + s_queue.addAll( forResend ); + } + } + } + + private int sendViaWeb( List packets ) throws InterruptedException + { + int sentLen = 0; + if ( packets.size() > 0 ) { + Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() ); + + final RelayService service = this; + HttpsURLConnection conn = NetUtils + .makeHttpsRelayConn( service, "post" ); + if ( null == conn ) { + Log.e( TAG, "sendViaWeb(): null conn for POST" ); + } else { + try { + JSONArray dataArray = new JSONArray(); + for ( PacketData packet : packets ) { + Assert.assertFalse( packet == sEOQPacket ); + byte[] datum = packet.assemble(); + dataArray.put( Utils.base64Encode(datum) ); + sentLen += datum.length; + } + JSONObject params = new JSONObject(); + params.put( "data", dataArray ); + + String result = NetUtils.runConn( conn, params ); + boolean succeeded = null != result; + if ( succeeded ) { + Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result ); + JSONObject resultObj = new JSONObject( result ); + JSONArray resData = resultObj.getJSONArray( "data" ); + int nReplies = resData.length(); + // Log.d( TAG, "sendViaWeb(): got %d replies", nReplies ); + + service + .noteSent( packets, false ); // before we process the acks below :-) + + for ( int ii = 0; ii < nReplies; ++ii ) { + byte[] datum = Utils.base64Decode( resData.getString( ii ) ); + // PENDING: skip ack or not + service.gotPacket( datum, false, false ); + } + } else { + Log.e( TAG, "sendViaWeb(): failed result for POST" ); + + } + + ConnStatusHandler.updateStatus( service, null, + CommsConnType.COMMS_CONN_RELAY, + succeeded ); + } catch ( JSONException ex ) { + Assert.assertFalse( BuildConfig.DEBUG ); + } + } + } + return sentLen; + } + + private int sendViaUDP( List packets ) throws InterruptedException + { + int sentLen = 0; + + if ( packets.size() > 0 ) { + Log.d( TAG, "sendViaUDP(): sending %d at once", packets.size() ); + final RelayService service = this; + service.noteSent( packets, true ); + for ( PacketData packet : packets ) { + boolean getOut = true; + byte[] data = packet.assemble(); + try { + DatagramPacket udpPacket = new DatagramPacket( data, data.length ); + s_UDPSocket.send( udpPacket ); + + sentLen += udpPacket.getLength(); + // packet.setSentMS( nowMS ); + getOut = false; + } catch ( java.net.SocketException se ) { + Log.ex( TAG, se ); + Log.i( TAG, "Restarting threads to force new socket" ); + ConnStatusHandler.updateStatusOut( service, null, + CommsConnType.COMMS_CONN_RELAY, + true ); + + service.m_handler.post( new Runnable() { + public void run() { + service.stopUDPReadThread(); + } + } ); + break; + } catch ( java.io.IOException ioe ) { + Log.e( TAG, "sendViaUDP(): failure \"%s\" sending on %s", + s_UDPSocket, ioe.getMessage() ); + } catch ( NullPointerException npe ) { + Log.w( TAG, "network problem; dropping packet" ); + } + if ( getOut ) { + break; + } + } + + ConnStatusHandler.updateStatus( service, null, + CommsConnType.COMMS_CONN_RELAY, + sentLen > 0 ); + Log.d( TAG, "sendViaUDP(): sent %d bytes (%d packets)", + sentLen, packets.size() ); + } + + return sentLen; + } + private void setupNotifications( String[] relayIDs, BackMoveResult[] bmrs, ArrayList locals ) { @@ -598,33 +839,33 @@ public class RelayService extends JobIntentService } } - private UDPThreads startUDPThreadsOnce() + private UDPReadThread startUDPReadThreadOnce() { - UDPThreads threads = null; + UDPReadThread thread = null; if ( BuildConfig.UDP_ENABLED && relayEnabled( this ) ) { - synchronized ( sUDPThreadsRef ) { - threads = sUDPThreadsRef.get(); - if ( null == threads ) { - threads = new UDPThreads(); - sUDPThreadsRef.set( threads ); - threads.start(); + synchronized ( sUDPReadThreadRef ) { + thread = sUDPReadThreadRef.get(); + if ( null == thread ) { + thread = new UDPReadThread(); + sUDPReadThreadRef.set( thread ); + thread.start(); } else { - Assert.assertTrue( null != threads.m_UDPSocket || !BuildConfig.DEBUG ); + Assert.assertTrue( null != s_UDPSocket || !BuildConfig.DEBUG ); } - threads.setService( this ); + thread.setService( this ); } } else { Log.i( TAG, "startUDPThreadsOnce(): UDP disabled" ); } - return threads; + return thread; } // startUDPThreadsOnce - private void stopUDPThreads() + private void stopUDPReadThread() { - synchronized ( sUDPThreadsRef ) { - UDPThreads threads = sUDPThreadsRef.getAndSet( null ); - if ( null != threads ) { - threads.stop(); + synchronized ( sUDPReadThreadRef ) { + UDPReadThread thread = sUDPReadThreadRef.getAndSet( null ); + if ( null != thread ) { + thread.interrupt(); } } } @@ -731,7 +972,7 @@ public class RelayService extends JobIntentService break; case XWPDEV_UPGRADE: intent = getIntentTo( this, MsgCmds.UPGRADE ); - enqueueWork( this, intent ); + enqueueWork( intent ); break; case XWPDEV_GOTINVITE: resetBackoff = true; @@ -744,7 +985,7 @@ public class RelayService extends JobIntentService intent = getIntentTo( this, MsgCmds.GOT_INVITE ) .putExtra( INVITE_FROM, srcDevID ) .putExtra( NLI_DATA, asStr ); - enqueueWork( this, intent ); + enqueueWork( intent ); break; case XWPDEV_ACK: noteAck( vli2un( dis ), fromUDP ); @@ -768,6 +1009,11 @@ public class RelayService extends JobIntentService } } // gotPacket() + private void enqueueWork( Intent intent ) + { + enqueueWork( this, intent ); + } + private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP ) { gotPacket( data, skipAck, fromUDP, -1 ); @@ -985,13 +1231,10 @@ public class RelayService extends JobIntentService private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd, long timestamp ) { - UDPThreads threads = startUDPThreadsOnce(); - if ( threads != null ) { - PacketData packet = new PacketData( bas, cmd, timestamp ); - threads.add( packet ); - Log.d( TAG, "postPacket(%s); (now %d in queue)", packet, - threads.m_queue.size() ); - } + PacketData packet = new PacketData( bas, cmd, timestamp ); + s_queue.add( packet ); + Log.d( TAG, "postPacket(%s); (now %d in queue)", packet, + s_queue.size() ); } private String getDevID( DevIDType[] typp ) @@ -1054,12 +1297,7 @@ public class RelayService extends JobIntentService } } - private static class UDPThreads { - private DatagramSocket m_UDPSocket; - private LinkedBlockingQueue m_queue = - new LinkedBlockingQueue(); - private Thread m_UDPReadThread; - private Thread m_UDPWriteThread; + private static class UDPReadThread extends Thread { private RelayService[] mServiceHolder = {null}; void setService( RelayService service ) @@ -1101,312 +1339,52 @@ public class RelayService extends JobIntentService } } - void start() - { - m_UDPReadThread = new Thread( null, new Runnable() { - @Override - public void run() { - try { - connectSocket(); // block until this is done - startWriteThread(); - - Log.i( TAG, "read thread running" ); - byte[] buf = new byte[1024]; - for ( ; ; ) { - DatagramPacket packet = - new DatagramPacket( buf, buf.length ); - try { - m_UDPSocket.receive( packet ); - final RelayService service = getService(); - service.resetExitTimer(); - service.gotPacket( packet ); - } catch ( java.io.InterruptedIOException iioe ) { - // Log.d( TAG, "iioe from receive(): %s", iioe.getMessage() ); - } catch( java.io.IOException ioe ) { - Log.d( TAG, "ioe from receive(): %s/%s", ioe.getMessage() ); - Assert.assertFalse( BuildConfig.DEBUG ); - break; - } - } - - } catch ( InterruptedException ie ) { - Log.d( TAG, "exiting on interrupt: %s", - ie.getMessage() ); - } - Log.i( TAG, "read thread exiting" ); - } - }, getClass().getName() ); - m_UDPReadThread.start(); - } - - void stop() - { - m_queue.add( sEOQPacket ); // will kill the writer thread - } - - void add( PacketData packet ) - { - m_queue.add( packet ); - } - - private void connectSocket() throws InterruptedException - { - if ( null == m_UDPSocket ) { - final RelayService service = getService(); - int port = XWPrefs.getDefaultRelayPort( service ); - String host = XWPrefs.getDefaultRelayHost( service ); - - try { - m_UDPSocket = new DatagramSocket(); - m_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log - - InetAddress addr = InetAddress.getByName( host ); - m_UDPSocket.connect( addr, port ); // remember this address - Log.d( TAG, "connectSocket(%s:%d): m_UDPSocket now %H", - host, port, m_UDPSocket ); - } catch( java.net.SocketException se ) { - Log.ex( TAG, se ); - Assert.fail(); - } catch( java.net.UnknownHostException uhe ) { - Log.ex( TAG, uhe ); - } - } else { - Assert.assertTrue( m_UDPSocket.isConnected() ); - Log.i( TAG, "m_UDPSocket not null" ); - } - } - - // Break out the work of sending queued packets so it can be called - // from several threads. Fixes starvation problems on some devices. - private synchronized boolean serviceQueue() - { - Log.d( TAG, "serviceQueue()" ); - boolean shouldGoOn = true; - List dataListUDP = new ArrayList<>(); - List dataListWeb = new ArrayList<>(); - PacketData outData; + @Override + public void run() { + Context context = XWApp.getContext(); try { - long ts = s_packetsSentUDP.size() > 0 ? 10 : 1000; - Log.d( TAG, "blocking %dms on poll()", ts ); - for ( outData = m_queue.poll( ts, TimeUnit.MILLISECONDS ); - null != outData; - outData = m_queue.poll() ) { // doesn't block - Log.d( TAG, "removed packet from queue (%d left): %s", - m_queue.size(), outData ); - if ( outData == sEOQPacket ) { - shouldGoOn = false; - break; - } else if ( skipNativeSend() || outData.getForWeb() ) { - dataListWeb.add( outData ); - } else { - dataListUDP.add( outData ); - } + if ( null == s_UDPSocket ) { + getService().connectSocketOnce(); // block until this is done } - sendViaWeb( dataListWeb ); - sendViaUDP( dataListUDP ); - - getService().resetExitTimer(); - - runUDPAckTimer(); - - ConnStatusHandler.showSuccessOut(); - } catch ( InterruptedException ie ) { - Log.w( TAG, "write thread killed" ); - shouldGoOn = false; - } - Log.d( TAG, "serviceQueue() => %b", shouldGoOn ); - return shouldGoOn; - } - - private void startWriteThread() - { - Assert.assertNull( m_UDPWriteThread ); - - m_UDPWriteThread = new Thread( null, new Runnable() { - public void run() { - Log.i( TAG, "write thread starting" ); - for ( ; ; ) { - if ( !serviceQueue() ) { - break; - } - } - - Log.i( TAG, "write thread killing read thread" ); - killThreads(); - // now kill the read thread - } - }, getClass().getName() ); - m_UDPWriteThread.start(); - } - - private void killThreads() - { - m_UDPSocket.close(); - try { - m_UDPReadThread.join(); - } catch( java.lang.InterruptedException ie ) { - Log.ex( TAG, ie ); - } - - m_UDPSocket = null; - Log.i( TAG, "write thread exiting (with %d in queue)", - m_queue.size() ); - } - - private int sendViaWeb( List packets ) throws InterruptedException - { - int sentLen = 0; - if ( packets.size() > 0 ) { - Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() ); - - final RelayService service = getService(); - HttpsURLConnection conn = NetUtils - .makeHttpsRelayConn( service, "post" ); - if ( null == conn ) { - Log.e( TAG, "sendViaWeb(): null conn for POST" ); - } else { + Log.i( TAG, "%s.run() starting", this ); + byte[] buf = new byte[1024]; + for ( ; ; ) { + DatagramPacket packet = + new DatagramPacket( buf, buf.length ); try { - JSONArray dataArray = new JSONArray(); - for ( PacketData packet : packets ) { - Assert.assertFalse( packet == sEOQPacket ); - byte[] datum = packet.assemble(); - dataArray.put( Utils.base64Encode(datum) ); - sentLen += datum.length; - } - JSONObject params = new JSONObject(); - params.put( "data", dataArray ); - - String result = NetUtils.runConn( conn, params ); - boolean succeeded = null != result; - if ( succeeded ) { - Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result ); - JSONObject resultObj = new JSONObject( result ); - JSONArray resData = resultObj.getJSONArray( "data" ); - int nReplies = resData.length(); - // Log.d( TAG, "sendViaWeb(): got %d replies", nReplies ); - - service - .noteSent( packets, false ); // before we process the acks below :-) - - for ( int ii = 0; ii < nReplies; ++ii ) { - byte[] datum = Utils.base64Decode( resData.getString( ii ) ); - // PENDING: skip ack or not - service.gotPacket( datum, false, false ); - } - } else { - Log.e( TAG, "sendViaWeb(): failed result for POST" ); - - } - - ConnStatusHandler.updateStatus( service, null, - CommsConnType.COMMS_CONN_RELAY, - succeeded ); - } catch ( JSONException ex ) { + s_UDPSocket.receive( packet ); + postGotPacket( context, packet ); + // final RelayService service = getService(); + // service.resetExitTimer(); + // service.gotPacket( packet ); + } catch ( java.io.InterruptedIOException iioe ) { + // Log.d( TAG, "iioe from receive(): %s", iioe.getMessage() ); + } catch( java.io.IOException ioe ) { + Log.d( TAG, "ioe from receive(): %s/%s", ioe.getMessage() ); Assert.assertFalse( BuildConfig.DEBUG ); - } - } - } - return sentLen; - } - - private int sendViaUDP( List packets ) throws InterruptedException - { - int sentLen = 0; - - if ( packets.size() > 0 ) { - Log.d( TAG, "sendViaUDP(): sending %d at once", packets.size() ); - final RelayService service = getService(); - service.noteSent( packets, true ); - for ( PacketData packet : packets ) { - boolean getOut = true; - byte[] data = packet.assemble(); - try { - DatagramPacket udpPacket = new DatagramPacket( data, data.length ); - m_UDPSocket.send( udpPacket ); - - sentLen += udpPacket.getLength(); - // packet.setSentMS( nowMS ); - getOut = false; - } catch ( java.net.SocketException se ) { - Log.ex( TAG, se ); - Log.i( TAG, "Restarting threads to force new socket" ); - ConnStatusHandler.updateStatusOut( service, null, - CommsConnType.COMMS_CONN_RELAY, - true ); - - service.m_handler.post( new Runnable() { - public void run() { - service.stopUDPThreads(); - } - } ); - break; - } catch ( java.io.IOException ioe ) { - Log.ex( TAG, ioe ); - } catch ( NullPointerException npe ) { - Log.w( TAG, "network problem; dropping packet" ); - } - if ( getOut ) { break; } } - ConnStatusHandler.updateStatus( service, null, - CommsConnType.COMMS_CONN_RELAY, - sentLen > 0 ); - Log.d( TAG, "sendViaUDP(): sent %d bytes (%d packets)", - sentLen, packets.size() ); + } catch ( InterruptedException ie ) { + Log.d( TAG, "exiting on interrupt: %s", + ie.getMessage() ); } - - return sentLen; + Log.i( TAG, "%s.run() exiting", this ); } - private long m_lastRunMS = 0; - private void runUDPAckTimer() + private void postGotPacket( Context context, DatagramPacket packet ) { - long nowMS = System.currentTimeMillis(); - if ( 0 == m_lastRunMS ) { - m_lastRunMS = nowMS; - } - long age = nowMS - m_lastRunMS; - if ( age < 3000 ) { // never more frequently than 3 sec. - // Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" ); - } else { - m_lastRunMS = nowMS; + Log.d( TAG, "postGotPacket()" ); + int packetLen = packet.getLength(); + byte[] data = new byte[packetLen]; + System.arraycopy( packet.getData(), 0, data, 0, packetLen ); - long minSentMS = nowMS - 10000; // 10 seconds ago - long prevSentMS = 0; - List forResend = new ArrayList<>(); - boolean foundNonAck = false; - synchronized ( s_packetsSentUDP ) { - for ( Iterator iter = s_packetsSentUDP.iterator(); - iter.hasNext(); ) { - PacketData packet = iter.next(); - long sentMS = packet.getSentMS(); - Assert.assertTrue( prevSentMS <= sentMS ); - prevSentMS = sentMS; - if ( sentMS > minSentMS ) { - break; - } - - forResend.add( packet ); - if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { - foundNonAck = true; - sNativeFailScore.incrementAndGet(); - } - iter.remove(); - } - Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining", - s_packetsSentUDP.size() ); - } - if ( foundNonAck ) { - Log.d( TAG, "runUDPAckTimer(): reposting %d packets", - forResend.size() ); - m_queue.addAll( forResend ); - } - } + Intent intent = getIntentTo( context, MsgCmds.GOT_PACKET ) + .putExtra( BINBUFFER, data ); + enqueueWork( context, intent ); } - } private static class AsyncSender extends Thread { @@ -1616,11 +1594,11 @@ public class RelayService extends JobIntentService stopThreads(); } else if ( BuildConfig.UDP_ENABLED ) { stopFetchThreadIf(); - startUDPThreadsOnce(); + startUDPReadThreadOnce(); registerWithRelay( -1 ); } else { Assert.assertFalse( BuildConfig.DEBUG ); - stopUDPThreads(); + stopUDPReadThread(); startFetchThreadIfNotUDP(); } } @@ -1629,7 +1607,7 @@ public class RelayService extends JobIntentService { Log.d( TAG, "stopThreads()" ); stopFetchThreadIf(); - stopUDPThreads(); + stopUDPReadThread(); } private static void un2vli( int nn, OutputStream os )