try to speed up delivery when udp not working

Having reconfigured to use non-existent relay port as a test of falling
back to the web apis, tweak stuff: send the packets that have been
accumulated when an EOQ is found (rather than dropping all of them
immediately) before exiting the write thread; and start the threads up
when posting a packet in case they aren't (they may not be when the post
happens via timer firing.)
This commit is contained in:
Eric House 2017-12-27 15:03:28 -08:00
parent 9fd4a90ccd
commit 5e20e54638

View file

@ -629,20 +629,20 @@ public class RelayService extends XWService
m_UDPWriteThread = new Thread( null, new Runnable() { m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() { public void run() {
Log.i( TAG, "write thread starting" ); Log.i( TAG, "write thread starting" );
outer: for ( boolean gotEOQ = false; !gotEOQ; ) {
for ( ; ; ) {
List<PacketData> dataListUDP = new ArrayList<>(); List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>(); List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData; PacketData outData;
try { try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600; long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600;
Log.d( TAG, "blocking %d sec on poll()", ts );
for ( outData = m_queue.poll(ts, TimeUnit.SECONDS); for ( outData = m_queue.poll(ts, TimeUnit.SECONDS);
null != outData; null != outData;
outData = m_queue.poll() ) { // doesn't block outData = m_queue.poll() ) { // doesn't block
if ( outData.isEOQ() ) { if ( outData instanceof EOQPacketData ) {
break outer; gotEOQ = true;
} break;
if ( skipNativeSend() || outData.getForWeb() ) { } else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add (outData ); dataListWeb.add (outData );
} else { } else {
dataListUDP.add( outData ); dataListUDP.add( outData );
@ -683,7 +683,7 @@ public class RelayService extends XWService
try { try {
JSONArray dataArray = new JSONArray(); JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) { for ( PacketData packet : packets ) {
Assert.assertFalse( packet.isEOQ() ); Assert.assertFalse( packet instanceof EOQPacketData );
byte[] datum = packet.assemble(); byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) ); dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length; sentLen += datum.length;
@ -726,8 +726,9 @@ public class RelayService extends XWService
private int sendViaUDP( List<PacketData> packets ) private int sendViaUDP( List<PacketData> packets )
{ {
int sentLen = 0; int sentLen = 0;
long nowMS = System.currentTimeMillis();
if ( packets.size() > 0 ) {
noteSent( packets, true );
for ( PacketData packet : packets ) { for ( PacketData packet : packets ) {
boolean getOut = true; boolean getOut = true;
byte[] data = packet.assemble(); byte[] data = packet.assemble();
@ -736,18 +737,21 @@ public class RelayService extends XWService
m_UDPSocket.send( udpPacket ); m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength(); sentLen += udpPacket.getLength();
noteSent( packet, true ); // packet.setSentMS( nowMS );
packet.setSentMS( nowMS );
getOut = false; getOut = false;
} catch ( java.net.SocketException se ) { } catch ( java.net.SocketException se ) {
Log.ex( TAG, se ); Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force" Log.i( TAG, "Restarting threads to force new socket" );
+ " new socket" ); ConnStatusHandler.updateStatusOut( this, null,
CommsConnType.COMMS_CONN_RELAY,
true );
m_handler.post( new Runnable() { m_handler.post( new Runnable() {
public void run() { public void run() {
stopUDPThreadsIf(); stopUDPThreadsIf();
} }
} ); } );
break;
} catch ( java.io.IOException ioe ) { } catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe ); Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) { } catch ( NullPointerException npe ) {
@ -758,8 +762,9 @@ public class RelayService extends XWService
} }
} }
if ( sentLen > 0 ) { ConnStatusHandler.updateStatus( this, null,
ConnStatusHandler.showSuccessOut(); CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
} }
return sentLen; return sentLen;
@ -825,10 +830,14 @@ public class RelayService extends XWService
private void noteSent( List<PacketData> packets, boolean fromUDP ) private void noteSent( List<PacketData> packets, boolean fromUDP )
{ {
long nowMS = System.currentTimeMillis();
List<PacketData> map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb; List<PacketData> map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb;
Log.d( TAG, "noteSent(fromUDP=%b): adding %d; size before: %d", Log.d( TAG, "noteSent(fromUDP=%b): adding %d; size before: %d",
fromUDP, packets.size(), map.size() ); fromUDP, packets.size(), map.size() );
for ( PacketData packet : packets ) { for ( PacketData packet : packets ) {
if ( fromUDP ) {
packet.setSentMS( nowMS );
}
noteSent( packet, fromUDP ); noteSent( packet, fromUDP );
} }
Log.d( TAG, "noteSent(fromUDP=%b): size after: %d", fromUDP, map.size() ); Log.d( TAG, "noteSent(fromUDP=%b): size after: %d", fromUDP, map.size() );
@ -840,7 +849,7 @@ public class RelayService extends XWService
if ( null != m_UDPWriteThread ) { if ( null != m_UDPWriteThread ) {
// can't add null // can't add null
m_queue.add( new PacketData() ); m_queue.add( new EOQPacketData() );
try { try {
Log.d( TAG, "joining m_UDPWriteThread" ); Log.d( TAG, "joining m_UDPWriteThread" );
m_UDPWriteThread.join(); m_UDPWriteThread.join();
@ -1184,6 +1193,7 @@ public class RelayService extends XWService
private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd ) private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd )
{ {
m_queue.add( new PacketData( bas, cmd ) ); m_queue.add( new PacketData( bas, cmd ) );
startUDPThreadsIfNot();
// 0 ok; thread will often have sent already! // 0 ok; thread will often have sent already!
// DbgUtils.logf( "postPacket() done; %d in queue", m_queue.size() ); // DbgUtils.logf( "postPacket() done; %d in queue", m_queue.size() );
} }
@ -1424,6 +1434,13 @@ public class RelayService extends XWService
fromUDP, packetID, map.size(), TextUtils.join( ",", pstrs ) ); fromUDP, packetID, map.size(), TextUtils.join( ",", pstrs ) );
} }
} }
// If we get an ACK, things are working, even if it's not found above
// (which would be the case for an ACK sent via web, which we don't
// save.)
ConnStatusHandler.updateStatus( this, null,
CommsConnType.COMMS_CONN_RELAY,
true );
} }
// Called from any thread // Called from any thread
@ -1535,7 +1552,7 @@ public class RelayService extends XWService
result = figureBackoffSeconds(); result = figureBackoffSeconds();
} }
Log.d( TAG, "getMaxIntervalSeconds() => %d", result ); Log.d( TAG, "getMaxIntervalSeconds() => %d", result ); // WFT? went from 40 to 1000
return result; return result;
} }
@ -1630,14 +1647,10 @@ public class RelayService extends XWService
private long m_created; private long m_created;
private long m_sentUDP; private long m_sentUDP;
public PacketData() { private PacketData() {}
m_bas = null;
m_created = System.currentTimeMillis();
}
public PacketData( ByteArrayOutputStream bas, XWRelayReg cmd ) public PacketData( ByteArrayOutputStream bas, XWRelayReg cmd )
{ {
this();
m_bas = bas; m_bas = bas;
m_cmd = cmd; m_cmd = cmd;
} }
@ -1653,8 +1666,6 @@ public class RelayService extends XWService
long getSentMS() { return m_sentUDP; } long getSentMS() { return m_sentUDP; }
boolean getForWeb() { return m_sentUDP != 0; } boolean getForWeb() { return m_sentUDP != 0; }
public boolean isEOQ() { return 0 == getLength(); }
public int getLength() public int getLength()
{ {
int result = 0; int result = 0;
@ -1693,4 +1704,7 @@ public class RelayService extends XWService
} }
} }
} }
// Exits only to exist, so instanceof can distinguish
private class EOQPacketData extends PacketData {}
} }