use new thread pool class

This commit is contained in:
ehouse 2005-03-30 01:42:28 +00:00
parent 12795788ea
commit 863e241fc6

View file

@ -52,6 +52,9 @@
#include "cref.h"
#include "ctrl.h"
#include "mlock.h"
#include "tpool.h"
#define N_WORKER_THREADS 5
void
logf( const char* format, ... )
@ -135,7 +138,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket )
return cref;
} /* processConnect */
static void
void
killSocket( int socket, char* why )
{
logf( "killSocket(%d): %s", socket, why );
@ -207,24 +210,24 @@ forwardMessage( unsigned char* buf, int bufLen )
return success;
} /* forwardMessage */
static CookieRef*
processMessage( unsigned char* buf, int bufLen, ThreadData* ts )
static void
processMessage( unsigned char* buf, int bufLen, int socket )
{
HostID srcID, destID;
unsigned short channelNo;
long connId;
char* cookie;
CookieRef* cref = NULL;
CookieRef* cref;
XWRELAY_Cmd cmd = *buf;
switch( cmd ) {
case XWRELAY_CONNECT:
logf( "processMessage got XWRELAY_CONNECT" );
cref = processConnect( buf+1, bufLen-1, ts->socket );
cref = processConnect( buf+1, bufLen-1, socket );
if ( cref != NULL ) {
sendConnResp( cref, ts->socket );
sendConnResp( cref, socket );
} else {
killSocket( ts->socket, "no cref found" );
killSocket( socket, "no cref found" );
}
break;
case XWRELAY_CONNECTRESP:
@ -240,56 +243,12 @@ processMessage( unsigned char* buf, int bufLen, ThreadData* ts )
case XWRELAY_MSG_TORELAY:
logf( "processMessage got XWRELAY_MSG_TORELAY" );
if ( !forwardMessage( buf, bufLen ) ) {
killSocket( ts->socket, "couldn't forward message" );
killSocket( socket, "couldn't forward message" );
}
break;
}
return cref;
} /* processMessage */
static void*
relay_thread_main( void* arg )
{
ThreadData* localStorage = (ThreadData*)arg;
int socket = localStorage->socket;
CookieRef* cref = NULL;
logf( "relay_thread_main called" );
for ( ; ; ) {
short packetSize;
assert( sizeof(packetSize) == 2 );
ssize_t nRead = recv( socket, &packetSize,
sizeof(packetSize), MSG_WAITALL );
if ( nRead != 2 ) {
killSocket( socket, "nRead != 2" );
break;
}
packetSize = ntohs( packetSize );
if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) {
killSocket( socket, "packetSize wrong" );
break;
}
unsigned char buf[MAX_MSG_LEN];
nRead = recv( socket, buf, packetSize, MSG_WAITALL );
if ( nRead != packetSize ) {
killSocket( socket, "nRead != packetSize" ); break;
}
logf( "read %d bytes\n", nRead );
cref = processMessage( buf, packetSize, localStorage );
}
close( socket );
delete localStorage;
logf( "exiting thread\n" );
return NULL;
} /* relay_thread_main */
static int
make_socket( unsigned long addr, unsigned short port )
{
@ -316,23 +275,6 @@ make_socket( unsigned long addr, unsigned short port )
return sock;
} /* make_socket */
static void
accept_and_fork( int socket, void * (*start_routine)(void *) )
{
logf( "calling accept on socket %d\n", socket );
sockaddr newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( socket, &newaddr, &siz );
logf( "got one\n" );
ThreadData* td = new ThreadData();
td->socket = newSock;
pthread_t thread;
int result = pthread_create( &thread, NULL, start_routine, td );
} /* accept_and_fork */
int main( int argc, char** argv )
{
int port = 10999;
@ -349,6 +291,9 @@ int main( int argc, char** argv )
int control = make_socket( INADDR_LOOPBACK, port + 1 );
if ( control == -1 ) exit( 1 );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( N_WORKER_THREADS, processMessage );
/* set up select call */
fd_set rfds;
for ( ; ; ) {
@ -365,11 +310,14 @@ int main( int argc, char** argv )
assert( retval > 0 );
if ( FD_ISSET( listener, &rfds ) ) {
accept_and_fork( listener, relay_thread_main );
sockaddr newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( listener, &newaddr, &siz );
tPool->AddSocket( newSock );
--retval;
}
if ( FD_ISSET( control, &rfds ) ) {
accept_and_fork( control, ctrl_thread_main );
run_ctrl_thread( control );
--retval;
}
assert( retval == 0 );