final set of changes -- all test cases now seem to be passing. Stop

duplicating set of sockets owned by a cref, moving it from cinfo into
cref and caching a copy outside when cref is unclaimed (after which no
change is possible until it's claimed again.)
This commit is contained in:
Andy2 2011-06-24 18:34:34 -07:00
parent df60ec648b
commit 6c121dac57
6 changed files with 79 additions and 234 deletions

View file

@ -25,6 +25,13 @@
#include "cidlock.h"
#include "mlock.h"
const set<int>
CidInfo::GetSockets( void )
{
return 0 == m_owner || NULL == m_cref ?
m_sockets : m_cref->GetSockets();
}
CidLock* CidLock::s_instance = NULL;
CidLock::CidLock() : m_nextCID(0)
@ -38,20 +45,18 @@ CidLock::~CidLock()
pthread_mutex_destroy( &m_infos_mutex );
}
#define PRINT_CLAIMED() print_claimed(__func__)
void
CidLock::print_claimed()
CidLock::print_claimed( const char* caller )
{
char buf[256] = {0};
int len = 0;
char buf[512] = {0};
int len = snprintf( buf, sizeof(buf), "after %s: ", caller );
// Assume we have the mutex!!!!
map< CookieID, CidInfo*>::iterator iter = m_infos.begin();
while ( iter != m_infos.end() ) {
map< CookieID, CidInfo*>::iterator iter;
for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) {
CidInfo* info = iter->second;
if ( 0 != info->GetOwner() ) {
len += snprintf( &buf[len], sizeof(buf)-len, "%d,",
info->GetCid() );
}
++iter;
CookieID cid = 0 == info->GetOwner() ? 0 : info->GetCid();
len += snprintf( &buf[len], sizeof(buf)-len, "%d,", cid );
}
logf( XW_LOGINFO, "%s: claimed: %s", __func__, buf );
}
@ -81,7 +86,7 @@ CidLock::Claim( CookieID cid )
if ( NULL != info ) { // we're done
info->SetOwner( pthread_self() );
print_claimed();
PRINT_CLAIMED();
break;
}
@ -101,16 +106,17 @@ CidLock::ClaimSocket( int sock )
for ( ; ; ) {
MutexLock ml( &m_infos_mutex );
map< CookieID, CidInfo*>::iterator iter = m_infos.begin();
while ( iter != m_infos.end() ) {
if ( sock == iter->second->GetSocket() ) {
map< CookieID, CidInfo*>::iterator iter;
for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) {
const set<int>& sockets = iter->second->GetSockets();
if ( sockets.end() != sockets.find( sock ) ) {
if ( 0 == iter->second->GetOwner() ) {
info = iter->second;
info->SetOwner( pthread_self() );
PRINT_CLAIMED();
}
break;
}
++iter;
}
/* break if socket isn't here or if it's not claimed */
@ -120,44 +126,11 @@ CidLock::ClaimSocket( int sock )
logf( XW_LOGINFO, "%s(sock=%d): waiting....", __func__, sock );
pthread_cond_wait( &m_infos_condvar, &m_infos_mutex );
}
print_claimed();
logf( XW_LOGINFO, "%s(%d): DONE", __func__, info? info->GetCid():0 );
return info;
}
bool
CidLock::Associate_locked( const CookieRef* cref, int socket )
{
map< CookieID, CidInfo*>::iterator iter = m_infos.begin();
while ( iter != m_infos.end() ) {
if ( cref == iter->second->GetRef() ) {
iter->second->SetSocket( socket );
break;
}
++iter;
}
bool isNew = m_sockets.find( socket ) == m_sockets.end();
if ( isNew ) {
m_sockets.insert( socket );
}
return isNew;
}
bool
CidLock::Associate( const CookieRef* cref, int socket )
{
MutexLock ml( &m_infos_mutex );
return Associate_locked( cref, socket );
}
void
CidLock::DisAssociate( const CookieRef* cref, int socket )
{
MutexLock ml( &m_infos_mutex );
Associate_locked( cref, 0 );
m_sockets.erase( socket );
}
void
CidLock::Relinquish( CidInfo* claim, bool drop )
{
@ -167,15 +140,17 @@ CidLock::Relinquish( CidInfo* claim, bool drop )
MutexLock ml( &m_infos_mutex );
map< CookieID, CidInfo*>::iterator iter = m_infos.find( cid );
assert( iter != m_infos.end() );
assert( iter->second->GetOwner() == pthread_self() );
assert( iter->second == claim );
assert( claim->GetOwner() == pthread_self() );
if ( drop ) {
logf( XW_LOGINFO, "%s: deleting %p", __func__, iter->second );
delete iter->second;
m_infos.erase( iter );
delete claim;
} else {
iter->second->SetOwner( 0 );
claim->SetSockets( claim->GetRef()->GetSockets() ); /* cache these */
claim->SetOwner( 0 );
}
print_claimed();
PRINT_CLAIMED();
pthread_cond_signal( &m_infos_condvar );
logf( XW_LOGINFO, "%s(%d,drop=%d): DONE", __func__, cid, drop );
}

