From bdf1bd3b8411aeafc6bd3a85961a0671639faacf Mon Sep 17 00:00:00 2001 From: Eric House Date: Mon, 4 Feb 2019 13:21:24 -0800 Subject: [PATCH] fix problems with RelayService Darned thing was dropping packets, failing to connect games built in response to invitations, and otherwise misbehaving. First was due to not resheduling when exited with outbound packets in queue; second to not overriding relayNoConnProc() (due to signature change.) Though it still happens occasionally.... Also added timestamps to track how long it takes a packet to be sent and ACK'd. --- .../eehouse/android/xw4/CommsTransport.java | 14 +- .../org/eehouse/android/xw4/MultiMsgSink.java | 2 +- .../org/eehouse/android/xw4/RelayService.java | 177 ++++++++++++------ xwords4/common/comms.c | 6 +- 4 files changed, 135 insertions(+), 64 deletions(-) diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/CommsTransport.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/CommsTransport.java index b91129832..06f63ac5b 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/CommsTransport.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/CommsTransport.java @@ -347,8 +347,11 @@ public class CommsTransport implements TransportProcs, } // TransportProcs interface + private static final boolean TRANSPORT_DOES_NOCONN = true; @Override - public int getFlags() { return COMMS_XPORT_FLAGS_NONE; } + public int getFlags() { + return TRANSPORT_DOES_NOCONN ? COMMS_XPORT_FLAGS_HASNOCONN : COMMS_XPORT_FLAGS_NONE; + } @Override public int transportSend( byte[] buf, String msgNo, CommsAddrRec addr, @@ -403,8 +406,13 @@ public class CommsTransport implements TransportProcs, @Override public boolean relayNoConnProc( byte[] buf, String msgNo, String relayID ) { - Assert.fail(); - return false; + Assert.assertTrue( TRANSPORT_DOES_NOCONN ); + int nSent = RelayService.sendNoConnPacket( m_context, m_rowid, + relayID, buf, msgNo ); + boolean success = buf.length == nSent; + Log.d( TAG, "relayNoConnProc(msgNo=%s, len=%d) => %b", msgNo, + buf.length, success ); + return success; } private static int sendForAddr( Context context, CommsAddrRec addr, diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/MultiMsgSink.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/MultiMsgSink.java index b067a12a7..17c78f387 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/MultiMsgSink.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/MultiMsgSink.java @@ -135,7 +135,7 @@ public class MultiMsgSink implements TransportProcs { { // Assert.fail(); int nSent = RelayService.sendNoConnPacket( m_context, getRowID(), - relayID, buf ); + relayID, buf, msgNo ); boolean success = buf.length == nSent; if ( success ) { Log.d( TAG, "relayNoConnProc: adding %s", msgNo ); diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java index 875ba6c7e..2c64fe4a1 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java @@ -82,7 +82,8 @@ public class RelayService extends JobIntentService // One day, in seconds. Probably should be configurable. private static final long MAX_KEEPALIVE_SECS = 24 * 60 * 60; - private static final String CMD_STR = "CMD"; + private static final String CMD_KEY = "CMD"; + private static final String TIMESTAMP = "TIMESTAMP"; private static enum MsgCmds { INVALID, PROCESS_GAME_MSGS, @@ -107,6 +108,7 @@ public class RelayService extends JobIntentService private static final String INVITE_FROM = "INVITE_FROM"; private static final String ROWID = "ROWID"; private static final String BINBUFFER = "BINBUFFER"; + private static final String MSGNUM = "MSGNUM"; private static List s_packetsSentUDP = new ArrayList<>(); private static List s_packetsSentWeb = new ArrayList<>(); @@ -222,7 +224,7 @@ public class RelayService extends JobIntentService { MsgCmds cmd; try { - cmd = MsgCmds.values()[intent.getIntExtra( CMD_STR, -1 )]; + cmd = MsgCmds.values()[intent.getIntExtra( CMD_KEY, -1 )]; } catch (Exception ex) { // OOB most likely cmd = null; } @@ -277,14 +279,15 @@ public class RelayService extends JobIntentService } public static int sendNoConnPacket( Context context, long rowid, String relayID, - byte[] msg ) + byte[] msg, String msgNo ) { int result = -1; if ( NetStateCache.netAvail( context ) ) { Intent intent = getIntentTo( context, MsgCmds.SENDNOCONN ) .putExtra( ROWID, rowid ) .putExtra( RELAY_ID, relayID ) - .putExtra( BINBUFFER, msg ); + .putExtra( BINBUFFER, msg ) + .putExtra( MSGNUM, msgNo ); // not used yet enqueueWork( context, intent ); result = msg.length; } @@ -345,7 +348,8 @@ public class RelayService extends JobIntentService private static Intent getIntentTo( Context context, MsgCmds cmd ) { Intent intent = new Intent( context, RelayService.class ) - .putExtra( CMD_STR, cmd.ordinal() ); + .putExtra( CMD_KEY, cmd.ordinal() ) + .putExtra( TIMESTAMP, System.currentTimeMillis() ); return intent; } @@ -393,13 +397,22 @@ public class RelayService extends JobIntentService public void onDestroy() { Log.d( TAG, "onDestroy() called" ); - if ( shouldMaintainConnection() ) { - long interval_millis = getMaxIntervalSeconds() * 1000; - RelayReceiver.setTimer( this, interval_millis ); - } + + boolean startImmediately = false; if ( null != mThreads ) { + startImmediately = 0 < mThreads.m_queue.size(); mThreads.unsetService(); } + + if ( startImmediately ) { + timerFired( this ); + } else if ( shouldMaintainConnection() ) { + long interval_millis = getMaxIntervalSeconds() * 1000; + RelayReceiver.setTimer( this, interval_millis ); + Log.d( TAG, "onDestroy(): rescheduling in %d ms", + interval_millis ); + } + super.onDestroy(); Log.d( TAG, "%s.onDestroy() DONE", this ); } @@ -445,7 +458,9 @@ public class RelayService extends JobIntentService { MsgCmds cmd = cmdFrom( intent ); if ( null != cmd ) { - // Log.d( TAG, "handleCommand(): cmd=%s", cmd.toString() ); + long timestamp = intent.getLongExtra( TIMESTAMP, 0 ); + Log.d( TAG, "handleCommand(): cmd=%s (age=%dms)", cmd.toString(), + System.currentTimeMillis() - timestamp ); switch( cmd ) { case PROCESS_GAME_MSGS: String[] relayIDs = new String[1]; @@ -460,7 +475,7 @@ public class RelayService extends JobIntentService byte[][][] msgss = expandMsgsArray( intent ); for ( byte[][] msgs : msgss ) { for ( byte[] msg : msgs ) { - gotPacket( msg, true, false ); + gotPacket( msg, true, false, timestamp ); } } break; @@ -487,10 +502,11 @@ public class RelayService extends JobIntentService long rowid = intent.getLongExtra( ROWID, -1 ); byte[] msg = intent.getByteArrayExtra( BINBUFFER ); if ( MsgCmds.SEND == cmd ) { - sendMessage( rowid, msg ); + sendMessage( rowid, msg, timestamp ); } else if ( MsgCmds.SENDNOCONN == cmd ) { String relayID = intent.getStringExtra( RELAY_ID ); - sendNoConnMessage( rowid, relayID, msg ); + String msgNo = intent.getStringExtra( MSGNUM ); + sendNoConnMessage( rowid, relayID, msg, msgNo, timestamp ); } else { mHelper.receiveMessage( this, rowid, null, msg, s_addr ); } @@ -501,15 +517,15 @@ public class RelayService extends JobIntentService int destDevID = intent.getIntExtra( DEV_ID_DEST, 0 ); String relayID = intent.getStringExtra( RELAY_ID ); String nliData = intent.getStringExtra( NLI_DATA ); - sendInvitation( srcDevID, destDevID, relayID, nliData ); + sendInvitation( srcDevID, destDevID, relayID, nliData, timestamp ); break; case TIMER_FIRED: if ( !NetStateCache.netAvail( this ) ) { Log.w( TAG, "not connecting: no network" ); } else if ( startFetchThreadIfNotUDP() ) { // do nothing - } else if ( registerWithRelayIfNot() ) { - requestMessages(); + } else if ( registerWithRelayIfNot( timestamp ) ) { + requestMessages( timestamp ); } RelayReceiver.setTimer( this ); break; @@ -582,6 +598,8 @@ public class RelayService extends JobIntentService threads = new UDPThreads(); sUDPThreadsRef.set( threads ); threads.start(); + } else { + Assert.assertTrue( null != threads.m_UDPSocket || !BuildConfig.DEBUG ); } threads.setService( this ); } @@ -615,8 +633,10 @@ public class RelayService extends JobIntentService private void noteSent( PacketData packet, boolean fromUDP ) { - // Log.d( TAG, "Sent (fromUDP=%b) packet: cmd=%s, id=%d", - // fromUDP, packet.m_cmd.toString(), packet.m_packetID ); + Log.d( TAG, "noteSent(packet=%s, fromUDP=%b)", packet, fromUDP ); + if ( fromUDP ) { + packet.setSentMS(); + } if ( fromUDP || packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { List list = fromUDP ? s_packetsSentUDP : s_packetsSentWeb; synchronized( list ) { @@ -627,21 +647,18 @@ public class RelayService extends JobIntentService private void noteSent( List packets, boolean fromUDP ) { - long nowMS = System.currentTimeMillis(); List map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb; // Log.d( TAG, "noteSent(fromUDP=%b): adding %d; size before: %d", // fromUDP, packets.size(), map.size() ); for ( PacketData packet : packets ) { - if ( fromUDP ) { - packet.setSentMS( nowMS ); - } noteSent( packet, fromUDP ); } // Log.d( TAG, "noteSent(fromUDP=%b): size after: %d", fromUDP, map.size() ); } // MIGHT BE Running on reader thread - private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP ) + private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP, + long timestamp ) { boolean resetBackoff = false; ByteArrayInputStream bis = new ByteArrayInputStream( data ); @@ -673,7 +690,7 @@ public class RelayService extends JobIntentService Log.i( TAG, "bad relayID \"%s\" reported", str ); DevID.clearRelayDevID( this ); s_registered = false; - registerWithRelay(); + registerWithRelay( timestamp ); break; case XWPDEV_REGRSP: str = getVLIString( dis ); @@ -685,7 +702,7 @@ public class RelayService extends JobIntentService s_registered = true; break; case XWPDEV_HAVEMSGS: - requestMessages(); + requestMessages( timestamp ); break; case XWPDEV_MSG: int token = dis.readInt(); @@ -741,6 +758,11 @@ public class RelayService extends JobIntentService } } // gotPacket() + private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP ) + { + gotPacket( data, skipAck, fromUDP, -1 ); + } + private void gotPacket( DatagramPacket packet ) { ConnStatusHandler.showSuccessIn(); @@ -772,7 +794,7 @@ public class RelayService extends JobIntentService // How do we know if we need to register? We keep a timestamp // indicating when we last got a reg-response. When the GCM id // changes, that timestamp is cleared. - private void registerWithRelay() + private void registerWithRelay( long timestamp ) { long now = Utils.getCurSeconds(); long interval = now - s_regStartTime; @@ -807,7 +829,7 @@ public class RelayService extends JobIntentService writeVLIString( out, Build.VERSION.RELEASE ); writeVLIString( out, BuildConfig.VARIANT_NAME ); - postPacket( bas, XWRelayReg.XWPDEV_REG ); + postPacket( bas, XWRelayReg.XWPDEV_REG, timestamp ); s_regStartTime = now; } catch ( java.io.IOException ioe ) { Log.ex( TAG, ioe ); @@ -815,15 +837,15 @@ public class RelayService extends JobIntentService } } - private boolean registerWithRelayIfNot() + private boolean registerWithRelayIfNot( long timestamp ) { if ( !s_registered && shouldRegister() ) { - registerWithRelay(); + registerWithRelay( timestamp ); } return s_registered; } - private void requestMessages() + private void requestMessages( long timestamp ) { try { DevIDType[] typp = new DevIDType[1]; @@ -832,7 +854,7 @@ public class RelayService extends JobIntentService ByteArrayOutputStream bas = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream( bas ); writeVLIString( out, devid ); - postPacket( bas, XWRelayReg.XWPDEV_RQSTMSGS ); + postPacket( bas, XWRelayReg.XWPDEV_RQSTMSGS, timestamp ); } else { Log.d(TAG, "requestMessages(): devid is null" ); } @@ -841,7 +863,7 @@ public class RelayService extends JobIntentService } } - private void sendMessage( long rowid, byte[] msg ) + private void sendMessage( long rowid, byte[] msg, long timestamp ) { ByteArrayOutputStream bas = new ByteArrayOutputStream(); try { @@ -849,14 +871,17 @@ public class RelayService extends JobIntentService Assert.assertTrue( rowid < Integer.MAX_VALUE ); out.writeInt( (int)rowid ); out.write( msg, 0, msg.length ); - postPacket( bas, XWRelayReg.XWPDEV_MSG ); + postPacket( bas, XWRelayReg.XWPDEV_MSG, timestamp ); } catch ( java.io.IOException ioe ) { Log.ex( TAG, ioe ); } } - private void sendNoConnMessage( long rowid, String relayID, byte[] msg ) + private void sendNoConnMessage( long rowid, String relayID, + byte[] msg, String msgNo, // not used yet + long timestamp ) { + Log.d( TAG, "sendNoConnMessage(msgNo=%s, len=%d)", msgNo, msg.length ); ByteArrayOutputStream bas = new ByteArrayOutputStream(); try { DataOutputStream out = new DataOutputStream( bas ); @@ -865,14 +890,14 @@ public class RelayService extends JobIntentService out.writeBytes( relayID ); out.write( '\n' ); out.write( msg, 0, msg.length ); - postPacket( bas, XWRelayReg.XWPDEV_MSGNOCONN ); + postPacket( bas, XWRelayReg.XWPDEV_MSGNOCONN, timestamp ); } catch ( java.io.IOException ioe ) { Log.ex( TAG, ioe ); } } private void sendInvitation( int srcDevID, int destDevID, String relayID, - String nliStr ) + String nliStr, long timestamp ) { Log.d( TAG, "sendInvitation(%d->%d/%s [%s])", srcDevID, destDevID, relayID, nliStr ); @@ -899,7 +924,7 @@ public class RelayService extends JobIntentService } out.writeShort( nliData.length ); out.write( nliData, 0, nliData.length ); - postPacket( bas, XWRelayReg.XWPDEV_INVITE ); + postPacket( bas, XWRelayReg.XWPDEV_INVITE, timestamp ); } catch ( java.io.IOException ioe ) { Log.ex( TAG, ioe ); } @@ -912,7 +937,7 @@ public class RelayService extends JobIntentService try { DataOutputStream out = new DataOutputStream( bas ); un2vli( header.m_packetID, out ); - postPacket( bas, XWRelayReg.XWPDEV_ACK ); + postPacket( bas, XWRelayReg.XWPDEV_ACK, -1 ); } catch ( java.io.IOException ioe ) { Log.ex( TAG, ioe ); } @@ -947,14 +972,16 @@ public class RelayService extends JobIntentService return result; } - private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd ) + private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd, + long timestamp ) { UDPThreads threads = startUDPThreadsOnce(); if ( threads != null ) { - threads.add( new PacketData( bas, cmd ) ); + PacketData packet = new PacketData( bas, cmd, timestamp ); + threads.add( packet ); + Log.d( TAG, "postPacket(%s); (now %d in queue)", packet, + threads.m_queue.size() ); } - // 0 ok; thread will often have sent already! - // DbgUtils.logf( "postPacket() done; %d in queue", m_queue.size() ); } private String getDevID( DevIDType[] typp ) @@ -1049,9 +1076,14 @@ public class RelayService extends JobIntentService private RelayService getService() throws InterruptedException { synchronized ( mServiceHolder ) { + long startMS = System.currentTimeMillis(); while ( null == mServiceHolder[0] ) { mServiceHolder.wait(); } + long tookMS = System.currentTimeMillis() - startMS; + if ( tookMS > 10 ) { + Log.d( TAG, "getService(): blocked for %s ms", tookMS ); + } return mServiceHolder[0]; } } @@ -1059,6 +1091,7 @@ public class RelayService extends JobIntentService void start() { m_UDPReadThread = new Thread( null, new Runnable() { + @Override public void run() { try { connectSocket(); // block until this is done @@ -1075,8 +1108,10 @@ public class RelayService extends JobIntentService service.resetExitTimer(); service.gotPacket( packet ); } catch ( java.io.InterruptedIOException iioe ) { - // DbgUtils.logf( "FYI: udp receive timeout" ); + // Log.d( TAG, "iioe from receive(): %s", iioe.getMessage() ); } catch( java.io.IOException ioe ) { + Log.d( TAG, "ioe from receive(): %s/%s", ioe.getMessage() ); + Assert.assertFalse( BuildConfig.DEBUG ); break; } } @@ -1145,6 +1180,7 @@ public class RelayService extends JobIntentService for ( outData = m_queue.poll(ts, TimeUnit.SECONDS); null != outData; outData = m_queue.poll() ) { // doesn't block + Log.d( TAG, "removed packet from queue: %s", outData ); if ( outData == sEOQPacket ) { gotEOQ = true; break; @@ -1179,7 +1215,9 @@ public class RelayService extends JobIntentService Log.ex( TAG, ie ); } - Log.i( TAG, "write thread exiting" ); + m_UDPSocket = null; + Log.i( TAG, "write thread exiting (with %d in queue)", + m_queue.size() ); } }, getClass().getName() ); m_UDPWriteThread.start(); @@ -1296,7 +1334,11 @@ public class RelayService extends JobIntentService private void runUDPAckTimer() { long nowMS = System.currentTimeMillis(); - if ( m_lastRunMS + 3000 > nowMS ) { // never more frequently than 3 sec. + if ( 0 == m_lastRunMS ) { + m_lastRunMS = nowMS; + } + long age = nowMS - m_lastRunMS; + if ( age < 3000 ) { // never more frequently than 3 sec. // Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" ); } else { m_lastRunMS = nowMS; @@ -1306,8 +1348,8 @@ public class RelayService extends JobIntentService List forResend = new ArrayList<>(); boolean foundNonAck = false; synchronized ( s_packetsSentUDP ) { - Iterator iter; - for ( iter = s_packetsSentUDP.iterator(); iter.hasNext(); ) { + for ( Iterator iter = s_packetsSentUDP.iterator(); + iter.hasNext(); ) { PacketData packet = iter.next(); long sentMS = packet.getSentMS(); Assert.assertTrue( prevSentMS <= sentMS ); @@ -1447,11 +1489,12 @@ public class RelayService extends JobIntentService return buf.length; } - public boolean relayNoConnProc( byte[] buf, String relayID ) + @Override + public boolean relayNoConnProc( byte[] buf, String msgNo, String relayID ) { long rowID = getRowID(); if ( -1 != rowID ) { - sendNoConnMessage( rowID, relayID, buf ); + sendNoConnMessage( rowID, relayID, buf, msgNo, -1 ); } else { if ( null == m_msgLists ) { m_msgLists = new HashMap>(); @@ -1543,7 +1586,7 @@ public class RelayService extends JobIntentService } else if ( XWApp.UDP_ENABLED ) { stopFetchThreadIf(); startUDPThreadsOnce(); - registerWithRelay(); + registerWithRelay( -1 ); } else { Assert.assertFalse( BuildConfig.DEBUG ); stopUDPThreads(); @@ -1728,25 +1771,46 @@ public class RelayService extends JobIntentService public XWRelayReg m_cmd; public byte[] m_header; public int m_packetID; - private long m_created; + private long m_requested; // when the request came into the static API + private long m_created; // when this packet was created (to service request) private long m_sentUDP; private PacketData() {} - public PacketData( ByteArrayOutputStream bas, XWRelayReg cmd ) + public PacketData( ByteArrayOutputStream bas, XWRelayReg cmd, + long requestTS ) { m_bas = bas; m_cmd = cmd; + m_requested = requestTS; + m_created = System.currentTimeMillis(); + + makeHeader(); } @Override public String toString() { - return String.format( "{cmd: %s; age: %d ms}", m_cmd, - System.currentTimeMillis() - m_created ); + long now = System.currentTimeMillis(); + StringBuilder sb = new StringBuilder() + .append( "{cmd: " ) + .append( m_cmd ) + .append( "; id: " ) + .append( m_packetID ); + if ( m_requested > 0 ) { + sb.append( "; requestAge: " ) + .append( now - m_requested ) + .append( "ms" ); + } + sb.append( "; packetAge: " ) + .append( now - m_created ) + .append( "ms}" ); + return sb.toString(); + // return String.format( "{cmd: %s; id: %d; packetAge: %d ms; requestAge: %d}", + // m_cmd, m_packetID, now - m_created, ); } - void setSentMS( long ms ) { m_sentUDP = ms; } + void setSentMS() { m_sentUDP = System.currentTimeMillis(); } long getSentMS() { return m_sentUDP; } boolean getForWeb() { return m_sentUDP != 0; } @@ -1754,9 +1818,6 @@ public class RelayService extends JobIntentService { int result = 0; if ( null != m_bas ) { // empty case? - if ( null == m_header ) { - makeHeader(); - } result = m_header.length + m_bas.size(); } return result; diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index 2d3da8487..8881f7783 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -2181,7 +2181,10 @@ validateChannelMessage( CommsCtxt* comms, const CommsAddrRec* addr, augmentChannelAddr( comms, rec, addr, senderID ); - if ( msgID != rec->lastMsgRcd + 1 ) { + if ( msgID == rec->lastMsgRcd + 1 ) { + XP_LOGF( TAGFMT() "expected %d AND got %d", TAGPRMS, + msgID, msgID ); + } else if ( msgID != rec->lastMsgRcd + 1 ) { XP_LOGF( TAGFMT() "expected %d, got %d", TAGPRMS, rec->lastMsgRcd + 1, msgID ); rec = NULL; @@ -3054,7 +3057,6 @@ send_via_relay( CommsCtxt* comms, XWRELAY_Cmd cmd, XWHostID destID, XP_LOGF( "%s: dropping message because %s disabled", __func__, ConnType2Str( COMMS_CONN_RELAY ) ); } else { - XWStreamCtxt* tmpStream = relay_msg_to_stream( comms, cmd, destID, data, dlen );