diff --git a/xwords4/relay/Makefile b/xwords4/relay/Makefile index 06fc29fa8..82cfaeae5 100644 --- a/xwords4/relay/Makefile +++ b/xwords4/relay/Makefile @@ -67,6 +67,7 @@ endif # turn on semaphore debugging # CPPFLAGS += -DDEBUG_LOCKS +# CPPFLAGS += -DLOG_POLL memdebug all: xwrelay rq diff --git a/xwords4/relay/addrinfo.cpp b/xwords4/relay/addrinfo.cpp index ead4105ca..546b828e6 100644 --- a/xwords4/relay/addrinfo.cpp +++ b/xwords4/relay/addrinfo.cpp @@ -20,13 +20,16 @@ */ #include +#include #include #include +#include #include "addrinfo.h" #include "xwrelay_priv.h" #include "tpool.h" #include "udpager.h" +#include "mlock.h" // static uint32_t s_prevCreated = 0L; @@ -82,3 +85,40 @@ AddrInfo::equals( const AddrInfo& other ) const return equal; } +static pthread_mutex_t s_refMutex = PTHREAD_MUTEX_INITIALIZER; +static map s_socketRefs; + +void AddrInfo::ref() const +{ + // logf( XW_LOGVERBOSE0, "%s(socket=%d)", __func__, m_socket ); + MutexLock ml( &s_refMutex ); + ++s_socketRefs[m_socket]; + printRefMap(); +} + +void +AddrInfo::unref() const +{ + // logf( XW_LOGVERBOSE0, "%s(socket=%d)", __func__, m_socket ); + + MutexLock ml( &s_refMutex ); + assert( s_socketRefs[m_socket] > 0 ); + --s_socketRefs[m_socket]; + if ( s_socketRefs[m_socket] == 0 ) { + XWThreadPool::GetTPool()->CloseSocket( this ); + } + printRefMap(); +} + +/* private, and assumes have mutex */ +void +AddrInfo::printRefMap() const +{ + /* for ( map::const_iterator iter = s_socketRefs.begin(); */ + /* iter != s_socketRefs.end(); ++iter ) { */ + /* int count = iter->second; */ + /* if ( count > 0 ) { */ + /* logf( XW_LOGVERBOSE0, "socket: %d; count: %d", iter->first, count ); */ + /* } */ + /* } */ +} diff --git a/xwords4/relay/addrinfo.h b/xwords4/relay/addrinfo.h index d92e70a1b..94b04a816 100644 --- a/xwords4/relay/addrinfo.h +++ b/xwords4/relay/addrinfo.h @@ -81,12 +81,18 @@ class AddrInfo { bool equals( const AddrInfo& other ) const; + /* refcount the underlying socket (doesn't modify instance) */ + void ref() const; + void unref() const; + int getref() const; + private: void construct( int sock, const AddrUnion* saddr, bool isTCP ); void init( int sock, ClientToken clientToken, const AddrUnion* saddr ) { construct( sock, saddr, false ); m_clientToken = clientToken; } + void printRefMap() const; // AddrInfo& operator=(const AddrInfo&); // Prevent assignment int m_socket; diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 804be3de5..3ea401022 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -119,6 +119,7 @@ XWThreadPool::Stop() void XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from ) { + from->ref(); int sock = from->getSocket(); logf( XW_LOGVERBOSE0, "%s(sock=%d, isTCP=%d)", __func__, sock, from->isTCP() ); SockInfo si = { .m_type = stype, @@ -174,7 +175,6 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr ) void XWThreadPool::CloseSocket( const AddrInfo* addr ) { - int sock = addr->getSocket(); if ( addr->isTCP() ) { if ( !RemoveSocket( addr ) ) { MutexLock ml( &m_queueMutex ); @@ -187,6 +187,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) ++iter; } } + int sock = addr->getSocket(); int err = close( sock ); if ( 0 != err ) { logf( XW_LOGERROR, "%s(): close(socket=%d) => %d/%s", __func__, @@ -284,7 +285,7 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip ) case Q_KILL: logf( XW_LOGINFO, "worker thread got socket %d from queue (to close it)", sock ); (*m_kFunc)( &pr.m_info.m_addr ); - CloseSocket( &pr.m_info.m_addr ); + pr.m_info.m_addr.unref(); break; } } else { diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index 6ffab296e..bb59a0892 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -58,7 +58,7 @@ PartialPacket::readAtMost( int len ) logf( XW_LOGERROR, "%s(len=%d, socket=%d): recv failed: %d (%s)", __func__, len, m_sock, m_errno, strerror(m_errno) ); } - } else if ( 0 == nRead ) { // remote socket closed + } else if ( 0 == nRead ) { // remote socket half-closed logf( XW_LOGINFO, "%s(): remote closed (socket=%d)", __func__, m_sock ); m_errno = -1; // so stillGood will fail } else { @@ -160,11 +160,12 @@ void UdpQueue::handle( const AddrInfo* addr, const uint8_t* buf, int len, QueueCallback cb ) { + // addr->ref(); PacketThreadClosure* ptc = new PacketThreadClosure( addr, buf, len, cb ); MutexLock ml( &m_queueMutex ); int id = ++m_nextID; ptc->setID( id ); - logf( XW_LOGINFO, "%s: enqueuing packet %d (socket %d, len %d)", + logf( XW_LOGINFO, "%s(): enqueuing packet %d (socket %d, len %d)", __func__, id, addr->getSocket(), len ); m_queue.push_back( ptc ); @@ -223,6 +224,7 @@ UdpQueue::thread_main() logf( XW_LOGINFO, "%s: dropping packet %d; it's %d seconds old!", __func__, age ); } + // ptc->addr()->unref(); delete ptc; } return NULL; diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 3926b3903..befa99893 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -44,10 +44,14 @@ public: , m_cb(cb) , m_created(time( NULL )) { - memcpy( m_buf, buf, len ); + memcpy( m_buf, buf, len ); + m_addr.ref(); } - ~PacketThreadClosure() { delete[] m_buf; } + ~PacketThreadClosure() { + m_addr.unref(); + delete[] m_buf; + } const uint8_t* buf() const { return m_buf; } int len() const { return m_len; } diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 7b2363541..e94fd7666 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -1490,7 +1490,7 @@ game_thread_proc( PacketThreadClosure* ptc ) { logf( XW_LOGVERBOSE0, "%s()", __func__ ); if ( !processMessage( ptc->buf(), ptc->len(), ptc->addr(), 0 ) ) { - XWThreadPool::GetTPool()->CloseSocket( ptc->addr() ); + // XWThreadPool::GetTPool()->CloseSocket( ptc->addr() ); } }