Handle case where packet contains several messages; attempt to send on

socket-writable and on receiving message to be sent; cleanup.  With
this change full robot-vs-robot game has worked over relay, but not
reliably.  I think it's the relay's fault.  Still tested only on Win32.
This commit is contained in:
ehouse 2009-02-01 16:46:00 +00:00
parent 8026322e8d
commit d4de570358

View file

@ -51,7 +51,6 @@ typedef enum {
#define MAX_QUEUE_SIZE 3 #define MAX_QUEUE_SIZE 3
struct CeSocketWrapper { struct CeSocketWrapper {
WSADATA wsaData;
DataRecvProc dataProc; DataRecvProc dataProc;
CEAppGlobals* globals; CEAppGlobals* globals;
@ -75,7 +74,7 @@ struct CeSocketWrapper {
SOCKET socket; SOCKET socket;
CeConnState connState; CeConnState connState;
HANDLE queueMutex; HANDLE queueMutex; /* there's only one thread; get rid of this! */
#ifdef DEBUG #ifdef DEBUG
XP_U16 nSent; XP_U16 nSent;
@ -129,10 +128,7 @@ queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
XP_LOGF( "%s: there are now %d packets on send queue", XP_LOGF( "%s: there are now %d packets on send queue",
__func__, self->nPackets ); __func__, self->nPackets );
/* signal the writer thread */ success = XP_TRUE;
/* XP_LOGF( "%s: calling SetEvent(%p)", __func__, self->queueAddEvent ); */
/* SetEvent( self->queueAddEvent ); */
/* success = XP_TRUE; */
} }
if ( !ReleaseMutex( self->queueMutex ) ) { if ( !ReleaseMutex( self->queueMutex ) ) {
@ -143,13 +139,15 @@ queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
} }
return success; return success;
} } /* queue_packet */
static XP_Bool static XP_Bool
get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len ) get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
{ {
XP_Bool success = CE_IPST_CONNECTED == self->connState;
if ( success ) {
DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE ); DWORD wres = WaitForSingleObject( self->queueMutex, INFINITE );
XP_Bool success = wres == WAIT_OBJECT_0; success = wres == WAIT_OBJECT_0;
if ( success ) { if ( success ) {
success = self->nPackets > 0; success = self->nPackets > 0;
@ -161,7 +159,7 @@ get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
logLastError( "ReleaseMutex" ); logLastError( "ReleaseMutex" );
} }
} }
}
return success; return success;
} /* get_packet */ } /* get_packet */
@ -321,12 +319,12 @@ static void
closeConnection( CeSocketWrapper* self ) closeConnection( CeSocketWrapper* self )
{ {
if ( self->connState >= CE_IPST_CONNECTED ) { if ( self->connState >= CE_IPST_CONNECTED ) {
XP_ASSERT( self->socket != -1 );
if ( self->socket != -1 ) { if ( self->socket != -1 ) {
MS(closesocket)( self->socket ); closesocket( self->socket );
self->socket = -1;
} }
self->socket = -1;
stateChanged( self, CE_IPST_START ); stateChanged( self, CE_IPST_START );
} }
} /* closeConnection */ } /* closeConnection */
@ -467,17 +465,9 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
{ {
CeSocketWrapper* self = NULL; CeSocketWrapper* self = NULL;
WSADATA wsaData;
int iResult = WSAStartup(MAKEWORD(2,2), &wsaData);
if (iResult != NO_ERROR) {
XP_WARNF("Error at WSAStartup()\n");
} else {
self = XP_MALLOC( mpool, sizeof(*self) ); self = XP_MALLOC( mpool, sizeof(*self) );
XP_MEMSET( self, 0, sizeof(*self) ); XP_MEMSET( self, 0, sizeof(*self) );
self->wsaData = wsaData;
self->dataProc = proc; self->dataProc = proc;
self->globals = globals; self->globals = globals;
MPASSIGN(self->mpool, mpool ); MPASSIGN(self->mpool, mpool );
@ -487,7 +477,6 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
XP_ASSERT( self->queueMutex != NULL ); XP_ASSERT( self->queueMutex != NULL );
getHostAddr( self ); getHostAddr( self );
}
return self; return self;
} /* ce_sockwrap_new */ } /* ce_sockwrap_new */
@ -543,34 +532,39 @@ ce_sockwrap_hostname( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam )
LOG_RETURN_VOID(); LOG_RETURN_VOID();
} /* ce_sockwrap_hostname */ } /* ce_sockwrap_hostname */
/* MSDN: When one of the nominated network events occurs on the specified
socket s, the application window hWnd receives message wMsg. The wParam
parameter identifies the socket on which a network event has occurred. The
low word of lParam specifies the network event that has occurred. The high
word of lParam contains any error code. The error code be any error as
defined in Winsock2.h.
*/
static XP_Bool static XP_Bool
dispatch_if_complete( CeSocketWrapper* self, XP_U16 nBytesRecvd ) dispatch_msgs( CeSocketWrapper* self )
{ {
XP_U16 lenInBuffer = nBytesRecvd + self->in_offset;
XP_U16 msgLen;
XP_Bool draw = XP_FALSE; XP_Bool draw = XP_FALSE;
if ( lenInBuffer >= sizeof(msgLen) ) {
/* Repeat until we don't have a complete message in the buffer */
for ( ; ; ) {
XP_U16 lenInBuffer = self->in_offset;
XP_U16 msgLen;
XP_U16 lenUsed, lenLeft;
XP_LOGF( "%s: have %d bytes", __func__, lenInBuffer );
/* Do we even have the length header? */
if ( lenInBuffer < sizeof(msgLen) ) {
break;
}
XP_MEMCPY( &msgLen, self->in_buf, sizeof(msgLen) ); XP_MEMCPY( &msgLen, self->in_buf, sizeof(msgLen) );
msgLen = XP_NTOHS( msgLen ); msgLen = XP_NTOHS( msgLen );
XP_LOGF( "%s: at least we have len: %d", __func__, msgLen ); XP_LOGF( "%s: at least we have len: %d", __func__, msgLen );
/* We know the length of the full buffer. Do we have it? */ /* We know the length of the full buffer. Do we have it? */
if ( lenInBuffer >= (msgLen + sizeof(msgLen)) ) { if ( lenInBuffer < (msgLen + sizeof(msgLen)) ) {
XP_U16 lenLeft, lenUsed; break;
}
/* first send */ /* first send */
XP_LOGF( "%s: sending %d bytes to dataProc", __func__, msgLen ); XP_LOGF( "%s: sending %d bytes to dataProc", __func__, msgLen );
draw = (*self->dataProc)( (XP_U8*)&self->in_buf[sizeof(msgLen)], draw = (*self->dataProc)( (XP_U8*)&self->in_buf[sizeof(msgLen)],
msgLen, self->globals ); msgLen, self->globals )
|| draw;
/* then move down any additional bytes */ /* then move down any additional bytes */
lenUsed = msgLen + sizeof(msgLen); lenUsed = msgLen + sizeof(msgLen);
@ -580,16 +574,13 @@ dispatch_if_complete( CeSocketWrapper* self, XP_U16 nBytesRecvd )
XP_MEMCPY( self->in_buf, &self->in_buf[lenUsed], lenLeft ); XP_MEMCPY( self->in_buf, &self->in_buf[lenUsed], lenLeft );
} }
self->in_offset = 0; self->in_offset = lenLeft;
nBytesRecvd = lenLeft; /* will set below */
}
} }
self->in_offset += nBytesRecvd;
return draw; return draw;
} /* dispatch_if_complete */ } /* dispatch_msgs */
static XP_U16 static XP_Bool
read_from_socket( CeSocketWrapper* self ) read_from_socket( CeSocketWrapper* self )
{ {
WSABUF wsabuf; WSABUF wsabuf;
@ -601,18 +592,27 @@ read_from_socket( CeSocketWrapper* self )
int err = WSARecv( self->socket, &wsabuf, 1, &nBytesRecvd, int err = WSARecv( self->socket, &wsabuf, 1, &nBytesRecvd,
&flags, NULL, NULL ); &flags, NULL, NULL );
XP_ASSERT( nBytesRecvd < 0xFFFF );
if ( 0 == err ) { if ( 0 == err ) {
XP_LOGF( "%s: got %ld bytes", __func__, nBytesRecvd ); XP_LOGF( "%s: got %ld bytes", __func__, nBytesRecvd );
self->in_offset += nBytesRecvd;
} else { } else {
XP_ASSERT( err == SOCKET_ERROR ); XP_ASSERT( err == SOCKET_ERROR );
err = WSAGetLastError(); err = WSAGetLastError();
XP_LOGF( "%s: WSARecv=>%d", __func__, err ); XP_LOGF( "%s: WSARecv=>%d", __func__, err );
} }
XP_ASSERT( nBytesRecvd < 0xFFFF ); return nBytesRecvd > 0;
return (XP_U16)nBytesRecvd;
} /* read_from_socket */ } /* read_from_socket */
/* MSDN: When one of the nominated network events occurs on the specified
socket s, the application window hWnd receives message wMsg. The wParam
parameter identifies the socket on which a network event has occurred. The
low word of lParam specifies the network event that has occurred. The high
word of lParam contains any error code. The error code be any error as
defined in Winsock2.h.
*/
XP_Bool XP_Bool
ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam ) ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam )
{ {
@ -620,19 +620,20 @@ ce_sockwrap_event( CeSocketWrapper* self, WPARAM wParam, LPARAM lParam )
long event = (long)LOWORD(lParam); long event = (long)LOWORD(lParam);
XP_Bool draw = XP_FALSE; XP_Bool draw = XP_FALSE;
if ( 0 != (FD_READ & event) ) {
XP_U16 nReceived;
XP_LOGF( "%s: got FD_READ", __func__ );
nReceived = read_from_socket( self );
if ( nReceived > 0 ) {
draw = dispatch_if_complete( self, nReceived );
}
event &= ~FD_READ;
}
if ( 0 != (FD_WRITE & event) ) { if ( 0 != (FD_WRITE & event) ) {
send_packet_if( self );
event &= ~FD_WRITE; event &= ~FD_WRITE;
XP_LOGF( "%s: got FD_WRITE", __func__ ); XP_LOGF( "%s: got FD_WRITE", __func__ );
} }
if ( 0 != (FD_READ & event) ) {
XP_LOGF( "%s: got FD_READ", __func__ );
if ( read_from_socket( self ) ) {
draw = dispatch_msgs( self );
}
event &= ~FD_READ;
}
if ( 0 != (FD_CONNECT & event) ) { if ( 0 != (FD_CONNECT & event) ) {
XP_LOGF( "%s: got FD_CONNECT", __func__ ); XP_LOGF( "%s: got FD_CONNECT", __func__ );
event &= ~FD_CONNECT; event &= ~FD_CONNECT;
@ -668,7 +669,9 @@ ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
packet = XP_MALLOC( self->mpool, len ); packet = XP_MALLOC( self->mpool, len );
XP_MEMCPY( packet, buf, len ); XP_MEMCPY( packet, buf, len );
if ( !queue_packet( self, packet, len ) ) { if ( queue_packet( self, packet, len ) ) {
send_packet_if( self );
} else {
len = 0; /* error */ len = 0; /* error */
} }