diff --git a/relay/tpool.cpp b/relay/tpool.cpp index 3e60383e1..ca2366d6f 100644 --- a/relay/tpool.cpp +++ b/relay/tpool.cpp @@ -46,7 +46,7 @@ XWThreadPool::GetTPool() XWThreadPool::XWThreadPool() { - pthread_mutex_init ( &m_activeSocketsMutex, NULL ); + pthread_rwlock_init( &m_activeSocketsRWLock, NULL ); pthread_mutex_init ( &m_queueMutex, NULL ); pthread_cond_init( &m_queueCondVar, NULL ); @@ -84,27 +84,53 @@ XWThreadPool::AddSocket( int socket ) { logf( "AddSocket(%d)", socket ); { - MutexLock ml( &m_activeSocketsMutex ); + RWWriteLock ml( &m_activeSocketsRWLock ); m_activeSockets.push_back( socket ); } interrupt_poll(); } -void +int XWThreadPool::RemoveSocket( int socket ) { + int found = 0; { - MutexLock ml( &m_activeSocketsMutex ); + RWWriteLock ml( &m_activeSocketsRWLock ); vector::iterator iter = m_activeSockets.begin(); while ( iter != m_activeSockets.end() ) { if ( *iter == socket ) { m_activeSockets.erase( iter ); + found = 1; break; } ++iter; } } + return found; +} + + +void +XWThreadPool::CloseSocket( int socket ) +{ + int do_interrupt = 0; + if ( !RemoveSocket( socket ) ) { + RWWriteLock rwl( &m_activeSocketsRWLock ); + deque::iterator iter = m_queue.begin(); + while ( iter != m_queue.end() ) { + if ( *iter == socket ) { + m_queue.erase( iter ); + do_interrupt = 1; + break; + } + ++iter; + } + } + close( socket ); + if ( do_interrupt ) { + interrupt_poll(); + } } int @@ -189,7 +215,7 @@ XWThreadPool::real_listener() for ( ; ; ) { - pthread_mutex_lock( &m_activeSocketsMutex ); + pthread_rwlock_rdlock( &m_activeSocketsRWLock ); int nSockets = m_activeSockets.size() + 1; /* for pipe */ pollfd* fds = (pollfd*)malloc( sizeof(fds[0]) * nSockets ); pollfd* curfd = fds; @@ -209,7 +235,7 @@ XWThreadPool::real_listener() len += sprintf( log+len, "%d,", curfd->fd ); ++curfd; } - pthread_mutex_unlock( &m_activeSocketsMutex ); + pthread_rwlock_unlock( &m_activeSocketsRWLock ); logf( "calling poll on %s", log ); int nEvents = poll( fds, nSockets, -1 ); /* -1: infinite timeout */