From 5d3876fa7b7ca5f38d2954e2e83c6b29a3536a1b Mon Sep 17 00:00:00 2001 From: ehouse Date: Sat, 19 Mar 2005 22:13:43 +0000 Subject: [PATCH] total rewrite. New protocol eliminates need to store and forward messages: clients connect, then messages are passthru only. Add control port. Use stl map and vector to remove limit on number of connections. Also removed synchronization, which need to be re-added. --- relay/xwrelay.cpp | 602 ++++++++++++++++++++-------------------------- 1 file changed, 257 insertions(+), 345 deletions(-) diff --git a/relay/xwrelay.cpp b/relay/xwrelay.cpp index 317f3f818..75ef351a8 100644 --- a/relay/xwrelay.cpp +++ b/relay/xwrelay.cpp @@ -28,10 +28,6 @@ // be useful for other things. It also needs a lot of work, and I hacked it // up before making an exhaustive search for other alternatives. // -// The extreme limitations it has now are meant to be fixed by using stl-based -// structs rather than arrays to track stuff, and by adding mutexes on a -// per-cookie basis. -// ////////////////////////////////////////////////////////////////////////////// #include @@ -44,380 +40,297 @@ #include #include #include -#include #include #include #include #include +#include +#include +#include +#include "xwrelay.h" +#include "cref.h" +#include "ctrl.h" -typedef unsigned short HostID; - -typedef struct ThreadData { - int socket; - HostID srcID; -} ThreadData; - -typedef struct SavedBuf { - unsigned char* buf; - int bufLen; - HostID srcID; -} SavedBuf; - -#define MAX_BUFS_SAVES 10 -#define ILLEGAL_ID (HostID)0 -/* One of these puppies needed per cookie, or one, roughly, per two - threads. */ -#define MAX_HOSTS_PER_COOKIE 4 -typedef struct CookieData { - int nSavedBufs; - SavedBuf savedBufs[MAX_BUFS_SAVES]; - - int nHosts; - struct { - HostID id; - int socket; - } hostSockets[MAX_HOSTS_PER_COOKIE]; -} CookieData; - -/* This is too gross in scope! Need per-cookie mutexes once there are more - than one cookie on a system a once so removing data for a dying socket in - one game doesn't lock other games out.*/ -pthread_mutex_t gCookieDataMutex = PTHREAD_MUTEX_INITIALIZER; - -#define MAX_COOKIES 10 -typedef struct CookieHash { - char* cookie; - CookieData* ref; -} CookieHash; -static CookieHash gCookieRefs[MAX_COOKIES]; -static int gNCookies = 0; - -static CookieData* -getCookieRef( const char* cookie ) +void +logf( const char* format, ... ) { - int i; - for ( i = 0; i < gNCookies; ++i ) { - if ( 0 == strcmp( gCookieRefs[i].cookie, cookie ) ) { - return gCookieRefs[i].ref; - } - } - assert( gNCookies < MAX_COOKIES + 1 ); - CookieData* newRef = new CookieData; - char* c = new char[strlen(cookie)+1]; - strcpy( c, cookie ); - gCookieRefs[gNCookies].cookie = c; - gCookieRefs[gNCookies].ref = newRef; - ++gNCookies; + FILE* where = stderr; + struct tm* timp; + struct timeval tv; + struct timezone tz; + gettimeofday( &tv, &tz ); + timp = localtime( &tv.tv_sec ); - newRef->nSavedBufs = 0; - return newRef; -} /* getCookieRef */ + pthread_t me = pthread_self(); + + fprintf( where, "<%lx>%d:%d:%d: ", me, timp->tm_hour, timp->tm_min, + timp->tm_sec ); + + va_list ap; + va_start( ap, format ); + vfprintf( where, format, ap ); + va_end(ap); + fprintf( where, "\n" ); +} /* logf */ + +static unsigned short +getNetShort( unsigned char** bufpp ) +{ + unsigned short tmp; + memcpy( &tmp, *bufpp, 2 ); + *bufpp += 2; + return ntohs( tmp ); +} static void -deleteCookieRef( CookieData* cref ) +putNetShort( unsigned char** bufpp, unsigned short s ) { - int i; - int found = 0; + s = htons( s ); + memcpy( *bufpp, &s, sizeof(s) ); + *bufpp += sizeof(s); +} - for ( i = 0; i < cref->nSavedBufs; ++i ) { - delete [] cref->savedBufs[i].buf; - } - - for ( i = 0; i < gNCookies; ++i ) { - if ( gCookieRefs[i].ref == cref ) { - found = 1; +static void +processHeartbeat( const unsigned char* buf, int bufLen ) +{ +} /* processHeartbeat */ - fprintf( stderr, "removing ref for cookie %s\n", - gCookieRefs[i].cookie ); - delete [] gCookieRefs[i].cookie; +/* A CONNECT message from a device gives us the hostID and socket we'll + * associate with one participant in a relayed session. We'll store this + * information with the cookie where other participants can find it when they + * arrive. + * + * What to do if we already have a game going? In that case the connection ID + * passed in will be non-zero. If the device can be associated with an + * ongoing game, with its new socket, associate it and forward any messages + * outstanding. Otherwise close down the socket. And maybe the others in the + * game? + */ +static CookieRef* +processConnect( unsigned char* bufp, int bufLen, int socket ) +{ + logf( "processConnect" ); + CookieRef* cref = NULL; + unsigned char* end = bufp + bufLen; + unsigned char clen = *bufp++; + if ( bufp < end && clen < MAX_COOKIE_LEN ) { + char cookie[MAX_COOKIE_LEN+1]; + memcpy( cookie, bufp, clen ); + cookie[clen] = '\0'; + logf( "got cookie: %s", cookie ); + bufp += clen; - int nToMove = gNCookies - i - 1; - if ( nToMove > 0 ) { - memmove( &gCookieRefs[i].ref, gCookieRefs[i+1].ref, - nToMove * sizeof(gCookieRefs[0].ref) ); + if ( bufp < end ) { + HostID srcID = getNetShort( &bufp ); + unsigned short connID = getNetShort( &bufp ); + if ( bufp == end ) { + cref = get_make_cookieRef( cookie ); + cref->Associate( socket, srcID ); + Associate( socket, cref ); } - - break; } } - assert( found ); - - delete cref; - - --gNCookies; -} /* deleteCookieRef */ + return cref; +} /* processConnect */ static void -setSocketForId( CookieData* cref, HostID id, int socket ) +killSocket( int socket, char* why ) { - int i; - for ( i = 0; i < cref->nHosts; ++i ) { - if ( cref->hostSockets[i].id == id ) { - cref->hostSockets[i].socket == socket; - return; - } - } - - /* Not found; need to add */ - fprintf( stderr, "adding slot for hostID %x\n", id ); - assert( cref->nHosts < MAX_HOSTS_PER_COOKIE - 1 ); - cref->hostSockets[cref->nHosts].id = id; - cref->hostSockets[cref->nHosts].socket = socket; - ++cref->nHosts; -} /* setSocketForId */ - -static const SavedBuf* -getSavedBufs( const CookieData* cref, int* nBufs ) -{ - *nBufs = cref->nSavedBufs; - return cref->savedBufs; -} - -static int -getIds( const CookieData* cref, HostID* ids ) -{ - int i; - for ( i = 0; i < cref->nHosts; ++i ) { - *ids++ = cref->hostSockets[i].id; - } - return cref->nHosts; -} /* getIds */ - -static void -peekData( unsigned char* buf, char** cookie, HostID* srcIDP, long* connIdP, - unsigned short* channelNoP, HostID* destIDP ) -{ - short cklen = (short)*buf++; - *cookie = new char[cklen+1]; - memcpy( *cookie, buf, cklen ); - (*cookie)[cklen] = '\0'; - fprintf( stderr, "got cookie %s\n", *cookie ); - buf += cklen; - - long connId; - HostID id; - - memcpy( &id, buf, 2 ); - *srcIDP = htons( id ); - buf += 2; - - memcpy( &connId, buf, 4 ); - *connIdP = htons( connId ); - buf += 4; - - memcpy( &id, buf, 2 ); - *channelNoP = htons( id ); - buf += 2; - - memcpy( &id, buf, 2 ); - *destIDP = htons( id ); - buf += 2; - - fprintf( stderr, "0x%x %ld %d 0x%x\n", - *srcIDP, *connIdP, *channelNoP, *destIDP ); -} - -static int -socketForHostID( const CookieData* cref, HostID id ) -{ - assert( id != ILLEGAL_ID ); - int i; - for ( i = 0; i < cref->nHosts; ++i ) { - if ( cref->hostSockets[i].id == id ) { - return cref->hostSockets[i].socket; - } - } - assert( false ); -} - -/* Remove all data for a socket being closed down. No point in remembering - * its hostID or any buffers it originated. If it's the last thread with this - * cookie, remove the cookie ref as well */ -static void -removeSocket( CookieData* cref, ThreadData* td ) -{ - if ( cref != NULL ) { - HostID dyingID = td->srcID; - int i; - - fprintf( stderr, "removing socket %d for host 0x%x\n", - td->socket, dyingID ); - - pthread_mutex_lock( &gCookieDataMutex ); - - /* If this is the last/only ref for this cookie, just nuke the - cookie ref */ - if ( cref->nHosts == 1 ) { - - assert( cref->hostSockets[0].id == dyingID ); - deleteCookieRef( cref ); - - } else { - - /* Remove all bufs */ - int nBufs = cref->nSavedBufs; - for ( i = nBufs - 1; i >= 0; --i ) { - SavedBuf* sbuf = &cref->savedBufs[i]; - if ( sbuf->srcID == dyingID ) { - delete [] sbuf->buf; - - int nToMove = nBufs - i - 1; - assert( nToMove >= 0 ); - if ( nToMove > 0 ) { - memcpy( sbuf, sbuf + 1, - sizeof(*sbuf) * (nBufs - i - 1) ); - } - --nBufs; - } - } - cref->nSavedBufs = nBufs; - - /* remove ref to HostID */ - int nHosts = cref->nHosts; - for ( i = nHosts - 1; i >= 0; --i ) { - if ( cref->hostSockets[i].id == dyingID ) { - int nToMove = --nHosts - i; - if ( nToMove > 0 ) { - memmove( &cref->hostSockets[i], - &cref->hostSockets[i+1], - nToMove * sizeof( cref->hostSockets[0] ) ); - } - - } - } - cref->nHosts = nHosts; - } - - pthread_mutex_unlock( &gCookieDataMutex ) ; - } -} /* removeSocket */ - -static void -saveBuffer( CookieData* cref, unsigned char* buf, int bufLen, HostID srcID ) -{ - assert( cref->nSavedBufs < (MAX_BUFS_SAVES - 1) ); - SavedBuf* bufs = &cref->savedBufs[cref->nSavedBufs++]; - bufs->buf = buf; - bufs->bufLen = bufLen; - bufs->srcID = srcID; - fprintf( stderr, "%d bufs now saved\n", cref->nSavedBufs ); + logf( "killSocket(%d): %s", socket, why ); + RemoveSocketRefs( socket ); + /* Might want to kill the thread it belongs to if we're not in it, + e.g. when unable to write to another socket. */ } static void -forwardMsg( const CookieData* cref, const void* buf, int bufLen, - HostID destID ) +send_with_length( int socket, unsigned char* buf, int bufLen ) { - int socket = socketForHostID( cref, destID ); + int ok = 0; unsigned short len = htons( bufLen ); ssize_t nSent = send( socket, &len, 2, 0 ); - assert( nSent == 2 ); - fprintf( stderr, "sent len %x (%x)\n", bufLen, len ); - nSent += send( socket, buf, bufLen, 0 ); - fprintf( stderr, "sent %d bytes to host %x on socket %d\n", - nSent, destID, socket ); -} - -static void -forwardToAllBut( const CookieData* cref, void* buf, int bufLen, - HostID thisID ) -{ - fprintf( stderr, "forwardToAllBut(%x)\n", thisID ); - HostID ids[MAX_HOSTS_PER_COOKIE]; - int nIds = getIds( cref, ids ); - HostID destID; - int i; - for ( i = 0; i < nIds; ++i ) { - if ( ids[i] != thisID ) { - forwardMsg( cref, buf, bufLen, ids[i] ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == bufLen ) { + logf( "sent %d bytes on socket %d", nSent, socket ); + ok = 1; } } + if ( !ok ) { + killSocket( socket, "couldn't send" ); + } } static void -forwardSavedTo( const CookieData* cref, HostID destID ) +sendConnResp( CookieRef* cref, int socket ) { - fprintf( stderr, "forwardSavedTo(%x)\n", destID ); - int nBufs; - const SavedBuf* bufs = getSavedBufs( cref, &nBufs ); - while ( nBufs-- ) { - forwardMsg( cref, bufs->buf, bufs->bufLen, destID ); - } -} /* forwardSavedTo */ + /* send cmd, heartbeat, connid */ + short tmp; + unsigned char buf[5]; + unsigned char* bufp = buf; -static CookieData* + *bufp++ = XWRELAY_CONNECTRESP; + putNetShort( &bufp, cref->GetHeartbeat() ); + putNetShort( &bufp, cref->GetConnID() ); + + send_with_length( socket, buf, sizeof(buf) ); + logf( "sent CONNECTIONRSP" ); +} + +/* forward the message. Need only change the command after looking up the + * socket and it's ready to go. */ +static int +forwardMessage( unsigned char* buf, int bufLen ) +{ + int success = 0; + unsigned char* bufp = buf + 1; /* skip cmd */ + unsigned short cookieID = getNetShort( &bufp ); + logf( "cookieID = %d", cookieID ); + CookieRef* cref = get_cookieRef( cookieID ); + if ( cref != NULL ) { + HostID src = getNetShort( &bufp ); + HostID dest = getNetShort( &bufp ); + logf( "forwarding from %x to %x", src, dest ); + int socket = cref->SocketForHost( dest ); + logf( "got socket %d for dest %x", socket, dest ); + if ( socket != -1 ) { + *buf = XWRELAY_MSG_FROMRELAY; + send_with_length( socket, buf, bufLen ); + success = 1; + } + } + return success; +} /* forwardMessage */ + +static CookieRef* processMessage( unsigned char* buf, int bufLen, ThreadData* ts ) { HostID srcID, destID; unsigned short channelNo; long connId; char* cookie; - peekData( buf, &cookie, &srcID, &connId, &channelNo, &destID ); - ts->srcID = srcID; + CookieRef* cref = NULL; - CookieData* cref = getCookieRef( cookie ); - - setSocketForId( cref, srcID, ts->socket ); - - if ( destID != ILLEGAL_ID ) { - fprintf( stderr, "JUST FORWARDING: %x -> %x\n", srcID, destID ); - forwardMsg( cref, buf, bufLen, destID ); - /* I'm pretty sure that at this point we can nuke all buffers saved - for this cookie: the device acting as server now has all the - addresses it needs. */ - } else { - pthread_mutex_lock( &gCookieDataMutex ); - - /* comms not set up yet. Need to get all existing messages */ - /* to this socket *and* get its message to all that came */ - /* before. So we'll forward this first, then send it the */ - /* full set, and then add this message to the full set. */ - - forwardToAllBut( cref, buf, bufLen, srcID ); - forwardSavedTo( cref, srcID ); - saveBuffer( cref, buf, bufLen, srcID ); - - pthread_mutex_unlock( &gCookieDataMutex ) ; + XWRELAY_Cmd cmd = *buf; + switch( cmd ) { + case XWRELAY_CONNECT: + logf( "processMessage got XWRELAY_CONNECT" ); + cref = processConnect( buf+1, bufLen-1, ts->socket ); + if ( cref != NULL ) { + sendConnResp( cref, ts->socket ); + } else { + killSocket( ts->socket, "no cref found" ); + } + break; + case XWRELAY_CONNECTRESP: + logf( "bad: processMessage got XWRELAY_CONNECTRESP" ); + break; + case XWRELAY_MSG_FROMRELAY: + logf( "bad: processMessage got XWRELAY_MSG_FROMRELAY" ); + break; + case XWRELAY_HEARTBEAT: + logf( "processMessage got XWRELAY_HEARTBEAT" ); + processHeartbeat( buf + 1, bufLen - 1 ); + break; + case XWRELAY_MSG_TORELAY: + logf( "processMessage got XWRELAY_MSG_TORELAY" ); + if ( !forwardMessage( buf, bufLen ) ) { + killSocket( ts->socket, "couldn't forward message" ); + } + break; } return cref; } /* processMessage */ static void* -thread_main( void* arg ) +relay_thread_main( void* arg ) { - ThreadData localStorage; - memcpy( &localStorage, arg, sizeof(localStorage) ); - CookieData* cRef = NULL; + ThreadData* localStorage = (ThreadData*)arg; + int socket = localStorage->socket; + CookieRef* cref = NULL; + + logf( "relay_thread_main called" ); for ( ; ; ) { short packetSize; assert( sizeof(packetSize) == 2 ); - ssize_t rcvd = recv( localStorage.socket, &packetSize, - sizeof(packetSize), 0 ); - if ( rcvd < 2 ) break; + ssize_t nRead = recv( socket, &packetSize, + sizeof(packetSize), MSG_WAITALL ); + if ( nRead != 2 ) { + killSocket( socket, "nRead != 2" ); + break; + } + packetSize = ntohs( packetSize ); - if ( packetSize < 0 ) break; - assert( rcvd == 2 ); + if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) { + killSocket( socket, "packetSize wrong" ); + break; + } - unsigned char* buf = new unsigned char[packetSize]; - rcvd = recv( localStorage.socket, buf, packetSize, 0 ); - assert( rcvd == packetSize ); - fprintf( stderr, "read %d bytes\n", rcvd ); + unsigned char buf[MAX_MSG_LEN]; + nRead = recv( socket, buf, packetSize, MSG_WAITALL ); + if ( nRead != packetSize ) { + killSocket( socket, "nRead != packetSize" ); break; + } + logf( "read %d bytes\n", nRead ); - cRef = processMessage( buf, packetSize, &localStorage ); + cref = processMessage( buf, packetSize, localStorage ); } - close( localStorage.socket ); + close( socket ); - removeSocket( cRef, &localStorage ); + delete localStorage; - fprintf( stderr, "exiting thread\n" ); + logf( "exiting thread\n" ); return NULL; -} /* thread_main */ +} /* relay_thread_main */ + +static int +make_socket( unsigned long addr, unsigned short port ) +{ + int sock = socket( AF_INET, SOCK_STREAM, 0 ); + + struct sockaddr_in sockAddr; + sockAddr.sin_family = AF_INET; + sockAddr.sin_addr.s_addr = htonl(addr); + sockAddr.sin_port = htons(port); + + int result = bind( sock, (struct sockaddr*)&sockAddr, sizeof(sockAddr) ); + if ( result != 0 ) { + logf( "exiting: unable to bind port %d: %d, errno = %d\n", + port, result, errno ); + return -1; + } + logf( "bound socket %d on port %d", sock, port ); + + result = listen( sock, 5 ); + if ( result != 0 ) { + logf( "exiting: unable to listen: %d, errno = %d\n", result, errno ); + return -1; + } + return sock; +} /* make_socket */ + +static void +accept_and_fork( int socket, void * (*start_routine)(void *) ) +{ + logf( "calling accept on socket %d\n", socket ); + + sockaddr newaddr; + socklen_t siz = sizeof(newaddr); + int newSock = accept( socket, &newaddr, &siz ); + logf( "got one\n" ); + + ThreadData* td = new ThreadData(); + td->socket = newSock; + + pthread_t thread; + int result = pthread_create( &thread, NULL, start_routine, td ); +} /* accept_and_fork */ int main( int argc, char** argv ) { - int port = 12000; + int port = 10999; int result; if ( argc > 1 ) { @@ -426,39 +339,38 @@ int main( int argc, char** argv ) /* Open a listening socket. For each received message, fork a thread into which relevant stuff is passed. */ - int listener = socket( AF_INET, SOCK_STREAM, 0 ); + int listener = make_socket( INADDR_ANY, port ); + if ( listener == -1 ) exit( 1 ); + int control = make_socket( INADDR_LOOPBACK, port + 1 ); + if ( control == -1 ) exit( 1 ); - struct sockaddr_in sockAddr; - sockAddr.sin_family = AF_INET; - sockAddr.sin_addr.s_addr = htonl(INADDR_ANY); - sockAddr.sin_port = htons(port); - - result = bind( listener, (struct sockaddr*)&sockAddr, sizeof(sockAddr) ); - if ( result != 0 ) { - fprintf( stderr, "exiting: unable to bind: %d, errno = %d\n", result, errno ); - exit( 1 ); - } - - result = listen( listener, 5 ); - if ( result != 0 ) { - fprintf( stderr, "exiting: unable to listen: %d, errno = %d\n", result, errno ); - exit( 1 ); - } - - fprintf( stderr, "listening on port %d, socket %d\n", port, listener ); + /* set up select call */ + fd_set rfds; for ( ; ; ) { - sockaddr newaddr; - socklen_t siz = sizeof(newaddr); - int newSock = accept( listener, &newaddr, &siz ); - fprintf( stderr, "got one\n" ); + FD_ZERO(&rfds); + FD_SET( listener, &rfds ); + FD_SET( control, &rfds ); + int highest = listener; + if ( control > listener ) { + highest = control; + } + ++highest; - ThreadData td; - td.socket = newSock; - - pthread_t thread; - int result = pthread_create( &thread, NULL, thread_main, &td ); + int retval = select( highest, &rfds, NULL, NULL, NULL ); + assert( retval > 0 ); + + if ( FD_ISSET( listener, &rfds ) ) { + accept_and_fork( listener, relay_thread_main ); + --retval; + } + if ( FD_ISSET( control, &rfds ) ) { + accept_and_fork( control, ctrl_thread_main ); + --retval; + } + assert( retval == 0 ); } close( listener ); + close( control ); return 0; } // main