move thread inside so it can be started and stopped. And exit the

thread on failure to connect, allowing comms' timer to control when we
retry.  This fixes problem where would try multiple times/second to
connect when relay was down.  Now we try every 15 seconds (per comms,
which can as well back-off), and shortly after the relay comes back up
connect successfully.
This commit is contained in:
Andy2 2010-11-24 18:14:34 -08:00
parent 8623c569e7
commit b4fb57de9c

View file

@ -42,7 +42,7 @@ import org.eehouse.android.xw4.jni.*;
import org.eehouse.android.xw4.jni.JNIThread.*;
import org.eehouse.android.xw4.jni.CurGameInfo.DeviceRole;
public class CommsTransport extends Thread implements TransportProcs {
public class CommsTransport implements TransportProcs {
public static final int DIALOG = 0;
public static final int DIALOG_RETRY = 1;
@ -69,9 +69,9 @@ public class CommsTransport extends Thread implements TransportProcs {
private Selector m_selector;
private SocketChannel m_socketChannel;
private int m_jniGamePtr;
private boolean m_running = false;
private CommsAddrRec m_addr;
private JNIThread m_jniThread;
private CommsThread m_thread;
private Handler m_handler;
private boolean m_done = false;
@ -94,6 +94,112 @@ public class CommsTransport extends Thread implements TransportProcs {
m_buffersOut = new Vector<ByteBuffer>();
m_bytesIn = ByteBuffer.allocate( 2048 );
}
public class CommsThread extends Thread {
@Override
public void run()
{
try {
if ( Build.PRODUCT.contains("sdk") ) {
System.setProperty("java.net.preferIPv6Addresses", "false");
}
m_selector = Selector.open();
loop();
closeSocket();
} catch ( java.io.IOException ioe ) {
Utils.logf( ioe.toString() );
} catch ( UnresolvedAddressException uae ) {
Utils.logf( "bad address: name: %s; port: %s; exception: %s",
m_addr.ip_relay_hostName, m_addr.ip_relay_port,
uae.toString() );
}
m_thread = null;
}
private void loop()
{
while ( !m_done ) {
try {
synchronized( this ) {
// if we have data and no socket, try to connect.
if ( null == m_socketChannel
&& 0 < m_buffersOut.size() ) {
try {
m_socketChannel = SocketChannel.open();
m_socketChannel.configureBlocking( false );
Utils.logf( "connecting to %s:%d",
m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
InetSocketAddress isa
= new InetSocketAddress( m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
m_socketChannel.connect( isa );
} catch ( java.io.IOException ioe ) {
Utils.logf( ioe.toString() );
}
}
if ( null != m_socketChannel ) {
int ops = figureOps();
// Utils.logf( "calling with ops=%x", ops );
m_socketChannel.register( m_selector, ops );
}
}
m_selector.select();
} catch ( ClosedChannelException cce ) {
// we get this when relay goes down. Need to notify!
m_jniThread.handle( JNICmd.CMD_TRANSFAIL );
closeSocket();
Utils.logf( "exiting: " + cce.toString() );
break; // don't try again
} catch ( java.io.IOException ioe ) {
closeSocket();
Utils.logf( "exiting: " + ioe.toString() );
Utils.logf( ioe.toString() );
}
Iterator<SelectionKey> iter = m_selector.selectedKeys().iterator();
while ( iter.hasNext() ) {
SelectionKey key = (SelectionKey)iter.next();
SocketChannel channel = (SocketChannel)key.channel();
iter.remove();
try {
if (key.isValid() && key.isConnectable()) {
if ( !channel.finishConnect() ) {
key.cancel();
}
}
if (key.isValid() && key.isReadable()) {
m_bytesIn.clear(); // will wipe any pending data!
// Utils.logf( "socket is readable; buffer has space for "
// + m_bytesIn.remaining() );
int nRead = channel.read( m_bytesIn );
if ( nRead == -1 ) {
channel.close();
} else {
addIncoming();
}
}
if (key.isValid() && key.isWritable()) {
getOut();
if ( null != m_bytesOut ) {
int nWritten = channel.write( m_bytesOut );
//Utils.logf( "wrote " + nWritten + " bytes" );
}
}
} catch ( java.io.IOException ioe ) {
key.cancel();
}
}
}
} // loop
}
public void setReceiver( JNIThread jnit )
{
@ -106,115 +212,16 @@ public class CommsTransport extends Thread implements TransportProcs {
if ( null != m_selector ) {
m_selector.wakeup();
}
if ( m_running ) { // synchronized this? Or use Thread method
if ( null != m_thread ) { // synchronized this? Or use Thread method
try {
join(100); // wait up to 1/10 second
m_thread.join(100); // wait up to 1/10 second
} catch ( java.lang.InterruptedException ie ) {
Utils.logf( "got InterruptedException: " + ie.toString() );
}
m_thread = null;
}
}
@Override
public void run()
{
try {
if ( Build.PRODUCT.contains("sdk") ) {
System.setProperty("java.net.preferIPv6Addresses", "false");
}
m_selector = Selector.open();
loop();
closeSocket();
} catch ( java.io.IOException ioe ) {
Utils.logf( ioe.toString() );
} catch ( UnresolvedAddressException uae ) {
Utils.logf( "bad address: name: %s; port: %s; exception: %s",
m_addr.ip_relay_hostName, m_addr.ip_relay_port,
uae.toString() );
}
}
private void loop()
{
while ( !m_done ) {
try {
synchronized( this ) {
// if we have data and no socket, try to connect.
if ( null == m_socketChannel
&& 0 < m_buffersOut.size() ) {
try {
m_socketChannel = SocketChannel.open();
m_socketChannel.configureBlocking( false );
Utils.logf( "connecting to %s:%d",
m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
InetSocketAddress isa
= new InetSocketAddress( m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
m_socketChannel.connect( isa );
} catch ( java.io.IOException ioe ) {
Utils.logf( ioe.toString() );
}
}
if ( null != m_socketChannel ) {
int ops = figureOps();
// Utils.logf( "calling with ops=%x", ops );
m_socketChannel.register( m_selector, ops );
}
}
m_selector.select();
} catch ( ClosedChannelException cce ) {
// we get this when relay goes down. Need to notify!
m_jniThread.handle( JNICmd.CMD_TRANSFAIL );
closeSocket();
Utils.logf( "exiting: " + cce.toString() );
} catch ( java.io.IOException ioe ) {
closeSocket();
Utils.logf( "exiting: " + ioe.toString() );
Utils.logf( ioe.toString() );
}
Iterator<SelectionKey> iter = m_selector.selectedKeys().iterator();
while ( iter.hasNext() ) {
SelectionKey key = (SelectionKey)iter.next();
SocketChannel channel = (SocketChannel)key.channel();
iter.remove();
try {
if (key.isValid() && key.isConnectable()) {
if ( !channel.finishConnect() ) {
key.cancel();
}
}
if (key.isValid() && key.isReadable()) {
m_bytesIn.clear(); // will wipe any pending data!
// Utils.logf( "socket is readable; buffer has space for "
// + m_bytesIn.remaining() );
int nRead = channel.read( m_bytesIn );
if ( nRead == -1 ) {
channel.close();
} else {
addIncoming();
}
}
if (key.isValid() && key.isWritable()) {
getOut();
if ( null != m_bytesOut ) {
int nWritten = channel.write( m_bytesOut );
//Utils.logf( "wrote " + nWritten + " bytes" );
}
}
} catch ( java.io.IOException ioe ) {
key.cancel();
}
}
}
} // loop
private synchronized void putOut( final byte[] buf )
{
int len = buf.length;
@ -306,6 +313,7 @@ public class CommsTransport extends Thread implements TransportProcs {
// TransportProcs interface
public int transportSend( byte[] buf, final CommsAddrRec faddr )
{
Utils.logf( "CommsTransport::transportSend(nbytes=%d)", buf.length );
int nSent = -1;
if ( null == m_addr ) {
@ -320,9 +328,9 @@ public class CommsTransport extends Thread implements TransportProcs {
switch ( m_addr.conType ) {
case COMMS_CONN_RELAY:
putOut( buf ); // add to queue
if ( !m_running ) {
m_running = true;
start();
if ( null == m_thread ) {
m_thread = new CommsThread();
m_thread.start();
}
nSent = buf.length;
break;