send an array of packets to post()

Somethimes there are several on the queue, so why not send all at
once. Note: this will break the linux implementation which I'll fix next.
This commit is contained in:
Eric House 2017-11-11 10:47:26 -08:00
parent b9800b22f5
commit f428538afc
2 changed files with 78 additions and 41 deletions

View file

@ -57,6 +57,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
public class RelayService extends XWService
@ -613,35 +614,31 @@ public class RelayService extends XWService
public void run() {
Log.i( TAG, "write thread starting" );
for ( ; ; ) {
PacketData outData;
List<PacketData> dataList = null;
try {
outData = m_queue.take();
PacketData outData = m_queue.take();
if ( null != outData ) {
dataList = new ArrayList<>();
dataList.add(outData);
m_queue.drainTo(dataList);
}
Log.d( TAG, "got %d packets; %d more left", dataList.size(), m_queue.size());
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
break;
}
if ( null == outData
|| 0 == outData.getLength() ) {
if ( null == dataList ) {
Log.i( TAG, "stopping write thread" );
break;
}
int sentLen;
byte[] data = outData.assemble();
if ( XWPrefs.getPreferWebAPI( RelayService.this ) ) {
sentLen = sendViaWeb( data );
sentLen = sendViaWeb( dataList );
} else {
sentLen = sendViaUDP( data );
sentLen = sendViaUDP( dataList );
}
int pid = outData.m_packetID;
Log.d( TAG, "Sent udp packet, cmd=%s, id=%d,"
+ " of length %d",
outData.m_cmd.toString(),
pid, sentLen);
synchronized( s_packetsSent ) {
s_packetsSent.add( pid );
}
resetExitTimer();
ConnStatusHandler.showSuccessOut();
}
@ -655,16 +652,28 @@ public class RelayService extends XWService
}
}
private int sendViaWeb( byte[] data )
private int sendViaWeb( List<PacketData> packets )
{
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
int sentLen = 0;
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
String b64Data = Utils.base64Encode(data);
params.put( "data", b64Data );
HttpURLConnection conn = NetUtils.makeHttpRelayConn(this, "post");
params.put( "data", dataArray );
HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" );
String result = NetUtils.runConn(conn, params);
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
Log.d( TAG, "sendViaWeb(): got %d replies", resData.length() );
noteSent( packets ); // before we process the acks below :-)
for ( int ii = 0; ii < resData.length(); ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
@ -673,33 +682,59 @@ public class RelayService extends XWService
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
return data.length;
return sentLen;
}
private int sendViaUDP( byte[] data )
private int sendViaUDP( List<PacketData> packets )
{
int sentLen = -1;
try {
DatagramPacket outPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( outPacket );
sentLen = outPacket.getLength();
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force"
+ " new socket" );
m_handler.post( new Runnable() {
public void run() {
stopUDPThreadsIf();
}
} );
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
int sentLen = 0;
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
noteSent( packet );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force"
+ " new socket" );
m_handler.post( new Runnable() {
public void run() {
stopUDPThreadsIf();
}
} );
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
}
}
return sentLen;
}
private void noteSent( PacketData packet )
{
int pid = packet.m_packetID;
Log.d( TAG, "Sent [udp?] packet: cmd=%s, id=%d",
packet.m_cmd.toString(), pid);
synchronized( s_packetsSent ) {
s_packetsSent.add( pid );
}
}
private void noteSent( List<PacketData> packets )
{
for ( PacketData packet : packets ) {
noteSent( packet );
}
}
private void stopUDPThreadsIf()
{
if ( null != m_UDPWriteThread ) {

View file

@ -144,12 +144,14 @@ def kill(req, params):
def post(req, params, timeoutSecs = 1.0):
err = 'none'
params = json.loads(params)
data = base64.b64decode(params['data'])
data = params['data']
binData = [base64.b64decode(datum) for datum in data]
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.settimeout(float(timeoutSecs)) # seconds
addr = ("127.0.0.1", 10997)
udpSock.sendto(data, addr)
for binDatum in binData:
udpSock.sendto(binDatum, addr)
responses = []
while True: