run proxy sockets through same thread pool as game sockets. This

prevents race conditions that are turning up when I'm running multiple
threads -- by allowing me to really not be running multiple threads.
Tested with the usual script.
This commit is contained in:
Eric House 2011-01-20 18:14:56 -08:00
parent c4bf153d36
commit 725888cb6c
4 changed files with 46 additions and 55 deletions

View file

@ -104,19 +104,19 @@ XWThreadPool::Stop()
int ii; int ii;
for ( ii = 0; ii < m_nThreads; ++ii ) { for ( ii = 0; ii < m_nThreads; ++ii ) {
enqueue( 0 ); enqueue( 0, STYPE_UNKNOWN );
} }
interrupt_poll(); interrupt_poll();
} }
void void
XWThreadPool::AddSocket( int socket ) XWThreadPool::AddSocket( int socket, SockType stype )
{ {
logf( XW_LOGINFO, "%s(%d)", __func__, socket ); logf( XW_LOGINFO, "%s(%d)", __func__, socket );
{ {
RWWriteLock ml( &m_activeSocketsRWLock ); RWWriteLock ml( &m_activeSocketsRWLock );
m_activeSockets.push_back( socket ); m_activeSockets.push_back( pair<int,SockType>(socket, stype) );
} }
interrupt_poll(); interrupt_poll();
} }
@ -128,9 +128,9 @@ XWThreadPool::RemoveSocket( int socket )
{ {
RWWriteLock ml( &m_activeSocketsRWLock ); RWWriteLock ml( &m_activeSocketsRWLock );
vector<int>::iterator iter = m_activeSockets.begin(); vector< pair<int,SockType> >::iterator iter = m_activeSockets.begin();
while ( iter != m_activeSockets.end() ) { while ( iter != m_activeSockets.end() ) {
if ( *iter == socket ) { if ( iter->first == socket ) {
m_activeSockets.erase( iter ); m_activeSockets.erase( iter );
found = true; found = true;
break; break;
@ -171,23 +171,27 @@ XWThreadPool::CloseSocket( int socket )
void void
XWThreadPool::EnqueueKill( int socket, const char* const why ) XWThreadPool::EnqueueKill( int socket, const char* const why )
{ {
enqueue( socket, Q_KILL ); enqueue( socket, STYPE_UNKNOWN, Q_KILL );
} }
bool bool
XWThreadPool::get_process_packet( int socket ) XWThreadPool::get_process_packet( int socket, SockType stype )
{ {
bool success = false; bool success = false;
short packetSize; short packetSize;
assert( sizeof(packetSize) == 2 ); assert( sizeof(packetSize) == 2 );
unsigned char buf[MAX_MSG_LEN]; unsigned char buf[MAX_MSG_LEN+1];
int nRead = read_packet( socket, buf, sizeof(buf) ); int nRead = read_packet( socket, buf, sizeof(buf) );
if ( nRead < 0 ) { if ( nRead < 0 ) {
EnqueueKill( socket, "bad packet" ); EnqueueKill( socket, "bad packet" );
} else { } else if ( STYPE_GAME == stype ) {
logf( XW_LOGINFO, "calling m_pFunc" ); logf( XW_LOGINFO, "calling m_pFunc" );
success = (*m_pFunc)( buf, nRead, socket ); success = (*m_pFunc)( buf, nRead, socket );
} else {
buf[nRead] = '\0';
handle_proxy_packet( buf, nRead, socket );
CloseSocket( socket );
} }
return success; return success;
} /* get_process_packet */ } /* get_process_packet */
@ -230,8 +234,8 @@ XWThreadPool::real_tpool_main()
pr.m_socket ); pr.m_socket );
switch ( pr.m_act ) { switch ( pr.m_act ) {
case Q_READ: case Q_READ:
if ( get_process_packet( pr.m_socket ) ) { if ( get_process_packet( pr.m_socket, pr.m_type ) ) {
AddSocket( pr.m_socket ); AddSocket( pr.m_socket, pr.m_type );
} }
break; break;
case Q_KILL: case Q_KILL:
@ -267,6 +271,7 @@ XWThreadPool::real_listener()
int nSocketsAllocd = 1; int nSocketsAllocd = 1;
struct pollfd* fds = (pollfd*)calloc( nSocketsAllocd, sizeof(fds[0]) ); struct pollfd* fds = (pollfd*)calloc( nSocketsAllocd, sizeof(fds[0]) );
SockType* stypes = (SockType*)calloc( nSocketsAllocd, sizeof(stypes[0]) );
#ifdef LOG_POLL #ifdef LOG_POLL
char* log = (char*)malloc( 4 * nSocketsAllocd ); char* log = (char*)malloc( 4 * nSocketsAllocd );
#endif #endif
@ -282,6 +287,7 @@ XWThreadPool::real_listener()
if ( nSockets > nSocketsAllocd ) { if ( nSockets > nSocketsAllocd ) {
fds = (struct pollfd*)realloc( fds, nSockets * sizeof(fds[0]) ); fds = (struct pollfd*)realloc( fds, nSockets * sizeof(fds[0]) );
stypes = (SockType*)realloc( stypes, nSockets * sizeof(stypes[0]) );
#ifdef LOG_POLL #ifdef LOG_POLL
log = (char*)realloc( log, nSockets * 4 ); log = (char*)realloc( log, nSockets * 4 );
#endif #endif
@ -297,9 +303,11 @@ XWThreadPool::real_listener()
#endif #endif
++curfd; ++curfd;
vector<int>::iterator iter = m_activeSockets.begin(); vector< pair<int,SockType> >::iterator iter;
while ( iter != m_activeSockets.end() ) { for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end();
fds[curfd].fd = *iter++; ++iter ) {
fds[curfd].fd = iter->first;
stypes[curfd] = iter->second;
fds[curfd].events = flags; fds[curfd].events = flags;
#ifdef LOG_POLL #ifdef LOG_POLL
if ( logCapacity > logLen ) { if ( logCapacity > logLen ) {
@ -360,7 +368,7 @@ XWThreadPool::real_listener()
if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) { if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) {
logf( XW_LOGINFO, "enqueuing %d", socket ); logf( XW_LOGINFO, "enqueuing %d", socket );
enqueue( socket ); enqueue( socket, stypes[curfd] );
} else { } else {
logf( XW_LOGERROR, "odd revents: %x", logf( XW_LOGERROR, "odd revents: %x",
fds[curfd].revents ); fds[curfd].revents );
@ -388,10 +396,10 @@ XWThreadPool::listener_main( void* closure )
} }
void void
XWThreadPool::enqueue( int socket, QAction act ) XWThreadPool::enqueue( int socket, SockType stype, QAction act )
{ {
MutexLock ml( &m_queueMutex ); MutexLock ml( &m_queueMutex );
QueuePr pr = { act, socket }; QueuePr pr = { act, socket, stype };
m_queue.push_back( pr ); m_queue.push_back( pr );
logf( XW_LOGINFO, "calling pthread_cond_signal" ); logf( XW_LOGINFO, "calling pthread_cond_signal" );

View file

@ -35,6 +35,8 @@ using namespace std;
class XWThreadPool { class XWThreadPool {
public: public:
typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType;
static XWThreadPool* GetTPool(); static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket ); typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket );
typedef void (*kill_func)( int socket ); typedef void (*kill_func)( int socket );
@ -46,7 +48,7 @@ class XWThreadPool {
void Stop(); void Stop();
/* Add to set being listened on */ /* Add to set being listened on */
void AddSocket( int socket ); void AddSocket( int socket, SockType stype );
/* remove from tpool altogether, and close */ /* remove from tpool altogether, and close */
void CloseSocket( int socket ); void CloseSocket( int socket );
@ -54,17 +56,18 @@ class XWThreadPool {
private: private:
typedef enum { Q_READ, Q_KILL } QAction; typedef enum { Q_READ, Q_KILL } QAction;
typedef struct { QAction m_act; int m_socket; } QueuePr; typedef struct { QAction m_act; int m_socket; SockType m_type; } QueuePr;
/* Remove from set being listened on */ /* Remove from set being listened on */
bool RemoveSocket( int socket ); bool RemoveSocket( int socket );
void enqueue( int socket, QAction act = Q_READ ); void enqueue( int socket, QAction act = Q_READ );
void enqueue( int socket, SockType stype, QAction act = Q_READ );
void release_socket_locked( int socket ); void release_socket_locked( int socket );
void grab_elem_locked( QueuePr* qpp ); void grab_elem_locked( QueuePr* qpp );
void print_in_use( void ); void print_in_use( void );
bool get_process_packet( int socket ); bool get_process_packet( int socket, SockType stype );
void interrupt_poll(); void interrupt_poll();
void* real_tpool_main(); void* real_tpool_main();
@ -74,7 +77,7 @@ class XWThreadPool {
static void* listener_main( void* closure ); static void* listener_main( void* closure );
/* Sockets main thread listens on */ /* Sockets main thread listens on */
vector<int> m_activeSockets; vector< pair<int,SockType> >m_activeSockets;
pthread_rwlock_t m_activeSocketsRWLock; pthread_rwlock_t m_activeSocketsRWLock;
/* Sockets waiting for a thread to read 'em */ /* Sockets waiting for a thread to read 'em */

View file

@ -683,7 +683,7 @@ read_packet( int sock, unsigned char* buf, int buflen )
nread = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); nread = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL );
if ( nread == sizeof(msgLen) ) { if ( nread == sizeof(msgLen) ) {
msgLen = ntohs( msgLen ); msgLen = ntohs( msgLen );
if ( msgLen <= buflen ) { if ( msgLen < buflen ) {
nread = recv( sock, buf, msgLen, MSG_WAITALL ); nread = recv( sock, buf, msgLen, MSG_WAITALL );
if ( nread == msgLen ) { if ( nread == msgLen ) {
result = nread; result = nread;
@ -693,16 +693,10 @@ read_packet( int sock, unsigned char* buf, int buflen )
return result; return result;
} }
static void* void
handle_proxy_tproc( void* closure ) handle_proxy_packet( unsigned char* buf, int len, int sock )
{ {
blockSignals();
int sock = (int)closure;
unsigned char buf[MAX_PROXY_MSGLEN];
int len = read_packet( sock, buf, sizeof(buf)-1 );
if ( len > 0 ) { if ( len > 0 ) {
buf[len] = '\0'; /* so can use strtok */
unsigned char* bufp = buf; unsigned char* bufp = buf;
unsigned char* end = bufp + len; unsigned char* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */ if ( (0 == *bufp++) ) { /* protocol */
@ -764,7 +758,7 @@ handle_proxy_tproc( void* closure )
} }
} }
} }
break; break; /* PRX_HAS_MSGS */
case PRX_DEVICE_GONE: case PRX_DEVICE_GONE:
logf( XW_LOGINFO, "%s: got PRX_DEVICE_GONE", __func__ ); logf( XW_LOGINFO, "%s: got PRX_DEVICE_GONE", __func__ );
if ( len >= 2 ) { if ( len >= 2 ) {
@ -799,24 +793,11 @@ handle_proxy_tproc( void* closure )
} }
len = 0; /* return a 0-length message */ len = 0; /* return a 0-length message */
write( sock, &len, sizeof(len) ); write( sock, &len, sizeof(len) );
break; break; /* PRX_DEVICE_GONE */
} }
} }
} }
sleep( 2 ); } /* handle_proxy_packet */
close( sock );
return NULL;
} /* handle_proxy_tproc */
static void
handle_proxy_connect( int sock )
{
pthread_t thread;
if ( 0 == pthread_create( &thread, NULL, handle_proxy_tproc,
(void*)sock ) ) {
pthread_detach( thread );
}
} /* handle_proxy_connect */
int int
main( int argc, char** argv ) main( int argc, char** argv )
@ -839,7 +820,6 @@ main( int argc, char** argv )
/* Verify sizes here... */ /* Verify sizes here... */
assert( sizeof(CookieID) == 2 ); assert( sizeof(CookieID) == 2 );
/* Read options. Options trump config file values when they conflict, but /* Read options. Options trump config file values when they conflict, but
the name of the config file is an option so we have to get that the name of the config file is an option so we have to get that
@ -1120,15 +1100,14 @@ main( int argc, char** argv )
int newSock = accept( listener, (sockaddr*)&newaddr, int newSock = accept( listener, (sockaddr*)&newaddr,
&siz ); &siz );
if ( perGame ) { logf( XW_LOGINFO,
logf( XW_LOGINFO, "accepting connection from %s on socket %d",
"accepting connection from %s on socket %d", inet_ntoa(newaddr.sin_addr), newSock );
inet_ntoa(newaddr.sin_addr), newSock );
tPool->AddSocket( newSock,
perGame ? XWThreadPool::STYPE_GAME
: XWThreadPool::STYPE_PROXY );
tPool->AddSocket( newSock );
} else {
handle_proxy_connect( newSock );
}
--retval; --retval;
} }
} }

View file

@ -30,6 +30,7 @@ int GetNSpawns(void);
int make_socket( unsigned long addr, unsigned short port ); int make_socket( unsigned long addr, unsigned short port );
int read_packet( int sock, unsigned char* buf, int buflen ); int read_packet( int sock, unsigned char* buf, int buflen );
void handle_proxy_packet( unsigned char* buf, int bufLen, int socket );
const char* cmdToStr( XWRELAY_Cmd cmd ); const char* cmdToStr( XWRELAY_Cmd cmd );