diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 2cacc9bae..284300e1c 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -146,9 +146,9 @@ XWThreadPool::CloseSocket( int socket ) /* bool do_interrupt = false; */ if ( !RemoveSocket( socket ) ) { RWWriteLock rwl( &m_activeSocketsRWLock ); - deque::iterator iter = m_queue.begin(); + deque::iterator iter = m_queue.begin(); while ( iter != m_queue.end() ) { - if ( *iter == socket ) { + if ( iter->second == socket ) { m_queue.erase( iter ); /* do_interrupt = true; */ break; @@ -211,9 +211,12 @@ void* XWThreadPool::real_tpool_main() { logf( XW_LOGINFO, "tpool worker thread starting" ); + int socket = -1; for ( ; ; ) { pthread_mutex_lock( &m_queueMutex ); + release_socket_locked( socket ); + while ( !m_timeToDie && m_queue.size() == 0 ) { pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); } @@ -223,12 +226,16 @@ XWThreadPool::real_tpool_main() break; } - int socket = m_queue.front(); - m_queue.pop_front(); + QueuePr pr; + if ( grab_elem_locked( socket, &pr ) ) { + socket = pr.second; + } else { + socket = -1; + } pthread_mutex_unlock( &m_queueMutex ); logf( XW_LOGINFO, "worker thread got socket %d from queue", socket ); - if ( get_process_packet( socket ) ) { + if ( socket >= 0 && get_process_packet( socket ) ) { AddSocket( socket ); } /* else drop it: error */ } @@ -377,9 +384,40 @@ void XWThreadPool::enqueue( int socket ) { MutexLock ml( &m_queueMutex ); - m_queue.push_back( socket ); + pair pr(Q_READ, socket); + m_queue.push_back( pr ); logf( XW_LOGINFO, "calling pthread_cond_signal" ); pthread_cond_signal( &m_queueCondVar ); /* implicit unlock */ } + +bool +XWThreadPool::grab_elem_locked( int curSock, QueuePr* prp ) +{ + bool found = false; + deque::iterator iter; + for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) { + int socket = iter->second; + if ( socket == curSock + || m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) { + *prp = *iter; + m_queue.erase( iter ); + m_sockets_in_use.insert( socket ); + found = true; + } + } + logf( XW_LOGINFO, "%s()=>%d", __func__, found ); + return found; +} + +void +XWThreadPool::release_socket_locked( int socket ) +{ + logf( XW_LOGINFO, "%s(%d)", __func__, socket ); + if ( -1 != socket ) { + set::iterator iter = m_sockets_in_use.find( socket ); + assert( iter != m_sockets_in_use.end() ); + m_sockets_in_use.erase( iter ); + } +} diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 5aec6c9ad..b99fcecc9 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -28,6 +28,7 @@ #include #include +#include using namespace std; @@ -48,13 +49,19 @@ class XWThreadPool { /* remove from tpool altogether, and close */ void CloseSocket( int socket ); - void Poll(); + void EnqueueKill( int socket ); private: + typedef enum { Q_READ, Q_KILL } QAction; + typedef pair QueuePr; + /* Remove from set being listened on */ bool RemoveSocket( int socket ); void enqueue( int socket ); + void release_socket_locked( int socket ); + bool grab_elem_locked( int curSock, QueuePr* qpp ); + bool get_process_packet( int socket ); void interrupt_poll(); @@ -69,7 +76,8 @@ class XWThreadPool { pthread_rwlock_t m_activeSocketsRWLock; /* Sockets waiting for a thread to read 'em */ - deque m_queue; + deque m_queue; + set m_sockets_in_use; pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar;