diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index d1ab51a33..16514707b 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -104,19 +104,19 @@ XWThreadPool::Stop() int ii; for ( ii = 0; ii < m_nThreads; ++ii ) { - enqueue( 0 ); + enqueue( 0, STYPE_UNKNOWN ); } interrupt_poll(); } void -XWThreadPool::AddSocket( int socket ) +XWThreadPool::AddSocket( int socket, SockType stype ) { logf( XW_LOGINFO, "%s(%d)", __func__, socket ); { RWWriteLock ml( &m_activeSocketsRWLock ); - m_activeSockets.push_back( socket ); + m_activeSockets.push_back( pair(socket, stype) ); } interrupt_poll(); } @@ -128,9 +128,9 @@ XWThreadPool::RemoveSocket( int socket ) { RWWriteLock ml( &m_activeSocketsRWLock ); - vector::iterator iter = m_activeSockets.begin(); + vector< pair >::iterator iter = m_activeSockets.begin(); while ( iter != m_activeSockets.end() ) { - if ( *iter == socket ) { + if ( iter->first == socket ) { m_activeSockets.erase( iter ); found = true; break; @@ -171,23 +171,27 @@ XWThreadPool::CloseSocket( int socket ) void XWThreadPool::EnqueueKill( int socket, const char* const why ) { - enqueue( socket, Q_KILL ); + enqueue( socket, STYPE_UNKNOWN, Q_KILL ); } bool -XWThreadPool::get_process_packet( int socket ) +XWThreadPool::get_process_packet( int socket, SockType stype ) { bool success = false; short packetSize; assert( sizeof(packetSize) == 2 ); - unsigned char buf[MAX_MSG_LEN]; + unsigned char buf[MAX_MSG_LEN+1]; int nRead = read_packet( socket, buf, sizeof(buf) ); if ( nRead < 0 ) { EnqueueKill( socket, "bad packet" ); - } else { + } else if ( STYPE_GAME == stype ) { logf( XW_LOGINFO, "calling m_pFunc" ); success = (*m_pFunc)( buf, nRead, socket ); + } else { + buf[nRead] = '\0'; + handle_proxy_packet( buf, nRead, socket ); + CloseSocket( socket ); } return success; } /* get_process_packet */ @@ -230,8 +234,8 @@ XWThreadPool::real_tpool_main() pr.m_socket ); switch ( pr.m_act ) { case Q_READ: - if ( get_process_packet( pr.m_socket ) ) { - AddSocket( pr.m_socket ); + if ( get_process_packet( pr.m_socket, pr.m_type ) ) { + AddSocket( pr.m_socket, pr.m_type ); } break; case Q_KILL: @@ -267,6 +271,7 @@ XWThreadPool::real_listener() int nSocketsAllocd = 1; struct pollfd* fds = (pollfd*)calloc( nSocketsAllocd, sizeof(fds[0]) ); + SockType* stypes = (SockType*)calloc( nSocketsAllocd, sizeof(stypes[0]) ); #ifdef LOG_POLL char* log = (char*)malloc( 4 * nSocketsAllocd ); #endif @@ -282,6 +287,7 @@ XWThreadPool::real_listener() if ( nSockets > nSocketsAllocd ) { fds = (struct pollfd*)realloc( fds, nSockets * sizeof(fds[0]) ); + stypes = (SockType*)realloc( stypes, nSockets * sizeof(stypes[0]) ); #ifdef LOG_POLL log = (char*)realloc( log, nSockets * 4 ); #endif @@ -297,9 +303,11 @@ XWThreadPool::real_listener() #endif ++curfd; - vector::iterator iter = m_activeSockets.begin(); - while ( iter != m_activeSockets.end() ) { - fds[curfd].fd = *iter++; + vector< pair >::iterator iter; + for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end(); + ++iter ) { + fds[curfd].fd = iter->first; + stypes[curfd] = iter->second; fds[curfd].events = flags; #ifdef LOG_POLL if ( logCapacity > logLen ) { @@ -360,7 +368,7 @@ XWThreadPool::real_listener() if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) { logf( XW_LOGINFO, "enqueuing %d", socket ); - enqueue( socket ); + enqueue( socket, stypes[curfd] ); } else { logf( XW_LOGERROR, "odd revents: %x", fds[curfd].revents ); @@ -388,10 +396,10 @@ XWThreadPool::listener_main( void* closure ) } void -XWThreadPool::enqueue( int socket, QAction act ) +XWThreadPool::enqueue( int socket, SockType stype, QAction act ) { MutexLock ml( &m_queueMutex ); - QueuePr pr = { act, socket }; + QueuePr pr = { act, socket, stype }; m_queue.push_back( pr ); logf( XW_LOGINFO, "calling pthread_cond_signal" ); diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 5b2e6a16e..353b32d58 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -35,6 +35,8 @@ using namespace std; class XWThreadPool { public: + typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType; + static XWThreadPool* GetTPool(); typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket ); typedef void (*kill_func)( int socket ); @@ -46,7 +48,7 @@ class XWThreadPool { void Stop(); /* Add to set being listened on */ - void AddSocket( int socket ); + void AddSocket( int socket, SockType stype ); /* remove from tpool altogether, and close */ void CloseSocket( int socket ); @@ -54,17 +56,18 @@ class XWThreadPool { private: typedef enum { Q_READ, Q_KILL } QAction; - typedef struct { QAction m_act; int m_socket; } QueuePr; + typedef struct { QAction m_act; int m_socket; SockType m_type; } QueuePr; /* Remove from set being listened on */ bool RemoveSocket( int socket ); void enqueue( int socket, QAction act = Q_READ ); + void enqueue( int socket, SockType stype, QAction act = Q_READ ); void release_socket_locked( int socket ); void grab_elem_locked( QueuePr* qpp ); void print_in_use( void ); - bool get_process_packet( int socket ); + bool get_process_packet( int socket, SockType stype ); void interrupt_poll(); void* real_tpool_main(); @@ -74,7 +77,7 @@ class XWThreadPool { static void* listener_main( void* closure ); /* Sockets main thread listens on */ - vector m_activeSockets; + vector< pair >m_activeSockets; pthread_rwlock_t m_activeSocketsRWLock; /* Sockets waiting for a thread to read 'em */ diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 9181675fa..19769b42e 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -683,7 +683,7 @@ read_packet( int sock, unsigned char* buf, int buflen ) nread = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); if ( nread == sizeof(msgLen) ) { msgLen = ntohs( msgLen ); - if ( msgLen <= buflen ) { + if ( msgLen < buflen ) { nread = recv( sock, buf, msgLen, MSG_WAITALL ); if ( nread == msgLen ) { result = nread; @@ -693,16 +693,10 @@ read_packet( int sock, unsigned char* buf, int buflen ) return result; } -static void* -handle_proxy_tproc( void* closure ) +void +handle_proxy_packet( unsigned char* buf, int len, int sock ) { - blockSignals(); - int sock = (int)closure; - - unsigned char buf[MAX_PROXY_MSGLEN]; - int len = read_packet( sock, buf, sizeof(buf)-1 ); if ( len > 0 ) { - buf[len] = '\0'; /* so can use strtok */ unsigned char* bufp = buf; unsigned char* end = bufp + len; if ( (0 == *bufp++) ) { /* protocol */ @@ -764,7 +758,7 @@ handle_proxy_tproc( void* closure ) } } } - break; + break; /* PRX_HAS_MSGS */ case PRX_DEVICE_GONE: logf( XW_LOGINFO, "%s: got PRX_DEVICE_GONE", __func__ ); if ( len >= 2 ) { @@ -799,24 +793,11 @@ handle_proxy_tproc( void* closure ) } len = 0; /* return a 0-length message */ write( sock, &len, sizeof(len) ); - break; + break; /* PRX_DEVICE_GONE */ } } } - sleep( 2 ); - close( sock ); - return NULL; -} /* handle_proxy_tproc */ - -static void -handle_proxy_connect( int sock ) -{ - pthread_t thread; - if ( 0 == pthread_create( &thread, NULL, handle_proxy_tproc, - (void*)sock ) ) { - pthread_detach( thread ); - } -} /* handle_proxy_connect */ +} /* handle_proxy_packet */ int main( int argc, char** argv ) @@ -839,7 +820,6 @@ main( int argc, char** argv ) /* Verify sizes here... */ assert( sizeof(CookieID) == 2 ); - /* Read options. Options trump config file values when they conflict, but the name of the config file is an option so we have to get that @@ -1120,15 +1100,14 @@ main( int argc, char** argv ) int newSock = accept( listener, (sockaddr*)&newaddr, &siz ); - if ( perGame ) { - logf( XW_LOGINFO, - "accepting connection from %s on socket %d", - inet_ntoa(newaddr.sin_addr), newSock ); + logf( XW_LOGINFO, + "accepting connection from %s on socket %d", + inet_ntoa(newaddr.sin_addr), newSock ); + + tPool->AddSocket( newSock, + perGame ? XWThreadPool::STYPE_GAME + : XWThreadPool::STYPE_PROXY ); - tPool->AddSocket( newSock ); - } else { - handle_proxy_connect( newSock ); - } --retval; } } diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 44f989967..5226609a2 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -30,6 +30,7 @@ int GetNSpawns(void); int make_socket( unsigned long addr, unsigned short port ); int read_packet( int sock, unsigned char* buf, int buflen ); +void handle_proxy_packet( unsigned char* buf, int bufLen, int socket ); const char* cmdToStr( XWRELAY_Cmd cmd );