mirror of
git://xwords.git.sourceforge.net/gitroot/xwords/xwords
synced 2025-01-28 07:58:08 +01:00
total rewrite. New protocol eliminates need to store and forward
messages: clients connect, then messages are passthru only. Add control port. Use stl map and vector to remove limit on number of connections. Also removed synchronization, which need to be re-added.
This commit is contained in:
parent
02588f6244
commit
5d3876fa7b
1 changed files with 257 additions and 345 deletions
|
@ -28,10 +28,6 @@
|
||||||
// be useful for other things. It also needs a lot of work, and I hacked it
|
// be useful for other things. It also needs a lot of work, and I hacked it
|
||||||
// up before making an exhaustive search for other alternatives.
|
// up before making an exhaustive search for other alternatives.
|
||||||
//
|
//
|
||||||
// The extreme limitations it has now are meant to be fixed by using stl-based
|
|
||||||
// structs rather than arrays to track stuff, and by adding mutexes on a
|
|
||||||
// per-cookie basis.
|
|
||||||
//
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
@ -44,380 +40,297 @@
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <time.h>
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
#include <sys/select.h>
|
||||||
|
#include <stdarg.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
|
#include "xwrelay.h"
|
||||||
|
#include "cref.h"
|
||||||
|
#include "ctrl.h"
|
||||||
|
|
||||||
typedef unsigned short HostID;
|
void
|
||||||
|
logf( const char* format, ... )
|
||||||
typedef struct ThreadData {
|
|
||||||
int socket;
|
|
||||||
HostID srcID;
|
|
||||||
} ThreadData;
|
|
||||||
|
|
||||||
typedef struct SavedBuf {
|
|
||||||
unsigned char* buf;
|
|
||||||
int bufLen;
|
|
||||||
HostID srcID;
|
|
||||||
} SavedBuf;
|
|
||||||
|
|
||||||
#define MAX_BUFS_SAVES 10
|
|
||||||
#define ILLEGAL_ID (HostID)0
|
|
||||||
/* One of these puppies needed per cookie, or one, roughly, per two
|
|
||||||
threads. */
|
|
||||||
#define MAX_HOSTS_PER_COOKIE 4
|
|
||||||
typedef struct CookieData {
|
|
||||||
int nSavedBufs;
|
|
||||||
SavedBuf savedBufs[MAX_BUFS_SAVES];
|
|
||||||
|
|
||||||
int nHosts;
|
|
||||||
struct {
|
|
||||||
HostID id;
|
|
||||||
int socket;
|
|
||||||
} hostSockets[MAX_HOSTS_PER_COOKIE];
|
|
||||||
} CookieData;
|
|
||||||
|
|
||||||
/* This is too gross in scope! Need per-cookie mutexes once there are more
|
|
||||||
than one cookie on a system a once so removing data for a dying socket in
|
|
||||||
one game doesn't lock other games out.*/
|
|
||||||
pthread_mutex_t gCookieDataMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
||||||
|
|
||||||
#define MAX_COOKIES 10
|
|
||||||
typedef struct CookieHash {
|
|
||||||
char* cookie;
|
|
||||||
CookieData* ref;
|
|
||||||
} CookieHash;
|
|
||||||
static CookieHash gCookieRefs[MAX_COOKIES];
|
|
||||||
static int gNCookies = 0;
|
|
||||||
|
|
||||||
static CookieData*
|
|
||||||
getCookieRef( const char* cookie )
|
|
||||||
{
|
{
|
||||||
int i;
|
FILE* where = stderr;
|
||||||
for ( i = 0; i < gNCookies; ++i ) {
|
struct tm* timp;
|
||||||
if ( 0 == strcmp( gCookieRefs[i].cookie, cookie ) ) {
|
struct timeval tv;
|
||||||
return gCookieRefs[i].ref;
|
struct timezone tz;
|
||||||
}
|
gettimeofday( &tv, &tz );
|
||||||
}
|
timp = localtime( &tv.tv_sec );
|
||||||
assert( gNCookies < MAX_COOKIES + 1 );
|
|
||||||
CookieData* newRef = new CookieData;
|
|
||||||
char* c = new char[strlen(cookie)+1];
|
|
||||||
strcpy( c, cookie );
|
|
||||||
gCookieRefs[gNCookies].cookie = c;
|
|
||||||
gCookieRefs[gNCookies].ref = newRef;
|
|
||||||
++gNCookies;
|
|
||||||
|
|
||||||
newRef->nSavedBufs = 0;
|
pthread_t me = pthread_self();
|
||||||
return newRef;
|
|
||||||
} /* getCookieRef */
|
fprintf( where, "<%lx>%d:%d:%d: ", me, timp->tm_hour, timp->tm_min,
|
||||||
|
timp->tm_sec );
|
||||||
|
|
||||||
|
va_list ap;
|
||||||
|
va_start( ap, format );
|
||||||
|
vfprintf( where, format, ap );
|
||||||
|
va_end(ap);
|
||||||
|
fprintf( where, "\n" );
|
||||||
|
} /* logf */
|
||||||
|
|
||||||
|
static unsigned short
|
||||||
|
getNetShort( unsigned char** bufpp )
|
||||||
|
{
|
||||||
|
unsigned short tmp;
|
||||||
|
memcpy( &tmp, *bufpp, 2 );
|
||||||
|
*bufpp += 2;
|
||||||
|
return ntohs( tmp );
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
deleteCookieRef( CookieData* cref )
|
putNetShort( unsigned char** bufpp, unsigned short s )
|
||||||
{
|
{
|
||||||
int i;
|
s = htons( s );
|
||||||
int found = 0;
|
memcpy( *bufpp, &s, sizeof(s) );
|
||||||
|
*bufpp += sizeof(s);
|
||||||
|
}
|
||||||
|
|
||||||
for ( i = 0; i < cref->nSavedBufs; ++i ) {
|
static void
|
||||||
delete [] cref->savedBufs[i].buf;
|
processHeartbeat( const unsigned char* buf, int bufLen )
|
||||||
}
|
{
|
||||||
|
} /* processHeartbeat */
|
||||||
for ( i = 0; i < gNCookies; ++i ) {
|
|
||||||
if ( gCookieRefs[i].ref == cref ) {
|
|
||||||
found = 1;
|
|
||||||
|
|
||||||
fprintf( stderr, "removing ref for cookie %s\n",
|
/* A CONNECT message from a device gives us the hostID and socket we'll
|
||||||
gCookieRefs[i].cookie );
|
* associate with one participant in a relayed session. We'll store this
|
||||||
delete [] gCookieRefs[i].cookie;
|
* information with the cookie where other participants can find it when they
|
||||||
|
* arrive.
|
||||||
|
*
|
||||||
|
* What to do if we already have a game going? In that case the connection ID
|
||||||
|
* passed in will be non-zero. If the device can be associated with an
|
||||||
|
* ongoing game, with its new socket, associate it and forward any messages
|
||||||
|
* outstanding. Otherwise close down the socket. And maybe the others in the
|
||||||
|
* game?
|
||||||
|
*/
|
||||||
|
static CookieRef*
|
||||||
|
processConnect( unsigned char* bufp, int bufLen, int socket )
|
||||||
|
{
|
||||||
|
logf( "processConnect" );
|
||||||
|
CookieRef* cref = NULL;
|
||||||
|
unsigned char* end = bufp + bufLen;
|
||||||
|
unsigned char clen = *bufp++;
|
||||||
|
if ( bufp < end && clen < MAX_COOKIE_LEN ) {
|
||||||
|
char cookie[MAX_COOKIE_LEN+1];
|
||||||
|
memcpy( cookie, bufp, clen );
|
||||||
|
cookie[clen] = '\0';
|
||||||
|
logf( "got cookie: %s", cookie );
|
||||||
|
bufp += clen;
|
||||||
|
|
||||||
int nToMove = gNCookies - i - 1;
|
if ( bufp < end ) {
|
||||||
if ( nToMove > 0 ) {
|
HostID srcID = getNetShort( &bufp );
|
||||||
memmove( &gCookieRefs[i].ref, gCookieRefs[i+1].ref,
|
unsigned short connID = getNetShort( &bufp );
|
||||||
nToMove * sizeof(gCookieRefs[0].ref) );
|
if ( bufp == end ) {
|
||||||
|
cref = get_make_cookieRef( cookie );
|
||||||
|
cref->Associate( socket, srcID );
|
||||||
|
Associate( socket, cref );
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert( found );
|
return cref;
|
||||||
|
} /* processConnect */
|
||||||
delete cref;
|
|
||||||
|
|
||||||
--gNCookies;
|
|
||||||
} /* deleteCookieRef */
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
setSocketForId( CookieData* cref, HostID id, int socket )
|
killSocket( int socket, char* why )
|
||||||
{
|
{
|
||||||
int i;
|
logf( "killSocket(%d): %s", socket, why );
|
||||||
for ( i = 0; i < cref->nHosts; ++i ) {
|
RemoveSocketRefs( socket );
|
||||||
if ( cref->hostSockets[i].id == id ) {
|
/* Might want to kill the thread it belongs to if we're not in it,
|
||||||
cref->hostSockets[i].socket == socket;
|
e.g. when unable to write to another socket. */
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Not found; need to add */
|
|
||||||
fprintf( stderr, "adding slot for hostID %x\n", id );
|
|
||||||
assert( cref->nHosts < MAX_HOSTS_PER_COOKIE - 1 );
|
|
||||||
cref->hostSockets[cref->nHosts].id = id;
|
|
||||||
cref->hostSockets[cref->nHosts].socket = socket;
|
|
||||||
++cref->nHosts;
|
|
||||||
} /* setSocketForId */
|
|
||||||
|
|
||||||
static const SavedBuf*
|
|
||||||
getSavedBufs( const CookieData* cref, int* nBufs )
|
|
||||||
{
|
|
||||||
*nBufs = cref->nSavedBufs;
|
|
||||||
return cref->savedBufs;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
getIds( const CookieData* cref, HostID* ids )
|
|
||||||
{
|
|
||||||
int i;
|
|
||||||
for ( i = 0; i < cref->nHosts; ++i ) {
|
|
||||||
*ids++ = cref->hostSockets[i].id;
|
|
||||||
}
|
|
||||||
return cref->nHosts;
|
|
||||||
} /* getIds */
|
|
||||||
|
|
||||||
static void
|
|
||||||
peekData( unsigned char* buf, char** cookie, HostID* srcIDP, long* connIdP,
|
|
||||||
unsigned short* channelNoP, HostID* destIDP )
|
|
||||||
{
|
|
||||||
short cklen = (short)*buf++;
|
|
||||||
*cookie = new char[cklen+1];
|
|
||||||
memcpy( *cookie, buf, cklen );
|
|
||||||
(*cookie)[cklen] = '\0';
|
|
||||||
fprintf( stderr, "got cookie %s\n", *cookie );
|
|
||||||
buf += cklen;
|
|
||||||
|
|
||||||
long connId;
|
|
||||||
HostID id;
|
|
||||||
|
|
||||||
memcpy( &id, buf, 2 );
|
|
||||||
*srcIDP = htons( id );
|
|
||||||
buf += 2;
|
|
||||||
|
|
||||||
memcpy( &connId, buf, 4 );
|
|
||||||
*connIdP = htons( connId );
|
|
||||||
buf += 4;
|
|
||||||
|
|
||||||
memcpy( &id, buf, 2 );
|
|
||||||
*channelNoP = htons( id );
|
|
||||||
buf += 2;
|
|
||||||
|
|
||||||
memcpy( &id, buf, 2 );
|
|
||||||
*destIDP = htons( id );
|
|
||||||
buf += 2;
|
|
||||||
|
|
||||||
fprintf( stderr, "0x%x %ld %d 0x%x\n",
|
|
||||||
*srcIDP, *connIdP, *channelNoP, *destIDP );
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
socketForHostID( const CookieData* cref, HostID id )
|
|
||||||
{
|
|
||||||
assert( id != ILLEGAL_ID );
|
|
||||||
int i;
|
|
||||||
for ( i = 0; i < cref->nHosts; ++i ) {
|
|
||||||
if ( cref->hostSockets[i].id == id ) {
|
|
||||||
return cref->hostSockets[i].socket;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert( false );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Remove all data for a socket being closed down. No point in remembering
|
|
||||||
* its hostID or any buffers it originated. If it's the last thread with this
|
|
||||||
* cookie, remove the cookie ref as well */
|
|
||||||
static void
|
|
||||||
removeSocket( CookieData* cref, ThreadData* td )
|
|
||||||
{
|
|
||||||
if ( cref != NULL ) {
|
|
||||||
HostID dyingID = td->srcID;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
fprintf( stderr, "removing socket %d for host 0x%x\n",
|
|
||||||
td->socket, dyingID );
|
|
||||||
|
|
||||||
pthread_mutex_lock( &gCookieDataMutex );
|
|
||||||
|
|
||||||
/* If this is the last/only ref for this cookie, just nuke the
|
|
||||||
cookie ref */
|
|
||||||
if ( cref->nHosts == 1 ) {
|
|
||||||
|
|
||||||
assert( cref->hostSockets[0].id == dyingID );
|
|
||||||
deleteCookieRef( cref );
|
|
||||||
|
|
||||||
} else {
|
|
||||||
|
|
||||||
/* Remove all bufs */
|
|
||||||
int nBufs = cref->nSavedBufs;
|
|
||||||
for ( i = nBufs - 1; i >= 0; --i ) {
|
|
||||||
SavedBuf* sbuf = &cref->savedBufs[i];
|
|
||||||
if ( sbuf->srcID == dyingID ) {
|
|
||||||
delete [] sbuf->buf;
|
|
||||||
|
|
||||||
int nToMove = nBufs - i - 1;
|
|
||||||
assert( nToMove >= 0 );
|
|
||||||
if ( nToMove > 0 ) {
|
|
||||||
memcpy( sbuf, sbuf + 1,
|
|
||||||
sizeof(*sbuf) * (nBufs - i - 1) );
|
|
||||||
}
|
|
||||||
--nBufs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cref->nSavedBufs = nBufs;
|
|
||||||
|
|
||||||
/* remove ref to HostID */
|
|
||||||
int nHosts = cref->nHosts;
|
|
||||||
for ( i = nHosts - 1; i >= 0; --i ) {
|
|
||||||
if ( cref->hostSockets[i].id == dyingID ) {
|
|
||||||
int nToMove = --nHosts - i;
|
|
||||||
if ( nToMove > 0 ) {
|
|
||||||
memmove( &cref->hostSockets[i],
|
|
||||||
&cref->hostSockets[i+1],
|
|
||||||
nToMove * sizeof( cref->hostSockets[0] ) );
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cref->nHosts = nHosts;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock( &gCookieDataMutex ) ;
|
|
||||||
}
|
|
||||||
} /* removeSocket */
|
|
||||||
|
|
||||||
static void
|
|
||||||
saveBuffer( CookieData* cref, unsigned char* buf, int bufLen, HostID srcID )
|
|
||||||
{
|
|
||||||
assert( cref->nSavedBufs < (MAX_BUFS_SAVES - 1) );
|
|
||||||
SavedBuf* bufs = &cref->savedBufs[cref->nSavedBufs++];
|
|
||||||
bufs->buf = buf;
|
|
||||||
bufs->bufLen = bufLen;
|
|
||||||
bufs->srcID = srcID;
|
|
||||||
fprintf( stderr, "%d bufs now saved\n", cref->nSavedBufs );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
forwardMsg( const CookieData* cref, const void* buf, int bufLen,
|
send_with_length( int socket, unsigned char* buf, int bufLen )
|
||||||
HostID destID )
|
|
||||||
{
|
{
|
||||||
int socket = socketForHostID( cref, destID );
|
int ok = 0;
|
||||||
unsigned short len = htons( bufLen );
|
unsigned short len = htons( bufLen );
|
||||||
ssize_t nSent = send( socket, &len, 2, 0 );
|
ssize_t nSent = send( socket, &len, 2, 0 );
|
||||||
assert( nSent == 2 );
|
if ( nSent == 2 ) {
|
||||||
fprintf( stderr, "sent len %x (%x)\n", bufLen, len );
|
nSent = send( socket, buf, bufLen, 0 );
|
||||||
nSent += send( socket, buf, bufLen, 0 );
|
if ( nSent == bufLen ) {
|
||||||
fprintf( stderr, "sent %d bytes to host %x on socket %d\n",
|
logf( "sent %d bytes on socket %d", nSent, socket );
|
||||||
nSent, destID, socket );
|
ok = 1;
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
forwardToAllBut( const CookieData* cref, void* buf, int bufLen,
|
|
||||||
HostID thisID )
|
|
||||||
{
|
|
||||||
fprintf( stderr, "forwardToAllBut(%x)\n", thisID );
|
|
||||||
HostID ids[MAX_HOSTS_PER_COOKIE];
|
|
||||||
int nIds = getIds( cref, ids );
|
|
||||||
HostID destID;
|
|
||||||
int i;
|
|
||||||
for ( i = 0; i < nIds; ++i ) {
|
|
||||||
if ( ids[i] != thisID ) {
|
|
||||||
forwardMsg( cref, buf, bufLen, ids[i] );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ( !ok ) {
|
||||||
|
killSocket( socket, "couldn't send" );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
forwardSavedTo( const CookieData* cref, HostID destID )
|
sendConnResp( CookieRef* cref, int socket )
|
||||||
{
|
{
|
||||||
fprintf( stderr, "forwardSavedTo(%x)\n", destID );
|
/* send cmd, heartbeat, connid */
|
||||||
int nBufs;
|
short tmp;
|
||||||
const SavedBuf* bufs = getSavedBufs( cref, &nBufs );
|
unsigned char buf[5];
|
||||||
while ( nBufs-- ) {
|
unsigned char* bufp = buf;
|
||||||
forwardMsg( cref, bufs->buf, bufs->bufLen, destID );
|
|
||||||
}
|
|
||||||
} /* forwardSavedTo */
|
|
||||||
|
|
||||||
static CookieData*
|
*bufp++ = XWRELAY_CONNECTRESP;
|
||||||
|
putNetShort( &bufp, cref->GetHeartbeat() );
|
||||||
|
putNetShort( &bufp, cref->GetConnID() );
|
||||||
|
|
||||||
|
send_with_length( socket, buf, sizeof(buf) );
|
||||||
|
logf( "sent CONNECTIONRSP" );
|
||||||
|
}
|
||||||
|
|
||||||
|
/* forward the message. Need only change the command after looking up the
|
||||||
|
* socket and it's ready to go. */
|
||||||
|
static int
|
||||||
|
forwardMessage( unsigned char* buf, int bufLen )
|
||||||
|
{
|
||||||
|
int success = 0;
|
||||||
|
unsigned char* bufp = buf + 1; /* skip cmd */
|
||||||
|
unsigned short cookieID = getNetShort( &bufp );
|
||||||
|
logf( "cookieID = %d", cookieID );
|
||||||
|
CookieRef* cref = get_cookieRef( cookieID );
|
||||||
|
if ( cref != NULL ) {
|
||||||
|
HostID src = getNetShort( &bufp );
|
||||||
|
HostID dest = getNetShort( &bufp );
|
||||||
|
logf( "forwarding from %x to %x", src, dest );
|
||||||
|
int socket = cref->SocketForHost( dest );
|
||||||
|
logf( "got socket %d for dest %x", socket, dest );
|
||||||
|
if ( socket != -1 ) {
|
||||||
|
*buf = XWRELAY_MSG_FROMRELAY;
|
||||||
|
send_with_length( socket, buf, bufLen );
|
||||||
|
success = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
} /* forwardMessage */
|
||||||
|
|
||||||
|
static CookieRef*
|
||||||
processMessage( unsigned char* buf, int bufLen, ThreadData* ts )
|
processMessage( unsigned char* buf, int bufLen, ThreadData* ts )
|
||||||
{
|
{
|
||||||
HostID srcID, destID;
|
HostID srcID, destID;
|
||||||
unsigned short channelNo;
|
unsigned short channelNo;
|
||||||
long connId;
|
long connId;
|
||||||
char* cookie;
|
char* cookie;
|
||||||
peekData( buf, &cookie, &srcID, &connId, &channelNo, &destID );
|
CookieRef* cref = NULL;
|
||||||
ts->srcID = srcID;
|
|
||||||
|
|
||||||
CookieData* cref = getCookieRef( cookie );
|
XWRELAY_Cmd cmd = *buf;
|
||||||
|
switch( cmd ) {
|
||||||
setSocketForId( cref, srcID, ts->socket );
|
case XWRELAY_CONNECT:
|
||||||
|
logf( "processMessage got XWRELAY_CONNECT" );
|
||||||
if ( destID != ILLEGAL_ID ) {
|
cref = processConnect( buf+1, bufLen-1, ts->socket );
|
||||||
fprintf( stderr, "JUST FORWARDING: %x -> %x\n", srcID, destID );
|
if ( cref != NULL ) {
|
||||||
forwardMsg( cref, buf, bufLen, destID );
|
sendConnResp( cref, ts->socket );
|
||||||
/* I'm pretty sure that at this point we can nuke all buffers saved
|
} else {
|
||||||
for this cookie: the device acting as server now has all the
|
killSocket( ts->socket, "no cref found" );
|
||||||
addresses it needs. */
|
}
|
||||||
} else {
|
break;
|
||||||
pthread_mutex_lock( &gCookieDataMutex );
|
case XWRELAY_CONNECTRESP:
|
||||||
|
logf( "bad: processMessage got XWRELAY_CONNECTRESP" );
|
||||||
/* comms not set up yet. Need to get all existing messages */
|
break;
|
||||||
/* to this socket *and* get its message to all that came */
|
case XWRELAY_MSG_FROMRELAY:
|
||||||
/* before. So we'll forward this first, then send it the */
|
logf( "bad: processMessage got XWRELAY_MSG_FROMRELAY" );
|
||||||
/* full set, and then add this message to the full set. */
|
break;
|
||||||
|
case XWRELAY_HEARTBEAT:
|
||||||
forwardToAllBut( cref, buf, bufLen, srcID );
|
logf( "processMessage got XWRELAY_HEARTBEAT" );
|
||||||
forwardSavedTo( cref, srcID );
|
processHeartbeat( buf + 1, bufLen - 1 );
|
||||||
saveBuffer( cref, buf, bufLen, srcID );
|
break;
|
||||||
|
case XWRELAY_MSG_TORELAY:
|
||||||
pthread_mutex_unlock( &gCookieDataMutex ) ;
|
logf( "processMessage got XWRELAY_MSG_TORELAY" );
|
||||||
|
if ( !forwardMessage( buf, bufLen ) ) {
|
||||||
|
killSocket( ts->socket, "couldn't forward message" );
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return cref;
|
return cref;
|
||||||
} /* processMessage */
|
} /* processMessage */
|
||||||
|
|
||||||
static void*
|
static void*
|
||||||
thread_main( void* arg )
|
relay_thread_main( void* arg )
|
||||||
{
|
{
|
||||||
ThreadData localStorage;
|
ThreadData* localStorage = (ThreadData*)arg;
|
||||||
memcpy( &localStorage, arg, sizeof(localStorage) );
|
int socket = localStorage->socket;
|
||||||
CookieData* cRef = NULL;
|
CookieRef* cref = NULL;
|
||||||
|
|
||||||
|
logf( "relay_thread_main called" );
|
||||||
|
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
short packetSize;
|
short packetSize;
|
||||||
assert( sizeof(packetSize) == 2 );
|
assert( sizeof(packetSize) == 2 );
|
||||||
|
|
||||||
ssize_t rcvd = recv( localStorage.socket, &packetSize,
|
ssize_t nRead = recv( socket, &packetSize,
|
||||||
sizeof(packetSize), 0 );
|
sizeof(packetSize), MSG_WAITALL );
|
||||||
if ( rcvd < 2 ) break;
|
if ( nRead != 2 ) {
|
||||||
|
killSocket( socket, "nRead != 2" );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
packetSize = ntohs( packetSize );
|
packetSize = ntohs( packetSize );
|
||||||
if ( packetSize < 0 ) break;
|
if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) {
|
||||||
assert( rcvd == 2 );
|
killSocket( socket, "packetSize wrong" );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
unsigned char* buf = new unsigned char[packetSize];
|
unsigned char buf[MAX_MSG_LEN];
|
||||||
rcvd = recv( localStorage.socket, buf, packetSize, 0 );
|
nRead = recv( socket, buf, packetSize, MSG_WAITALL );
|
||||||
assert( rcvd == packetSize );
|
if ( nRead != packetSize ) {
|
||||||
fprintf( stderr, "read %d bytes\n", rcvd );
|
killSocket( socket, "nRead != packetSize" ); break;
|
||||||
|
}
|
||||||
|
logf( "read %d bytes\n", nRead );
|
||||||
|
|
||||||
cRef = processMessage( buf, packetSize, &localStorage );
|
cref = processMessage( buf, packetSize, localStorage );
|
||||||
}
|
}
|
||||||
close( localStorage.socket );
|
close( socket );
|
||||||
|
|
||||||
removeSocket( cRef, &localStorage );
|
delete localStorage;
|
||||||
|
|
||||||
fprintf( stderr, "exiting thread\n" );
|
logf( "exiting thread\n" );
|
||||||
return NULL;
|
return NULL;
|
||||||
} /* thread_main */
|
} /* relay_thread_main */
|
||||||
|
|
||||||
|
static int
|
||||||
|
make_socket( unsigned long addr, unsigned short port )
|
||||||
|
{
|
||||||
|
int sock = socket( AF_INET, SOCK_STREAM, 0 );
|
||||||
|
|
||||||
|
struct sockaddr_in sockAddr;
|
||||||
|
sockAddr.sin_family = AF_INET;
|
||||||
|
sockAddr.sin_addr.s_addr = htonl(addr);
|
||||||
|
sockAddr.sin_port = htons(port);
|
||||||
|
|
||||||
|
int result = bind( sock, (struct sockaddr*)&sockAddr, sizeof(sockAddr) );
|
||||||
|
if ( result != 0 ) {
|
||||||
|
logf( "exiting: unable to bind port %d: %d, errno = %d\n",
|
||||||
|
port, result, errno );
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
logf( "bound socket %d on port %d", sock, port );
|
||||||
|
|
||||||
|
result = listen( sock, 5 );
|
||||||
|
if ( result != 0 ) {
|
||||||
|
logf( "exiting: unable to listen: %d, errno = %d\n", result, errno );
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return sock;
|
||||||
|
} /* make_socket */
|
||||||
|
|
||||||
|
static void
|
||||||
|
accept_and_fork( int socket, void * (*start_routine)(void *) )
|
||||||
|
{
|
||||||
|
logf( "calling accept on socket %d\n", socket );
|
||||||
|
|
||||||
|
sockaddr newaddr;
|
||||||
|
socklen_t siz = sizeof(newaddr);
|
||||||
|
int newSock = accept( socket, &newaddr, &siz );
|
||||||
|
logf( "got one\n" );
|
||||||
|
|
||||||
|
ThreadData* td = new ThreadData();
|
||||||
|
td->socket = newSock;
|
||||||
|
|
||||||
|
pthread_t thread;
|
||||||
|
int result = pthread_create( &thread, NULL, start_routine, td );
|
||||||
|
} /* accept_and_fork */
|
||||||
|
|
||||||
int main( int argc, char** argv )
|
int main( int argc, char** argv )
|
||||||
{
|
{
|
||||||
int port = 12000;
|
int port = 10999;
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if ( argc > 1 ) {
|
if ( argc > 1 ) {
|
||||||
|
@ -426,39 +339,38 @@ int main( int argc, char** argv )
|
||||||
/* Open a listening socket. For each received message, fork a thread into
|
/* Open a listening socket. For each received message, fork a thread into
|
||||||
which relevant stuff is passed. */
|
which relevant stuff is passed. */
|
||||||
|
|
||||||
int listener = socket( AF_INET, SOCK_STREAM, 0 );
|
int listener = make_socket( INADDR_ANY, port );
|
||||||
|
if ( listener == -1 ) exit( 1 );
|
||||||
|
int control = make_socket( INADDR_LOOPBACK, port + 1 );
|
||||||
|
if ( control == -1 ) exit( 1 );
|
||||||
|
|
||||||
struct sockaddr_in sockAddr;
|
/* set up select call */
|
||||||
sockAddr.sin_family = AF_INET;
|
fd_set rfds;
|
||||||
sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
||||||
sockAddr.sin_port = htons(port);
|
|
||||||
|
|
||||||
result = bind( listener, (struct sockaddr*)&sockAddr, sizeof(sockAddr) );
|
|
||||||
if ( result != 0 ) {
|
|
||||||
fprintf( stderr, "exiting: unable to bind: %d, errno = %d\n", result, errno );
|
|
||||||
exit( 1 );
|
|
||||||
}
|
|
||||||
|
|
||||||
result = listen( listener, 5 );
|
|
||||||
if ( result != 0 ) {
|
|
||||||
fprintf( stderr, "exiting: unable to listen: %d, errno = %d\n", result, errno );
|
|
||||||
exit( 1 );
|
|
||||||
}
|
|
||||||
|
|
||||||
fprintf( stderr, "listening on port %d, socket %d\n", port, listener );
|
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
sockaddr newaddr;
|
FD_ZERO(&rfds);
|
||||||
socklen_t siz = sizeof(newaddr);
|
FD_SET( listener, &rfds );
|
||||||
int newSock = accept( listener, &newaddr, &siz );
|
FD_SET( control, &rfds );
|
||||||
fprintf( stderr, "got one\n" );
|
int highest = listener;
|
||||||
|
if ( control > listener ) {
|
||||||
|
highest = control;
|
||||||
|
}
|
||||||
|
++highest;
|
||||||
|
|
||||||
ThreadData td;
|
int retval = select( highest, &rfds, NULL, NULL, NULL );
|
||||||
td.socket = newSock;
|
assert( retval > 0 );
|
||||||
|
|
||||||
pthread_t thread;
|
if ( FD_ISSET( listener, &rfds ) ) {
|
||||||
int result = pthread_create( &thread, NULL, thread_main, &td );
|
accept_and_fork( listener, relay_thread_main );
|
||||||
|
--retval;
|
||||||
|
}
|
||||||
|
if ( FD_ISSET( control, &rfds ) ) {
|
||||||
|
accept_and_fork( control, ctrl_thread_main );
|
||||||
|
--retval;
|
||||||
|
}
|
||||||
|
assert( retval == 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
close( listener );
|
close( listener );
|
||||||
|
close( control );
|
||||||
return 0;
|
return 0;
|
||||||
} // main
|
} // main
|
||||||
|
|
Loading…
Add table
Reference in a new issue