always try starting UDP threads in case Service has been stopped;

implement transportSend in RelayMsgSink since relay-connection packets
need it.
This commit is contained in:
Eric House 2013-01-25 06:22:37 -08:00
parent c8b3050c8d
commit f91af77438

View file

@ -38,9 +38,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import junit.framework.Assert; import junit.framework.Assert;
import org.eehouse.android.xw4.MultiService.MultiEvent;
import org.eehouse.android.xw4.jni.CommsAddrRec;
import org.eehouse.android.xw4.jni.GameSummary; import org.eehouse.android.xw4.jni.GameSummary;
import org.eehouse.android.xw4.jni.UtilCtxt; import org.eehouse.android.xw4.jni.UtilCtxt;
import org.eehouse.android.xw4.MultiService.MultiEvent;
public class RelayService extends XWService { public class RelayService extends XWService {
private static final int MAX_SEND = 1024; private static final int MAX_SEND = 1024;
@ -149,7 +150,7 @@ public class RelayService extends XWService {
DbgUtils.logf( "RelayService::onStartCommand::UDP_CHANGED" ); DbgUtils.logf( "RelayService::onStartCommand::UDP_CHANGED" );
if ( XWPrefs.getUDPEnabled( this ) ) { if ( XWPrefs.getUDPEnabled( this ) ) {
stopFetchThreadIf(); stopFetchThreadIf();
startUDPThreads(); startUDPThreadsIfNot();
registerWithRelay(); registerWithRelay();
} else { } else {
stopUDPThreadsIf(); stopUDPThreadsIf();
@ -158,6 +159,7 @@ public class RelayService extends XWService {
break; break;
case SEND: case SEND:
case RECEIVE: case RECEIVE:
startUDPThreadsIfNot();
long rowid = intent.getLongExtra( ROWID, -1 ); long rowid = intent.getLongExtra( ROWID, -1 );
byte[] msg = intent.getByteArrayExtra( BINBUFFER ); byte[] msg = intent.getByteArrayExtra( BINBUFFER );
if ( SEND == cmd ) { if ( SEND == cmd ) {
@ -220,87 +222,86 @@ public class RelayService extends XWService {
} }
} }
private void startUDPThreads() private void startUDPThreadsIfNot()
{ {
DbgUtils.logf( "startUDPThreads" ); if ( XWPrefs.getUDPEnabled( this ) ) {
Assert.assertTrue( XWPrefs.getUDPEnabled( this ) ); if ( null == m_UDPSocket ) {
int port = XWPrefs.getDefaultRelayPort( RelayService.this );
if ( null == m_UDPSocket ) { String host = XWPrefs.getDefaultRelayHost( RelayService.this );
int port = XWPrefs.getDefaultRelayPort( RelayService.this ); try {
String host = XWPrefs.getDefaultRelayHost( RelayService.this ); m_UDPSocket = new DatagramSocket();
try { InetAddress addr = InetAddress.getByName( host );
m_UDPSocket = new DatagramSocket(); m_UDPSocket.connect( addr, port ); // remember this address
InetAddress addr = InetAddress.getByName( host ); } catch( java.net.SocketException se ) {
m_UDPSocket.connect( addr, port ); // meaning: remember this address DbgUtils.loge( se );
} catch( java.net.SocketException se ) { Assert.fail();
DbgUtils.loge( se ); } catch( java.net.UnknownHostException uhe ) {
Assert.fail(); DbgUtils.loge( uhe );
} catch( java.net.UnknownHostException uhe ) { }
DbgUtils.loge( uhe ); } else {
Assert.assertTrue( m_UDPSocket.isConnected() );
DbgUtils.logf( "m_UDPSocket not null" );
} }
} else {
Assert.assertTrue( m_UDPSocket.isConnected() );
DbgUtils.logf( "m_UDPSocket not null" );
}
if ( null == m_UDPReadThread ) { if ( null == m_UDPReadThread ) {
m_UDPReadThread = new Thread( null, new Runnable() { m_UDPReadThread = new Thread( null, new Runnable() {
public void run() { public void run() {
DbgUtils.logf( "read thread running" ); DbgUtils.logf( "read thread running" );
byte[] buf = new byte[1024]; byte[] buf = new byte[1024];
for ( ; ; ) { for ( ; ; ) {
DatagramPacket packet = DatagramPacket packet =
new DatagramPacket( buf, buf.length ); new DatagramPacket( buf, buf.length );
try { try {
DbgUtils.logf( "UPD read thread blocking on receive" ); DbgUtils.logf( "UPD read thread blocking on receive" );
m_UDPSocket.receive( packet ); m_UDPSocket.receive( packet );
DbgUtils.logf( "UPD read thread: receive returned" ); DbgUtils.logf( "UPD read thread: receive returned" );
} catch( java.io.IOException ioe ) { } catch( java.io.IOException ioe ) {
DbgUtils.loge( ioe ); DbgUtils.loge( ioe );
break; // ??? break; // ???
}
DbgUtils.logf( "received %d bytes", packet.getLength() );
gotPacket( packet );
} }
DbgUtils.logf( "received %d bytes", packet.getLength() ); DbgUtils.logf( "read thread exiting" );
gotPacket( packet );
} }
DbgUtils.logf( "read thread exiting" ); }, getClass().getName() );
} m_UDPReadThread.start();
}, getClass().getName() ); } else {
m_UDPReadThread.start(); DbgUtils.logf( "m_UDPReadThread not null and assumed to be running" );
} else { }
DbgUtils.logf( "m_UDPReadThread not null and assumed to be running" );
}
if ( null == m_UDPWriteThread ) { if ( null == m_UDPWriteThread ) {
m_queue = new LinkedBlockingQueue<DatagramPacket>(); m_queue = new LinkedBlockingQueue<DatagramPacket>();
m_UDPWriteThread = new Thread( null, new Runnable() { m_UDPWriteThread = new Thread( null, new Runnable() {
public void run() { public void run() {
DbgUtils.logf( "write thread running" ); DbgUtils.logf( "write thread running" );
for ( ; ; ) { for ( ; ; ) {
DatagramPacket outPacket; DatagramPacket outPacket;
try { try {
outPacket = m_queue.take(); outPacket = m_queue.take();
} catch ( InterruptedException ie ) { } catch ( InterruptedException ie ) {
DbgUtils.logf( "RelayService; write thread killed" ); DbgUtils.logf( "RelayService; write thread killed" );
break; break;
} }
if ( null == outPacket || 0 == outPacket.getLength() ) { if ( null == outPacket || 0 == outPacket.getLength() ) {
DbgUtils.logf( "stopping write thread" ); DbgUtils.logf( "stopping write thread" );
break; break;
} }
DbgUtils.logf( "Sending udp packet of length %d", DbgUtils.logf( "Sending udp packet of length %d",
outPacket.getLength() ); outPacket.getLength() );
try { try {
m_UDPSocket.send( outPacket ); m_UDPSocket.send( outPacket );
} catch ( java.io.IOException ioe ) { } catch ( java.io.IOException ioe ) {
DbgUtils.loge( ioe ); DbgUtils.loge( ioe );
}
} }
DbgUtils.logf( "write thread exiting" );
} }
DbgUtils.logf( "write thread exiting" ); }, getClass().getName() );
} m_UDPWriteThread.start();
}, getClass().getName() ); } else {
m_UDPWriteThread.start(); DbgUtils.logf( "m_UDPWriteThread not null and assumed to be running" );
} else { }
DbgUtils.logf( "m_UDPWriteThread not null and assumed to be running" );
} }
} }
@ -682,6 +683,14 @@ public class RelayService extends XWService {
/***** TransportProcs interface *****/ /***** TransportProcs interface *****/
public int transportSend( byte[] buf, final CommsAddrRec addr,
int gameID )
{
Assert.assertTrue( -1 != m_rowid );
sendPacket( RelayService.this, m_rowid, buf );
return buf.length;
}
public boolean relayNoConnProc( byte[] buf, String relayID ) public boolean relayNoConnProc( byte[] buf, String relayID )
{ {
if ( -1 != m_rowid ) { if ( -1 != m_rowid ) {