add socket refcounting

AddrInfo now has ref()/unref() and keeps a global socket->refcount map
(since actual AddrInfo instances come and go.) When the
count drops to 0, the existing CloseSocket() method is called. This
seems to fix a bunch of race conditions that had a socket being closed
and reused while old code was still expecting to write to the device
attached to the socket the first time (along with lots of calls to close()
already-closed sockets, attempts to write() to closed sockets, etc.)
This commit is contained in:
Eric House 2017-12-06 19:24:52 -08:00
parent dbf38f7759
commit 811c8f535e
7 changed files with 61 additions and 7 deletions

View file

@ -67,6 +67,7 @@ endif
# turn on semaphore debugging
# CPPFLAGS += -DDEBUG_LOCKS
# CPPFLAGS += -DLOG_POLL
memdebug all: xwrelay rq

View file

@ -20,13 +20,16 @@
*/
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "addrinfo.h"
#include "xwrelay_priv.h"
#include "tpool.h"
#include "udpager.h"
#include "mlock.h"
// static uint32_t s_prevCreated = 0L;
@ -82,3 +85,40 @@ AddrInfo::equals( const AddrInfo& other ) const
return equal;
}
static pthread_mutex_t s_refMutex = PTHREAD_MUTEX_INITIALIZER;
static map<int, int > s_socketRefs;
void AddrInfo::ref() const
{
// logf( XW_LOGVERBOSE0, "%s(socket=%d)", __func__, m_socket );
MutexLock ml( &s_refMutex );
++s_socketRefs[m_socket];
printRefMap();
}
void
AddrInfo::unref() const
{
// logf( XW_LOGVERBOSE0, "%s(socket=%d)", __func__, m_socket );
MutexLock ml( &s_refMutex );
assert( s_socketRefs[m_socket] > 0 );
--s_socketRefs[m_socket];
if ( s_socketRefs[m_socket] == 0 ) {
XWThreadPool::GetTPool()->CloseSocket( this );
}
printRefMap();
}
/* private, and assumes have mutex */
void
AddrInfo::printRefMap() const
{
/* for ( map<int,int>::const_iterator iter = s_socketRefs.begin(); */
/* iter != s_socketRefs.end(); ++iter ) { */
/* int count = iter->second; */
/* if ( count > 0 ) { */
/* logf( XW_LOGVERBOSE0, "socket: %d; count: %d", iter->first, count ); */
/* } */
/* } */
}

View file

@ -81,12 +81,18 @@ class AddrInfo {
bool equals( const AddrInfo& other ) const;
/* refcount the underlying socket (doesn't modify instance) */
void ref() const;
void unref() const;
int getref() const;
private:
void construct( int sock, const AddrUnion* saddr, bool isTCP );
void init( int sock, ClientToken clientToken, const AddrUnion* saddr ) {
construct( sock, saddr, false );
m_clientToken = clientToken;
}
void printRefMap() const;
// AddrInfo& operator=(const AddrInfo&); // Prevent assignment
int m_socket;

View file

@ -119,6 +119,7 @@ XWThreadPool::Stop()
void
XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from )
{
from->ref();
int sock = from->getSocket();
logf( XW_LOGVERBOSE0, "%s(sock=%d, isTCP=%d)", __func__, sock, from->isTCP() );
SockInfo si = { .m_type = stype,
@ -174,7 +175,6 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr )
void
XWThreadPool::CloseSocket( const AddrInfo* addr )
{
int sock = addr->getSocket();
if ( addr->isTCP() ) {
if ( !RemoveSocket( addr ) ) {
MutexLock ml( &m_queueMutex );
@ -187,6 +187,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
++iter;
}
}
int sock = addr->getSocket();
int err = close( sock );
if ( 0 != err ) {
logf( XW_LOGERROR, "%s(): close(socket=%d) => %d/%s", __func__,
@ -284,7 +285,7 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip )
case Q_KILL:
logf( XW_LOGINFO, "worker thread got socket %d from queue (to close it)", sock );
(*m_kFunc)( &pr.m_info.m_addr );
CloseSocket( &pr.m_info.m_addr );
pr.m_info.m_addr.unref();
break;
}
} else {

View file

@ -58,7 +58,7 @@ PartialPacket::readAtMost( int len )
logf( XW_LOGERROR, "%s(len=%d, socket=%d): recv failed: %d (%s)", __func__,
len, m_sock, m_errno, strerror(m_errno) );
}
} else if ( 0 == nRead ) { // remote socket closed
} else if ( 0 == nRead ) { // remote socket half-closed
logf( XW_LOGINFO, "%s(): remote closed (socket=%d)", __func__, m_sock );
m_errno = -1; // so stillGood will fail
} else {
@ -160,11 +160,12 @@ void
UdpQueue::handle( const AddrInfo* addr, const uint8_t* buf, int len,
QueueCallback cb )
{
// addr->ref();
PacketThreadClosure* ptc = new PacketThreadClosure( addr, buf, len, cb );
MutexLock ml( &m_queueMutex );
int id = ++m_nextID;
ptc->setID( id );
logf( XW_LOGINFO, "%s: enqueuing packet %d (socket %d, len %d)",
logf( XW_LOGINFO, "%s(): enqueuing packet %d (socket %d, len %d)",
__func__, id, addr->getSocket(), len );
m_queue.push_back( ptc );
@ -223,6 +224,7 @@ UdpQueue::thread_main()
logf( XW_LOGINFO, "%s: dropping packet %d; it's %d seconds old!",
__func__, age );
}
// ptc->addr()->unref();
delete ptc;
}
return NULL;

View file

@ -44,10 +44,14 @@ public:
, m_cb(cb)
, m_created(time( NULL ))
{
memcpy( m_buf, buf, len );
memcpy( m_buf, buf, len );
m_addr.ref();
}
~PacketThreadClosure() { delete[] m_buf; }
~PacketThreadClosure() {
m_addr.unref();
delete[] m_buf;
}
const uint8_t* buf() const { return m_buf; }
int len() const { return m_len; }

View file

@ -1490,7 +1490,7 @@ game_thread_proc( PacketThreadClosure* ptc )
{
logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( !processMessage( ptc->buf(), ptc->len(), ptc->addr(), 0 ) ) {
XWThreadPool::GetTPool()->CloseSocket( ptc->addr() );
// XWThreadPool::GetTPool()->CloseSocket( ptc->addr() );
}
}