Add and use driver for communication over TCP. Driver uses two

threads, reader and writer, on a single socket.  With this checkin a
connect request reaches the relay and a response comes back and is
passed to and recognized by the common code.  A full game should now
work, but hasn't been tried.  Nor is there any handling of socket
errors, retries, etc.
This commit is contained in:
ehouse 2005-07-30 02:02:49 +00:00
parent 7d79e79105
commit 3b79617c61
4 changed files with 560 additions and 13 deletions

View file

@ -73,8 +73,14 @@ typedef struct FileWriteState {
} FileWriteState;
/* forward util function decls */
static XP_S16 ce_send_proc( XP_U8* buf, XP_U16 len, CommsAddrRec* addr,
#ifndef XWFEATURE_STANDALONE_ONLY
static XP_S16 ce_send_proc( XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr,
void* closure );
#define CE_SEND_PROC ce_send_proc
#else
#define CE_SEND_PROC NULL
#endif
static VTableMgr* ce_util_getVTManager( XW_UtilCtxt* uc );
static void ce_util_userError( XW_UtilCtxt* uc, UtilErrID id );
@ -157,14 +163,17 @@ WinMain( HINSTANCE hInstance,
hAccelTable = LoadAccelerators(hInstance, (LPCTSTR)IDC_XWORDS4);
// Main message loop:
while (GetMessage(&msg, NULL, 0, 0)) {
// Main message loop. Return of 0 indicates quit message. Return of -1
// indicates major error (so we just bail.)
while ( 0 < GetMessage(&msg, NULL, 0, 0) ) {
if (!TranslateAccelerator(msg.hwnd, hAccelTable, &msg)) {
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}
/* This would be a good place to free up memory, close sockets, etc. */
return msg.wParam;
}
@ -534,7 +543,7 @@ ceInitAndStartBoard( CEAppGlobals* globals, XP_Bool newGame, CeGamePrefs* gp,
if ( newGame ) {
XP_U16 newGameID = 0;
game_reset( MEMPOOL &globals->game, &globals->gameInfo, &globals->util,
newGameID, &globals->appPrefs.cp, ce_send_proc,
newGameID, &globals->appPrefs.cp, CE_SEND_PROC,
globals );
if ( !!gp ) {
@ -765,7 +774,7 @@ ceLoadSavedGame( CEAppGlobals* globals )
game_makeFromStream( MEMPOOL stream, &globals->game,
&globals->gameInfo,
dict, &globals->util, globals->draw,
&globals->appPrefs.cp, ce_send_proc, globals );
&globals->appPrefs.cp, CE_SEND_PROC, globals );
}
stream_destroy( stream );
@ -920,7 +929,7 @@ InitInstance(HINSTANCE hInstance, int nCmdShow)
game_makeNewGame( MPPARM(mpool) &globals->game, &globals->gameInfo,
&globals->util, globals->draw, gameID,
&globals->appPrefs.cp,
ce_send_proc, globals );
CE_SEND_PROC, globals );
newDone = doNewGame( globals, XP_TRUE ); /* calls ceInitAndStartBoard */
if ( !newDone ) {
@ -1463,6 +1472,25 @@ ceFireTimer( CEAppGlobals* globals, XWTimerReason why )
(*proc)( closure, why );
}
#ifndef XWFEATURE_STANDALONE_ONLY
static XP_Bool
processPacket( CEAppGlobals* globals, XWStreamCtxt* instream )
{
XP_Bool draw = XP_FALSE;
XP_ASSERT( globals->game.comms != NULL );
if ( comms_checkIncomingStream( globals->game.comms,
instream, NULL ) ) {
draw = server_receiveMessage( globals->game.server, instream );
}
stream_destroy( instream );
ce_util_requestTime( &globals->util );
return draw;
} /* processPacket */
#endif
LRESULT CALLBACK
WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
@ -1703,6 +1731,12 @@ WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
draw = server_do( globals->game.server );
break;
#ifndef XWFEATURE_STANDALONE_ONLY
case XWWM_PACKET_ARRIVED:
draw = processPacket( globals, (XWStreamCtxt*)lParam );
break;
#endif
default:
return DefWindowProc(hWnd, message, wParam, lParam);
}
@ -1866,8 +1900,13 @@ static void
makeTimeStamp( XP_UCHAR* timeStamp, XP_U16 size )
{
SYSTEMTIME st;
DWORD tid;
tid = GetCurrentThreadId();
GetLocalTime( &st );
sprintf( timeStamp, "%d:%.2d:%.2d ", st.wHour, st.wMinute, st.wSecond );
sprintf( timeStamp, "<%lx>%d:%.2d:%.2d ", tid, st.wHour, st.wMinute,
st.wSecond );
XP_ASSERT( size > strlen(timeStamp) );
} /* makeTimeStamp */
@ -1944,12 +1983,38 @@ wince_snprintf( XP_UCHAR* buf, XP_U16 len, XP_UCHAR* format, ... )
return strlen(buf);
} /* wince_snprintf */
static XP_S16
ce_send_proc( XP_U8* buf, XP_U16 len, CommsAddrRec* addr, void* closure )
#ifndef XWFEATURE_STANDALONE_ONLY
static void
got_data_proc( XP_U8* data, XP_U16 len, void* closure )
{
/* Remember that this gets called by the reader thread, not by the one
running the window loop. */
CEAppGlobals* globals = (CEAppGlobals*)closure;
BOOL posted;
XWStreamCtxt* stream;
stream = make_generic_stream( globals );
stream_putBytes( stream, data, len );
posted = PostMessage( globals->hWnd, XWWM_PACKET_ARRIVED,
0, (DWORD)stream );
XP_ASSERT( posted );
} /* got_data_proc */
static XP_S16
ce_send_proc( XP_U8* buf, XP_U16 len, const CommsAddrRec* addr, void* closure )
{
CEAppGlobals* globals = (CEAppGlobals*)closure;
XP_LOGF( "ce_send_proc called" );
return 0;
if ( !globals->socketWrap ) {
globals->socketWrap = ce_sockwrap_new( MPPARM(globals->mpool)
got_data_proc, globals );
}
return ce_sockwrap_send( globals->socketWrap, buf, len, addr );
} /* ce_send_proc */
#endif
/* I can't believe the stupid compiler's making me implement this */
void p_ignore(XP_UCHAR* c, ...){}
@ -2228,6 +2293,13 @@ ce_util_makeEmptyDict( XW_UtilCtxt* uc )
static XWStreamCtxt*
ce_util_makeStreamFromAddr( XW_UtilCtxt* uc, XP_U16 channelNo )
{
XWStreamCtxt* stream;
CEAppGlobals* globals = (CEAppGlobals*)uc->closure;
stream = make_generic_stream( globals );
stream_setAddress( stream, channelNo );
return stream;
} /* ce_util_makeStreamFromAddr */
#endif

View file

@ -25,7 +25,7 @@
#include "game.h"
#include "util.h"
#include "mempool.h"
#include "cesockwr.h"
enum { BONUS1_COLOR,
BONUS2_COLOR,
@ -84,6 +84,8 @@ typedef struct CEAppGlobals {
HWND scrollHandle;
#endif
CeSocketWrapper* socketWrap;
CEAppPrefs appPrefs;
XP_Bool isNewGame;
@ -101,9 +103,9 @@ typedef struct CEAppGlobals {
#define GAME_IN_PROGRESS(g) ((g)->gameInfo.dictName != 0)
enum {
XWWM_TIME_RQST = WM_USER,
XWWM_TIME_RQST = WM_APP
,XWWM_PACKET_ARRIVED
XW_TIME_RQST
};
#define NUM_EDITABLE_COLORS BLACK_COLOR

437
wince/cesockwr.c Executable file
View file

@ -0,0 +1,437 @@
/* -*-mode: C; fill-column: 77; c-basic-offset: 4; -*- */
/*
* Copyright 2005 by Eric House (fixin@peak.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef XWFEATURE_STANDALONE_ONLY
#include "cesockwr.h"
#include "cemain.h"
#include <winsock.h>
/* This object owns all network activity: sending and receiving packets. It
maintains two threads, one to send and the other to listen. Incoming
packets are passed out via a proc passed into the "constructor". Outgoing
packets are passed in directly. Uses TCP, and the relay framing protocol
wherein each packet is proceeded by its length in two bytes, network byte
order.
*/
enum { WRITER_THREAD,
READER_THREAD,
N_THREADS };
typedef enum {
CE_IP_NONE, /* shouldn't be used */
CE_IP_UNCONNECTED,
CE_IP_CONNECTED
} CE_CONNSTATE;
#define MAX_QUEUE_SIZE 3
typedef struct CeSocketWrapper {
DataRecvProc dataProc;
void* dataClosure;
XP_U8* packets[MAX_QUEUE_SIZE];
XP_U16 lens[MAX_QUEUE_SIZE];
XP_U16 nPackets;
CommsAddrRec addrRec;
SOCKET socket;
CE_CONNSTATE connState;
HANDLE queueAddEvent;
HANDLE socketConnEvent;
HANDLE queueMutex;
HANDLE threads[N_THREADS];
#ifdef DEBUG
XP_U16 nSent;
#endif
MPSLOT
} CeSocketWrapper;
/* queue_packet: Place packet on queue using semaphore. Return false
* if no room or fail for some other reason.
*/
static XP_Bool
queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
{
DWORD wres;
XP_Bool success = XP_FALSE;
// 2/5 second time-out interval. This is called from the UI thread, so
// long pauses are unacceptable. comms will have to try again if for
// some reason the queue is locked for that long.
wres = WaitForSingleObject( self->queueMutex, 200L );
if ( wres == WAIT_OBJECT_0 ) {
if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) {
/* add it to the queue */
self->packets[self->nPackets] = packet;
self->lens[self->nPackets] = len;
++self->nPackets;
XP_LOGF( "there are now %d packets on send queue", self->nPackets );
/* signal the writer thread */
SetEvent( self->queueAddEvent );
success = XP_TRUE;
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
} else {
XP_ASSERT(0);
}
return success;
}
static XP_Bool
get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
{
DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE );
XP_Bool success = wres == WAIT_OBJECT_0;
if ( success ) {
success = self->nPackets > 0;
if ( success ) {
*packet = self->packets[0];
*len = self->lens[0];
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
}
return success;
} /* get_packet */
static void
remove_packet( CeSocketWrapper* self )
{
DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE );
if ( wres == WAIT_OBJECT_0 ) {
XP_ASSERT( self->nPackets > 0 );
if ( --self->nPackets > 0 ) {
XP_MEMCPY( &self->packets[0], &self->packets[1],
self->nPackets * sizeof(self->packets[0]) );
XP_MEMCPY( &self->lens[0], &self->lens[1],
self->nPackets * sizeof(self->lens[0]) );
} else {
XP_ASSERT( self->nPackets == 0 );
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
}
XP_LOGF( "%d packets left on queue", self->nPackets );
} /* remove_packet */
static XP_Bool
sendAll( CeSocketWrapper* self, XP_U8* buf, XP_U16 len )
{
for ( ; ; ) {
int nSent = send( self->socket, buf, len, 0 ); /* flags? */
if ( nSent == SOCKET_ERROR ) {
return XP_FALSE;
} else if ( nSent == len ) {
XP_LOGF( "sent %d bytes", nSent );
return XP_TRUE;
} else {
XP_LOGF( "sent %d bytes", nSent );
XP_ASSERT( nSent < len );
len -= nSent;
buf += nSent;
}
}
} /* sendAll */
static XP_Bool
sendLenAndData( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
{
XP_Bool success = XP_FALSE;
XP_U16 lenData;
XP_ASSERT( self->socket != -1 );
lenData = XP_HTONS( len );
if ( sendAll( self, (XP_U8*)&lenData, sizeof(lenData) ) ) {
success = sendAll( self, packet, len );
}
return success;
} /* sendLenAndData */
static XP_Bool
connectSocket( CeSocketWrapper* self )
{
SOCKET sock;
/* first look up the ip address */
if ( self->addrRec.u.ip_relay.ipAddr == 0 ) {
struct hostent* ent;
ent = gethostbyname( self->addrRec.u.ip_relay.hostName );
if ( ent != NULL ) {
XP_U32 tmp;
XP_MEMCPY( &tmp, &ent->h_addr_list[0][0],
sizeof(self->addrRec.u.ip_relay.ipAddr) );
self->addrRec.u.ip_relay.ipAddr = XP_NTOHL( tmp );
} else {
logLastError( "gethostbyname" );
}
}
if ( self->addrRec.u.ip_relay.ipAddr != 0 ) {
sock = socket( AF_INET, SOCK_STREAM, IPPROTO_IP );
XP_LOGF( "got socket %d", sock );
if ( sock != INVALID_SOCKET ) {
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = XP_HTONS( self->addrRec.u.ip_relay.port );
name.sin_addr.S_un.S_addr = XP_HTONL(self->addrRec.u.ip_relay.ipAddr);
if ( SOCKET_ERROR != connect( sock, (struct sockaddr *)&name,
sizeof(name) ) ) {
self->connState = CE_IP_CONNECTED;
self->socket = sock;
/* Let the reader thread know there's now a socket to listen on */
SetEvent( self->socketConnEvent );
} else {
logLastError( "connect" );
}
} else {
logLastError( "socket" );
}
}
return self->connState == CE_IP_CONNECTED;
} /* connectSocket */
static XP_Bool
connectIfNot( CeSocketWrapper* self )
{
XP_Bool success = self->connState == CE_IP_CONNECTED;
if ( !success ) {
success = connectSocket( self );
}
return success;
} /* connectIfNot */
static void
closeConnection( CeSocketWrapper* self )
{
if ( self->connState >= CE_IP_UNCONNECTED ) {
if ( self->socket != -1 ) {
closesocket( self->socket );
}
self->socket = -1;
self->connState = CE_IP_UNCONNECTED;
}
} /* closeConnection */
static DWORD
WriterThreadProc( LPVOID lpParameter )
{
CeSocketWrapper* self = (CeSocketWrapper*)lpParameter;
/* PENDING: Start up network so we'll have a socket to use. Once the
socket's open and connected, start the reader thread.*/
connectSocket( self );
/* Then loop waiting for packets to write to it. */
for ( ; ; ) {
XP_U8* packet;
XP_U16 len;
WaitForSingleObject( self->queueAddEvent, INFINITE );
if ( get_packet( self, &packet, &len ) && connectIfNot( self ) ) {
if ( sendLenAndData( self, packet, len ) ) {
/* successful send. Remove our copy */
remove_packet( self );
XP_FREE( self->mpool, packet );
}
}
/* Should this happen sooner? What if other thread signals in the
meantime? */
ResetEvent( self->queueAddEvent );
}
ExitThread(0); /* docs say to exit this way */
return 0;
} /* WriterThreadProc */
/* Read until we get the number of bytes sought or until an error's
received. */
static XP_Bool
read_bytes_blocking( CeSocketWrapper* self, XP_U8* buf, XP_U16 len )
{
while ( len > 0 ) {
fd_set readSet;
int sres;
FD_ZERO( &readSet );
/* There also needs to be a pipe in here for interrupting */
FD_SET( self->socket, &readSet );
sres = select( 0, /* nFds is ignored on wince */
&readSet, NULL, NULL, /* others not interesting */
NULL ); /* no timeout */
XP_LOGF( "back from select: got %d", sres );
if ( sres == 0 ) {
break;
} else if ( sres == 1 && FD_ISSET( self->socket, &readSet ) ) {
int nRead = recv( self->socket, buf, len, 0 );
if ( nRead > 0 ) {
XP_LOGF( "read %d bytes", nRead );
XP_ASSERT( nRead <= len );
buf += nRead;
len -= nRead;
} else {
break;
}
} else {
XP_ASSERT(0);
break;
}
}
/* We probably want to close the socket if something's wrong here. Once
we get out of sync somehow we'll never get the framing right again. */
XP_ASSERT( len == 0 );
return len == 0;
} /* read_bytes_blocking */
static DWORD
ReaderThreadProc( LPVOID lpParameter )
{
XP_U8 buf[MAX_MSG_LEN];
CeSocketWrapper* self = (CeSocketWrapper*)lpParameter;
for ( ; ; ) {
WaitForSingleObject( self->socketConnEvent, INFINITE );
for ( ; ; ) {
XP_U16 len;
XP_LOGF( "ReaderThreadProc running" );
/* This will block in select */
if ( !read_bytes_blocking( self, (XP_U8*)&len, sizeof(len) ) ) {
break; /* bad socket. Go back to waiting new
one. */
}
len = XP_NTOHS( len );
if ( !read_bytes_blocking( self, buf, len ) ) {
break; /* bad socket */
}
(*self->dataProc)( buf, len, self->dataClosure );
}
}
ExitThread(0); /* docs say to exit this way */
return 0;
} /* ReaderThreadProc */
CeSocketWrapper*
ce_sockwrap_new( MPFORMAL DataRecvProc proc, void* closure )
{
CeSocketWrapper* self = XP_MALLOC( mpool, sizeof(*self) );
XP_MEMSET( self, 0, sizeof(*self) );
self->dataProc = proc;
self->dataClosure = closure;
MPASSIGN(self->mpool, mpool );
self->socket = -1;
self->queueMutex = CreateMutex( NULL, FALSE, NULL );
XP_ASSERT( self->queueMutex != NULL );
self->queueAddEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
self->socketConnEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
self->threads[WRITER_THREAD] = CreateThread( NULL, 0, WriterThreadProc,
self, 0, NULL );
self->threads[READER_THREAD] = CreateThread( NULL, 0, ReaderThreadProc,
self, 0, NULL );
return self;
} /* ce_sockwrap_new */
void
ce_sockwrap_delete( CeSocketWrapper* self )
{
/* This isn't a good thing to do. Better to signal them to exit
some other way */
TerminateThread( self->threads[WRITER_THREAD], 0 );
TerminateThread( self->threads[READER_THREAD], 0 );
WaitForMultipleObjects( N_THREADS, self->threads, TRUE, INFINITE );
closeConnection( self );
CloseHandle( self->threads[WRITER_THREAD] );
CloseHandle( self->threads[READER_THREAD] );
CloseHandle( self->queueMutex );
CloseHandle( self->queueAddEvent );
CloseHandle( self->socketConnEvent );
XP_FREE( self->mpool, self );
} /* ce_sockwrap_delete */
XP_U16
ce_sockwrap_send( CeSocketWrapper* self, XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr )
{
XP_U8* packet;
/* If the address has changed, we need to close the connection. Send
thread will take care of opening it again. */
XP_ASSERT( addr->conType == COMMS_CONN_RELAY );
if ( 0 != XP_STRCMP( addr->u.ip_relay.hostName, self->addrRec.u.ip_relay.hostName )
|| 0 != XP_STRCMP( addr->u.ip_relay.cookie, self->addrRec.u.ip_relay.cookie )
|| addr->u.ip_relay.port != self->addrRec.u.ip_relay.port ) {
closeConnection( self );
XP_MEMCPY( &self->addrRec, addr, sizeof(self->addrRec) );
}
packet = XP_MALLOC( self->mpool, len );
XP_MEMCPY( packet, buf, len );
if ( !queue_packet( self, packet, len ) ) {
len = 0; /* error */
}
return len;
}
#endif

36
wince/cesockwr.h Executable file
View file

@ -0,0 +1,36 @@
/* -*-mode: C; fill-column: 77; c-basic-offset: 4; -*- */
/*
* Copyright 2005 by Eric House (fixin@peak.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _CESOCKWR_H_
#define _CESOCKWR_H_
#include "comms.h"
#include "mempool.h"
typedef struct CeSocketWrapper CeSocketWrapper; /* forward */
typedef void (*DataRecvProc)( XP_U8* data, XP_U16 len, void* closure );
CeSocketWrapper* ce_sockwrap_new( MPFORMAL DataRecvProc proc, void* closure );
void ce_sockwrap_delete( CeSocketWrapper* self );
XP_U16 ce_sockwrap_send( CeSocketWrapper* self, XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr );
#endif