diff --git a/relay/xwrelay.cpp b/relay/xwrelay.cpp index 412271313..df4754484 100644 --- a/relay/xwrelay.cpp +++ b/relay/xwrelay.cpp @@ -52,6 +52,9 @@ #include "cref.h" #include "ctrl.h" #include "mlock.h" +#include "tpool.h" + +#define N_WORKER_THREADS 5 void logf( const char* format, ... ) @@ -135,7 +138,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket ) return cref; } /* processConnect */ -static void +void killSocket( int socket, char* why ) { logf( "killSocket(%d): %s", socket, why ); @@ -207,24 +210,24 @@ forwardMessage( unsigned char* buf, int bufLen ) return success; } /* forwardMessage */ -static CookieRef* -processMessage( unsigned char* buf, int bufLen, ThreadData* ts ) +static void +processMessage( unsigned char* buf, int bufLen, int socket ) { HostID srcID, destID; unsigned short channelNo; long connId; char* cookie; - CookieRef* cref = NULL; + CookieRef* cref; XWRELAY_Cmd cmd = *buf; switch( cmd ) { case XWRELAY_CONNECT: logf( "processMessage got XWRELAY_CONNECT" ); - cref = processConnect( buf+1, bufLen-1, ts->socket ); + cref = processConnect( buf+1, bufLen-1, socket ); if ( cref != NULL ) { - sendConnResp( cref, ts->socket ); + sendConnResp( cref, socket ); } else { - killSocket( ts->socket, "no cref found" ); + killSocket( socket, "no cref found" ); } break; case XWRELAY_CONNECTRESP: @@ -240,56 +243,12 @@ processMessage( unsigned char* buf, int bufLen, ThreadData* ts ) case XWRELAY_MSG_TORELAY: logf( "processMessage got XWRELAY_MSG_TORELAY" ); if ( !forwardMessage( buf, bufLen ) ) { - killSocket( ts->socket, "couldn't forward message" ); + killSocket( socket, "couldn't forward message" ); } break; } - return cref; } /* processMessage */ -static void* -relay_thread_main( void* arg ) -{ - ThreadData* localStorage = (ThreadData*)arg; - int socket = localStorage->socket; - CookieRef* cref = NULL; - - logf( "relay_thread_main called" ); - - for ( ; ; ) { - short packetSize; - assert( sizeof(packetSize) == 2 ); - - ssize_t nRead = recv( socket, &packetSize, - sizeof(packetSize), MSG_WAITALL ); - if ( nRead != 2 ) { - killSocket( socket, "nRead != 2" ); - break; - } - - packetSize = ntohs( packetSize ); - if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) { - killSocket( socket, "packetSize wrong" ); - break; - } - - unsigned char buf[MAX_MSG_LEN]; - nRead = recv( socket, buf, packetSize, MSG_WAITALL ); - if ( nRead != packetSize ) { - killSocket( socket, "nRead != packetSize" ); break; - } - logf( "read %d bytes\n", nRead ); - - cref = processMessage( buf, packetSize, localStorage ); - } - close( socket ); - - delete localStorage; - - logf( "exiting thread\n" ); - return NULL; -} /* relay_thread_main */ - static int make_socket( unsigned long addr, unsigned short port ) { @@ -316,23 +275,6 @@ make_socket( unsigned long addr, unsigned short port ) return sock; } /* make_socket */ -static void -accept_and_fork( int socket, void * (*start_routine)(void *) ) -{ - logf( "calling accept on socket %d\n", socket ); - - sockaddr newaddr; - socklen_t siz = sizeof(newaddr); - int newSock = accept( socket, &newaddr, &siz ); - logf( "got one\n" ); - - ThreadData* td = new ThreadData(); - td->socket = newSock; - - pthread_t thread; - int result = pthread_create( &thread, NULL, start_routine, td ); -} /* accept_and_fork */ - int main( int argc, char** argv ) { int port = 10999; @@ -349,6 +291,9 @@ int main( int argc, char** argv ) int control = make_socket( INADDR_LOOPBACK, port + 1 ); if ( control == -1 ) exit( 1 ); + XWThreadPool* tPool = XWThreadPool::GetTPool(); + tPool->Setup( N_WORKER_THREADS, processMessage ); + /* set up select call */ fd_set rfds; for ( ; ; ) { @@ -365,11 +310,14 @@ int main( int argc, char** argv ) assert( retval > 0 ); if ( FD_ISSET( listener, &rfds ) ) { - accept_and_fork( listener, relay_thread_main ); + sockaddr newaddr; + socklen_t siz = sizeof(newaddr); + int newSock = accept( listener, &newaddr, &siz ); + tPool->AddSocket( newSock ); --retval; } if ( FD_ISSET( control, &rfds ) ) { - accept_and_fork( control, ctrl_thread_main ); + run_ctrl_thread( control ); --retval; } assert( retval == 0 );