From 6c121dac577ee41304941224dd1746d92a83c5b7 Mon Sep 17 00:00:00 2001 From: Andy2 Date: Fri, 24 Jun 2011 18:34:34 -0700 Subject: [PATCH] final set of changes -- all test cases now seem to be passing. Stop duplicating set of sockets owned by a cref, moving it from cinfo into cref and caching a copy outside when cref is unclaimed (after which no change is possible until it's claimed again.) --- xwords4/relay/cidlock.cpp | 81 ++++++++------------- xwords4/relay/cidlock.h | 21 +++--- xwords4/relay/cref.cpp | 38 ++++++++-- xwords4/relay/cref.h | 15 ++-- xwords4/relay/crefmgr.cpp | 148 -------------------------------------- xwords4/relay/crefmgr.h | 10 +-- 6 files changed, 79 insertions(+), 234 deletions(-) diff --git a/xwords4/relay/cidlock.cpp b/xwords4/relay/cidlock.cpp index 3a522079e..a756e41bd 100644 --- a/xwords4/relay/cidlock.cpp +++ b/xwords4/relay/cidlock.cpp @@ -25,6 +25,13 @@ #include "cidlock.h" #include "mlock.h" +const set +CidInfo::GetSockets( void ) +{ + return 0 == m_owner || NULL == m_cref ? + m_sockets : m_cref->GetSockets(); +} + CidLock* CidLock::s_instance = NULL; CidLock::CidLock() : m_nextCID(0) @@ -38,20 +45,18 @@ CidLock::~CidLock() pthread_mutex_destroy( &m_infos_mutex ); } +#define PRINT_CLAIMED() print_claimed(__func__) void -CidLock::print_claimed() +CidLock::print_claimed( const char* caller ) { - char buf[256] = {0}; - int len = 0; + char buf[512] = {0}; + int len = snprintf( buf, sizeof(buf), "after %s: ", caller ); // Assume we have the mutex!!!! - map< CookieID, CidInfo*>::iterator iter = m_infos.begin(); - while ( iter != m_infos.end() ) { + map< CookieID, CidInfo*>::iterator iter; + for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) { CidInfo* info = iter->second; - if ( 0 != info->GetOwner() ) { - len += snprintf( &buf[len], sizeof(buf)-len, "%d,", - info->GetCid() ); - } - ++iter; + CookieID cid = 0 == info->GetOwner() ? 0 : info->GetCid(); + len += snprintf( &buf[len], sizeof(buf)-len, "%d,", cid ); } logf( XW_LOGINFO, "%s: claimed: %s", __func__, buf ); } @@ -81,7 +86,7 @@ CidLock::Claim( CookieID cid ) if ( NULL != info ) { // we're done info->SetOwner( pthread_self() ); - print_claimed(); + PRINT_CLAIMED(); break; } @@ -101,16 +106,17 @@ CidLock::ClaimSocket( int sock ) for ( ; ; ) { MutexLock ml( &m_infos_mutex ); - map< CookieID, CidInfo*>::iterator iter = m_infos.begin(); - while ( iter != m_infos.end() ) { - if ( sock == iter->second->GetSocket() ) { + map< CookieID, CidInfo*>::iterator iter; + for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) { + const set& sockets = iter->second->GetSockets(); + if ( sockets.end() != sockets.find( sock ) ) { if ( 0 == iter->second->GetOwner() ) { info = iter->second; info->SetOwner( pthread_self() ); + PRINT_CLAIMED(); } break; } - ++iter; } /* break if socket isn't here or if it's not claimed */ @@ -120,44 +126,11 @@ CidLock::ClaimSocket( int sock ) logf( XW_LOGINFO, "%s(sock=%d): waiting....", __func__, sock ); pthread_cond_wait( &m_infos_condvar, &m_infos_mutex ); } - print_claimed(); + logf( XW_LOGINFO, "%s(%d): DONE", __func__, info? info->GetCid():0 ); return info; } -bool -CidLock::Associate_locked( const CookieRef* cref, int socket ) -{ - map< CookieID, CidInfo*>::iterator iter = m_infos.begin(); - while ( iter != m_infos.end() ) { - if ( cref == iter->second->GetRef() ) { - iter->second->SetSocket( socket ); - break; - } - ++iter; - } - bool isNew = m_sockets.find( socket ) == m_sockets.end(); - if ( isNew ) { - m_sockets.insert( socket ); - } - return isNew; -} - -bool -CidLock::Associate( const CookieRef* cref, int socket ) -{ - MutexLock ml( &m_infos_mutex ); - return Associate_locked( cref, socket ); -} - -void -CidLock::DisAssociate( const CookieRef* cref, int socket ) -{ - MutexLock ml( &m_infos_mutex ); - Associate_locked( cref, 0 ); - m_sockets.erase( socket ); -} - void CidLock::Relinquish( CidInfo* claim, bool drop ) { @@ -167,15 +140,17 @@ CidLock::Relinquish( CidInfo* claim, bool drop ) MutexLock ml( &m_infos_mutex ); map< CookieID, CidInfo*>::iterator iter = m_infos.find( cid ); assert( iter != m_infos.end() ); - assert( iter->second->GetOwner() == pthread_self() ); + assert( iter->second == claim ); + assert( claim->GetOwner() == pthread_self() ); if ( drop ) { logf( XW_LOGINFO, "%s: deleting %p", __func__, iter->second ); - delete iter->second; m_infos.erase( iter ); + delete claim; } else { - iter->second->SetOwner( 0 ); + claim->SetSockets( claim->GetRef()->GetSockets() ); /* cache these */ + claim->SetOwner( 0 ); } - print_claimed(); + PRINT_CLAIMED(); pthread_cond_signal( &m_infos_condvar ); logf( XW_LOGINFO, "%s(%d,drop=%d): DONE", __func__, cid, drop ); } diff --git a/xwords4/relay/cidlock.h b/xwords4/relay/cidlock.h index 42ffe8a34..75ce5171d 100644 --- a/xwords4/relay/cidlock.h +++ b/xwords4/relay/cidlock.h @@ -34,23 +34,22 @@ class CidInfo { CidInfo( CookieID cid ) :m_cid(cid), m_cref(NULL), - m_owner(NULL), - m_socket(0) {} + m_owner(NULL) {} CookieID GetCid( void ) { return m_cid; } CookieRef* GetRef( void ) { return m_cref; } pthread_t GetOwner( void ) { return m_owner; } - int GetSocket( void ) { return m_socket; } + const set GetSockets( void ); + void SetSockets( set sockets ) { m_sockets = sockets; }; void SetRef( CookieRef* cref ) { m_cref = cref; } void SetOwner( pthread_t owner ) { m_owner = owner; } - void SetSocket( int socket ) { m_socket = socket; } private: CookieID m_cid; CookieRef* m_cref; pthread_t m_owner; - int m_socket; + set m_sockets; }; class CidLock { @@ -69,16 +68,12 @@ class CidLock { CidInfo* ClaimSocket( int sock ); void Relinquish( CidInfo* claim, bool drop ); - bool Associate( const CookieRef* cref, int socket ); /* return true if new association */ - void DisAssociate( const CookieRef* cref, int socket ); - private: - CidLock(); - void print_claimed(); - bool Associate_locked( const CookieRef* cref, int socket ); - static CidLock* s_instance; - set m_sockets; + + CidLock(); + void print_claimed( const char* caller ); + map< CookieID, CidInfo* > m_infos; pthread_mutex_t m_infos_mutex; pthread_cond_t m_infos_condvar; diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 51907dc8d..3a6de40ea 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -114,11 +114,13 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id, CookieRef::CookieRef( const char* cookie, const char* connName, CookieID id, int langCode, int nPlayersT, int nAlreadyHere ) { + pthread_mutex_init( &m_sockSetMutex, NULL ); ReInit( cookie, connName, id, langCode, nPlayersT, nAlreadyHere ); } CookieRef::~CookieRef() { + logf( XW_LOGINFO, "CookieRef for %d being deleted", m_cookieID ); cancelAllConnectedTimer(); /* get rid of any sockets still contained */ @@ -129,12 +131,13 @@ CookieRef::~CookieRef() for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { int socket = iter->m_socket; tPool->CloseSocket( socket ); + RmSocket( socket ); m_sockets.erase( iter ); } printSeeds(__func__); - logf( XW_LOGINFO, "CookieRef for %d being deleted", m_cookieID ); + pthread_mutex_destroy( &m_sockSetMutex ); } /* ~CookieRef */ void @@ -193,7 +196,8 @@ CookieRef::_Connect( int socket, int nPlayersH, int nPlayersS, int seed, } if ( !connected ) { - if ( CRefMgr::Get()->Associate( socket, this ) ) { + set sockets = GetSockets(); + if ( sockets.end() == sockets.find( socket ) ) { pushConnectEvent( socket, nPlayersH, nPlayersS, seed ); handleEvents(); connected = HasSocket_locked( socket ); @@ -217,7 +221,6 @@ CookieRef::_Reconnect( int socket, HostID hid, int nPlayersH, int nPlayersS, logf( XW_LOGINFO, "%s: dropping because already here", __func__ ); } else { - (void)CRefMgr::Get()->Associate( socket, this ); pushReconnectEvent( socket, hid, nPlayersH, nPlayersS, seed ); } if ( gameDead ) { @@ -241,7 +244,6 @@ void CookieRef::_Disconnect( int socket, HostID hostID ) { logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID ); - CRefMgr::Get()->Disassociate( socket, this ); CRefEvent evt( XWE_DISCONN ); evt.u.discon.socket = socket; @@ -344,9 +346,10 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket, } else if ( socket == iter->m_socket ) { here = true; /* dup packet */ } else { - logf( XW_LOGINFO, "%s: hids match; nuking existing record" + logf( XW_LOGINFO, "%s: hids match; nuking existing record " "for socket %d b/c assumed closed", __func__, iter->m_socket ); + RmSocket( iter->m_socket ); m_sockets.erase( iter ); } break; @@ -393,6 +396,7 @@ CookieRef::removeSocket( int socket ) m_nPlayersHere -= iter->m_nPlayersH; cancelAckTimer( iter->m_hostID ); } + RmSocket( socket ); m_sockets.erase(iter); --count; found = true; @@ -436,6 +440,29 @@ CookieRef::HasSocket( int socket ) return result; } +set +CookieRef::GetSockets() +{ + MutexLock ml( &m_sockSetMutex ); + return m_sockSet; +} + +void +CookieRef::AddSocket( int socket ) +{ + MutexLock ml( &m_sockSetMutex ); + assert( m_sockSet.find(socket) == m_sockSet.end() ); + m_sockSet.insert( socket ); +} + +void +CookieRef::RmSocket( int socket ) +{ + MutexLock ml( &m_sockSetMutex ); + assert( m_sockSet.find(socket) != m_sockSet.end() ); + m_sockSet.erase( socket ); +} + bool CookieRef::HasSocket_locked( int socket ) { @@ -864,6 +891,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp ) HostRec hr( hostid, socket, nPlayersH, seed, !reconn ); m_sockets.push_back( hr ); + AddSocket( socket ); printSeeds(__func__); diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 64687ec34..65ff1450f 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -23,6 +23,7 @@ #define _CREF_H_ #include +#include #include #include #include @@ -65,6 +66,8 @@ public: }; class CookieRef { + public: + set GetSockets(); private: /* These classes have access to CookieRef. All others should go through @@ -257,6 +260,9 @@ class CookieRef { void printSeeds( const char* caller ); + void AddSocket( int socket ); + void RmSocket( int socket ); + /* timer callback */ static void s_checkAllConnected( void* closure ); static void s_checkAck( void* closure ); @@ -267,11 +273,8 @@ class CookieRef { string m_connName; /* globally unique name */ CookieID m_cookieID; /* Unique among current games on this server */ - /* Guard the event queue. Only one thread at a time can post to the - queue, but once in a thread can post new events while processing - current ones. */ -/* pthread_mutex_t m_EventsMutex; */ - + pthread_mutex_t m_sockSetMutex; + set m_sockSet; XW_RELAY_STATE m_curState; deque m_eventQueue; @@ -284,7 +287,7 @@ class CookieRef { AckTimer m_timers[4]; - pthread_t m_locking_thread; /* for debugging only */ + pthread_t m_locking_thread; bool m_in_handleEvents; /* for debugging only */ int m_delayMicros; }; /* CookieRef */ diff --git a/xwords4/relay/crefmgr.cpp b/xwords4/relay/crefmgr.cpp index 2179a46c6..0cd32e747 100644 --- a/xwords4/relay/crefmgr.cpp +++ b/xwords4/relay/crefmgr.cpp @@ -77,12 +77,6 @@ CRefMgr::~CRefMgr() pthread_mutex_destroy( &m_freeList_mutex ); pthread_rwlock_destroy( &m_cookieMapRWLock ); - /* SocketMap::iterator iter; */ - /* for ( iter = m_SocketStuff.begin(); iter != m_SocketStuff.end(); ++iter ) { */ - /* SocketStuff* stuff = iter->second; */ - /* delete stuff; */ - /* } */ - s_instance = NULL; } @@ -108,15 +102,6 @@ CRefMgr::CloseAll() } } /* CloseAll */ -/* CookieID */ -/* CRefMgr::nextCID( const char* connName ) */ -/* { */ -/* /\* Later may want to guarantee that wrap-around doesn't cause an overlap. */ -/* But that's really only a theoretical possibility. *\/ */ -/* MutexLock ml(&m_nextCIDMutex); */ -/* return ++m_nextCID; */ -/* } /\* nextCID *\/ */ - void CRefMgr::IncrementFullCount( void ) { @@ -393,88 +378,6 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead ) return cinfo; } -bool -CRefMgr::Associate( int socket, CookieRef* cref ) -{ - return m_cidlock->Associate( cref, socket ); - /* MutexLock ml( &m_SocketStuffMutex ); */ - /* return Associate_locked( socket, cref ); */ -} - -/* bool */ -/* CRefMgr::Associate_locked( int socket, CookieRef* cref ) */ -/* { */ -/* bool isNew = false; */ -/* SocketMap::iterator iter = m_SocketStuff.find( socket ); */ -/* /\* This isn't enough. Must provide a way to reuse sockets should a */ -/* genuinely different connection appear. Now maybe we already remove */ -/* this reference when a socket is closed. Test this! Or assert */ -/* something here. Bottom line: need to swallow repeated/duplicate */ -/* connect messages from same host. *\/ */ -/* if ( iter == m_SocketStuff.end() ) { */ -/* SocketStuff* stuff = new SocketStuff( cref ); */ -/* m_SocketStuff.insert( pair< int, SocketStuff* >( socket, stuff ) ); */ -/* isNew = true; */ -/* } else { */ -/* logf( XW_LOGERROR, "Already have cref/threadID pair for socket %d; " */ -/* "error???", socket ); */ -/* } */ -/* return isNew; */ -/* } */ - -/* void */ -/* CRefMgr::Disassociate_locked( int socket, CookieRef* cref ) */ -/* { */ -/* SocketMap::iterator iter = m_SocketStuff.find( socket ); */ -/* if ( iter == m_SocketStuff.end() ) { */ -/* logf( XW_LOGERROR, "can't find SocketStuff for socket %d", socket ); */ -/* } else { */ -/* SocketStuff* stuff = iter->second; */ -/* assert( cref == NULL || stuff->m_cref == cref ); */ -/* delete stuff; */ -/* m_SocketStuff.erase( iter ); */ -/* } */ -/* } */ - -void -CRefMgr::Disassociate( int socket, CookieRef* cref ) -{ - m_cidlock->DisAssociate( cref, socket ); - /* MutexLock ml( &m_SocketStuffMutex ); */ - /* Disassociate_locked( socket, cref ); */ -} - -/* void */ -/* CRefMgr::MoveSockets( vector sockets, CookieRef* cref ) */ -/* { */ -/* MutexLock ml( &m_SocketStuffMutex ); */ -/* vector::iterator iter; */ -/* for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) { */ -/* Disassociate_locked( *iter, NULL ); */ -/* Associate_locked( *iter, cref ); */ -/* } */ -/* } */ - -#if 0 -pthread_mutex_t* -CRefMgr::GetWriteMutexForSocket( int socket ) -{ - pthread_mutex_t* mutex = NULL; - MutexLock ml( &m_SocketStuffMutex ); - SocketMap::iterator iter = m_SocketStuff.find( socket ); - if ( iter != m_SocketStuff.end() ) { - SocketStuff* stuff = iter->second; - /* this is dangerous! What if we want to nuke SocketStuff while this - is locked? And shouldn't it be the case that only one thread at a - time can be trying to write to one of a given cref's sockets since - only one thread at a time is handling a cref? */ - mutex = &stuff->m_writeMutex; - } - logf( XW_LOGERROR, "GetWriteMutexForSocket: not found" ); - return mutex; -} /* GetWriteMutexForSocket */ -#endif - void CRefMgr::RemoveSocketRefs( int socket ) { @@ -482,8 +385,6 @@ CRefMgr::RemoveSocketRefs( int socket ) SafeCref scr( socket ); scr.Remove( socket ); } - - Disassociate( socket, NULL ); } void @@ -502,16 +403,6 @@ CRefMgr::PrintSocketInfo( int socket, string& out ) } } -/* /\* static *\/ SocketsIterator */ -/* CRefMgr::MakeSocketsIterator() */ -/* { */ -/* assert( 0 ); /\* called? *\/ */ -/* pthread_mutex_lock( &m_SocketStuffMutex ); */ -/* SocketsIterator iter( m_SocketStuff.begin(), m_SocketStuff.end(), */ -/* &m_SocketStuffMutex ); */ -/* return iter; */ -/* } */ - CidInfo* CRefMgr::getCookieRef( CookieID cid ) { @@ -522,14 +413,6 @@ CidInfo* CRefMgr::getCookieRef( int socket ) { CidInfo* cinfo = m_cidlock->ClaimSocket( socket ); - /* if ( NULL == cinfo ) { */ - /* MutexLock ml( &m_SocketStuffMutex ); */ - /* SocketMap::iterator iter = m_SocketStuff.find( socket ); */ - /* if ( iter != m_SocketStuff.end() ) { */ - /* SocketStuff* stuff = iter->second; */ - /* cinfo = m_cidlock->Claim( stuff->m_cref->GetCookieID() ); */ - /* } */ - /* } */ assert( NULL == cinfo || NULL != cinfo->GetRef() ); return cinfo; @@ -639,26 +522,6 @@ CRefMgr::Recycle( const char* connName ) Recycle( id ); } /* Delete */ -/* CidInfo* */ -/* CRefMgr::getCookieRef_impl( CookieID cid ) */ -/* { */ -/* CidInfo* info = m_cidlock->Claim( cid ); */ -/* /\* CookieRef* ref = NULL; *\/ */ -/* /\* RWReadLock rwl( &m_cookieMapRWLock ); *\/ */ - -/* /\* CookieMap::iterator iter = m_cookieMap.find( cid ); *\/ */ -/* /\* while ( iter != m_cookieMap.end() ) { *\/ */ -/* /\* CookieRef* second = iter->second; *\/ */ -/* /\* if ( second->GetCookieID() == cid ) { *\/ */ -/* /\* ref = second; *\/ */ -/* /\* break; *\/ */ -/* /\* } *\/ */ -/* /\* ++iter; *\/ */ -/* /\* } *\/ */ -/* /\* info->SetRef( ref ); *\/ */ -/* return info; */ -/* } */ - #ifdef RELAY_HEARTBEAT void CRefMgr::checkHeartbeats( time_t now ) @@ -813,17 +676,6 @@ SafeCref::SafeCref( int socket ) } } -/* SafeCref::SafeCref( CookieRef* cref ) */ -/* : m_cinfo( NULL ) */ -/* , m_mgr( CRefMgr::Get() ) */ -/* , m_isValid( false ) */ -/* { */ -/* assert(0); /\* path to this? *\/ */ -/* m_locked = cref->Lock(); */ -/* m_isValid = m_locked; */ -/* // m_cref = cref; */ -/* } */ - SafeCref::~SafeCref() { if ( m_cinfo != NULL ) { diff --git a/xwords4/relay/crefmgr.h b/xwords4/relay/crefmgr.h index 75b739578..ba6afd580 100644 --- a/xwords4/relay/crefmgr.h +++ b/xwords4/relay/crefmgr.h @@ -86,16 +86,11 @@ class CRefMgr { /* void LockAll() { pthread_rwlock_wrlock( &m_cookieMapRWLock ); } */ /* void UnlockAll() { pthread_rwlock_unlock( &m_cookieMapRWLock ); } */ - /* Track sockets independent of cookie refs */ - bool Associate( int socket, CookieRef* cref ); - bool Associate_locked( int socket, CookieRef* cref ); - void Disassociate( int socket, CookieRef* cref ); - void Disassociate_locked( int socket, CookieRef* cref ); + void MoveSockets( vector sockets, CookieRef* cref ); pthread_mutex_t* GetWriteMutexForSocket( int socket ); void RemoveSocketRefs( int socket ); void PrintSocketInfo( int socket, string& out ); - /* SocketsIterator MakeSocketsIterator(); */ void IncrementFullCount( void ); int GetNumRoomsFilled( void ); @@ -153,9 +148,6 @@ class CRefMgr { static void heartbeatProc( void* closure ); void checkHeartbeats( time_t now ); - /* pthread_mutex_t m_nextCIDMutex; */ - /* CookieID m_nextCID; */ - pthread_mutex_t m_roomsFilledMutex; int m_nRoomsFilled;