From 2db67ed3395a97f178028752fa20afd55b439fe3 Mon Sep 17 00:00:00 2001 From: Eric House Date: Fri, 15 Dec 2017 07:12:14 -0800 Subject: [PATCH] try send via udp, then web Send each packet via UDP if that's thought to be working (always is, now) and start a 10-second timer. If it hasn't been ack'd by then, resend via Web API. Tested by configuring to use a UDP socket that the relay isn't listening on. Only problem is that the backoff timers are broken: never stops sending every few seconds. --- .../org/eehouse/android/xw4/RelayService.java | 180 +++++++++++------- 1 file changed, 113 insertions(+), 67 deletions(-) diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java index f4b7ac714..42d2f0fd7 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java @@ -99,7 +99,8 @@ public class RelayService extends XWService private static final String ROWID = "ROWID"; private static final String BINBUFFER = "BINBUFFER"; - private static Map s_packetsSent = new HashMap<>(); + private static Map s_packetsSentUDP = new HashMap<>(); + private static Map s_packetsSentWeb = new HashMap<>(); private static AtomicInteger s_nextPacketID = new AtomicInteger(); private static boolean s_gcmWorking = false; private static boolean s_registered = false; @@ -119,6 +120,8 @@ public class RelayService extends XWService private Runnable m_onInactivity; private int m_maxIntervalSeconds = 0; private long m_lastGamePacketReceived; + // m_nativeNotWorking: set to true if too many acks missed? + private boolean m_nativeNotWorking = false; private static DevIDType s_curType = DevIDType.ID_TYPE_NONE; private static long s_regStartTime = 0; @@ -411,7 +414,7 @@ public class RelayService extends XWService byte[][][] msgss = expandMsgsArray( intent ); for ( byte[][] msgs : msgss ) { for ( byte[] msg : msgs ) { - gotPacket( msg, true ); + gotPacket( msg, true, false ); } } break; @@ -609,6 +612,15 @@ public class RelayService extends XWService } } + private boolean skipNativeSend() + { + boolean skip = m_nativeNotWorking; + if ( ! skip ) { + skip = XWPrefs.getSkipToWebAPI( RelayService.this ); + } + return skip; + } + private void startWriteThread() { if ( null == m_UDPWriteThread ) { @@ -616,35 +628,35 @@ public class RelayService extends XWService public void run() { Log.i( TAG, "write thread starting" ); for ( ; ; ) { - List dataList = null; + boolean exitNow = false; + boolean useWeb = skipNativeSend(); + List dataListUDP = new ArrayList<>(); + List dataListWeb = new ArrayList<>(); try { - dataList = new ArrayList<>(); for ( PacketData outData = m_queue.take(); // blocks null != outData; outData = m_queue.poll() ) { // doesn't block if ( outData.isEOQ() ) { - dataList = null; + exitNow = true; break; } - dataList.add(outData); - Log.d( TAG, "got %d packets; %d more left", dataList.size(), - m_queue.size()); + if ( useWeb || outData.getForWeb() ) { + dataListWeb.add(outData); + } else { + dataListUDP.add(outData); + } } } catch ( InterruptedException ie ) { Log.w( TAG, "write thread killed" ); break; } - if ( null == dataList ) { + if ( exitNow ) { Log.i( TAG, "stopping write thread" ); break; } - int sentLen; - if ( XWPrefs.getSkipToWebAPI( RelayService.this ) ) { - sentLen = sendViaWeb( dataList ); - } else { - sentLen = sendViaUDP( dataList ); - } + sendViaWeb( dataListWeb ); + sendViaUDP( dataListUDP ); resetExitTimer(); ConnStatusHandler.showSuccessOut(); @@ -663,44 +675,43 @@ public class RelayService extends XWService { Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() ); int sentLen = 0; - HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" ); - if ( null == conn ) { - Log.e( TAG, "sendViaWeb(): null conn for POST" ); - } else { - try { - JSONArray dataArray = new JSONArray(); - for ( PacketData packet : packets ) { - Assert.assertFalse( packet.isEOQ() ); - byte[] datum = packet.assemble(); - dataArray.put( Utils.base64Encode(datum) ); - sentLen += datum.length; - } - JSONObject params = new JSONObject(); - params.put( "data", dataArray ); - - String result = NetUtils.runConn(conn, params); - if ( null != result ) { - Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result ); - JSONObject resultObj = new JSONObject( result ); - JSONArray resData = resultObj.getJSONArray( "data" ); - int nReplies = resData.length(); - Log.d( TAG, "sendViaWeb(): got %d replies", nReplies ); - - noteSent( packets ); // before we process the acks below :-) - - if ( nReplies > 0 ) { - resetExitTimer(); + if ( packets.size() > 0 ) { + HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" ); + if ( null == conn ) { + Log.e( TAG, "sendViaWeb(): null conn for POST" ); + } else { + try { + JSONArray dataArray = new JSONArray(); + for ( PacketData packet : packets ) { + Assert.assertFalse( packet.isEOQ() ); + byte[] datum = packet.assemble(); + dataArray.put( Utils.base64Encode(datum) ); + sentLen += datum.length; } - for ( int ii = 0; ii < nReplies; ++ii ) { - byte[] datum = Utils.base64Decode( resData.getString( ii ) ); - // PENDING: skip ack or not - gotPacket( datum, false ); + JSONObject params = new JSONObject(); + params.put( "data", dataArray ); + + String result = NetUtils.runConn(conn, params); + if ( null != result ) { + Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result ); + JSONObject resultObj = new JSONObject( result ); + JSONArray resData = resultObj.getJSONArray( "data" ); + int nReplies = resData.length(); + Log.d( TAG, "sendViaWeb(): got %d replies", nReplies ); + + noteSent( packets, s_packetsSentWeb ); // before we process the acks below :-) + + for ( int ii = 0; ii < nReplies; ++ii ) { + byte[] datum = Utils.base64Decode( resData.getString( ii ) ); + // PENDING: skip ack or not + gotPacket( datum, false, false ); + } + } else { + Log.e( TAG, "sendViaWeb(): failed result for POST" ); } - } else { - Log.e( TAG, "sendViaWeb(): failed result for POST" ); + } catch ( JSONException ex ) { + Assert.assertFalse( BuildConfig.DEBUG ); } - } catch ( JSONException ex ) { - Assert.assertFalse( BuildConfig.DEBUG ); } } return sentLen; @@ -717,7 +728,7 @@ public class RelayService extends XWService m_UDPSocket.send( udpPacket ); sentLen += udpPacket.getLength(); - noteSent( packet ); + noteSent( packet, s_packetsSentUDP ); getOut = false; } catch ( java.net.SocketException se ) { Log.ex( TAG, se ); @@ -737,25 +748,54 @@ public class RelayService extends XWService break; } } + + if ( sentLen > 0 ) { + startAckTimer( packets ); + } + return sentLen; } - private void noteSent( PacketData packet ) + private void startAckTimer( final List packets ) + { + Runnable ackTimer = new Runnable() { + @Override + public void run() { + List forResend = new ArrayList<>(); + Log.d( TAG, "ackTimer.run() called" ); + synchronized ( s_packetsSentUDP ) { + for ( PacketData packet : packets ) { + PacketData stillThere = s_packetsSentUDP.remove(packet.m_packetID); + if ( stillThere != null ) { + Log.d( TAG, "packed %d not yet acked; resending", + stillThere.m_packetID ); + stillThere.setForWeb(); + forResend.add( stillThere ); + } + } + } + m_queue.addAll( forResend ); + } + }; + m_handler.postDelayed( ackTimer, 10 * 1000 ); + } + + private void noteSent( PacketData packet, Map map ) { int pid = packet.m_packetID; Log.d( TAG, "Sent [udp?] packet: cmd=%s, id=%d", packet.m_cmd.toString(), pid ); if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { - synchronized( s_packetsSent ) { - s_packetsSent.put( pid, packet ); + synchronized( map ) { + map.put( pid, packet ); } } } - private void noteSent( List packets ) + private void noteSent( List packets, Map map ) { for ( PacketData packet : packets ) { - noteSent( packet ); + noteSent( packet, map ); } } @@ -789,7 +829,7 @@ public class RelayService extends XWService } // MIGHT BE Running on reader thread - private void gotPacket( byte[] data, boolean skipAck ) + private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP ) { boolean resetBackoff = false; ByteArrayInputStream bis = new ByteArrayInputStream( data ); @@ -868,7 +908,7 @@ public class RelayService extends XWService startService( intent ); break; case XWPDEV_ACK: - noteAck( vli2un( dis ) ); + noteAck( vli2un( dis ), fromUDP ); break; // case XWPDEV_MSGFWDOTHERS: // Assert.assertTrue( 0 == dis.readByte() ); // protocol; means "invite", I guess. @@ -897,7 +937,7 @@ public class RelayService extends XWService byte[] data = new byte[packetLen]; System.arraycopy( packet.getData(), 0, data, 0, packetLen ); // DbgUtils.logf( "RelayService::gotPacket: %d bytes of data", packetLen ); - gotPacket( data, false ); + gotPacket( data, false, true ); } // gotPacket private boolean shouldRegister() @@ -1317,23 +1357,25 @@ public class RelayService extends XWService return nextPacketID; } - private static void noteAck( int packetID ) + private static void noteAck( int packetID, boolean fromUDP ) { PacketData packet; - synchronized( s_packetsSent ) { - packet = s_packetsSent.remove( packetID ); + Map map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb; + synchronized( map ) { + packet = map.remove( packetID ); if ( packet != null ) { - Log.d( TAG, "noteAck(): removed for id %d: %s", packetID, packet ); + Log.d( TAG, "noteAck(fromUDP=%b): removed for id %d: %s", + fromUDP, packetID, packet ); } else { Log.w( TAG, "Weird: got ack %d but never sent", packetID ); } if ( BuildConfig.DEBUG ) { ArrayList pstrs = new ArrayList<>(); - for ( Integer pkid : s_packetsSent.keySet() ) { - pstrs.add( s_packetsSent.get(pkid).toString() ); + for ( Integer pkid : map.keySet() ) { + pstrs.add( map.get(pkid).toString() ); } - Log.d( TAG, "noteAck(): Got ack for %d; there are %d unacked packets: %s", - packetID, s_packetsSent.size(), TextUtils.join( ",", pstrs ) ); + Log.d( TAG, "noteAck(fromUDP=%b): Got ack for %d; there are %d unacked packets: %s", + fromUDP, packetID, map.size(), TextUtils.join( ",", pstrs ) ); } } } @@ -1540,6 +1582,7 @@ public class RelayService extends XWService public byte[] m_header; public int m_packetID; private long m_created; + private boolean m_useWeb; public PacketData() { m_bas = null; @@ -1560,6 +1603,9 @@ public class RelayService extends XWService System.currentTimeMillis() - m_created ); } + void setForWeb() { m_useWeb = true; } + boolean getForWeb() { return m_useWeb; } + public boolean isEOQ() { return 0 == getLength(); } public int getLength()