cleanup; and: collapse HostRec and socket set into map of socket to

HostRec, protecting all accesses with ReadWrite locks.
This commit is contained in:
Andy2 2011-06-25 15:40:12 -07:00
parent 6c121dac57
commit 38ad3aa81c
2 changed files with 163 additions and 226 deletions

View file

@ -114,7 +114,7 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id,
CookieRef::CookieRef( const char* cookie, const char* connName, CookieID id,
int langCode, int nPlayersT, int nAlreadyHere )
{
pthread_mutex_init( &m_sockSetMutex, NULL );
pthread_rwlock_init( &m_socketsRWLock, NULL );
ReInit( cookie, connName, id, langCode, nPlayersT, nAlreadyHere );
}
@ -127,17 +127,18 @@ CookieRef::~CookieRef()
XWThreadPool* tPool = XWThreadPool::GetTPool();
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int socket = iter->m_socket;
tPool->CloseSocket( socket );
RmSocket( socket );
m_sockets.erase( iter );
map<int,HostRec>::iterator iter;
{
RWWriteLock rwl( &m_socketsRWLock );
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int socket = iter->first;
tPool->CloseSocket( socket );
m_sockets.erase( iter );
}
}
printSeeds(__func__);
pthread_mutex_destroy( &m_sockSetMutex );
pthread_rwlock_destroy( &m_socketsRWLock );
} /* ~CookieRef */
void
@ -278,13 +279,11 @@ CookieRef::HostForSocket( int sock )
{
HostID hid = -1;
ASSERT_LOCKED();
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == sock ) {
hid = iter->m_hostID;
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid );
break;
}
RWReadLock rrl( &m_socketsRWLock );
map<int, HostRec>::const_iterator iter = m_sockets.find( sock );
if ( iter != m_sockets.end() ) {
hid = iter->second.m_hostID;
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid );
}
return hid;
}
@ -294,11 +293,13 @@ CookieRef::SocketForHost( HostID dest )
{
int socket = -1;
ASSERT_LOCKED();
vector<HostRec>::const_iterator iter;
assert( dest != 0 ); /* don't use as lookup before assigned */
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == dest ) {
socket = iter->m_socket;
if ( iter->second.m_hostID == dest ) {
socket = iter->first;
break;
}
}
@ -313,14 +314,15 @@ CookieRef::AlreadyHere( unsigned short seed, int socket, HostID* prevHostID )
logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket );
bool here = false;
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
here = iter->m_seed == seed; /* client already registered */
here = iter->second.m_seed == seed; /* client already registered */
if ( here ) {
if ( iter->m_socket != socket ) { /* not just a dupe packet */
if ( iter->first != socket ) { /* not just a dupe packet */
logf( XW_LOGINFO, "%s: seeds match; socket %d assumed closed",
__func__, iter->m_socket );
*prevHostID = iter->m_hostID;
__func__, iter->first );
*prevHostID = iter->second.m_hostID;
}
break;
}
@ -338,18 +340,18 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket,
hid, seed, seed, socket );
bool here = false;
vector<HostRec>::iterator iter;
RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == hid ) {
if ( seed != iter->m_seed ) {
if ( iter->second.m_hostID == hid ) {
if ( seed != iter->second.m_seed ) {
*spotTaken = true;
} else if ( socket == iter->m_socket ) {
} else if ( socket == iter->first ) {
here = true; /* dup packet */
} else {
logf( XW_LOGINFO, "%s: hids match; nuking existing record "
"for socket %d b/c assumed closed", __func__,
iter->m_socket );
RmSocket( iter->m_socket );
iter->first );
m_sockets.erase( iter );
}
break;
@ -376,44 +378,33 @@ void
CookieRef::removeSocket( int socket )
{
logf( XW_LOGINFO, "%s(socket=%d)", __func__, socket );
int count;
{
bool found = false;
ASSERT_LOCKED();
bool found = false;
ASSERT_LOCKED();
count = m_sockets.size();
assert( count <= 4 );
if ( count > 0 ) {
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin();
!found && iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == socket ) {
if ( iter->m_ackPending ) {
logf( XW_LOGINFO,
"Never got ack; removing hid %d from DB",
iter->m_hostID );
DBMgr::Get()->RmDeviceByHid( ConnName(), iter->m_hostID );
m_nPlayersHere -= iter->m_nPlayersH;
cancelAckTimer( iter->m_hostID );
}
RmSocket( socket );
m_sockets.erase(iter);
--count;
found = true;
}
{
RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter = m_sockets.find(socket);
if ( iter != m_sockets.end() ) {
if ( iter->second.m_ackPending ) {
logf( XW_LOGINFO,
"Never got ack; removing hid %d from DB",
iter->second.m_hostID );
DBMgr::Get()->RmDeviceByHid( ConnName(),
iter->second.m_hostID );
m_nPlayersHere -= iter->second.m_nPlayersH;
cancelAckTimer( iter->second.m_hostID );
}
} else {
logf( XW_LOGERROR, "%s: no socket %d to remove", __func__,
socket );
}
if ( !found ) {
logf( XW_LOGINFO, "%s: socket %d not found", __func__, socket );
m_sockets.erase(iter);
found = true;
}
}
if ( !found ) {
logf( XW_LOGINFO, "%s: socket %d not found", __func__, socket );
}
printSeeds(__func__);
if ( count == 0 ) {
if ( m_sockets.size() == 0 ) {
pushLastSocketGoneEvent();
}
} /* removeSocket */
@ -443,39 +434,22 @@ CookieRef::HasSocket( int socket )
set<int>
CookieRef::GetSockets()
{
MutexLock ml( &m_sockSetMutex );
return m_sockSet;
}
void
CookieRef::AddSocket( int socket )
{
MutexLock ml( &m_sockSetMutex );
assert( m_sockSet.find(socket) == m_sockSet.end() );
m_sockSet.insert( socket );
}
void
CookieRef::RmSocket( int socket )
{
MutexLock ml( &m_sockSetMutex );
assert( m_sockSet.find(socket) != m_sockSet.end() );
m_sockSet.erase( socket );
set<int> result;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
result.insert( iter->first );
}
return result;
}
bool
CookieRef::HasSocket_locked( int socket )
{
bool found = false;
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == socket ) {
found = true;
break;
}
}
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter = m_sockets.find( socket );
bool found = iter != m_sockets.end();
logf( XW_LOGINFO, "%s=>%d", __func__, found );
return found;
@ -493,11 +467,14 @@ void
CookieRef::_CheckHeartbeats( time_t now )
{
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
time_t last = iter->m_lastHeartbeat;
if ( (now - last) > GetHeartbeat() ) {
pushHeartFailedEvent( iter->m_socket );
{
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > GetHeartbeat() ) {
pushHeartFailedEvent( iter->first );
}
}
}
@ -683,11 +660,11 @@ CookieRef::handleEvents()
break;
case XWA_TIMERDISCONN:
disconnectSockets( 0, XWRELAY_ERROR_TIMEOUT );
disconnectSockets( XWRELAY_ERROR_TIMEOUT );
break;
case XWA_SHUTDOWN:
disconnectSockets( 0, XWRELAY_ERROR_SHUTDOWN );
disconnectSockets( XWRELAY_ERROR_SHUTDOWN );
break;
case XWA_HEARTDISCONN:
@ -695,8 +672,8 @@ CookieRef::handleEvents()
XWRELAY_ERROR_HEART_OTHER );
setAllConnectedTimer();
// reducePlayerCounts( evt.u.discon.socket );
disconnectSockets( evt.u.heart.socket,
XWRELAY_ERROR_HEART_YOU );
disconnectSocket( evt.u.heart.socket,
XWRELAY_ERROR_HEART_YOU );
break;
case XWA_DISCONNECT:
@ -740,7 +717,6 @@ CookieRef::handleEvents()
CRefMgr::Get()->IncrementFullCount();
cancelAllConnectedTimer();
sendAllHere( true );
/* checkSomeMissing(); */
break;
case XWA_SNDALLHERE_2:
@ -889,9 +865,11 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp )
assert( m_sockets.size() < 4 );
HostRec hr( hostid, socket, nPlayersH, seed, !reconn );
m_sockets.push_back( hr );
AddSocket( socket );
{
RWWriteLock rwl( &m_socketsRWLock );
HostRec hr( hostid, nPlayersH, seed, !reconn );
m_sockets.insert( pair<int,HostRec>(socket, hr) );
}
printSeeds(__func__);
@ -910,17 +888,19 @@ CookieRef::updateAck( HostID hostID, bool keep )
cancelAckTimer( hostID );
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_ackPending && iter->m_hostID == hostID ) {
if ( keep ) {
iter->m_ackPending = false;
DBMgr::Get()->NoteAckd( ConnName(), hostID );
} else {
socket = iter->m_socket;
{
RWWriteLock rwl( &m_socketsRWLock );
map<int, HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_ackPending && iter->second.m_hostID == hostID ) {
if ( keep ) {
iter->second.m_ackPending = false;
DBMgr::Get()->NoteAckd( ConnName(), hostID );
} else {
socket = iter->first;
}
break;
}
break;
}
}
@ -1112,11 +1092,12 @@ CookieRef::notifyOthers( int socket, XWRelayMsg msg, XWREASON why )
assert( socket != 0 );
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int other = iter->m_socket;
int other = iter->first;
if ( other != socket ) {
send_msg( other, iter->m_hostID, msg, why, false );
send_msg( other, iter->second.m_hostID, msg, why, false );
}
}
} /* notifyOthers */
@ -1138,7 +1119,7 @@ CookieRef::notifyGameDead( int socket )
/* ASSERT_LOCKED(); */
/* vector<int> sockets; */
/* vector<HostRec>::iterator iter; */
/* map<int,HostRec>::iterator iter; */
/* for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { */
/* sockets.push_back( iter->m_socket ); */
/* } */
@ -1176,15 +1157,18 @@ CookieRef::sendAllHere( bool initial )
for ( dest = 1; dest <= m_nPlayersHere; ++dest ) {
bool sent = false;
*idLoc = dest; /* write in this target's hostId */
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == dest ) {
sent = send_with_length( iter->m_socket, buf, bufp-buf,
true );
break;
{
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_hostID == dest ) {
sent = send_with_length( iter->first, buf, bufp-buf,
true );
break;
}
}
}
if ( !sent ) {
store_message( dest, buf, bufp-buf );
}
@ -1192,63 +1176,11 @@ CookieRef::sendAllHere( bool initial )
} /* sendAllHere */
#define CONNNAME_DELIM ' ' /* ' ' so will wrap in browser */
/* Does my seed belong as part of existing connName */
bool
CookieRef::SeedBelongs( int gameSeed )
{
bool belongs = false;
const char* ptr = ConnName();
const char* end = ptr + strlen(ptr);
assert( '\0' != ptr[0] );
char buf[5];
snprintf( buf, sizeof(buf), "%.4X", gameSeed );
for ( ; *ptr != CONNNAME_DELIM && ptr < end; ptr += 4 ) {
if ( 0 == strncmp( ptr, buf, 4 ) ) {
belongs = true;
break;
}
}
return belongs;
} /* SeedBelongs */
/* does my connName provide a home for seeds already in this connName-less
ref? */
bool
CookieRef::SeedsBelong( const char* connName )
{
bool found = true;
assert( !m_connName[0] );
const char* delim = strchr( connName, CONNNAME_DELIM );
assert( !!delim );
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
char buf[5];
snprintf( buf, sizeof(buf), "%.4X", iter->m_seed );
const char* match = strstr( connName, buf );
if ( !match || match > delim ) {
found = false;
break;
}
}
return found;
} /* SeedsBelong */
void
CookieRef::assignConnName( void )
{
if ( '\0' == ConnName()[0] ) {
/* vector<HostRec>::iterator iter; */
/* for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { */
/* char buf[5]; */
/* snprintf( buf, sizeof(buf), "%.4X", iter->m_seed ); */
/* m_connName += buf; */
/* } */
m_connName += /*CONNNAME_DELIM + */PermID::GetNextUniqueID();
logf( XW_LOGINFO, "%s: assigning name: %s", __func__, ConnName() );
@ -1258,20 +1190,26 @@ CookieRef::assignConnName( void )
}
void
CookieRef::disconnectSockets( int socket, XWREASON why )
CookieRef::disconnectSockets( XWREASON why )
{
if ( socket == 0 ) {
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
assert( iter->m_socket != 0 );
disconnectSockets( iter->m_socket, why );
ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
assert( iter->first != 0 );
if ( iter->first != 0 ) {
disconnectSocket( iter->first, why );
}
} else {
pushNotifyDisconEvent( socket, why );
pushRemoveSocketEvent( socket );
}
} /* disconnectSockets */
}
void
CookieRef::disconnectSocket( int socket, XWREASON why )
{
ASSERT_LOCKED();
pushNotifyDisconEvent( socket, why );
pushRemoveSocketEvent( socket );
} /* disconnectSocket */
void
CookieRef::removeDevice( const CRefEvent* const evt )
@ -1281,9 +1219,10 @@ CookieRef::removeDevice( const CRefEvent* const evt )
evt->u.devgone.seed ) ) {
dbmgr->KillGame( ConnName(), evt->u.devgone.hid );
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
notifyGameDead( iter->m_socket );
notifyGameDead( iter->first );
}
}
}
@ -1295,29 +1234,28 @@ CookieRef::noteHeartbeat( const CRefEvent* evt )
HostID id = evt->u.heart.id;
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == id ) {
if ( iter->second.m_hostID == id ) {
int second_socket = iter->first;
if ( second_socket == socket ) {
logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, uptime() );
iter->second.m_lastHeartbeat = uptime();
} else {
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. But:
now it's happening after a crash and clients reconnect. */
logf( XW_LOGERROR, "wrong socket record for HostID %x; "
"wanted %d, found %d", id, socket, second_socket );
}
break;
}
}
if ( iter == m_sockets.end() ) {
logf( XW_LOGERROR, "no socket for HostID %x", id );
} else {
int second_socket = iter->m_socket;
if ( second_socket == socket ) {
logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d",
iter->m_lastHeartbeat, uptime() );
iter->m_lastHeartbeat = uptime();
} else {
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. But:
now it's happening after a crash and clients reconnect. */
logf( XW_LOGERROR, "wrong socket record for HostID %x; wanted %d, "
"found %d", id, socket, second_socket );
}
}
} /* noteHeartbeat */
@ -1368,11 +1306,14 @@ CookieRef::printSeeds( const char* caller )
{
int len = 0;
char buf[64] = {0};
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
len += snprintf( &buf[len], sizeof(buf)-len, "[%d]%.4x(%d)/%d/%c ",
iter->m_hostID, iter->m_seed, iter->m_seed,
iter->m_socket, iter->m_ackPending?'a':'A' );
iter->second.m_hostID, iter->second.m_seed,
iter->second.m_seed, iter->first,
iter->second.m_ackPending?'a':'A' );
}
logf( XW_LOGINFO, "seeds/sockets/ack'd after %s(): %s", caller, buf );
}
@ -1426,11 +1367,13 @@ CookieRef::_PrintCookieInfo( string& out )
snprintf( buf, sizeof(buf), "Hosts connected=%d; cur time = %ld\n",
m_sockets.size(), uptime() );
out += buf;
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n",
iter->m_hostID, iter->m_socket,
iter->m_lastHeartbeat );
iter->second.m_hostID, iter->first,
iter->second.m_lastHeartbeat );
out += buf;
}
@ -1440,23 +1383,24 @@ void
CookieRef::_FormatHostInfo( string* hostIds, string* seeds, string* addrs )
{
ASSERT_LOCKED();
vector<HostRec>::iterator iter;
RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( !!hostIds ) {
char buf[8];
snprintf( buf, sizeof(buf), "%d ", iter->m_hostID );
snprintf( buf, sizeof(buf), "%d ", iter->second.m_hostID );
*hostIds += buf;
}
if ( !!seeds ) {
char buf[6];
snprintf( buf, sizeof(buf), "%.4X ", iter->m_seed );
snprintf( buf, sizeof(buf), "%.4X ", iter->second.m_seed );
*seeds += buf;
}
if ( !!addrs ) {
int s = iter->m_socket;
int s = iter->first;
struct sockaddr_in name;
socklen_t siz = sizeof(name);
if ( 0 == getpeername( s, (struct sockaddr*)&name, &siz) ) {

View file

@ -41,9 +41,8 @@ class CookieMapIterator; /* forward */
struct HostRec {
public:
HostRec(HostID hostID, int socket, int nPlayersH, int seed, bool ackPending )
HostRec(HostID hostID, int nPlayersH, int seed, bool ackPending )
: m_hostID(hostID)
, m_socket(socket)
, m_nPlayersH(nPlayersH)
, m_seed(seed)
, m_lastHeartbeat(uptime())
@ -52,7 +51,6 @@ HostRec(HostID hostID, int socket, int nPlayersH, int seed, bool ackPending )
::logf( XW_LOGINFO, "created HostRec with id %d", m_hostID);
}
HostID m_hostID;
int m_socket;
int m_nPlayersH;
int m_seed;
time_t m_lastHeartbeat;
@ -235,19 +233,15 @@ class CookieRef {
void notifyOthers( int socket, XWRelayMsg msg, XWREASON why );
void notifyGameDead( int socket );
void disconnectSockets( int socket, XWREASON why );
void disconnectSockets( XWREASON why );
void disconnectSocket( int socket, XWREASON why );
void removeDevice( const CRefEvent* const evt );
void noteHeartbeat(const CRefEvent* evt);
void notifyDisconn(const CRefEvent* evt);
void removeSocket( int socket );
void sendAllHere( bool initial );
void checkSomeMissing( void );
void moveSockets( void );
bool SeedBelongs( int gameSeed );
bool SeedsBelong( const char* connName );
void assignConnName( void );
void assignHostIds( void );
time_t GetStarttime( void ) { return m_starttime; }
int GetLangCode( void ) { return m_langCode; }
@ -267,15 +261,14 @@ class CookieRef {
static void s_checkAllConnected( void* closure );
static void s_checkAck( void* closure );
vector<HostRec> m_sockets;
pthread_rwlock_t m_socketsRWLock;
map<int, HostRec> m_sockets;
int m_heatbeat; /* might change per carrier or something. */
string m_cookie; /* cookie used for initial connections */
string m_connName; /* globally unique name */
CookieID m_cookieID; /* Unique among current games on this server */
pthread_mutex_t m_sockSetMutex;
set<int> m_sockSet;
XW_RELAY_STATE m_curState;
deque<CRefEvent> m_eventQueue;