fix leak of packet buffers; get rid of multithread code.

This commit is contained in:
ehouse 2009-02-07 18:20:16 +00:00
parent 326e57e742
commit 9ce82a7c94
3 changed files with 39 additions and 177 deletions

View file

@ -843,9 +843,8 @@ ceInitAndStartBoard( CEAppGlobals* globals, XP_Bool newGame,
}
if ( newGame ) {
XP_U16 newGameID = 0;
game_reset( MEMPOOL &globals->game, &globals->gameInfo, &globals->util,
newGameID, &globals->appPrefs.cp, CE_SEND_PROC,
0, &globals->appPrefs.cp, CE_SEND_PROC,
CE_RESET_PROC globals );
#if defined XWFEATURE_RELAY || defined XWFEATURE_BLUETOOTH
@ -1371,9 +1370,8 @@ InitInstance(HINSTANCE hInstance, int nCmdShow
oldGameLoaded = prevStateExists && ceLoadSavedGame( globals );
if ( !oldGameLoaded ) {
XP_U16 gameID = 0; /* good enough until I get networking going */
game_makeNewGame( MPPARM(mpool) &globals->game, &globals->gameInfo,
&globals->util, (DrawCtx*)globals->draw, gameID,
&globals->util, (DrawCtx*)globals->draw, 0,
&globals->appPrefs.cp,
CE_SEND_PROC, CE_RESET_PROC globals );
@ -1865,9 +1863,6 @@ ceSaveAndExit( CEAppGlobals* globals )
{
(void)ceSaveCurGame( globals, XP_TRUE );
ceSavePrefs( globals );
if ( !!globals->socketWrap ) {
ce_sockwrap_delete( globals->socketWrap );
}
DestroyWindow(globals->hWnd);
} /* ceSaveAndExit */
@ -1894,6 +1889,11 @@ freeGlobals( CEAppGlobals* globals )
closeGame( globals );
if ( !!globals->socketWrap ) {
ce_sockwrap_delete( globals->socketWrap );
globals->socketWrap = NULL;
}
if ( !!globals->vtMgr ) {
vtmgr_destroy( MPPARM(mpool) globals->vtMgr );
}
@ -2651,7 +2651,6 @@ messageBoxStream( CEAppGlobals* globals, XWStreamCtxt* stream, wchar_t* title,
XP_UCHAR* buf = ceStreamToStrBuf( MPPARM(globals->mpool) stream );
int result;
assertOnTop( globals->hWnd );
result = ceMessageBoxChar( globals, buf, title, buttons );
XP_FREE( globals->mpool, buf );

View file

@ -48,7 +48,7 @@ typedef enum {
,CE_IPST_CONNECTED
} CeConnState;
#define MAX_QUEUE_SIZE 3
#define MAX_QUEUE_SIZE 6
struct CeSocketWrapper {
DataRecvProc dataProc;
@ -60,6 +60,7 @@ struct CeSocketWrapper {
} hostNameUnion;
HANDLE getHostTask;
/* PENDING rewrite this as one sliding buffer */
/* Outgoing queue */
XP_U8* packets[MAX_QUEUE_SIZE];
XP_U16 lens[MAX_QUEUE_SIZE];
@ -74,8 +75,6 @@ struct CeSocketWrapper {
SOCKET socket;
CeConnState connState;
HANDLE queueMutex; /* there's only one thread; get rid of this! */
#ifdef DEBUG
XP_U16 nSent;
#endif
@ -111,31 +110,17 @@ static XP_Bool connectIfNot( CeSocketWrapper* self );
static XP_Bool
queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
{
DWORD wres;
XP_Bool success = XP_FALSE;
// 2/5 second time-out interval. This is called from the UI thread, so
// long pauses are unacceptable. comms will have to try again if for
// some reason the queue is locked for that long.
wres = WaitForSingleObject( self->queueMutex, 200L );
if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) {
/* add it to the queue */
self->packets[self->nPackets] = packet;
self->lens[self->nPackets] = len;
++self->nPackets;
XP_LOGF( "%s: there are now %d packets on send queue",
__func__, self->nPackets );
if ( wres == WAIT_OBJECT_0 ) {
if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) {
/* add it to the queue */
self->packets[self->nPackets] = packet;
self->lens[self->nPackets] = len;
++self->nPackets;
XP_LOGF( "%s: there are now %d packets on send queue",
__func__, self->nPackets );
success = XP_TRUE;
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
} else {
XP_LOGF( "timed out" );
success = XP_TRUE;
}
return success;
@ -146,18 +131,10 @@ get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
{
XP_Bool success = CE_IPST_CONNECTED == self->connState;
if ( success ) {
DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE );
success = wres == WAIT_OBJECT_0;
success = self->nPackets > 0;
if ( success ) {
success = self->nPackets > 0;
if ( success ) {
*packet = self->packets[0];
*len = self->lens[0];
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
*packet = self->packets[0];
*len = self->lens[0];
}
}
return success;
@ -167,20 +144,13 @@ get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
static void
remove_packet( CeSocketWrapper* self )
{
DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE );
if ( wres == WAIT_OBJECT_0 ) {
XP_ASSERT( self->nPackets > 0 );
if ( --self->nPackets > 0 ) {
XP_MEMCPY( &self->packets[0], &self->packets[1],
self->nPackets * sizeof(self->packets[0]) );
XP_MEMCPY( &self->lens[0], &self->lens[1],
self->nPackets * sizeof(self->lens[0]) );
} else {
XP_ASSERT( self->nPackets == 0 );
}
if ( !ReleaseMutex( self->queueMutex ) ) {
logLastError( "ReleaseMutex" );
}
XP_ASSERT( self->nPackets > 0 );
XP_FREE( self->mpool, self->packets[0] );
if ( --self->nPackets > 0 ) {
XP_MEMCPY( &self->packets[0], &self->packets[1],
self->nPackets * sizeof(self->packets[0]) );
XP_MEMCPY( &self->lens[0], &self->lens[1],
self->nPackets * sizeof(self->lens[0]) );
}
XP_LOGF( "%d packets left on queue", self->nPackets );
} /* remove_packet */
@ -226,7 +196,6 @@ send_packet_if( CeSocketWrapper* self )
if ( sendLenAndData( self, packet, len ) ) {
/* successful send. Remove our copy */
remove_packet( self );
XP_FREE( self->mpool, packet );
}
}
}
@ -329,116 +298,6 @@ closeConnection( CeSocketWrapper* self )
}
} /* closeConnection */
#if 0
static DWORD
WriterThreadProc( LPVOID lpParameter )
{
CeSocketWrapper* self = (CeSocketWrapper*)lpParameter;
connectSocket( self );
/* Then loop waiting for packets to write to it. */
for ( ; ; ) {
XP_U8* packet;
XP_U16 len;
WaitForSingleObject( self->queueAddEvent, INFINITE );
if ( get_packet( self, &packet, &len ) && connectIfNot( self ) ) {
if ( sendLenAndData( self, packet, len ) ) {
/* successful send. Remove our copy */
remove_packet( self );
XP_FREE( self->mpool, packet );
}
}
/* Should this happen sooner? What if other thread signals in the
meantime? */
ResetEvent( self->queueAddEvent );
}
ExitThread(0); /* docs say to exit this way */
return 0;
} /* WriterThreadProc */
#endif
#if 0
/* Read until we get the number of bytes sought or until an error's
received. */
static XP_Bool
read_bytes_blocking( CeSocketWrapper* self, XP_U8* buf, XP_U16 len )
{
while ( len > 0 ) {
fd_set readSet;
int sres;
FD_ZERO( &readSet );
/* There also needs to be a pipe in here for interrupting */
FD_SET( self->socket, &readSet );
sres = MS(select)( 0, /* nFds is ignored on wince */
&readSet, NULL, NULL, /* others not interesting */
NULL ); /* no timeout */
XP_LOGF( "back from select: got %d", sres );
if ( sres == 0 ) {
break;
} else if ( sres == 1 && FD_ISSET( self->socket, &readSet ) ) {
int nRead = MS(recv)( self->socket, (char*)buf, len, 0 );
if ( nRead > 0 ) {
XP_LOGF( "read %d bytes", nRead );
XP_ASSERT( nRead <= len );
buf += nRead;
len -= nRead;
} else {
break;
}
} else {
XP_ASSERT(0);
break;
}
}
/* We probably want to close the socket if something's wrong here. Once
we get out of sync somehow we'll never get the framing right again. */
XP_ASSERT( len == 0 );
return len == 0;
} /* read_bytes_blocking */
#endif
#if 0
static DWORD
ReaderThreadProc( LPVOID lpParameter )
{
XP_U8 buf[MAX_MSG_LEN];
CeSocketWrapper* self = (CeSocketWrapper*)lpParameter;
for ( ; ; ) {
WaitForSingleObject( self->socketConnEvent, INFINITE );
for ( ; ; ) {
XP_U16 len;
XP_LOGF( "ReaderThreadProc running" );
/* This will block in select */
if ( !read_bytes_blocking( self, (XP_U8*)&len, sizeof(len) ) ) {
break; /* bad socket. Go back to waiting new
one. */
}
len = XP_NTOHS( len );
if ( !read_bytes_blocking( self, buf, len ) ) {
break; /* bad socket */
}
(*self->dataProc)( buf, len, self->globals );
}
}
ExitThread(0); /* docs say to exit this way */
return 0;
} /* ReaderThreadProc */
#endif
static void
getHostAddr( CeSocketWrapper* self )
{
@ -473,9 +332,6 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
MPASSIGN(self->mpool, mpool );
self->socket = -1;
self->queueMutex = CreateMutex( NULL, FALSE, NULL );
XP_ASSERT( self->queueMutex != NULL );
getHostAddr( self );
return self;
} /* ce_sockwrap_new */
@ -483,14 +339,19 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
void
ce_sockwrap_delete( CeSocketWrapper* self )
{
XP_U8* packet;
XP_U16 len;
/* This isn't a good thing to do. Better to signal them to exit
some other way */
closeConnection( self );
CloseHandle( self->queueMutex );
WSACleanup();
while ( get_packet( self, &packet, &len ) ) {
remove_packet(self);
}
XP_FREE( self->mpool, self );
} /* ce_sockwrap_delete */
@ -647,7 +508,7 @@ ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam )
return draw;
}
XP_U16
XP_S16
ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr )
{
@ -672,7 +533,9 @@ ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
if ( queue_packet( self, packet, len ) ) {
send_packet_if( self );
} else {
len = 0; /* error */
XP_WARNF( "dropping packet; queue full" );
XP_FREE( self->mpool, packet );
len = -1; /* error */
}
return len;

View file

@ -36,7 +36,7 @@ void ce_sockwrap_delete( CeSocketWrapper* self );
void ce_sockwrap_hostname( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam );
XP_Bool ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam );
XP_U16 ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
XP_S16 ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr );
#endif