Revert "switch from nio to regular old io, with separate reader and writer"

This reverts commit b23de9a958.
This commit is contained in:
eehouse@eehouse.org 2010-12-07 18:12:35 -08:00 committed by Andy2
parent b389478350
commit 57bfe4c943

View file

@ -20,8 +20,13 @@
package org.eehouse.android.xw4;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.UnresolvedAddressException;
import java.nio.ByteBuffer;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Vector;
import java.util.Iterator;
import junit.framework.Assert;
@ -32,11 +37,6 @@ import android.content.Context;
import android.os.Build;
import android.os.Handler;
import android.os.Message;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import org.eehouse.android.xw4.jni.*;
import org.eehouse.android.xw4.jni.JNIThread.*;
@ -66,82 +66,141 @@ public class CommsTransport implements TransportProcs {
public int m_nMissing;
}
private Selector m_selector;
private SocketChannel m_socketChannel;
private int m_jniGamePtr;
private CommsAddrRec m_addr;
private JNIThread m_jniThread;
private CommsThread m_thread;
private Handler m_handler;
private boolean m_done = false;
private Socket m_socket;
private ReaderThread m_reader;
private WriterThread m_writer;
BlockingQueue<byte[]> m_queue;
private Vector<ByteBuffer> m_buffersOut;
private ByteBuffer m_bytesOut;
private ByteBuffer m_bytesIn;
private Context m_context;
// assembling inbound packet
private byte[] m_packetIn;
private int m_haveLen = -1;
public CommsTransport( int jniGamePtr, Context context, Handler handler,
DeviceRole role )
{
m_jniGamePtr = jniGamePtr;
m_context = context;
m_handler = handler;
m_buffersOut = new Vector<ByteBuffer>();
m_bytesIn = ByteBuffer.allocate( 2048 );
}
public class WriterThread extends Thread {
public class CommsThread extends Thread {
@Override
public void run()
{
DataOutputStream os;
try {
os = new DataOutputStream( m_socket.getOutputStream() );
try {
if ( Build.PRODUCT.contains("sdk") ) {
System.setProperty("java.net.preferIPv6Addresses", "false");
}
m_selector = Selector.open();
loop();
closeSocket();
} catch ( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
return;
Utils.logf( ioe.toString() );
} catch ( UnresolvedAddressException uae ) {
Utils.logf( "bad address: name: %s; port: %s; exception: %s",
m_addr.ip_relay_hostName, m_addr.ip_relay_port,
uae.toString() );
}
for ( ; ; ) {
m_thread = null;
}
private void loop()
{
while ( !m_done ) {
try {
byte[] buf = m_queue.take(); // blocks
synchronized( this ) {
os.writeShort( buf.length );
os.write( buf );
Utils.logf( "wrote %d bytes to socket", buf.length );
} catch ( InterruptedException inte ) {
Utils.logf( "%s", inte.toString() );
break;
} catch( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
break;
// if we have data and no socket, try to connect.
if ( null == m_socketChannel
&& 0 < m_buffersOut.size() ) {
try {
m_socketChannel = SocketChannel.open();
m_socketChannel.configureBlocking( false );
Utils.logf( "connecting to %s:%d",
m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
InetSocketAddress isa
= new InetSocketAddress( m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
m_socketChannel.connect( isa );
} catch ( java.io.IOException ioe ) {
Utils.logf( ioe.toString() );
}
}
if ( null != m_socketChannel ) {
int ops = figureOps();
// Utils.logf( "calling with ops=%x", ops );
m_socketChannel.register( m_selector, ops );
}
}
m_selector.select();
} catch ( ClosedChannelException cce ) {
// we get this when relay goes down. Need to notify!
m_jniThread.handle( JNICmd.CMD_TRANSFAIL );
closeSocket();
Utils.logf( "exiting: " + cce.toString() );
break; // don't try again
} catch ( java.io.IOException ioe ) {
closeSocket();
Utils.logf( "exiting: " + ioe.toString() );
Utils.logf( ioe.toString() );
}
Iterator<SelectionKey> iter = m_selector.selectedKeys().iterator();
while ( iter.hasNext() ) {
SelectionKey key = (SelectionKey)iter.next();
SocketChannel channel = (SocketChannel)key.channel();
iter.remove();
try {
if (key.isValid() && key.isConnectable()) {
if ( !channel.finishConnect() ) {
key.cancel();
}
}
if (key.isValid() && key.isReadable()) {
m_bytesIn.clear(); // will wipe any pending data!
// Utils.logf( "socket is readable; buffer has space for "
// + m_bytesIn.remaining() );
int nRead = channel.read( m_bytesIn );
if ( nRead == -1 ) {
channel.close();
} else {
addIncoming();
}
}
if (key.isValid() && key.isWritable()) {
getOut();
if ( null != m_bytesOut ) {
int nWritten = channel.write( m_bytesOut );
//Utils.logf( "wrote " + nWritten + " bytes" );
}
}
} catch ( java.io.IOException ioe ) {
key.cancel();
}
}
}
}
} // loop
}
public class ReaderThread extends Thread {
public void run()
{
DataInputStream dis;
try {
dis = new DataInputStream( m_socket.getInputStream() );
} catch ( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
return;
}
for ( ; ; ) {
try {
short len = dis.readShort();
Utils.logf( "ReaderThread: read length short: %d", len );
byte[] buf = new byte[len];
dis.readFully( buf );
Utils.logf( "returned from readFully()" );
m_jniThread.handle( JNICmd.CMD_RECEIVE, buf );
} catch( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
break;
}
}
}
}
public void setReceiver( JNIThread jnit )
{
m_jniThread = jnit;
@ -149,35 +208,104 @@ public class CommsTransport implements TransportProcs {
public void waitToStop()
{
if ( null != m_socket ) {
m_done = true;
if ( null != m_selector ) {
m_selector.wakeup();
}
if ( null != m_thread ) { // synchronized this? Or use Thread method
try {
m_socket.close();
m_socket = null;
} catch ( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
m_thread.join(100); // wait up to 1/10 second
} catch ( java.lang.InterruptedException ie ) {
Utils.logf( "got InterruptedException: " + ie.toString() );
}
m_thread = null;
}
}
private void startThreadsIf()
private synchronized void putOut( final byte[] buf )
{
if ( null == m_socket ) {
int len = buf.length;
ByteBuffer netbuf = ByteBuffer.allocate( len + 2 );
netbuf.putShort( (short)len );
netbuf.put( buf );
m_buffersOut.add( netbuf );
Assert.assertEquals( netbuf.remaining(), 0 );
if ( null != m_selector ) {
m_selector.wakeup(); // tell it it's got some writing to do
}
}
private synchronized void closeSocket()
{
if ( null != m_socketChannel ) {
try {
m_socket = new Socket( m_addr.ip_relay_hostName,
m_addr.ip_relay_port );
if ( null != m_socket ) {
m_queue = new ArrayBlockingQueue<byte[]>(16);
m_reader = new ReaderThread();
m_reader.start();
m_writer = new WriterThread();
m_writer.start();
m_socketChannel.close();
} catch ( Exception e ) {
Utils.logf( "closing socket: %s", e.toString() );
}
m_socketChannel = null;
}
}
private synchronized void getOut()
{
if ( null != m_bytesOut && m_bytesOut.remaining() == 0 ) {
m_bytesOut = null;
}
if ( null == m_bytesOut && m_buffersOut.size() > 0 ) {
m_bytesOut = m_buffersOut.remove(0);
m_bytesOut.flip();
}
}
private synchronized int figureOps() {
int ops;
if ( null == m_socketChannel ) {
ops = 0;
} else if ( m_socketChannel.isConnected() ) {
ops = SelectionKey.OP_READ;
if ( (null != m_bytesOut && m_bytesOut.hasRemaining())
|| m_buffersOut.size() > 0 ) {
ops |= SelectionKey.OP_WRITE;
}
} else {
ops = SelectionKey.OP_CONNECT;
}
return ops;
}
private void addIncoming( )
{
m_bytesIn.flip();
for ( ; ; ) {
int len = m_bytesIn.remaining();
if ( len <= 0 ) {
break;
}
if ( null == m_packetIn ) { // we're not mid-packet
Assert.assertTrue( len > 1 ); // tell me if I see this case
if ( len == 1 ) { // half a length byte...
break; // can I leave it in the buffer?
} else {
m_packetIn = new byte[m_bytesIn.getShort()];
m_haveLen = 0;
}
} else { // we're mid-packet
int wantLen = m_packetIn.length - m_haveLen;
if ( wantLen > len ) {
wantLen = len;
}
m_bytesIn.get( m_packetIn, m_haveLen, wantLen );
m_haveLen += wantLen;
if ( m_haveLen == m_packetIn.length ) {
// send completed packet
m_jniThread.handle( JNICmd.CMD_RECEIVE, m_packetIn );
m_packetIn = null;
}
} catch ( java.net.UnknownHostException uhe ) {
Utils.logf( "%s", uhe.toString() );
m_socket = null;
} catch ( java.io.IOException ioe ) {
Utils.logf( "%s", ioe.toString() );
m_socket = null; // need to notify user on some of these
}
}
}
@ -199,14 +327,12 @@ public class CommsTransport implements TransportProcs {
switch ( m_addr.conType ) {
case COMMS_CONN_RELAY:
startThreadsIf();
try {
// add(), not put(): don't block thread in comms if full
m_queue.add( buf );
nSent = buf.length;
} catch ( IllegalStateException ise ) {
Utils.logf( "%s", ise.toString() );
putOut( buf ); // add to queue
if ( null == m_thread ) {
m_thread = new CommsThread();
m_thread.start();
}
nSent = buf.length;
break;
case COMMS_CONN_SMS:
Assert.fail();