diff --git a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java index ee01c6b4b..24f17a8e2 100644 --- a/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java +++ b/xwords4/android/app/src/main/java/org/eehouse/android/xw4/RelayService.java @@ -60,6 +60,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class RelayService extends XWService @@ -402,7 +403,7 @@ public class RelayService extends XWService cmd = null; } if ( null != cmd ) { - Log.d( TAG, "onStartCommand(): cmd=%s", cmd.toString() ); + // Log.d( TAG, "onStartCommand(): cmd=%s", cmd.toString() ); switch( cmd ) { case PROCESS_GAME_MSGS: String[] relayIDs = new String[1]; @@ -632,8 +633,10 @@ public class RelayService extends XWService for ( ; ; ) { List dataListUDP = new ArrayList<>(); List dataListWeb = new ArrayList<>(); + PacketData outData; try { - for ( PacketData outData = m_queue.take(); // blocks + long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600; + for ( outData = m_queue.poll(ts, TimeUnit.SECONDS); null != outData; outData = m_queue.poll() ) { // doesn't block if ( outData.isEOQ() ) { @@ -654,6 +657,8 @@ public class RelayService extends XWService sendViaUDP( dataListUDP ); resetExitTimer(); + runUDPAckTimer(); + ConnStatusHandler.showSuccessOut(); } Log.i( TAG, "write thread exiting" ); @@ -686,15 +691,16 @@ public class RelayService extends XWService JSONObject params = new JSONObject(); params.put( "data", dataArray ); - String result = NetUtils.runConn(conn, params); - if ( null != result ) { + String result = NetUtils.runConn( conn, params ); + boolean succeeded = null != result; + if ( succeeded ) { Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result ); JSONObject resultObj = new JSONObject( result ); JSONArray resData = resultObj.getJSONArray( "data" ); int nReplies = resData.length(); // Log.d( TAG, "sendViaWeb(): got %d replies", nReplies ); - noteSent( packets, s_packetsSentWeb ); // before we process the acks below :-) + noteSent( packets, false ); // before we process the acks below :-) for ( int ii = 0; ii < nReplies; ++ii ) { byte[] datum = Utils.base64Decode( resData.getString( ii ) ); @@ -703,7 +709,12 @@ public class RelayService extends XWService } } else { Log.e( TAG, "sendViaWeb(): failed result for POST" ); + } + + ConnStatusHandler.updateStatus( this, null, + CommsConnType.COMMS_CONN_RELAY, + succeeded ); } catch ( JSONException ex ) { Assert.assertFalse( BuildConfig.DEBUG ); } @@ -715,7 +726,7 @@ public class RelayService extends XWService private int sendViaUDP( List packets ) { int sentLen = 0; - long now = System.currentTimeMillis(); + long nowMS = System.currentTimeMillis(); for ( PacketData packet : packets ) { boolean getOut = true; @@ -725,8 +736,8 @@ public class RelayService extends XWService m_UDPSocket.send( udpPacket ); sentLen += udpPacket.getLength(); - noteSent( packet, s_packetsSentUDP ); - packet.setSent( now ); + noteSent( packet, true ); + packet.setSentMS( nowMS ); getOut = false; } catch ( java.net.SocketException se ) { Log.ex( TAG, se ); @@ -748,58 +759,71 @@ public class RelayService extends XWService } if ( sentLen > 0 ) { - startAckTimer( packets ); + ConnStatusHandler.showSuccessOut(); } return sentLen; } - private void startAckTimer( final List packets ) + private long m_lastRunMS = 0; + private void runUDPAckTimer() { - Assert.assertTrue( packets.size() > 0 ); - Runnable ackTimer = new Runnable() { - @Override - public void run() { - boolean resend = false; - // Log.d( TAG, "ackTimer.run() called for %d packets", packets.size() ); - synchronized ( s_packetsSentUDP ) { - for ( PacketData packet : packets ) { - if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { - PacketData stillThere = s_packetsSentUDP.remove(packet.m_packetID); - if ( stillThere == null ) { - --m_nativeFailScore; // got an ack: decrement - } else { - ++m_nativeFailScore; // FAILED: increment - resend = true; // if ANY fails, resend all - } - } + Log.d( TAG, "runUDPAckTimer()" ); + long nowMS = System.currentTimeMillis(); + if ( m_lastRunMS + 3000 > nowMS ) { + // Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" ); + } else { + m_lastRunMS = nowMS; + + long minSentMS = nowMS - 10000; // 10 seconds ago + List forResend = new ArrayList<>(); + boolean foundNonAck = false; + synchronized ( s_packetsSentUDP ) { + Iterator> iter; + for ( iter = s_packetsSentUDP.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry entry = iter.next(); + PacketData packet = entry.getValue(); + if ( packet.getSentMS() < minSentMS ) { + forResend.add( packet ); + if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { + foundNonAck = true; + ++m_nativeFailScore; } - // Log.d( TAG, "ackScore: %d", m_nativeFailScore ); - } - if ( resend ) { - m_queue.addAll( packets ); + iter.remove(); } } - }; - m_handler.postDelayed( ackTimer, 10 * 1000 ); + Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining", + s_packetsSentUDP.size() ); + } + if ( foundNonAck ) { + Log.d( TAG, "runUDPAckTimer(): reposting %d packets", forResend.size() ); + m_queue.addAll( forResend ); + } + } } - private void noteSent( PacketData packet, Map map ) + // So it's a map. The timer iterates over the whole map, which should + // never be *that* big, and pulls everything older than 10 seconds. If + // anything in that list isn't an ACK (since ACKs will always be there + // because they're not ACK'd) then the whole thing gets resent. + + private void noteSent( PacketData packet, boolean fromUDP ) { + Map map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb; int pid = packet.m_packetID; Log.d( TAG, "Sent [udp?] packet: cmd=%s, id=%d", packet.m_cmd.toString(), pid ); - if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { + if ( !fromUDP || packet.m_cmd != XWRelayReg.XWPDEV_ACK ) { synchronized( map ) { map.put( pid, packet ); } } } - private void noteSent( List packets, Map map ) + private void noteSent( List packets, boolean fromUDP ) { for ( PacketData packet : packets ) { - noteSent( packet, map ); + noteSent( packet, fromUDP ); } } @@ -1367,6 +1391,9 @@ public class RelayService extends XWService if ( packet != null ) { // Log.d( TAG, "noteAck(fromUDP=%b): removed for id %d: %s", // fromUDP, packetID, packet ); + if ( fromUDP ) { + --m_nativeFailScore; + } } else { Log.w( TAG, "Weird: got ack %d but never sent", packetID ); } @@ -1604,7 +1631,8 @@ public class RelayService extends XWService System.currentTimeMillis() - m_created ); } - void setSent( long ms ) { m_sentUDP = ms; } + void setSentMS( long ms ) { m_sentUDP = ms; } + long getSentMS() { return m_sentUDP; } boolean getForWeb() { return m_sentUDP != 0; } public boolean isEOQ() { return 0 == getLength(); }