move ack timer off of ui thread

Seemed to be causing ANRs. Integrate instead into outgoing message queue
by using poll(timeout) then checking for unack'd packets every time
through the loop (but not more than once/3 seconds or so.)
This commit is contained in:
Eric House 2017-12-21 07:30:20 -08:00
parent af58110489
commit d134077d87

View file

@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class RelayService extends XWService
@ -402,7 +403,7 @@ public class RelayService extends XWService
cmd = null;
}
if ( null != cmd ) {
Log.d( TAG, "onStartCommand(): cmd=%s", cmd.toString() );
// Log.d( TAG, "onStartCommand(): cmd=%s", cmd.toString() );
switch( cmd ) {
case PROCESS_GAME_MSGS:
String[] relayIDs = new String[1];
@ -632,8 +633,10 @@ public class RelayService extends XWService
for ( ; ; ) {
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
for ( PacketData outData = m_queue.take(); // blocks
long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600;
for ( outData = m_queue.poll(ts, TimeUnit.SECONDS);
null != outData;
outData = m_queue.poll() ) { // doesn't block
if ( outData.isEOQ() ) {
@ -654,6 +657,8 @@ public class RelayService extends XWService
sendViaUDP( dataListUDP );
resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
}
Log.i( TAG, "write thread exiting" );
@ -686,15 +691,16 @@ public class RelayService extends XWService
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn(conn, params);
if ( null != result ) {
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets, s_packetsSentWeb ); // before we process the acks below :-)
noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
@ -703,7 +709,12 @@ public class RelayService extends XWService
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( this, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
@ -715,7 +726,7 @@ public class RelayService extends XWService
private int sendViaUDP( List<PacketData> packets )
{
int sentLen = 0;
long now = System.currentTimeMillis();
long nowMS = System.currentTimeMillis();
for ( PacketData packet : packets ) {
boolean getOut = true;
@ -725,8 +736,8 @@ public class RelayService extends XWService
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
noteSent( packet, s_packetsSentUDP );
packet.setSent( now );
noteSent( packet, true );
packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
@ -748,58 +759,71 @@ public class RelayService extends XWService
}
if ( sentLen > 0 ) {
startAckTimer( packets );
ConnStatusHandler.showSuccessOut();
}
return sentLen;
}
private void startAckTimer( final List<PacketData> packets )
private long m_lastRunMS = 0;
private void runUDPAckTimer()
{
Assert.assertTrue( packets.size() > 0 );
Runnable ackTimer = new Runnable() {
@Override
public void run() {
boolean resend = false;
// Log.d( TAG, "ackTimer.run() called for %d packets", packets.size() );
synchronized ( s_packetsSentUDP ) {
for ( PacketData packet : packets ) {
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
PacketData stillThere = s_packetsSentUDP.remove(packet.m_packetID);
if ( stillThere == null ) {
--m_nativeFailScore; // got an ack: decrement
} else {
++m_nativeFailScore; // FAILED: increment
resend = true; // if ANY fails, resend all
}
}
Log.d( TAG, "runUDPAckTimer()" );
long nowMS = System.currentTimeMillis();
if ( m_lastRunMS + 3000 > nowMS ) {
// Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" );
} else {
m_lastRunMS = nowMS;
long minSentMS = nowMS - 10000; // 10 seconds ago
List<PacketData> forResend = new ArrayList<>();
boolean foundNonAck = false;
synchronized ( s_packetsSentUDP ) {
Iterator<Map.Entry<Integer, PacketData>> iter;
for ( iter = s_packetsSentUDP.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, PacketData> entry = iter.next();
PacketData packet = entry.getValue();
if ( packet.getSentMS() < minSentMS ) {
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
++m_nativeFailScore;
}
// Log.d( TAG, "ackScore: %d", m_nativeFailScore );
}
if ( resend ) {
m_queue.addAll( packets );
iter.remove();
}
}
};
m_handler.postDelayed( ackTimer, 10 * 1000 );
Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining",
s_packetsSentUDP.size() );
}
if ( foundNonAck ) {
Log.d( TAG, "runUDPAckTimer(): reposting %d packets", forResend.size() );
m_queue.addAll( forResend );
}
}
}
private void noteSent( PacketData packet, Map<Integer, PacketData> map )
// So it's a map. The timer iterates over the whole map, which should
// never be *that* big, and pulls everything older than 10 seconds. If
// anything in that list isn't an ACK (since ACKs will always be there
// because they're not ACK'd) then the whole thing gets resent.
private void noteSent( PacketData packet, boolean fromUDP )
{
Map<Integer, PacketData> map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb;
int pid = packet.m_packetID;
Log.d( TAG, "Sent [udp?] packet: cmd=%s, id=%d",
packet.m_cmd.toString(), pid );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
if ( !fromUDP || packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
synchronized( map ) {
map.put( pid, packet );
}
}
}
private void noteSent( List<PacketData> packets, Map<Integer, PacketData> map )
private void noteSent( List<PacketData> packets, boolean fromUDP )
{
for ( PacketData packet : packets ) {
noteSent( packet, map );
noteSent( packet, fromUDP );
}
}
@ -1367,6 +1391,9 @@ public class RelayService extends XWService
if ( packet != null ) {
// Log.d( TAG, "noteAck(fromUDP=%b): removed for id %d: %s",
// fromUDP, packetID, packet );
if ( fromUDP ) {
--m_nativeFailScore;
}
} else {
Log.w( TAG, "Weird: got ack %d but never sent", packetID );
}
@ -1604,7 +1631,8 @@ public class RelayService extends XWService
System.currentTimeMillis() - m_created );
}
void setSent( long ms ) { m_sentUDP = ms; }
void setSentMS( long ms ) { m_sentUDP = ms; }
long getSentMS() { return m_sentUDP; }
boolean getForWeb() { return m_sentUDP != 0; }
public boolean isEOQ() { return 0 == getLength(); }