View file

@ -34,23 +34,22 @@ class CidInfo {
CidInfo( CookieID cid )
:m_cid(cid),
m_cref(NULL),
m_owner(NULL),
m_socket(0) {}
m_owner(NULL) {}
CookieID GetCid( void ) { return m_cid; }
CookieRef* GetRef( void ) { return m_cref; }
pthread_t GetOwner( void ) { return m_owner; }
int GetSocket( void ) { return m_socket; }
const set<int> GetSockets( void );
void SetSockets( set<int> sockets ) { m_sockets = sockets; };
void SetRef( CookieRef* cref ) { m_cref = cref; }
void SetOwner( pthread_t owner ) { m_owner = owner; }
void SetSocket( int socket ) { m_socket = socket; }
private:
CookieID m_cid;
CookieRef* m_cref;
pthread_t m_owner;
int m_socket;
set<int> m_sockets;
};
class CidLock {
@ -69,16 +68,12 @@ class CidLock {
CidInfo* ClaimSocket( int sock );
void Relinquish( CidInfo* claim, bool drop );
bool Associate( const CookieRef* cref, int socket ); /* return true if new association */
void DisAssociate( const CookieRef* cref, int socket );
private:
CidLock();
void print_claimed();
bool Associate_locked( const CookieRef* cref, int socket );
static CidLock* s_instance;
set<int> m_sockets;
CidLock();
void print_claimed( const char* caller );
map< CookieID, CidInfo* > m_infos;
pthread_mutex_t m_infos_mutex;
pthread_cond_t m_infos_condvar;

View file

@ -114,11 +114,13 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id,
CookieRef::CookieRef( const char* cookie, const char* connName, CookieID id,
int langCode, int nPlayersT, int nAlreadyHere )
{
pthread_mutex_init( &m_sockSetMutex, NULL );
ReInit( cookie, connName, id, langCode, nPlayersT, nAlreadyHere );
}
CookieRef::~CookieRef()
{
logf( XW_LOGINFO, "CookieRef for %d being deleted", m_cookieID );
cancelAllConnectedTimer();
/* get rid of any sockets still contained */
@ -129,12 +131,13 @@ CookieRef::~CookieRef()
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int socket = iter->m_socket;
tPool->CloseSocket( socket );
RmSocket( socket );
m_sockets.erase( iter );
}
printSeeds(__func__);
logf( XW_LOGINFO, "CookieRef for %d being deleted", m_cookieID );
pthread_mutex_destroy( &m_sockSetMutex );
} /* ~CookieRef */
void
@ -193,7 +196,8 @@ CookieRef::_Connect( int socket, int nPlayersH, int nPlayersS, int seed,
}
if ( !connected ) {
if ( CRefMgr::Get()->Associate( socket, this ) ) {
set<int> sockets = GetSockets();
if ( sockets.end() == sockets.find( socket ) ) {
pushConnectEvent( socket, nPlayersH, nPlayersS, seed );
handleEvents();
connected = HasSocket_locked( socket );
@ -217,7 +221,6 @@ CookieRef::_Reconnect( int socket, HostID hid, int nPlayersH, int nPlayersS,
logf( XW_LOGINFO, "%s: dropping because already here",
__func__ );
} else {
(void)CRefMgr::Get()->Associate( socket, this );
pushReconnectEvent( socket, hid, nPlayersH, nPlayersS, seed );
}
if ( gameDead ) {
@ -241,7 +244,6 @@ void
CookieRef::_Disconnect( int socket, HostID hostID )
{
logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID );
CRefMgr::Get()->Disassociate( socket, this );
CRefEvent evt( XWE_DISCONN );
evt.u.discon.socket = socket;
@ -344,9 +346,10 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket,
} else if ( socket == iter->m_socket ) {
here = true; /* dup packet */
} else {
logf( XW_LOGINFO, "%s: hids match; nuking existing record"
logf( XW_LOGINFO, "%s: hids match; nuking existing record "
"for socket %d b/c assumed closed", __func__,
iter->m_socket );
RmSocket( iter->m_socket );
m_sockets.erase( iter );
}
break;
@ -393,6 +396,7 @@ CookieRef::removeSocket( int socket )
m_nPlayersHere -= iter->m_nPlayersH;
cancelAckTimer( iter->m_hostID );
}
RmSocket( socket );
m_sockets.erase(iter);
--count;
found = true;
@ -436,6 +440,29 @@ CookieRef::HasSocket( int socket )
return result;
}
set<int>
CookieRef::GetSockets()
{
MutexLock ml( &m_sockSetMutex );
return m_sockSet;
}
void
CookieRef::AddSocket( int socket )
{
MutexLock ml( &m_sockSetMutex );
assert( m_sockSet.find(socket) == m_sockSet.end() );
m_sockSet.insert( socket );
}
void
CookieRef::RmSocket( int socket )
{
MutexLock ml( &m_sockSetMutex );
assert( m_sockSet.find(socket) != m_sockSet.end() );
m_sockSet.erase( socket );
}
bool
CookieRef::HasSocket_locked( int socket )
{
@ -864,6 +891,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp )
HostRec hr( hostid, socket, nPlayersH, seed, !reconn );
m_sockets.push_back( hr );
AddSocket( socket );
printSeeds(__func__);

View file

@ -23,6 +23,7 @@
#define _CREF_H_
#include <map>
#include <set>
#include <vector>
#include <string>
#include <deque>
@ -65,6 +66,8 @@ public:
};
class CookieRef {
public:
set<int> GetSockets();
private:
/* These classes have access to CookieRef. All others should go through
@ -257,6 +260,9 @@ class CookieRef {
void printSeeds( const char* caller );
void AddSocket( int socket );
void RmSocket( int socket );
/* timer callback */
static void s_checkAllConnected( void* closure );
static void s_checkAck( void* closure );
@ -267,11 +273,8 @@ class CookieRef {
string m_connName; /* globally unique name */
CookieID m_cookieID; /* Unique among current games on this server */
/* Guard the event queue. Only one thread at a time can post to the
queue, but once in a thread can post new events while processing
current ones. */
/* pthread_mutex_t m_EventsMutex; */
pthread_mutex_t m_sockSetMutex;
set<int> m_sockSet;
XW_RELAY_STATE m_curState;
deque<CRefEvent> m_eventQueue;
@ -284,7 +287,7 @@ class CookieRef {
AckTimer m_timers[4];
pthread_t m_locking_thread; /* for debugging only */
pthread_t m_locking_thread;
bool m_in_handleEvents; /* for debugging only */
int m_delayMicros;
}; /* CookieRef */

View file

@ -77,12 +77,6 @@ CRefMgr::~CRefMgr()
pthread_mutex_destroy( &m_freeList_mutex );
pthread_rwlock_destroy( &m_cookieMapRWLock );
/* SocketMap::iterator iter; */
/* for ( iter = m_SocketStuff.begin(); iter != m_SocketStuff.end(); ++iter ) { */
/* SocketStuff* stuff = iter->second; */
/* delete stuff; */
/* } */
s_instance = NULL;
}
@ -108,15 +102,6 @@ CRefMgr::CloseAll()
}
} /* CloseAll */
/* CookieID */
/* CRefMgr::nextCID( const char* connName ) */
/* { */
/* /\* Later may want to guarantee that wrap-around doesn't cause an overlap. */
/* But that's really only a theoretical possibility. *\/ */
/* MutexLock ml(&m_nextCIDMutex); */
/* return ++m_nextCID; */
/* } /\* nextCID *\/ */
void
CRefMgr::IncrementFullCount( void )
{
@ -393,88 +378,6 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead )
return cinfo;
}
bool
CRefMgr::Associate( int socket, CookieRef* cref )
{
return m_cidlock->Associate( cref, socket );
/* MutexLock ml( &m_SocketStuffMutex ); */
/* return Associate_locked( socket, cref ); */
}
/* bool */
/* CRefMgr::Associate_locked( int socket, CookieRef* cref ) */
/* { */
/* bool isNew = false; */
/* SocketMap::iterator iter = m_SocketStuff.find( socket ); */
/* /\* This isn't enough. Must provide a way to reuse sockets should a */
/* genuinely different connection appear. Now maybe we already remove */
/* this reference when a socket is closed. Test this! Or assert */
/* something here. Bottom line: need to swallow repeated/duplicate */
/* connect messages from same host. *\/ */
/* if ( iter == m_SocketStuff.end() ) { */
/* SocketStuff* stuff = new SocketStuff( cref ); */
/* m_SocketStuff.insert( pair< int, SocketStuff* >( socket, stuff ) ); */
/* isNew = true; */
/* } else { */
/* logf( XW_LOGERROR, "Already have cref/threadID pair for socket %d; " */
/* "error???", socket ); */
/* } */
/* return isNew; */
/* } */
/* void */
/* CRefMgr::Disassociate_locked( int socket, CookieRef* cref ) */
/* { */
/* SocketMap::iterator iter = m_SocketStuff.find( socket ); */
/* if ( iter == m_SocketStuff.end() ) { */
/* logf( XW_LOGERROR, "can't find SocketStuff for socket %d", socket ); */
/* } else { */
/* SocketStuff* stuff = iter->second; */
/* assert( cref == NULL || stuff->m_cref == cref ); */
/* delete stuff; */
/* m_SocketStuff.erase( iter ); */
/* } */
/* } */
void
CRefMgr::Disassociate( int socket, CookieRef* cref )
{
m_cidlock->DisAssociate( cref, socket );
/* MutexLock ml( &m_SocketStuffMutex ); */
/* Disassociate_locked( socket, cref ); */
}
/* void */
/* CRefMgr::MoveSockets( vector<int> sockets, CookieRef* cref ) */
/* { */
/* MutexLock ml( &m_SocketStuffMutex ); */
/* vector<int>::iterator iter; */
/* for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) { */
/* Disassociate_locked( *iter, NULL ); */
/* Associate_locked( *iter, cref ); */
/* } */
/* } */
#if 0
pthread_mutex_t*
CRefMgr::GetWriteMutexForSocket( int socket )
{
pthread_mutex_t* mutex = NULL;
MutexLock ml( &m_SocketStuffMutex );
SocketMap::iterator iter = m_SocketStuff.find( socket );
if ( iter != m_SocketStuff.end() ) {
SocketStuff* stuff = iter->second;
/* this is dangerous! What if we want to nuke SocketStuff while this
is locked? And shouldn't it be the case that only one thread at a
time can be trying to write to one of a given cref's sockets since
only one thread at a time is handling a cref? */
mutex = &stuff->m_writeMutex;
}
logf( XW_LOGERROR, "GetWriteMutexForSocket: not found" );
return mutex;
} /* GetWriteMutexForSocket */
#endif
void
CRefMgr::RemoveSocketRefs( int socket )
{
@ -482,8 +385,6 @@ CRefMgr::RemoveSocketRefs( int socket )
SafeCref scr( socket );
scr.Remove( socket );
}
Disassociate( socket, NULL );
}
void
@ -502,16 +403,6 @@ CRefMgr::PrintSocketInfo( int socket, string& out )
}
}
/* /\* static *\/ SocketsIterator */
/* CRefMgr::MakeSocketsIterator() */
/* { */
/* assert( 0 ); /\* called? *\/ */
/* pthread_mutex_lock( &m_SocketStuffMutex ); */
/* SocketsIterator iter( m_SocketStuff.begin(), m_SocketStuff.end(), */
/* &m_SocketStuffMutex ); */
/* return iter; */
/* } */
CidInfo*
CRefMgr::getCookieRef( CookieID cid )
{
@ -522,14 +413,6 @@ CidInfo*
CRefMgr::getCookieRef( int socket )
{
CidInfo* cinfo = m_cidlock->ClaimSocket( socket );
/* if ( NULL == cinfo ) { */
/* MutexLock ml( &m_SocketStuffMutex ); */
/* SocketMap::iterator iter = m_SocketStuff.find( socket ); */
/* if ( iter != m_SocketStuff.end() ) { */
/* SocketStuff* stuff = iter->second; */
/* cinfo = m_cidlock->Claim( stuff->m_cref->GetCookieID() ); */
/* } */
/* } */
assert( NULL == cinfo || NULL != cinfo->GetRef() );
return cinfo;
@ -639,26 +522,6 @@ CRefMgr::Recycle( const char* connName )
Recycle( id );
} /* Delete */
/* CidInfo* */
/* CRefMgr::getCookieRef_impl( CookieID cid ) */
/* { */
/* CidInfo* info = m_cidlock->Claim( cid ); */
/* /\* CookieRef* ref = NULL; *\/ */
/* /\* RWReadLock rwl( &m_cookieMapRWLock ); *\/ */
/* /\* CookieMap::iterator iter = m_cookieMap.find( cid ); *\/ */
/* /\* while ( iter != m_cookieMap.end() ) { *\/ */
/* /\* CookieRef* second = iter->second; *\/ */
/* /\* if ( second->GetCookieID() == cid ) { *\/ */
/* /\* ref = second; *\/ */
/* /\* break; *\/ */
/* /\* } *\/ */
/* /\* ++iter; *\/ */
/* /\* } *\/ */
/* /\* info->SetRef( ref ); *\/ */
/* return info; */
/* } */
#ifdef RELAY_HEARTBEAT
void
CRefMgr::checkHeartbeats( time_t now )
@ -813,17 +676,6 @@ SafeCref::SafeCref( int socket )
}
}
/* SafeCref::SafeCref( CookieRef* cref ) */
/* : m_cinfo( NULL ) */
/* , m_mgr( CRefMgr::Get() ) */
/* , m_isValid( false ) */
/* { */
/* assert(0); /\* path to this? *\/ */
/* m_locked = cref->Lock(); */
/* m_isValid = m_locked; */
/* // m_cref = cref; */
/* } */
SafeCref::~SafeCref()
{
if ( m_cinfo != NULL ) {

View file

@ -86,16 +86,11 @@ class CRefMgr {
/* void LockAll() { pthread_rwlock_wrlock( &m_cookieMapRWLock ); } */
/* void UnlockAll() { pthread_rwlock_unlock( &m_cookieMapRWLock ); } */
/* Track sockets independent of cookie refs */
bool Associate( int socket, CookieRef* cref );
bool Associate_locked( int socket, CookieRef* cref );
void Disassociate( int socket, CookieRef* cref );
void Disassociate_locked( int socket, CookieRef* cref );
void MoveSockets( vector<int> sockets, CookieRef* cref );
pthread_mutex_t* GetWriteMutexForSocket( int socket );
void RemoveSocketRefs( int socket );
void PrintSocketInfo( int socket, string& out );
/* SocketsIterator MakeSocketsIterator(); */
void IncrementFullCount( void );
int GetNumRoomsFilled( void );
@ -153,9 +148,6 @@ class CRefMgr {
static void heartbeatProc( void* closure );
void checkHeartbeats( time_t now );
/* pthread_mutex_t m_nextCIDMutex; */
/* CookieID m_nextCID; */
pthread_mutex_t m_roomsFilledMutex;
int m_nRoomsFilled;