network connection stuff must not be run on main/UI thread, so to

preserve socket connection completing before startup of read and write
threads, move it and start of write thread to beginning of read
thread.
This commit is contained in:
Eric House 2013-07-16 08:01:59 -07:00
parent b7e00bfc0c
commit 403401745e

View file

@ -70,7 +70,8 @@ public class RelayService extends XWService {
private Thread m_UDPReadThread = null;
private Thread m_UDPWriteThread = null;
private DatagramSocket m_UDPSocket;
private LinkedBlockingQueue<DatagramPacket> m_queue = null;
private LinkedBlockingQueue<DatagramPacket> m_queue =
new LinkedBlockingQueue<DatagramPacket>();
// These must match the enum XWRelayReg in xwrelay.h
private static final int XWPDEV_PROTO_VERSION = 0;
@ -108,13 +109,13 @@ public class RelayService extends XWService {
context.startService( intent );
}
public static int sendPacket( Context context, long rowid, byte[] buf )
public static int sendPacket( Context context, long rowid, byte[] msg )
{
Intent intent = getIntentTo( context, SEND )
.putExtra( ROWID, rowid )
.putExtra( BINBUFFER, buf );
.putExtra( BINBUFFER, msg );
context.startService( intent );
return buf.length;
return msg.length;
}
// Exists to get incoming data onto the main thread
@ -260,41 +261,31 @@ public class RelayService extends XWService {
private void startUDPThreadsIfNot()
{
if ( XWPrefs.getUDPEnabled( this ) ) {
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
String host = XWPrefs.getDefaultRelayHost( RelayService.this );
try {
m_UDPSocket = new DatagramSocket();
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // remember this address
} catch( java.net.SocketException se ) {
DbgUtils.loge( se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
DbgUtils.loge( uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
DbgUtils.logf( "m_UDPSocket not null" );
}
if ( null == m_UDPReadThread ) {
m_UDPReadThread = new Thread( null, new Runnable() {
public void run() {
connectSocket(); // block until this is done
startWriteThread();
DbgUtils.logf( "read thread running" );
byte[] buf = new byte[1024];
for ( ; ; ) {
DatagramPacket packet =
new DatagramPacket( buf, buf.length );
try {
DbgUtils.logf( "UPD read thread blocking on receive" );
DbgUtils.logf( "UPD read thread blocking "
+ "on receive" );
m_UDPSocket.receive( packet );
DbgUtils.logf( "UPD read thread: receive returned" );
DbgUtils.logf( "UPD read thread: "
+ "receive returned" );
} catch( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
break; // ???
}
DbgUtils.logf( "received %d bytes", packet.getLength() );
DbgUtils.logf( "received %d bytes",
packet.getLength() );
gotPacket( packet );
}
DbgUtils.logf( "read thread exiting" );
@ -302,48 +293,78 @@ public class RelayService extends XWService {
}, getClass().getName() );
m_UDPReadThread.start();
} else {
DbgUtils.logf( "m_UDPReadThread not null and assumed to be running" );
DbgUtils.logf( "m_UDPReadThread not null and assumed to "
+ "be running" );
}
if ( null == m_UDPWriteThread ) {
m_queue = new LinkedBlockingQueue<DatagramPacket>();
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
DbgUtils.logf( "write thread running" );
for ( ; ; ) {
DatagramPacket outPacket;
try {
outPacket = m_queue.take();
} catch ( InterruptedException ie ) {
DbgUtils.logf( "RelayService; write thread killed" );
break;
}
if ( null == outPacket || 0 == outPacket.getLength() ) {
DbgUtils.logf( "stopping write thread" );
break;
}
DbgUtils.logf( "Sending udp packet of length %d",
outPacket.getLength() );
try {
m_UDPSocket.send( outPacket );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
DbgUtils.logf( "write thread exiting" );
}
}, getClass().getName() );
m_UDPWriteThread.start();
} else {
DbgUtils.logf( "m_UDPWriteThread not null and assumed to be running" );
}
}
// Some of this must not be done on main (UI) thread
private void connectSocket()
{
if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( this );
String host = XWPrefs.getDefaultRelayHost( this );
try {
m_UDPSocket = new DatagramSocket();
// put on background thread!!
InetAddress addr = InetAddress.getByName( host );
m_UDPSocket.connect( addr, port ); // remember this address
} catch( java.net.SocketException se ) {
DbgUtils.loge( se );
Assert.fail();
} catch( java.net.UnknownHostException uhe ) {
DbgUtils.loge( uhe );
}
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
DbgUtils.logf( "m_UDPSocket not null" );
}
}
private void startWriteThread()
{
if ( null == m_UDPWriteThread ) {
m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() {
DbgUtils.logf( "write thread running" );
for ( ; ; ) {
DatagramPacket outPacket;
try {
outPacket = m_queue.take();
} catch ( InterruptedException ie ) {
DbgUtils.logf( "RelayService; write thread "
+ "killed" );
break;
}
if ( null == outPacket
|| 0 == outPacket.getLength() ) {
DbgUtils.logf( "stopping write thread" );
break;
}
DbgUtils.logf( "Sending udp packet of length %d",
outPacket.getLength() );
try {
m_UDPSocket.send( outPacket );
} catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe );
}
}
DbgUtils.logf( "write thread exiting" );
}
}, getClass().getName() );
m_UDPWriteThread.start();
} else {
DbgUtils.logf( "m_UDPWriteThread not null and assumed to "
+ "be running" );
}
}
private void stopUDPThreadsIf()
{
DbgUtils.logf( "stopUDPThreadsIf" );
if ( null != m_queue && null != m_UDPWriteThread ) {
if ( null != m_UDPWriteThread ) {
// can't add null
m_queue.add( new DatagramPacket( new byte[0], 0 ) );
try {
@ -354,7 +375,7 @@ public class RelayService extends XWService {
DbgUtils.loge( ie );
}
m_UDPWriteThread = null;
m_queue = null;
m_queue.clear();
}
if ( null != m_UDPSocket && null != m_UDPReadThread ) {
m_UDPSocket.close();
@ -615,16 +636,17 @@ public class RelayService extends XWService {
byte[][] forOne = msgs[ii];
// if game has messages, open it and feed 'em to it.
if ( null == forOne ) {
// Nothing for this relayID
} else if ( BoardActivity.feedMessages( rowIDs[ii], forOne )
|| GameUtils.feedMessages( this, rowIDs[ii],
forOne, null,
sink ) ) {
idsWMsgs.add( relayIDs[ii] );
} else {
DbgUtils.logf( "message for %s (rowid %d) not consumed",
relayIDs[ii], rowIDs[ii] );
if ( null != forOne ) {
sink.setRowID( rowIDs[ii] );
if ( BoardActivity.feedMessages( rowIDs[ii], forOne )
|| GameUtils.feedMessages( this, rowIDs[ii],
forOne, null,
sink ) ) {
idsWMsgs.add( relayIDs[ii] );
} else {
DbgUtils.logf( "message for %s (rowid %d) not consumed",
relayIDs[ii], rowIDs[ii] );
}
}
}
if ( 0 < idsWMsgs.size() ) {