wip: get rid of join() from main thread

Getting ANRs because (I think) the main thread's waiting for the write
thread to die and now the write thread's doing a ton of work
sometimes. So move the threads into a standalone object that can be
allowed to die on its own time without anybody waiting.
This commit is contained in:
Eric House 2018-01-01 19:46:29 -08:00
parent ea985e2f7a
commit b24a004cab

View file

@ -112,12 +112,8 @@ public class RelayService extends XWService
private static long s_curNextTimer; private static long s_curNextTimer;
static { resetBackoffTimer(); } static { resetBackoffTimer(); }
private Thread m_fetchThread = null; private Thread m_fetchThread = null; // no longer used
private Thread m_UDPReadThread = null; private UDPThreads m_UDPThreads = null;
private Thread m_UDPWriteThread = null;
private DatagramSocket m_UDPSocket;
private LinkedBlockingQueue<PacketData> m_queue =
new LinkedBlockingQueue<PacketData>();
private Handler m_handler; private Handler m_handler;
private Runnable m_onInactivity; private Runnable m_onInactivity;
private int m_maxIntervalSeconds = 0; private int m_maxIntervalSeconds = 0;
@ -558,32 +554,9 @@ public class RelayService extends XWService
private void startUDPThreadsIfNot() private void startUDPThreadsIfNot()
{ {
if ( XWApp.UDP_ENABLED && relayEnabled( this ) ) { if ( XWApp.UDP_ENABLED && relayEnabled( this ) ) {
if ( null == m_UDPReadThread ) { if ( null == m_UDPThreads ) {
m_UDPReadThread = new Thread( null, new Runnable() { m_UDPThreads = new UDPThreads();
public void run() { m_UDPThreads.start();
connectSocket(); // block until this is done
startWriteThread();
Log.i( TAG, "read thread running" );
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
m_UDPSocket.receive( packet );
resetExitTimer();
gotPacket( packet );
} catch ( java.io.InterruptedIOException iioe ) {
// DbgUtils.logf( "FYI: udp receive timeout" );
} catch( java.io.IOException ioe ) {
break;
}
}
Log.i( TAG, "read thread exiting" );
}
}, getClass().getName() );
m_UDPReadThread.start();
} else { } else {
// Log.i( TAG, "m_UDPReadThread not null and assumed to be running" ); // Log.i( TAG, "m_UDPReadThread not null and assumed to be running" );
} }
@ -592,31 +565,6 @@ public class RelayService extends XWService
} }
} // startUDPThreadsIfNot } // startUDPThreadsIfNot
private void connectSocket()
{
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( this );
String host = XWPrefs.getDefaultRelayHost( this );
try {
m_UDPSocket = new DatagramSocket();
m_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // remember this address
Log.d( TAG, "connectSocket(%s:%d): m_UDPSocket now %H",
host, port, m_UDPSocket );
} catch( java.net.SocketException se ) {
Log.ex( TAG, se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
Log.ex( TAG, uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
Log.i( TAG, "m_UDPSocket not null" );
}
}
private boolean skipNativeSend() private boolean skipNativeSend()
{ {
boolean skip = m_nativeFailScore > UDP_FAIL_LIMIT || m_skipUPDSet; boolean skip = m_nativeFailScore > UDP_FAIL_LIMIT || m_skipUPDSet;
@ -624,194 +572,6 @@ public class RelayService extends XWService
return skip; return skip;
} }
private void startWriteThread()
{
if ( null == m_UDPWriteThread ) {
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
Log.i( TAG, "write thread starting" );
for ( boolean gotEOQ = false; !gotEOQ; ) {
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600;
Log.d( TAG, "blocking %d sec on poll()", ts );
for ( outData = m_queue.poll(ts, TimeUnit.SECONDS);
null != outData;
outData = m_queue.poll() ) { // doesn't block
if ( outData instanceof EOQPacketData ) {
gotEOQ = true;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add (outData );
} else {
dataListUDP.add( outData );
}
}
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
break;
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
}
Log.i( TAG, "write thread exiting" );
}
}, getClass().getName() );
m_UDPWriteThread.start();
} else {
Log.i( TAG, "m_UDPWriteThread not null and assumed to "
+ "be running" );
}
}
private int sendViaWeb( List<PacketData> packets )
{
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
int sentLen = 0;
if ( packets.size() > 0 ) {
HttpURLConnection conn = NetUtils.makeHttpRelayConn( this, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet instanceof EOQPacketData );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( this, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
return sentLen;
}
private int sendViaUDP( List<PacketData> packets )
{
int sentLen = 0;
if ( packets.size() > 0 ) {
noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( this, null,
CommsConnType.COMMS_CONN_RELAY,
true );
m_handler.post( new Runnable() {
public void run() {
stopUDPThreadsIf();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
}
}
ConnStatusHandler.updateStatus( this, null,
CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
}
return sentLen;
}
private long m_lastRunMS = 0;
private void runUDPAckTimer()
{
long nowMS = System.currentTimeMillis();
if ( m_lastRunMS + 3000 > nowMS ) { // never more frequently than 3 sec.
// Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" );
} else {
m_lastRunMS = nowMS;
long minSentMS = nowMS - 10000; // 10 seconds ago
long prevSentMS = 0;
List<PacketData> forResend = new ArrayList<>();
boolean foundNonAck = false;
synchronized ( s_packetsSentUDP ) {
Iterator<PacketData> iter;
for ( iter = s_packetsSentUDP.iterator(); iter.hasNext(); ) {
PacketData packet = iter.next();
long sentMS = packet.getSentMS();
Assert.assertTrue( prevSentMS <= sentMS );
prevSentMS = sentMS;
if ( sentMS > minSentMS ) {
break;
}
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
++m_nativeFailScore;
}
iter.remove();
}
Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining",
s_packetsSentUDP.size() );
}
if ( foundNonAck ) {
Log.d( TAG, "runUDPAckTimer(): reposting %d packets", forResend.size() );
m_queue.addAll( forResend );
}
}
}
// So it's a map. The timer iterates over the whole map, which should // So it's a map. The timer iterates over the whole map, which should
// never be *that* big, and pulls everything older than 10 seconds. If // never be *that* big, and pulls everything older than 10 seconds. If
// anything in that list isn't an ACK (since ACKs will always be there // anything in that list isn't an ACK (since ACKs will always be there
@ -848,28 +608,9 @@ public class RelayService extends XWService
{ {
DbgUtils.assertOnUIThread(); DbgUtils.assertOnUIThread();
if ( null != m_UDPWriteThread ) { if ( null != m_UDPThreads ) {
// can't add null m_UDPThreads.stop();
m_queue.add( new EOQPacketData() ); m_UDPThreads = null;
try {
Log.d( TAG, "joining m_UDPWriteThread" );
m_UDPWriteThread.join();
Log.d( TAG, "SUCCESSFULLY joined m_UDPWriteThread" );
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
m_UDPWriteThread = null;
m_queue.clear();
}
if ( null != m_UDPSocket && null != m_UDPReadThread ) {
m_UDPSocket.close();
try {
m_UDPReadThread.join();
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
m_UDPReadThread = null;
m_UDPSocket = null;
} }
} }
@ -1193,8 +934,8 @@ public class RelayService extends XWService
private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd ) private void postPacket( ByteArrayOutputStream bas, XWRelayReg cmd )
{ {
m_queue.add( new PacketData( bas, cmd ) );
startUDPThreadsIfNot(); startUDPThreadsIfNot();
m_UDPThreads.add( new PacketData( bas, cmd ) );
// 0 ok; thread will often have sent already! // 0 ok; thread will often have sent already!
// DbgUtils.logf( "postPacket() done; %d in queue", m_queue.size() ); // DbgUtils.logf( "postPacket() done; %d in queue", m_queue.size() );
} }
@ -1258,6 +999,277 @@ public class RelayService extends XWService
} }
} }
private class UDPThreads {
private DatagramSocket m_UDPSocket;
private LinkedBlockingQueue<PacketData> m_queue =
new LinkedBlockingQueue<PacketData>();
private Thread m_UDPReadThread;
private Thread m_UDPWriteThread;
UDPThreads() {}
void start()
{
m_UDPReadThread = new Thread( null, new Runnable() {
public void run() {
connectSocket(); // block until this is done
startWriteThread();
Log.i( TAG, "read thread running" );
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
m_UDPSocket.receive( packet );
resetExitTimer();
gotPacket( packet );
} catch ( java.io.InterruptedIOException iioe ) {
// DbgUtils.logf( "FYI: udp receive timeout" );
} catch( java.io.IOException ioe ) {
break;
}
}
Log.i( TAG, "read thread exiting" );
}
}, getClass().getName() );
m_UDPReadThread.start();
}
void stop()
{
m_queue.add( new EOQPacketData() ); // will kill the writer thread
}
void add( PacketData packet )
{
m_queue.add( packet );
}
private void connectSocket()
{
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
String host = XWPrefs.getDefaultRelayHost( RelayService.this );
try {
m_UDPSocket = new DatagramSocket();
m_UDPSocket.setSoTimeout(30 * 1000); // timeout so we can log
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // remember this address
Log.d( TAG, "connectSocket(%s:%d): m_UDPSocket now %H",
host, port, m_UDPSocket );
} catch( java.net.SocketException se ) {
Log.ex( TAG, se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
Log.ex( TAG, uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
Log.i( TAG, "m_UDPSocket not null" );
}
}
private void startWriteThread()
{
Assert.assertNull( m_UDPWriteThread );
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
Log.i( TAG, "write thread starting" );
for ( boolean gotEOQ = false; !gotEOQ; ) {
List<PacketData> dataListUDP = new ArrayList<>();
List<PacketData> dataListWeb = new ArrayList<>();
PacketData outData;
try {
long ts = s_packetsSentUDP.size() > 0 ? 10 : 3600;
Log.d( TAG, "blocking %d sec on poll()", ts );
for ( outData = m_queue.poll(ts, TimeUnit.SECONDS);
null != outData;
outData = m_queue.poll() ) { // doesn't block
if ( outData instanceof EOQPacketData ) {
gotEOQ = true;
break;
} else if ( skipNativeSend() || outData.getForWeb() ) {
dataListWeb.add (outData );
} else {
dataListUDP.add( outData );
}
}
} catch ( InterruptedException ie ) {
Log.w( TAG, "write thread killed" );
break;
}
sendViaWeb( dataListWeb );
sendViaUDP( dataListUDP );
resetExitTimer();
runUDPAckTimer();
ConnStatusHandler.showSuccessOut();
}
Log.i( TAG, "write thread killing read thread" );
// now kill the read thread
m_UDPSocket.close();
try {
m_UDPReadThread.join();
} catch( java.lang.InterruptedException ie ) {
Log.ex( TAG, ie );
}
Log.i( TAG, "write thread exiting" );
}
}, getClass().getName() );
m_UDPWriteThread.start();
}
private int sendViaWeb( List<PacketData> packets )
{
Log.d( TAG, "sendViaWeb(): sending %d at once", packets.size() );
int sentLen = 0;
if ( packets.size() > 0 ) {
HttpURLConnection conn = NetUtils.makeHttpRelayConn( RelayService.this, "post" );
if ( null == conn ) {
Log.e( TAG, "sendViaWeb(): null conn for POST" );
} else {
try {
JSONArray dataArray = new JSONArray();
for ( PacketData packet : packets ) {
Assert.assertFalse( packet instanceof EOQPacketData );
byte[] datum = packet.assemble();
dataArray.put( Utils.base64Encode(datum) );
sentLen += datum.length;
}
JSONObject params = new JSONObject();
params.put( "data", dataArray );
String result = NetUtils.runConn( conn, params );
boolean succeeded = null != result;
if ( succeeded ) {
Log.d( TAG, "sendViaWeb(): POST(%s) => %s", params, result );
JSONObject resultObj = new JSONObject( result );
JSONArray resData = resultObj.getJSONArray( "data" );
int nReplies = resData.length();
// Log.d( TAG, "sendViaWeb(): got %d replies", nReplies );
noteSent( packets, false ); // before we process the acks below :-)
for ( int ii = 0; ii < nReplies; ++ii ) {
byte[] datum = Utils.base64Decode( resData.getString( ii ) );
// PENDING: skip ack or not
gotPacket( datum, false, false );
}
} else {
Log.e( TAG, "sendViaWeb(): failed result for POST" );
}
ConnStatusHandler.updateStatus( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
succeeded );
} catch ( JSONException ex ) {
Assert.assertFalse( BuildConfig.DEBUG );
}
}
}
return sentLen;
}
private int sendViaUDP( List<PacketData> packets )
{
int sentLen = 0;
if ( packets.size() > 0 ) {
noteSent( packets, true );
for ( PacketData packet : packets ) {
boolean getOut = true;
byte[] data = packet.assemble();
try {
DatagramPacket udpPacket = new DatagramPacket( data, data.length );
m_UDPSocket.send( udpPacket );
sentLen += udpPacket.getLength();
// packet.setSentMS( nowMS );
getOut = false;
} catch ( java.net.SocketException se ) {
Log.ex( TAG, se );
Log.i( TAG, "Restarting threads to force new socket" );
ConnStatusHandler.updateStatusOut( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
true );
m_handler.post( new Runnable() {
public void run() {
stopUDPThreadsIf();
}
} );
break;
} catch ( java.io.IOException ioe ) {
Log.ex( TAG, ioe );
} catch ( NullPointerException npe ) {
Log.w( TAG, "network problem; dropping packet" );
}
if ( getOut ) {
break;
}
}
ConnStatusHandler.updateStatus( RelayService.this, null,
CommsConnType.COMMS_CONN_RELAY,
sentLen > 0 );
}
return sentLen;
}
private long m_lastRunMS = 0;
private void runUDPAckTimer()
{
long nowMS = System.currentTimeMillis();
if ( m_lastRunMS + 3000 > nowMS ) { // never more frequently than 3 sec.
// Log.d( TAG, "runUDPAckTimer(): too soon, so skipping" );
} else {
m_lastRunMS = nowMS;
long minSentMS = nowMS - 10000; // 10 seconds ago
long prevSentMS = 0;
List<PacketData> forResend = new ArrayList<>();
boolean foundNonAck = false;
synchronized ( s_packetsSentUDP ) {
Iterator<PacketData> iter;
for ( iter = s_packetsSentUDP.iterator(); iter.hasNext(); ) {
PacketData packet = iter.next();
long sentMS = packet.getSentMS();
Assert.assertTrue( prevSentMS <= sentMS );
prevSentMS = sentMS;
if ( sentMS > minSentMS ) {
break;
}
forResend.add( packet );
if ( packet.m_cmd != XWRelayReg.XWPDEV_ACK ) {
foundNonAck = true;
++m_nativeFailScore;
}
iter.remove();
}
Log.d( TAG, "runUDPAckTimer(): %d too-new packets remaining",
s_packetsSentUDP.size() );
}
if ( foundNonAck ) {
Log.d( TAG, "runUDPAckTimer(): reposting %d packets", forResend.size() );
m_queue.addAll( forResend );
}
}
}
}
private static class AsyncSender extends AsyncTask<Void, Void, Void> { private static class AsyncSender extends AsyncTask<Void, Void, Void> {
private Context m_context; private Context m_context;
private HashMap<String,ArrayList<byte[]>> m_msgHash; private HashMap<String,ArrayList<byte[]>> m_msgHash;