From e5447c5029c237625ff66a1c2846be52748cc09b Mon Sep 17 00:00:00 2001 From: ehouse Date: Sat, 7 Feb 2009 18:20:16 +0000 Subject: [PATCH] fix leak of packet buffers; get rid of multithread code. --- wince/cemain.c | 15 ++-- wince/cesockwr.c | 199 ++++++++--------------------------------------- wince/cesockwr.h | 2 +- 3 files changed, 39 insertions(+), 177 deletions(-) diff --git a/wince/cemain.c b/wince/cemain.c index c3591349b..7de955ce0 100755 --- a/wince/cemain.c +++ b/wince/cemain.c @@ -843,9 +843,8 @@ ceInitAndStartBoard( CEAppGlobals* globals, XP_Bool newGame, } if ( newGame ) { - XP_U16 newGameID = 0; game_reset( MEMPOOL &globals->game, &globals->gameInfo, &globals->util, - newGameID, &globals->appPrefs.cp, CE_SEND_PROC, + 0, &globals->appPrefs.cp, CE_SEND_PROC, CE_RESET_PROC globals ); #if defined XWFEATURE_RELAY || defined XWFEATURE_BLUETOOTH @@ -1371,9 +1370,8 @@ InitInstance(HINSTANCE hInstance, int nCmdShow oldGameLoaded = prevStateExists && ceLoadSavedGame( globals ); if ( !oldGameLoaded ) { - XP_U16 gameID = 0; /* good enough until I get networking going */ game_makeNewGame( MPPARM(mpool) &globals->game, &globals->gameInfo, - &globals->util, (DrawCtx*)globals->draw, gameID, + &globals->util, (DrawCtx*)globals->draw, 0, &globals->appPrefs.cp, CE_SEND_PROC, CE_RESET_PROC globals ); @@ -1865,9 +1863,6 @@ ceSaveAndExit( CEAppGlobals* globals ) { (void)ceSaveCurGame( globals, XP_TRUE ); ceSavePrefs( globals ); - if ( !!globals->socketWrap ) { - ce_sockwrap_delete( globals->socketWrap ); - } DestroyWindow(globals->hWnd); } /* ceSaveAndExit */ @@ -1894,6 +1889,11 @@ freeGlobals( CEAppGlobals* globals ) closeGame( globals ); + if ( !!globals->socketWrap ) { + ce_sockwrap_delete( globals->socketWrap ); + globals->socketWrap = NULL; + } + if ( !!globals->vtMgr ) { vtmgr_destroy( MPPARM(mpool) globals->vtMgr ); } @@ -2651,7 +2651,6 @@ messageBoxStream( CEAppGlobals* globals, XWStreamCtxt* stream, wchar_t* title, XP_UCHAR* buf = ceStreamToStrBuf( MPPARM(globals->mpool) stream ); int result; - assertOnTop( globals->hWnd ); result = ceMessageBoxChar( globals, buf, title, buttons ); XP_FREE( globals->mpool, buf ); diff --git a/wince/cesockwr.c b/wince/cesockwr.c index d1f68342b..2360416fc 100755 --- a/wince/cesockwr.c +++ b/wince/cesockwr.c @@ -48,7 +48,7 @@ typedef enum { ,CE_IPST_CONNECTED } CeConnState; -#define MAX_QUEUE_SIZE 3 +#define MAX_QUEUE_SIZE 6 struct CeSocketWrapper { DataRecvProc dataProc; @@ -60,6 +60,7 @@ struct CeSocketWrapper { } hostNameUnion; HANDLE getHostTask; + /* PENDING rewrite this as one sliding buffer */ /* Outgoing queue */ XP_U8* packets[MAX_QUEUE_SIZE]; XP_U16 lens[MAX_QUEUE_SIZE]; @@ -74,8 +75,6 @@ struct CeSocketWrapper { SOCKET socket; CeConnState connState; - HANDLE queueMutex; /* there's only one thread; get rid of this! */ - #ifdef DEBUG XP_U16 nSent; #endif @@ -111,31 +110,17 @@ static XP_Bool connectIfNot( CeSocketWrapper* self ); static XP_Bool queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len ) { - DWORD wres; XP_Bool success = XP_FALSE; - // 2/5 second time-out interval. This is called from the UI thread, so - // long pauses are unacceptable. comms will have to try again if for - // some reason the queue is locked for that long. - wres = WaitForSingleObject( self->queueMutex, 200L ); + if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) { + /* add it to the queue */ + self->packets[self->nPackets] = packet; + self->lens[self->nPackets] = len; + ++self->nPackets; + XP_LOGF( "%s: there are now %d packets on send queue", + __func__, self->nPackets ); - if ( wres == WAIT_OBJECT_0 ) { - if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) { - /* add it to the queue */ - self->packets[self->nPackets] = packet; - self->lens[self->nPackets] = len; - ++self->nPackets; - XP_LOGF( "%s: there are now %d packets on send queue", - __func__, self->nPackets ); - - success = XP_TRUE; - } - - if ( !ReleaseMutex( self->queueMutex ) ) { - logLastError( "ReleaseMutex" ); - } - } else { - XP_LOGF( "timed out" ); + success = XP_TRUE; } return success; @@ -146,18 +131,10 @@ get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len ) { XP_Bool success = CE_IPST_CONNECTED == self->connState; if ( success ) { - DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE ); - success = wres == WAIT_OBJECT_0; - + success = self->nPackets > 0; if ( success ) { - success = self->nPackets > 0; - if ( success ) { - *packet = self->packets[0]; - *len = self->lens[0]; - } - if ( !ReleaseMutex( self->queueMutex ) ) { - logLastError( "ReleaseMutex" ); - } + *packet = self->packets[0]; + *len = self->lens[0]; } } return success; @@ -167,20 +144,13 @@ get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len ) static void remove_packet( CeSocketWrapper* self ) { - DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE ); - if ( wres == WAIT_OBJECT_0 ) { - XP_ASSERT( self->nPackets > 0 ); - if ( --self->nPackets > 0 ) { - XP_MEMCPY( &self->packets[0], &self->packets[1], - self->nPackets * sizeof(self->packets[0]) ); - XP_MEMCPY( &self->lens[0], &self->lens[1], - self->nPackets * sizeof(self->lens[0]) ); - } else { - XP_ASSERT( self->nPackets == 0 ); - } - if ( !ReleaseMutex( self->queueMutex ) ) { - logLastError( "ReleaseMutex" ); - } + XP_ASSERT( self->nPackets > 0 ); + XP_FREE( self->mpool, self->packets[0] ); + if ( --self->nPackets > 0 ) { + XP_MEMCPY( &self->packets[0], &self->packets[1], + self->nPackets * sizeof(self->packets[0]) ); + XP_MEMCPY( &self->lens[0], &self->lens[1], + self->nPackets * sizeof(self->lens[0]) ); } XP_LOGF( "%d packets left on queue", self->nPackets ); } /* remove_packet */ @@ -226,7 +196,6 @@ send_packet_if( CeSocketWrapper* self ) if ( sendLenAndData( self, packet, len ) ) { /* successful send. Remove our copy */ remove_packet( self ); - XP_FREE( self->mpool, packet ); } } } @@ -329,116 +298,6 @@ closeConnection( CeSocketWrapper* self ) } } /* closeConnection */ -#if 0 -static DWORD -WriterThreadProc( LPVOID lpParameter ) -{ - CeSocketWrapper* self = (CeSocketWrapper*)lpParameter; - - connectSocket( self ); - - /* Then loop waiting for packets to write to it. */ - for ( ; ; ) { - XP_U8* packet; - XP_U16 len; - - WaitForSingleObject( self->queueAddEvent, INFINITE ); - - if ( get_packet( self, &packet, &len ) && connectIfNot( self ) ) { - if ( sendLenAndData( self, packet, len ) ) { - - /* successful send. Remove our copy */ - remove_packet( self ); - XP_FREE( self->mpool, packet ); - } - } - - /* Should this happen sooner? What if other thread signals in the - meantime? */ - ResetEvent( self->queueAddEvent ); - } - - ExitThread(0); /* docs say to exit this way */ - return 0; -} /* WriterThreadProc */ -#endif - -#if 0 -/* Read until we get the number of bytes sought or until an error's - received. */ -static XP_Bool -read_bytes_blocking( CeSocketWrapper* self, XP_U8* buf, XP_U16 len ) -{ - while ( len > 0 ) { - fd_set readSet; - int sres; - - FD_ZERO( &readSet ); - /* There also needs to be a pipe in here for interrupting */ - FD_SET( self->socket, &readSet ); - - sres = MS(select)( 0, /* nFds is ignored on wince */ - &readSet, NULL, NULL, /* others not interesting */ - NULL ); /* no timeout */ - XP_LOGF( "back from select: got %d", sres ); - if ( sres == 0 ) { - break; - } else if ( sres == 1 && FD_ISSET( self->socket, &readSet ) ) { - int nRead = MS(recv)( self->socket, (char*)buf, len, 0 ); - if ( nRead > 0 ) { - XP_LOGF( "read %d bytes", nRead ); - XP_ASSERT( nRead <= len ); - buf += nRead; - len -= nRead; - } else { - break; - } - } else { - XP_ASSERT(0); - break; - } - } - - /* We probably want to close the socket if something's wrong here. Once - we get out of sync somehow we'll never get the framing right again. */ - XP_ASSERT( len == 0 ); - return len == 0; -} /* read_bytes_blocking */ -#endif - -#if 0 -static DWORD -ReaderThreadProc( LPVOID lpParameter ) -{ - XP_U8 buf[MAX_MSG_LEN]; - CeSocketWrapper* self = (CeSocketWrapper*)lpParameter; - - for ( ; ; ) { - WaitForSingleObject( self->socketConnEvent, INFINITE ); - - for ( ; ; ) { - XP_U16 len; - XP_LOGF( "ReaderThreadProc running" ); - - /* This will block in select */ - if ( !read_bytes_blocking( self, (XP_U8*)&len, sizeof(len) ) ) { - break; /* bad socket. Go back to waiting new - one. */ - } - len = XP_NTOHS( len ); - if ( !read_bytes_blocking( self, buf, len ) ) { - break; /* bad socket */ - } - - (*self->dataProc)( buf, len, self->globals ); - } - } - - ExitThread(0); /* docs say to exit this way */ - return 0; -} /* ReaderThreadProc */ -#endif - static void getHostAddr( CeSocketWrapper* self ) { @@ -473,9 +332,6 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals ) MPASSIGN(self->mpool, mpool ); self->socket = -1; - self->queueMutex = CreateMutex( NULL, FALSE, NULL ); - XP_ASSERT( self->queueMutex != NULL ); - getHostAddr( self ); return self; } /* ce_sockwrap_new */ @@ -483,14 +339,19 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals ) void ce_sockwrap_delete( CeSocketWrapper* self ) { + XP_U8* packet; + XP_U16 len; + /* This isn't a good thing to do. Better to signal them to exit some other way */ closeConnection( self ); - CloseHandle( self->queueMutex ); - WSACleanup(); + while ( get_packet( self, &packet, &len ) ) { + remove_packet(self); + } + XP_FREE( self->mpool, self ); } /* ce_sockwrap_delete */ @@ -647,7 +508,7 @@ ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam ) return draw; } -XP_U16 +XP_S16 ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len, const CommsAddrRec* addr ) { @@ -672,7 +533,9 @@ ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len, if ( queue_packet( self, packet, len ) ) { send_packet_if( self ); } else { - len = 0; /* error */ + XP_WARNF( "dropping packet; queue full" ); + XP_FREE( self->mpool, packet ); + len = -1; /* error */ } return len; diff --git a/wince/cesockwr.h b/wince/cesockwr.h index 572c6accb..0fbf395f8 100755 --- a/wince/cesockwr.h +++ b/wince/cesockwr.h @@ -36,7 +36,7 @@ void ce_sockwrap_delete( CeSocketWrapper* self ); void ce_sockwrap_hostname( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam ); XP_Bool ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam ); -XP_U16 ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len, +XP_S16 ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len, const CommsAddrRec* addr ); #endif