try send via udp, then web

Send each packet via UDP if that's thought to be working (always is,
now) and start a 10-second timer. If it hasn't been ack'd by then,
resend via Web API. Tested by configuring to use a UDP socket that the
relay isn't listening on. Only problem is that the backoff timers are
broken: never stops sending every few seconds.
This commit is contained in:
Eric House 2017-12-15 07:12:14 -08:00
parent c7a635285c
commit 2db67ed339

View file

@ -99,7 +99,8 @@ public class RelayService extends XWService
private static final String ROWID = "ROWID";
private static final String BINBUFFER = "BINBUFFER";
private static Map<Integer, PacketData> s_packetsSent = new HashMap<>();
private static Map<Integer, PacketData> s_packetsSentUDP = new HashMap<>();
private static Map<Integer, PacketData> s_packetsSentWeb = new HashMap<>();
private static AtomicInteger s_nextPacketID = new AtomicInteger();
private static boolean s_gcmWorking = false;
private static boolean s_registered = false;
@ -119,6 +120,8 @@ public class RelayService extends XWService
private Runnable m_onInactivity;
private int m_maxIntervalSeconds = 0;
private long m_lastGamePacketReceived;
// m_nativeNotWorking: set to true if too many acks missed?
private boolean m_nativeNotWorking = false;
private static DevIDType s_curType = DevIDType.ID_TYPE_NONE;
private static long s_regStartTime = 0;
@ -411,7 +414,7 @@ public class RelayService extends XWService
byte[][][] msgss = expandMsgsArray( intent );
for ( byte[][] msgs : msgss ) {
for ( byte[] msg : msgs ) {
gotPacket( msg, true );
gotPacket( msg, true, false );
}
}
break;
@ -609,6 +612,15 @@ public class RelayService extends XWService
}
}
private boolean skipNativeSend()
{
boolean skip = m_nativeNotWorking;
if ( ! skip ) {
skip = XWPrefs.getSkipToWebAPI( RelayService.this );
}
return skip;
}
private void startWriteThread()
{
if ( null == m_UDPWriteThread ) {
@ -616,35 +628,35 @@ public class RelayService extends XWService
public void run() {
Log.i( TAG, "write thread starting" );
for ( ; ; ) {
List<PacketData> dataList = null;
boolean exitNow = false;
boolean useWeb = skipNativeSend();
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
try {
dataList = new ArrayList<>();
for ( PacketData outData = m_queue.take(); // blocks
null != outData;
outData = m_queue.poll() ) { // doesn't block
if ( outData.isEOQ() ) {
dataList = null;
exitNow = true;
break;
}
dataList.add(outData);
Log.d( TAG, "got %d packets; %d more left", dataList.size(),
m_queue.size());
if ( useWeb || outData.getForWeb() ) {
dataListWeb.add(outData);
} else {
dataListUDP.add(outData);
}
}
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
break;
}
if ( null == dataList ) {
if ( exitNow ) {
Log.i( TAG, "stopping write thread" );
break;
}
int sentLen;
if ( XWPrefs.getSkipToWebAPI( RelayService.this ) ) {
sentLen = sendViaWeb( dataList );
} else {
sentLen = sendViaUDP( dataList );
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
resetExitTimer();
ConnStatusHandler.showSuccessOut();
@ -663,44 +675,43 @@ public class RelayService extends XWService
{
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
int sentLen = 0;
HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet.isEOQ() );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn(conn, params);
if ( null != result ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets ); // before we process the acks below :-)
if ( nReplies > 0 ) {
resetExitTimer();
if ( packets.size() > 0 ) {
HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet.isEOQ() );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
gotPacket( datum, false );
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn(conn, params);
if ( null != result ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets, s_packetsSentWeb ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
return sentLen;
@ -717,7 +728,7 @@ public class RelayService extends XWService
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
noteSent( packet );
noteSent( packet, s_packetsSentUDP );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
@ -737,25 +748,54 @@ public class RelayService extends XWService
break;
}
}
if ( sentLen > 0 ) {
startAckTimer( packets );
}
return sentLen;
}
private void noteSent( PacketData packet )
private void startAckTimer( final List<PacketData> packets )
{
Runnable ackTimer = new Runnable() {
@Override
public void run() {
List<PacketData> forResend = new ArrayList<>();
Log.d( TAG, "ackTimer.run() called" );
synchronized ( s_packetsSentUDP ) {
for ( PacketData packet : packets ) {
PacketData stillThere = s_packetsSentUDP.remove(packet.m_packetID);
if ( stillThere != null ) {
Log.d( TAG, "packed %d not yet acked; resending",
stillThere.m_packetID );
stillThere.setForWeb();
forResend.add( stillThere );
}
}
}
m_queue.addAll( forResend );
}
};
m_handler.postDelayed( ackTimer, 10 * 1000 );
}
private void noteSent( PacketData packet, Map<Integer, PacketData> map )
{
int pid = packet.m_packetID;
Log.d( TAG, "Sent [udp?] packet: cmd=%s, id=%d",
packet.m_cmd.toString(), pid );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
synchronized( s_packetsSent ) {
s_packetsSent.put( pid, packet );
synchronized( map ) {
map.put( pid, packet );
}
}
}
private void noteSent( List<PacketData> packets )
private void noteSent( List<PacketData> packets, Map<Integer, PacketData> map )
{
for ( PacketData packet : packets ) {
noteSent( packet );
noteSent( packet, map );
}
}
@ -789,7 +829,7 @@ public class RelayService extends XWService
}
// MIGHT BE Running on reader thread
private void gotPacket( byte[] data, boolean skipAck )
private void gotPacket( byte[] data, boolean skipAck, boolean fromUDP )
{
boolean resetBackoff = false;
ByteArrayInputStream bis = new ByteArrayInputStream( data );
@ -868,7 +908,7 @@ public class RelayService extends XWService
startService( intent );
break;
case XWPDEV_ACK:
noteAck( vli2un( dis ) );
noteAck( vli2un( dis ), fromUDP );
break;
// case XWPDEV_MSGFWDOTHERS:
// Assert.assertTrue( 0 == dis.readByte() ); // protocol; means "invite", I guess.
@ -897,7 +937,7 @@ public class RelayService extends XWService
byte[] data = new byte[packetLen];
System.arraycopy( packet.getData(), 0, data, 0, packetLen );
// DbgUtils.logf( "RelayService::gotPacket: %d bytes of data", packetLen );
gotPacket( data, false );
gotPacket( data, false, true );
} // gotPacket
private boolean shouldRegister()
@ -1317,23 +1357,25 @@ public class RelayService extends XWService
return nextPacketID;
}
private static void noteAck( int packetID )
private static void noteAck( int packetID, boolean fromUDP )
{
PacketData packet;
synchronized( s_packetsSent ) {
packet = s_packetsSent.remove( packetID );
Map<Integer, PacketData> map = fromUDP ? s_packetsSentUDP : s_packetsSentWeb;
synchronized( map ) {
packet = map.remove( packetID );
if ( packet != null ) {
Log.d( TAG, "noteAck(): removed for id %d: %s", packetID, packet );
Log.d( TAG, "noteAck(fromUDP=%b): removed for id %d: %s",
fromUDP, packetID, packet );
} else {
Log.w( TAG, "Weird: got ack %d but never sent", packetID );
}
if ( BuildConfig.DEBUG ) {
ArrayList<String> pstrs = new ArrayList<>();
for ( Integer pkid : s_packetsSent.keySet() ) {
pstrs.add( s_packetsSent.get(pkid).toString() );
for ( Integer pkid : map.keySet() ) {
pstrs.add( map.get(pkid).toString() );
}
Log.d( TAG, "noteAck(): Got ack for %d; there are %d unacked packets: %s",
packetID, s_packetsSent.size(), TextUtils.join( ",", pstrs ) );
Log.d( TAG, "noteAck(fromUDP=%b): Got ack for %d; there are %d unacked packets: %s",
fromUDP, packetID, map.size(), TextUtils.join( ",", pstrs ) );
}
}
}
@ -1540,6 +1582,7 @@ public class RelayService extends XWService
public byte[] m_header;
public int m_packetID;
private long m_created;
private boolean m_useWeb;
public PacketData() {
m_bas = null;
@ -1560,6 +1603,9 @@ public class RelayService extends XWService
System.currentTimeMillis() - m_created );
}
void setForWeb() { m_useWeb = true; }
boolean getForWeb() { return m_useWeb; }
public boolean isEOQ() { return 0 == getLength(); }
public int getLength()