diff --git a/xwords4/relay/ctrl.cpp b/xwords4/relay/ctrl.cpp index 76cb50bd3..a5b1ce059 100644 --- a/xwords4/relay/ctrl.cpp +++ b/xwords4/relay/ctrl.cpp @@ -46,6 +46,7 @@ #include "xwrelay_priv.h" #include "configs.h" #include "lstnrmgr.h" +#include "tpool.h" #define MAX_ARGS 10 @@ -190,7 +191,7 @@ cmd_kill_eject( int socket, const char** args ) if ( 0 == strcmp( args[1], "socket" ) ) { int victim = atoi( args[2] ); if ( victim != 0 ) { - killSocket( victim, "ctrl command" ); + XWThreadPool::GetTPool()-> EnqueueKill( victim, "ctrl command" ); found = true; } } else if ( 0 == strcmp( args[1], "cref" ) ) { diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 284300e1c..de30c47eb 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -78,10 +78,11 @@ XWThreadPool::~XWThreadPool() } /* ~XWThreadPool */ void -XWThreadPool::Setup( int nThreads, packet_func pFunc ) +XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) { m_nThreads = nThreads; m_pFunc = pFunc; + m_kFunc = kFunc; pthread_t thread; @@ -148,7 +149,7 @@ XWThreadPool::CloseSocket( int socket ) RWWriteLock rwl( &m_activeSocketsRWLock ); deque::iterator iter = m_queue.begin(); while ( iter != m_queue.end() ) { - if ( iter->second == socket ) { + if ( iter->m_socket == socket ) { m_queue.erase( iter ); /* do_interrupt = true; */ break; @@ -167,6 +168,12 @@ XWThreadPool::CloseSocket( int socket ) /* } */ } +void +XWThreadPool::EnqueueKill( int socket, const char* const why ) +{ + enqueue( socket, Q_KILL ); +} + bool XWThreadPool::get_process_packet( int socket ) { @@ -177,18 +184,18 @@ XWThreadPool::get_process_packet( int socket ) ssize_t nRead = recv( socket, &packetSize, sizeof(packetSize), MSG_WAITALL ); if ( nRead != 2 ) { - killSocket( socket, "nRead != 2" ); + EnqueueKill( socket, "nRead != 2" ); } else { packetSize = ntohs( packetSize ); if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) { - killSocket( socket, "packetSize wrong" ); + EnqueueKill( socket, "packetSize wrong" ); } else { unsigned char buf[MAX_MSG_LEN]; nRead = recv( socket, buf, packetSize, MSG_WAITALL ); if ( nRead != packetSize ) { - killSocket( socket, "nRead != packetSize" ); + EnqueueKill( socket, "nRead != packetSize" ); } else { logf( XW_LOGINFO, "read %d bytes", nRead ); @@ -213,7 +220,6 @@ XWThreadPool::real_tpool_main() logf( XW_LOGINFO, "tpool worker thread starting" ); int socket = -1; for ( ; ; ) { - pthread_mutex_lock( &m_queueMutex ); release_socket_locked( socket ); @@ -227,17 +233,26 @@ XWThreadPool::real_tpool_main() } QueuePr pr; - if ( grab_elem_locked( socket, &pr ) ) { - socket = pr.second; - } else { - socket = -1; - } - pthread_mutex_unlock( &m_queueMutex ); - logf( XW_LOGINFO, "worker thread got socket %d from queue", socket ); + grab_elem_locked( socket, &pr ); - if ( socket >= 0 && get_process_packet( socket ) ) { - AddSocket( socket ); - } /* else drop it: error */ + pthread_mutex_unlock( &m_queueMutex ); + + if ( pr.m_socket >= 0 ) { + logf( XW_LOGINFO, "worker thread got socket %d from queue", + pr.m_socket ); + switch ( pr.m_act ) { + case Q_READ: + if ( get_process_packet( pr.m_socket ) ) { + AddSocket( pr.m_socket ); + } + break; + case Q_KILL: + (*m_kFunc)( pr.m_socket ); + CloseSocket( pr.m_socket ); + break; + } + } + socket = pr.m_socket; } logf( XW_LOGINFO, "tpool worker thread exiting" ); return NULL; @@ -359,7 +374,7 @@ XWThreadPool::real_listener() enqueue( socket ); } else { logf( XW_LOGERROR, "odd revents: %x", curfd->revents ); - killSocket( socket, "error/hup in poll()" ); + EnqueueKill( socket, "error/hup in poll()" ); } --nEvents; } @@ -381,10 +396,10 @@ XWThreadPool::listener_main( void* closure ) } void -XWThreadPool::enqueue( int socket ) +XWThreadPool::enqueue( int socket, QAction act ) { MutexLock ml( &m_queueMutex ); - pair pr(Q_READ, socket); + QueuePr pr = { act, socket }; m_queue.push_back( pr ); logf( XW_LOGINFO, "calling pthread_cond_signal" ); @@ -392,13 +407,14 @@ XWThreadPool::enqueue( int socket ) /* implicit unlock */ } -bool +void XWThreadPool::grab_elem_locked( int curSock, QueuePr* prp ) { bool found = false; + prp->m_socket = -1; deque::iterator iter; for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) { - int socket = iter->second; + int socket = iter->m_socket; if ( socket == curSock || m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) { *prp = *iter; @@ -407,8 +423,14 @@ XWThreadPool::grab_elem_locked( int curSock, QueuePr* prp ) found = true; } } + + /* I think once an event isn't "found" here there's a chance of events + sitting in the queue without the threads knowing to go after them. So + IFF this happens need to deal with it or at least confirm that there's + no chance of starvation */ + // assert( found ); THIS IS FIRING + logf( XW_LOGINFO, "%s()=>%d", __func__, found ); - return found; } void diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index b99fcecc9..a1e28e45d 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -37,11 +37,12 @@ class XWThreadPool { public: static XWThreadPool* GetTPool(); typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket ); + typedef void (*kill_func)( int socket ); XWThreadPool(); ~XWThreadPool(); - void Setup( int nThreads, packet_func pFunc ); + void Setup( int nThreads, packet_func pFunc, kill_func kFunc ); void Stop(); /* Add to set being listened on */ @@ -49,18 +50,18 @@ class XWThreadPool { /* remove from tpool altogether, and close */ void CloseSocket( int socket ); - void EnqueueKill( int socket ); + void EnqueueKill( int socket, const char* const why ); private: typedef enum { Q_READ, Q_KILL } QAction; - typedef pair QueuePr; + typedef struct { QAction m_act; int m_socket; } QueuePr; /* Remove from set being listened on */ bool RemoveSocket( int socket ); - void enqueue( int socket ); + void enqueue( int socket, QAction act = Q_READ ); void release_socket_locked( int socket ); - bool grab_elem_locked( int curSock, QueuePr* qpp ); + void grab_elem_locked( int curSock, QueuePr* qpp ); bool get_process_packet( int socket ); void interrupt_poll(); @@ -88,6 +89,7 @@ class XWThreadPool { bool m_timeToDie; int m_nThreads; packet_func m_pFunc; + kill_func m_kFunc; static XWThreadPool* g_instance; }; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index fa5702262..8c088e7e6 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -218,9 +218,6 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket ) SafeCref scr( socket ); success = scr.HandleHeartbeat( hostID, socket ); } - if ( !success ) { - killSocket( socket, "no cref for socket" ); - } } return success; } /* processHeartbeat */ @@ -424,15 +421,11 @@ processDisconnect( unsigned char* bufp, int bufLen, int socket ) return success; } /* processDisconnect */ -void -killSocket( int socket, const char* why ) +static void +killSocket( int socket ) { - logf( XW_LOGINFO, "killSocket(%d): %s", socket, why ); + logf( XW_LOGINFO, "killSocket(%d)", socket ); CRefMgr::Get()->RemoveSocketRefs( socket ); - /* Might want to kill the thread it belongs to if we're not in it, - e.g. when unable to write to another socket. */ - logf( XW_LOGINFO, "killSocket done" ); - XWThreadPool::GetTPool()->CloseSocket( socket ); } time_t @@ -512,7 +505,7 @@ processMessage( unsigned char* buf, int bufLen, int socket ) } if ( !success ) { - killSocket( socket, "failure" ); + XWThreadPool::GetTPool()->EnqueueKill( socket, "failure" ); } return success; @@ -974,7 +967,7 @@ main( int argc, char** argv ) (void)sigaction( SIGINT, &act, NULL ); XWThreadPool* tPool = XWThreadPool::GetTPool(); - tPool->Setup( nWorkerThreads, processMessage ); + tPool->Setup( nWorkerThreads, processMessage, killSocket ); /* set up select call */ fd_set rfds; diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 6ebeebcdd..50f10a8a2 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -19,7 +19,6 @@ typedef enum { void logf( XW_LogLevel level, const char* format, ... ); void denyConnection( int socket, XWREASON err ); -void killSocket( int socket, const char* why ); bool send_with_length_unsafe( int socket, unsigned char* buf, int bufLen ); time_t uptime(void);