Use new bufqueue util

This commit is contained in:
ehouse 2009-02-09 02:51:43 +00:00
parent b2acdbae91
commit 2be682b4e9
2 changed files with 14 additions and 75 deletions

View file

@ -50,7 +50,7 @@ HAVE_COMMCTRL = 1
ifeq ($(TARGET_OS),wince)
#SMS = -DXWFEATURE_SMS
RELAY = -DXWFEATURE_RELAY -DCOMMS_HEARTBEAT
RELAY = -DXWFEATURE_RELAY -DCOMMS_HEARTBEAT -DUSE_BUFQUEUE
# IPDIRECT = -DXWFEATURE_IP_DIRECT
# BLUETOOTH = -DXWFEATURE_BLUETOOTH
CC = ${CE_ARCH}-gcc
@ -84,7 +84,7 @@ STANDALONE = -DPREV_WAS_STANDALONE_ONLY
# NO_DRAW = -DNO_DRAW
#BLUETOOTH = -DXWFEATURE_BLUETOOTH
#SMS = -DXWFEATURE_SMS
RELAY = -DXWFEATURE_RELAY
RELAY = -DXWFEATURE_RELAY -DUSE_BUFQUEUE
# IPDIRECT = -DXWFEATURE_IP_DIRECT
CC = i586-mingw32msvc-gcc
WINDRES = i586-mingw32msvc-windres

View file

@ -22,6 +22,8 @@
#include <winsock2.h>
#include <stdio.h>
#include "bufqueue.h"
#include "cesockwr.h"
#include "cemain.h"
#include "cedebug.h"
@ -62,9 +64,8 @@ struct CeSocketWrapper {
/* PENDING rewrite this as one sliding buffer */
/* Outgoing queue */
XP_U8* packets[MAX_QUEUE_SIZE];
XP_U16 lens[MAX_QUEUE_SIZE];
XP_U16 nPackets;
XP_U8 bufOut[512];
BufQueue queueOut;
/* Incoming */
char in_buf[512]; /* char is what WSARecv wants */
@ -103,60 +104,8 @@ ConnState2Str( CeConnState connState )
static XP_Bool connectIfNot( CeSocketWrapper* self );
/* queue_packet: Place packet on queue using semaphore. Return false
* if no room or fail for some other reason.
*/
static XP_Bool
queue_packet( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
{
XP_Bool success = XP_FALSE;
if ( self->nPackets < MAX_QUEUE_SIZE - 1 ) {
/* add it to the queue */
self->packets[self->nPackets] = packet;
self->lens[self->nPackets] = len;
++self->nPackets;
XP_LOGF( "%s: there are now %d packets on send queue",
__func__, self->nPackets );
success = XP_TRUE;
}
return success;
} /* queue_packet */
static XP_Bool
get_packet( CeSocketWrapper* self, XP_U8** packet, XP_U16* len )
{
XP_Bool success = CE_IPST_CONNECTED == self->connState;
if ( success ) {
success = self->nPackets > 0;
if ( success ) {
*packet = self->packets[0];
*len = self->lens[0];
}
}
return success;
} /* get_packet */
/* called by WriterThreadProc */
static void
remove_packet( CeSocketWrapper* self )
{
XP_ASSERT( self->nPackets > 0 );
XP_FREE( self->mpool, self->packets[0] );
if ( --self->nPackets > 0 ) {
XP_MEMCPY( &self->packets[0], &self->packets[1],
self->nPackets * sizeof(self->packets[0]) );
XP_MEMCPY( &self->lens[0], &self->lens[1],
self->nPackets * sizeof(self->lens[0]) );
}
XP_LOGF( "%d packets left on queue", self->nPackets );
} /* remove_packet */
static XP_Bool
sendAll( CeSocketWrapper* self, XP_U8* buf, XP_U16 len )
sendAll( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len )
{
for ( ; ; ) {
int nSent = send( self->socket, (char*)buf, len, 0 ); /* flags? */
@ -175,7 +124,7 @@ sendAll( CeSocketWrapper* self, XP_U8* buf, XP_U16 len )
} /* sendAll */
static XP_Bool
sendLenAndData( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
sendLenAndData( CeSocketWrapper* self, const XP_U8* packet, XP_U16 len )
{
XP_Bool success;
XP_U16 lenData;
@ -190,12 +139,12 @@ sendLenAndData( CeSocketWrapper* self, XP_U8* packet, XP_U16 len )
static void
send_packet_if( CeSocketWrapper* self )
{
XP_U8* packet;
const XP_U8* packet;
XP_U16 len;
if ( get_packet( self, &packet, &len ) ) {
if ( self->socket != -1 && bqGet( &self->queueOut, &packet, &len ) ) {
if ( sendLenAndData( self, packet, len ) ) {
/* successful send. Remove our copy */
remove_packet( self );
bqRemoveOne( &self->queueOut );
}
}
}
@ -332,6 +281,8 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
MPASSIGN(self->mpool, mpool );
self->socket = -1;
bqInit( &self->queueOut, self->bufOut, sizeof(self->bufOut) );
getHostAddr( self );
return self;
} /* ce_sockwrap_new */
@ -339,19 +290,12 @@ ce_sockwrap_new( MPFORMAL DataRecvProc proc, CEAppGlobals* globals )
void
ce_sockwrap_delete( CeSocketWrapper* self )
{
XP_U8* packet;
XP_U16 len;
/* This isn't a good thing to do. Better to signal them to exit
some other way */
closeConnection( self );
WSACleanup();
while ( get_packet( self, &packet, &len ) ) {
remove_packet(self);
}
XP_FREE( self->mpool, self );
} /* ce_sockwrap_delete */
@ -512,8 +456,6 @@ XP_S16
ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
const CommsAddrRec* addr )
{
XP_U8* packet;
/* If the address has changed, we need to close the connection. Send
thread will take care of opening it again. */
XP_ASSERT( addr->conType == COMMS_CONN_RELAY );
@ -528,13 +470,10 @@ ce_sockwrap_send( CeSocketWrapper* self, const XP_U8* buf, XP_U16 len,
getHostAddr( self );
}
packet = XP_MALLOC( self->mpool, len );
XP_MEMCPY( packet, buf, len );
if ( queue_packet( self, packet, len ) ) {
if ( bqAdd( &self->queueOut, buf, len ) ) {
send_packet_if( self );
} else {
XP_WARNF( "dropping packet; queue full" );
XP_FREE( self->mpool, packet );
len = -1; /* error */
}