diff --git a/xwords4/relay/addrinfo.h b/xwords4/relay/addrinfo.h index 1c8d04d87..631e6f19f 100644 --- a/xwords4/relay/addrinfo.h +++ b/xwords4/relay/addrinfo.h @@ -57,6 +57,7 @@ class AddrInfo { void setIsTCP( bool val ) { m_isTCP = val; } bool isTCP() const { return m_isTCP; } /* later UDP will be here too */ int socket() const { assert(m_isValid); return m_socket; } + void invalSocket() { m_socket = -1; } ClientToken clientToken() const { assert(m_isValid); return m_clientToken; } struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; } const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; } diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 396c5a4b1..cf6ddb99d 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -186,6 +186,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) { /* bool do_interrupt = false; */ assert( addr->isTCP() ); + UdpQueue::get()->forgetSocket( addr ); if ( !RemoveSocket( addr ) ) { MutexLock ml( &m_queueMutex ); deque::iterator iter = m_queue.begin(); @@ -482,7 +483,8 @@ XWThreadPool::grab_elem_locked( QueuePr* prp ) for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) { int socket = iter->m_info.m_addr.socket(); /* If NOT found */ - if ( m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) { + if ( -1 != socket + && m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) { *prp = *iter; m_queue.erase( iter ); /* this was a double-free once! */ m_sockets_in_use.insert( socket ); @@ -513,10 +515,10 @@ XWThreadPool::print_in_use( void ) for ( iter = m_sockets_in_use.begin(); iter != m_sockets_in_use.end(); ++iter ) { - string_printf( str, "%d ", *iter ); + string_printf( str, "%d ", *iter ); } if ( 0 < str.size() ) { - logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() ); + logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() ); } } diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index 17eae630c..ab08b97a6 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -107,9 +107,56 @@ UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, utc->setID( id ); logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); m_queue.push_back( utc ); + + int sock = addr->socket(); + map >::iterator iter = m_bySocket.find( sock ); + if ( iter == m_bySocket.end() ) { + logf( XW_LOGINFO, "%s: creating vector for socket %d", __func__, sock ); + vector vect; + vect.push_back( utc ); + m_bySocket.insert( pair >(sock, vect) ); + } else { + iter->second.push_back( utc ); + logf( XW_LOGINFO, "%s: now have %d packets for socket %d", + __func__, iter->second.size(), sock ); + } + pthread_cond_signal( &m_queueCondVar ); } +void +UdpQueue::forgetSocket( const AddrInfo* addr ) +{ + assert( addr->isTCP() ); + int sock = addr->socket(); + MutexLock ml( &m_queueMutex ); + + map >::iterator iter = m_bySocket.find( sock ); + if ( m_bySocket.end() != iter ) { + vector& vect = iter->second; + vector::iterator iter2; + for ( iter2 = vect.begin(); vect.end() != iter2; ++ iter2 ) { + UdpThreadClosure* utc = *iter2; + assert( -1 != utc->addr()->socket() ); + utc->invalSocket(); + logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", + __func__, sock, utc->getID() ); + // vect.erase( iter2 ); + } + vect.clear(); + } + + // deque::iterator iter; + // for ( iter = m_queue.begin(); iter != m_queue.end(); ++iter ) { + // const AddrInfo* addr = (*iter)->addr(); + // if ( sock == addr->socket() ) { + // logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", + // __func__, sock, (*iter)->getID() ); + // (*iter)->invalSocket(); + // } + // } +} + void* UdpQueue::thread_main() { @@ -120,6 +167,17 @@ UdpQueue::thread_main() } UdpThreadClosure* utc = m_queue.front(); m_queue.pop_front(); + + int sock = utc->addr()->socket(); + if ( -1 != sock ) { + map >::iterator iter = m_bySocket.find( sock ); + assert ( iter != m_bySocket.end() ); + vector& vect = iter->second; + assert( utc == *vect.begin() ); + vect.erase( vect.begin() ); + logf( XW_LOGINFO, "%s: %d packets remaining for socket %d", + __func__, vect.size(), sock ); + } pthread_mutex_unlock( &m_queueMutex ); utc->noteDequeued(); diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index a2ee730a8..544c47088 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -23,6 +23,7 @@ #include #include +#include #include "xwrelay_priv.h" #include "addrinfo.h" @@ -57,6 +58,7 @@ public: const QueueCallback cb() const { return m_cb; } void setID( int id ) { m_id = id; } int getID( void ) { return m_id; } + void invalSocket() { m_addr.invalSocket(); } private: unsigned char* m_buf; @@ -76,6 +78,7 @@ class UdpQueue { bool handle( const AddrInfo* addr, QueueCallback cb ); void handle( const AddrInfo* addr, unsigned char* buf, int len, QueueCallback cb ); + void forgetSocket( const AddrInfo* addr ); private: static void* thread_main_static( void* closure ); @@ -84,6 +87,7 @@ class UdpQueue { pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar; deque m_queue; + map > m_bySocket; int m_nextID; };