From 38ad3aa81c8e8aa64aa223a8a25f975de12ca9cf Mon Sep 17 00:00:00 2001 From: Andy2 Date: Sat, 25 Jun 2011 15:40:12 -0700 Subject: [PATCH] cleanup; and: collapse HostRec and socket set into map of socket to HostRec, protecting all accesses with ReadWrite locks. --- xwords4/relay/cref.cpp | 370 +++++++++++++++++------------------------ xwords4/relay/cref.h | 19 +-- 2 files changed, 163 insertions(+), 226 deletions(-) diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 3a6de40ea..da6fd7a44 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -114,7 +114,7 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id, CookieRef::CookieRef( const char* cookie, const char* connName, CookieID id, int langCode, int nPlayersT, int nAlreadyHere ) { - pthread_mutex_init( &m_sockSetMutex, NULL ); + pthread_rwlock_init( &m_socketsRWLock, NULL ); ReInit( cookie, connName, id, langCode, nPlayersT, nAlreadyHere ); } @@ -127,17 +127,18 @@ CookieRef::~CookieRef() XWThreadPool* tPool = XWThreadPool::GetTPool(); ASSERT_LOCKED(); - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - int socket = iter->m_socket; - tPool->CloseSocket( socket ); - RmSocket( socket ); - m_sockets.erase( iter ); + map::iterator iter; + { + RWWriteLock rwl( &m_socketsRWLock ); + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + int socket = iter->first; + tPool->CloseSocket( socket ); + m_sockets.erase( iter ); + } } - printSeeds(__func__); - pthread_mutex_destroy( &m_sockSetMutex ); + pthread_rwlock_destroy( &m_socketsRWLock ); } /* ~CookieRef */ void @@ -278,13 +279,11 @@ CookieRef::HostForSocket( int sock ) { HostID hid = -1; ASSERT_LOCKED(); - vector::const_iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_socket == sock ) { - hid = iter->m_hostID; - logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid ); - break; - } + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter = m_sockets.find( sock ); + if ( iter != m_sockets.end() ) { + hid = iter->second.m_hostID; + logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid ); } return hid; } @@ -294,11 +293,13 @@ CookieRef::SocketForHost( HostID dest ) { int socket = -1; ASSERT_LOCKED(); - vector::const_iterator iter; assert( dest != 0 ); /* don't use as lookup before assigned */ + + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_hostID == dest ) { - socket = iter->m_socket; + if ( iter->second.m_hostID == dest ) { + socket = iter->first; break; } } @@ -313,14 +314,15 @@ CookieRef::AlreadyHere( unsigned short seed, int socket, HostID* prevHostID ) logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket ); bool here = false; - vector::iterator iter; + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - here = iter->m_seed == seed; /* client already registered */ + here = iter->second.m_seed == seed; /* client already registered */ if ( here ) { - if ( iter->m_socket != socket ) { /* not just a dupe packet */ + if ( iter->first != socket ) { /* not just a dupe packet */ logf( XW_LOGINFO, "%s: seeds match; socket %d assumed closed", - __func__, iter->m_socket ); - *prevHostID = iter->m_hostID; + __func__, iter->first ); + *prevHostID = iter->second.m_hostID; } break; } @@ -338,18 +340,18 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket, hid, seed, seed, socket ); bool here = false; - vector::iterator iter; + RWWriteLock rwl( &m_socketsRWLock ); + map::iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_hostID == hid ) { - if ( seed != iter->m_seed ) { + if ( iter->second.m_hostID == hid ) { + if ( seed != iter->second.m_seed ) { *spotTaken = true; - } else if ( socket == iter->m_socket ) { + } else if ( socket == iter->first ) { here = true; /* dup packet */ } else { logf( XW_LOGINFO, "%s: hids match; nuking existing record " "for socket %d b/c assumed closed", __func__, - iter->m_socket ); - RmSocket( iter->m_socket ); + iter->first ); m_sockets.erase( iter ); } break; @@ -376,44 +378,33 @@ void CookieRef::removeSocket( int socket ) { logf( XW_LOGINFO, "%s(socket=%d)", __func__, socket ); - int count; - { - bool found = false; - ASSERT_LOCKED(); + bool found = false; + ASSERT_LOCKED(); - count = m_sockets.size(); - assert( count <= 4 ); - if ( count > 0 ) { - vector::iterator iter; - for ( iter = m_sockets.begin(); - !found && iter != m_sockets.end(); ++iter ) { - if ( iter->m_socket == socket ) { - if ( iter->m_ackPending ) { - logf( XW_LOGINFO, - "Never got ack; removing hid %d from DB", - iter->m_hostID ); - DBMgr::Get()->RmDeviceByHid( ConnName(), iter->m_hostID ); - m_nPlayersHere -= iter->m_nPlayersH; - cancelAckTimer( iter->m_hostID ); - } - RmSocket( socket ); - m_sockets.erase(iter); - --count; - found = true; - } + { + RWWriteLock rwl( &m_socketsRWLock ); + map::iterator iter = m_sockets.find(socket); + if ( iter != m_sockets.end() ) { + if ( iter->second.m_ackPending ) { + logf( XW_LOGINFO, + "Never got ack; removing hid %d from DB", + iter->second.m_hostID ); + DBMgr::Get()->RmDeviceByHid( ConnName(), + iter->second.m_hostID ); + m_nPlayersHere -= iter->second.m_nPlayersH; + cancelAckTimer( iter->second.m_hostID ); } - } else { - logf( XW_LOGERROR, "%s: no socket %d to remove", __func__, - socket ); - } - if ( !found ) { - logf( XW_LOGINFO, "%s: socket %d not found", __func__, socket ); + m_sockets.erase(iter); + found = true; } } + if ( !found ) { + logf( XW_LOGINFO, "%s: socket %d not found", __func__, socket ); + } printSeeds(__func__); - if ( count == 0 ) { + if ( m_sockets.size() == 0 ) { pushLastSocketGoneEvent(); } } /* removeSocket */ @@ -443,39 +434,22 @@ CookieRef::HasSocket( int socket ) set CookieRef::GetSockets() { - MutexLock ml( &m_sockSetMutex ); - return m_sockSet; -} - -void -CookieRef::AddSocket( int socket ) -{ - MutexLock ml( &m_sockSetMutex ); - assert( m_sockSet.find(socket) == m_sockSet.end() ); - m_sockSet.insert( socket ); -} - -void -CookieRef::RmSocket( int socket ) -{ - MutexLock ml( &m_sockSetMutex ); - assert( m_sockSet.find(socket) != m_sockSet.end() ); - m_sockSet.erase( socket ); + set result; + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + result.insert( iter->first ); + } + return result; } bool CookieRef::HasSocket_locked( int socket ) { - bool found = false; - ASSERT_LOCKED(); - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_socket == socket ) { - found = true; - break; - } - } + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter = m_sockets.find( socket ); + bool found = iter != m_sockets.end(); logf( XW_LOGINFO, "%s=>%d", __func__, found ); return found; @@ -493,11 +467,14 @@ void CookieRef::_CheckHeartbeats( time_t now ) { ASSERT_LOCKED(); - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - time_t last = iter->m_lastHeartbeat; - if ( (now - last) > GetHeartbeat() ) { - pushHeartFailedEvent( iter->m_socket ); + { + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + time_t last = iter->second.m_lastHeartbeat; + if ( (now - last) > GetHeartbeat() ) { + pushHeartFailedEvent( iter->first ); + } } } @@ -683,11 +660,11 @@ CookieRef::handleEvents() break; case XWA_TIMERDISCONN: - disconnectSockets( 0, XWRELAY_ERROR_TIMEOUT ); + disconnectSockets( XWRELAY_ERROR_TIMEOUT ); break; case XWA_SHUTDOWN: - disconnectSockets( 0, XWRELAY_ERROR_SHUTDOWN ); + disconnectSockets( XWRELAY_ERROR_SHUTDOWN ); break; case XWA_HEARTDISCONN: @@ -695,8 +672,8 @@ CookieRef::handleEvents() XWRELAY_ERROR_HEART_OTHER ); setAllConnectedTimer(); // reducePlayerCounts( evt.u.discon.socket ); - disconnectSockets( evt.u.heart.socket, - XWRELAY_ERROR_HEART_YOU ); + disconnectSocket( evt.u.heart.socket, + XWRELAY_ERROR_HEART_YOU ); break; case XWA_DISCONNECT: @@ -740,7 +717,6 @@ CookieRef::handleEvents() CRefMgr::Get()->IncrementFullCount(); cancelAllConnectedTimer(); sendAllHere( true ); - /* checkSomeMissing(); */ break; case XWA_SNDALLHERE_2: @@ -889,9 +865,11 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp ) assert( m_sockets.size() < 4 ); - HostRec hr( hostid, socket, nPlayersH, seed, !reconn ); - m_sockets.push_back( hr ); - AddSocket( socket ); + { + RWWriteLock rwl( &m_socketsRWLock ); + HostRec hr( hostid, nPlayersH, seed, !reconn ); + m_sockets.insert( pair(socket, hr) ); + } printSeeds(__func__); @@ -910,17 +888,19 @@ CookieRef::updateAck( HostID hostID, bool keep ) cancelAckTimer( hostID ); - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_ackPending && iter->m_hostID == hostID ) { - - if ( keep ) { - iter->m_ackPending = false; - DBMgr::Get()->NoteAckd( ConnName(), hostID ); - } else { - socket = iter->m_socket; + { + RWWriteLock rwl( &m_socketsRWLock ); + map::iterator iter; + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + if ( iter->second.m_ackPending && iter->second.m_hostID == hostID ) { + if ( keep ) { + iter->second.m_ackPending = false; + DBMgr::Get()->NoteAckd( ConnName(), hostID ); + } else { + socket = iter->first; + } + break; } - break; } } @@ -1112,11 +1092,12 @@ CookieRef::notifyOthers( int socket, XWRelayMsg msg, XWREASON why ) assert( socket != 0 ); ASSERT_LOCKED(); - vector::iterator iter; + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - int other = iter->m_socket; + int other = iter->first; if ( other != socket ) { - send_msg( other, iter->m_hostID, msg, why, false ); + send_msg( other, iter->second.m_hostID, msg, why, false ); } } } /* notifyOthers */ @@ -1138,7 +1119,7 @@ CookieRef::notifyGameDead( int socket ) /* ASSERT_LOCKED(); */ /* vector sockets; */ -/* vector::iterator iter; */ +/* map::iterator iter; */ /* for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { */ /* sockets.push_back( iter->m_socket ); */ /* } */ @@ -1176,15 +1157,18 @@ CookieRef::sendAllHere( bool initial ) for ( dest = 1; dest <= m_nPlayersHere; ++dest ) { bool sent = false; *idLoc = dest; /* write in this target's hostId */ - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_hostID == dest ) { - sent = send_with_length( iter->m_socket, buf, bufp-buf, - true ); - break; + + { + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + if ( iter->second.m_hostID == dest ) { + sent = send_with_length( iter->first, buf, bufp-buf, + true ); + break; + } } } - if ( !sent ) { store_message( dest, buf, bufp-buf ); } @@ -1192,63 +1176,11 @@ CookieRef::sendAllHere( bool initial ) } /* sendAllHere */ #define CONNNAME_DELIM ' ' /* ' ' so will wrap in browser */ -/* Does my seed belong as part of existing connName */ -bool -CookieRef::SeedBelongs( int gameSeed ) -{ - bool belongs = false; - const char* ptr = ConnName(); - const char* end = ptr + strlen(ptr); - assert( '\0' != ptr[0] ); - char buf[5]; - snprintf( buf, sizeof(buf), "%.4X", gameSeed ); - - for ( ; *ptr != CONNNAME_DELIM && ptr < end; ptr += 4 ) { - if ( 0 == strncmp( ptr, buf, 4 ) ) { - belongs = true; - break; - } - } - - return belongs; -} /* SeedBelongs */ - -/* does my connName provide a home for seeds already in this connName-less - ref? */ -bool -CookieRef::SeedsBelong( const char* connName ) -{ - bool found = true; - assert( !m_connName[0] ); - const char* delim = strchr( connName, CONNNAME_DELIM ); - assert( !!delim ); - - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - char buf[5]; - snprintf( buf, sizeof(buf), "%.4X", iter->m_seed ); - const char* match = strstr( connName, buf ); - if ( !match || match > delim ) { - found = false; - break; - } - } - - return found; -} /* SeedsBelong */ void CookieRef::assignConnName( void ) { if ( '\0' == ConnName()[0] ) { - - /* vector::iterator iter; */ - /* for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { */ - /* char buf[5]; */ - /* snprintf( buf, sizeof(buf), "%.4X", iter->m_seed ); */ - /* m_connName += buf; */ - /* } */ - m_connName += /*CONNNAME_DELIM + */PermID::GetNextUniqueID(); logf( XW_LOGINFO, "%s: assigning name: %s", __func__, ConnName() ); @@ -1258,20 +1190,26 @@ CookieRef::assignConnName( void ) } void -CookieRef::disconnectSockets( int socket, XWREASON why ) +CookieRef::disconnectSockets( XWREASON why ) { - if ( socket == 0 ) { - ASSERT_LOCKED(); - vector::iterator iter; - for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - assert( iter->m_socket != 0 ); - disconnectSockets( iter->m_socket, why ); + ASSERT_LOCKED(); + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; + for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { + assert( iter->first != 0 ); + if ( iter->first != 0 ) { + disconnectSocket( iter->first, why ); } - } else { - pushNotifyDisconEvent( socket, why ); - pushRemoveSocketEvent( socket ); } -} /* disconnectSockets */ +} + +void +CookieRef::disconnectSocket( int socket, XWREASON why ) +{ + ASSERT_LOCKED(); + pushNotifyDisconEvent( socket, why ); + pushRemoveSocketEvent( socket ); +} /* disconnectSocket */ void CookieRef::removeDevice( const CRefEvent* const evt ) @@ -1281,9 +1219,10 @@ CookieRef::removeDevice( const CRefEvent* const evt ) evt->u.devgone.seed ) ) { dbmgr->KillGame( ConnName(), evt->u.devgone.hid ); - vector::iterator iter; + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - notifyGameDead( iter->m_socket ); + notifyGameDead( iter->first ); } } } @@ -1295,29 +1234,28 @@ CookieRef::noteHeartbeat( const CRefEvent* evt ) HostID id = evt->u.heart.id; ASSERT_LOCKED(); - vector::iterator iter; + RWWriteLock rwl( &m_socketsRWLock ); + map::iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { - if ( iter->m_hostID == id ) { + if ( iter->second.m_hostID == id ) { + int second_socket = iter->first; + if ( second_socket == socket ) { + logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d", + iter->second.m_lastHeartbeat, uptime() ); + iter->second.m_lastHeartbeat = uptime(); + } else { + /* PENDING If the message came on an unexpected socket, kill the + connection. An attack is the most likely explanation. But: + now it's happening after a crash and clients reconnect. */ + logf( XW_LOGERROR, "wrong socket record for HostID %x; " + "wanted %d, found %d", id, socket, second_socket ); + } break; } } if ( iter == m_sockets.end() ) { logf( XW_LOGERROR, "no socket for HostID %x", id ); - } else { - - int second_socket = iter->m_socket; - if ( second_socket == socket ) { - logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d", - iter->m_lastHeartbeat, uptime() ); - iter->m_lastHeartbeat = uptime(); - } else { - /* PENDING If the message came on an unexpected socket, kill the - connection. An attack is the most likely explanation. But: - now it's happening after a crash and clients reconnect. */ - logf( XW_LOGERROR, "wrong socket record for HostID %x; wanted %d, " - "found %d", id, socket, second_socket ); - } } } /* noteHeartbeat */ @@ -1368,11 +1306,14 @@ CookieRef::printSeeds( const char* caller ) { int len = 0; char buf[64] = {0}; - vector::iterator iter; + + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { len += snprintf( &buf[len], sizeof(buf)-len, "[%d]%.4x(%d)/%d/%c ", - iter->m_hostID, iter->m_seed, iter->m_seed, - iter->m_socket, iter->m_ackPending?'a':'A' ); + iter->second.m_hostID, iter->second.m_seed, + iter->second.m_seed, iter->first, + iter->second.m_ackPending?'a':'A' ); } logf( XW_LOGINFO, "seeds/sockets/ack'd after %s(): %s", caller, buf ); } @@ -1426,11 +1367,13 @@ CookieRef::_PrintCookieInfo( string& out ) snprintf( buf, sizeof(buf), "Hosts connected=%d; cur time = %ld\n", m_sockets.size(), uptime() ); out += buf; - vector::iterator iter; + + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n", - iter->m_hostID, iter->m_socket, - iter->m_lastHeartbeat ); + iter->second.m_hostID, iter->first, + iter->second.m_lastHeartbeat ); out += buf; } @@ -1440,23 +1383,24 @@ void CookieRef::_FormatHostInfo( string* hostIds, string* seeds, string* addrs ) { ASSERT_LOCKED(); - vector::iterator iter; + RWReadLock rrl( &m_socketsRWLock ); + map::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { if ( !!hostIds ) { char buf[8]; - snprintf( buf, sizeof(buf), "%d ", iter->m_hostID ); + snprintf( buf, sizeof(buf), "%d ", iter->second.m_hostID ); *hostIds += buf; } if ( !!seeds ) { char buf[6]; - snprintf( buf, sizeof(buf), "%.4X ", iter->m_seed ); + snprintf( buf, sizeof(buf), "%.4X ", iter->second.m_seed ); *seeds += buf; } if ( !!addrs ) { - int s = iter->m_socket; + int s = iter->first; struct sockaddr_in name; socklen_t siz = sizeof(name); if ( 0 == getpeername( s, (struct sockaddr*)&name, &siz) ) { diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 65ff1450f..d0aeda988 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -41,9 +41,8 @@ class CookieMapIterator; /* forward */ struct HostRec { public: -HostRec(HostID hostID, int socket, int nPlayersH, int seed, bool ackPending ) +HostRec(HostID hostID, int nPlayersH, int seed, bool ackPending ) : m_hostID(hostID) - , m_socket(socket) , m_nPlayersH(nPlayersH) , m_seed(seed) , m_lastHeartbeat(uptime()) @@ -52,7 +51,6 @@ HostRec(HostID hostID, int socket, int nPlayersH, int seed, bool ackPending ) ::logf( XW_LOGINFO, "created HostRec with id %d", m_hostID); } HostID m_hostID; - int m_socket; int m_nPlayersH; int m_seed; time_t m_lastHeartbeat; @@ -235,19 +233,15 @@ class CookieRef { void notifyOthers( int socket, XWRelayMsg msg, XWREASON why ); void notifyGameDead( int socket ); - void disconnectSockets( int socket, XWREASON why ); + void disconnectSockets( XWREASON why ); + void disconnectSocket( int socket, XWREASON why ); void removeDevice( const CRefEvent* const evt ); void noteHeartbeat(const CRefEvent* evt); void notifyDisconn(const CRefEvent* evt); void removeSocket( int socket ); void sendAllHere( bool initial ); - void checkSomeMissing( void ); - void moveSockets( void ); - bool SeedBelongs( int gameSeed ); - bool SeedsBelong( const char* connName ); void assignConnName( void ); - void assignHostIds( void ); time_t GetStarttime( void ) { return m_starttime; } int GetLangCode( void ) { return m_langCode; } @@ -267,15 +261,14 @@ class CookieRef { static void s_checkAllConnected( void* closure ); static void s_checkAck( void* closure ); - vector m_sockets; + pthread_rwlock_t m_socketsRWLock; + map m_sockets; + int m_heatbeat; /* might change per carrier or something. */ string m_cookie; /* cookie used for initial connections */ string m_connName; /* globally unique name */ CookieID m_cookieID; /* Unique among current games on this server */ - pthread_mutex_t m_sockSetMutex; - set m_sockSet; - XW_RELAY_STATE m_curState; deque m_eventQueue;