loads of changes: get settings from config file; remove sockets and

kill crefs via state machine, and protect access to a cref so it can
die without another thread being in it; do timers via timeout to
poll() rather than interrupt (and integrate into state machine);
detect when all players are present and change state so new
connections on that cookie will get a new cref.
This commit is contained in:
ehouse 2005-09-02 06:56:34 +00:00
parent 5d22be174a
commit 8f32f4f99a
10 changed files with 512 additions and 418 deletions

View file

@ -16,10 +16,18 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
CC = g++
SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp states.cpp
SRC = xwrelay.cpp \
cref.cpp \
ctrl.cpp \
tpool.cpp \
states.cpp \
timermgr.cpp \
configs.cpp \
crefmgr.cpp \
OBJ = $(patsubst %.cpp,%.o,$(SRC))
LDFLAGS += -lpthread -g
CPPFLAGS += -g -Wall -DHEARTBEAT=10
CPPFLAGS += -g -Wall
memdebug all: xwrelay

View file

@ -29,245 +29,41 @@
#include "mlock.h"
#include "tpool.h"
#include "states.h"
#include "timermgr.h"
#include "configs.h"
#include "crefmgr.h"
using namespace std;
static CookieMap gCookieMap;
pthread_rwlock_t gCookieMapRWLock = PTHREAD_RWLOCK_INITIALIZER;
pthread_mutex_t g_IdsMutex = PTHREAD_MUTEX_INITIALIZER;
CookieID CookieRef::ms_nextConnectionID = 1000;
/* static */ CookieRef*
CookieRef::AddNew( string s, CookieID id )
{
RWWriteLock rwl( &gCookieMapRWLock );
CookieRef* ref = new CookieRef( s, id );
gCookieMap.insert( pair<CookieID, CookieRef*>(ref->GetCookieID(), ref ) );
logf( "paired cookie %s with id %d", s.c_str(), ref->GetCookieID() );
return ref;
}
/* static */ void
CookieRef::Delete( CookieID id )
{
CookieRef* cref = get_cookieRef( id );
if ( cref != NULL ) {
delete cref;
}
}
/* static */ void
CookieRef::Delete( const char* name )
{
CookieID id = CookieIdForName( name );
Delete( id );
} /* Delete */
CookieID
CookieIdForName( const char* name )
{
CookieRef* ref = NULL;
string s(name);
RWReadLock rwl( &gCookieMapRWLock );
CookieMap::iterator iter = gCookieMap.begin();
while ( iter != gCookieMap.end() ) {
ref = iter->second;
if ( ref->Name() == s && ref->NotFullyConnected() ) {
return ref->GetCookieID();
}
++iter;
}
return 0;
} /* CookieIdForName */
void
CheckHeartbeats( time_t now, vector<int>* sockets )
{
RWReadLock rwl( &gCookieMapRWLock );
CookieMap::iterator iter = gCookieMap.begin();
while ( iter != gCookieMap.end() ) {
CookieRef* ref = iter->second;
ref->CheckHeartbeats( now, sockets );
++iter;
}
} /* CheckHeartbeats */
/* [Re]connecting. If there was a game in progress and this host disconnected
* briefly then we can just reconnect. Otherwise we have to create just as if
* it were a from-scratch connect, but without choosing the CookieID.
*/
CookieRef*
get_make_cookieRef( const char* cookie, CookieID cookieID )
{
/* start with the cookieID if it's set */
CookieRef* cref = cookieID == 0 ? NULL: get_cookieRef( cookieID );
if ( cref == NULL ) { /* need to keep looking? */
CookieID newId = CookieIdForName( cookie );
if ( newId == 0 ) { /* not in the system */
cref = CookieRef::AddNew( string(cookie), cookieID );
} else {
cref = get_cookieRef( newId );
}
}
return cref;
}
CookieRef*
get_cookieRef( CookieID cookieID )
{
CookieRef* ref = NULL;
RWReadLock rwl( &gCookieMapRWLock );
CookieMap::iterator iter = gCookieMap.find( cookieID);
while ( iter != gCookieMap.end() ) {
CookieRef* sec = iter->second;
if ( sec->GetCookieID() == cookieID ) {
ref = sec;
break;
}
++iter;
}
return ref;
} /* get_cookieRef */
static void
ForgetCref( CookieRef* cref )
{
RWWriteLock ml( &gCookieMapRWLock );
CookieMap::iterator iter = gCookieMap.begin();
while ( iter != gCookieMap.end() ) {
CookieRef* ref = iter->second;
if ( ref == cref ) {
logf( "erasing cref" );
gCookieMap.erase( iter );
break;
}
++iter;
}
assert( iter != gCookieMap.end() ); /* didn't find it */
}
class SocketStuff {
public:
SocketStuff( pthread_t id, CookieRef* cref )
: m_threadID(id),
m_cref(cref)
{
pthread_mutex_init( &m_writeMutex, NULL );
}
~SocketStuff() { pthread_mutex_destroy( &m_writeMutex ); }
pthread_t m_threadID;
CookieRef* m_cref;
pthread_mutex_t m_writeMutex; /* so only one thread writes at a time */
};
SocketMap SocketMgr::ms_SocketStuff;
pthread_mutex_t SocketMgr::ms_SocketStuffMutex = PTHREAD_MUTEX_INITIALIZER;
/* static */ void
SocketMgr::Associate( int socket, CookieRef* cref )
{
logf( "ms_SocketStuffMutex=%x", &ms_SocketStuffMutex );
MutexLock ml( &ms_SocketStuffMutex );
SocketMap::iterator iter = ms_SocketStuff.find( socket );
if ( iter == ms_SocketStuff.end() ) {
logf( "replacing existing cref/threadID pair for socket %d", socket );
}
pthread_t self = pthread_self();
SocketStuff* stuff = new SocketStuff( self, cref );
ms_SocketStuff.insert( pair< int, SocketStuff* >( socket, stuff ) );
} /* Associate */
/*static*/ CookieRef*
SocketMgr::CookieRefForSocket( int socket )
{
MutexLock ml( &ms_SocketStuffMutex );
SocketMap::iterator iter = ms_SocketStuff.find( socket );
if ( iter != ms_SocketStuff.end() ) {
SocketStuff* stuff = iter->second;
return stuff->m_cref;
}
return NULL;
}
/* static */ pthread_mutex_t*
SocketMgr::GetWriteMutexForSocket( int socket )
{
MutexLock ml( &ms_SocketStuffMutex );
SocketMap::iterator iter = ms_SocketStuff.find( socket );
if ( iter != ms_SocketStuff.end() ) {
SocketStuff* stuff = iter->second;
return &stuff->m_writeMutex;
}
assert( 0 );
}
/* static */ void
SocketMgr::RemoveSocketRefs( int socket )
{
CookieRef* cref = CookieRefForSocket( socket );
if ( cref != NULL ) {
MutexLock ml( &ms_SocketStuffMutex );
SocketMap::iterator iter = ms_SocketStuff.find( socket );
assert( iter != ms_SocketStuff.end() );
delete iter->second;
ms_SocketStuff.erase( iter );
cref->Remove( socket );
} else {
logf( "socket already dead" );
}
} /* RemoveSocketRefs */
/* static */ void
SocketMgr::PrintSocketInfo( int socket, string& out )
{
CookieRef* me = SocketMgr::CookieRefForSocket( socket );
assert( me );
char buf[64];
snprintf( buf, sizeof(buf), "* socket: %d\n", socket );
out += buf;
snprintf( buf, sizeof(buf), " in cookie: %s\n", me->Name().c_str() );
out += buf;
}
/* static */ SocketsIterator
SocketMgr::MakeSocketsIterator()
{
SocketsIterator iter( ms_SocketStuff.begin() );
return iter;
}
/*****************************************************************************
* SocketsIterator class
*****************************************************************************/
SocketsIterator::SocketsIterator( SocketMap::iterator iter )
SocketsIterator::SocketsIterator( SocketMap::iterator iter,
SocketMap::iterator end,
pthread_mutex_t* rwlock )
: m_iter( iter )
, m_end( end )
, m_mutex( rwlock )
{
}
SocketsIterator::~SocketsIterator()
{
pthread_mutex_unlock( m_mutex );
}
int
SocketsIterator::Next()
{
int socket = m_iter->first;
++m_iter;
int socket = 0;
if ( m_iter != m_end ) {
socket = m_iter->first;
++m_iter;
}
return socket;
}
@ -276,7 +72,8 @@ SocketsIterator::Next()
*****************************************************************************/
CookieRef::CookieRef( string s, CookieID id )
: m_name(s)
: m_heatbeat(RelayConfigs::GetConfigs()->GetHeartbeatInterval())
, m_name(s)
, m_totalSent(0)
, m_curState(XW_ST_INITED)
{
@ -293,6 +90,8 @@ CookieRef::CookieRef( string s, CookieID id )
CookieRef::~CookieRef()
{
cancelAllConnectedTimer();
/* get rid of any sockets still contained */
XWThreadPool* tPool = XWThreadPool::GetTPool();
@ -311,15 +110,26 @@ CookieRef::~CookieRef()
pthread_rwlock_destroy( &m_sockets_rwlock );
logf( "CookieRef for %d being deleted", m_connectionID );
pthread_mutex_destroy( &m_EventsMutex );
pthread_rwlock_destroy( &m_sockets_rwlock );
} /* ~CookieRef */
void
CookieRef::_Connect( int socket, HostID srcID )
{
CRefMgr::Get()->Associate( socket, this );
MutexLock ml( &m_EventsMutex );
pushConnectEvent( socket, srcID );
handleEvents();
}
void
CookieRef::Connect( int socket, HostID srcID )
CookieRef::_Reconnect( int socket, HostID srcID )
{
SocketMgr::Associate( socket, this );
CRefMgr::Get()->Associate( socket, this );
MutexLock ml( &m_EventsMutex );
pushConnectEvent( socket, srcID );
pushReconnectEvent( socket, srcID );
handleEvents();
}
@ -339,8 +149,9 @@ CookieRef::SocketForHost( HostID dest )
}
void
CookieRef::Remove( int socket )
CookieRef::removeSocket( const CRefEvent* evt )
{
int socket = evt->u.rmsock.socket;
int count;
{
RWWriteLock rwl( &m_sockets_rwlock );
@ -362,13 +173,31 @@ CookieRef::Remove( int socket )
XWThreadPool::GetTPool()->CloseSocket( socket );
if ( count == 0 ) {
ForgetCref( this );
delete this;
pushLastSocketGoneEvent();
}
}
} /* Remove */
int
CookieRef::HasSocket( int socket )
{
int found = 0;
logf( "CookieRef::HasSocket" );
RWReadLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
if ( iter->second.m_socket == socket ) {
found = 1;
break;
}
++iter;
}
return found;
} /* HasSocket */
void
CookieRef::HandleHeartbeat( HostID id, int socket )
CookieRef::_HandleHeartbeat( HostID id, int socket )
{
MutexLock ml( &m_EventsMutex );
pushHeartbeatEvent( id, socket );
@ -380,19 +209,26 @@ CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
{
logf( "CookieRef::CheckHeartbeats" );
MutexLock ml( &m_EventsMutex );
pushHeartTimerEvent( now, victims );
handleEvents();
} /* CheckHeartbeats */
void
CookieRef::Forward( HostID src, HostID dest, unsigned char* buf, int buflen )
CookieRef::_Forward( HostID src, HostID dest, unsigned char* buf, int buflen )
{
MutexLock ml( &m_EventsMutex );
pushForwardEvent( src, dest, buf, buflen );
handleEvents();
} /* Forward */
void
CookieRef::_Remove( int socket )
{
MutexLock ml( &m_EventsMutex );
pushRemoveSocketEvent( socket );
handleEvents();
} /* Forward */
void
CookieRef::pushConnectEvent( int socket, HostID srcID )
{
@ -403,6 +239,16 @@ CookieRef::pushConnectEvent( int socket, HostID srcID )
m_eventQueue.push_front( evt );
} /* pushConnectEvent */
void
CookieRef::pushReconnectEvent( int socket, HostID srcID )
{
CRefEvent evt;
evt.type = XW_EVENT_RECONNECTMSG;
evt.u.recon.socket = socket;
evt.u.recon.srcID = srcID;
m_eventQueue.push_front( evt );
} /* pushConnectEvent */
void
CookieRef::pushHeartbeatEvent( HostID id, int socket )
{
@ -436,6 +282,15 @@ CookieRef::pushForwardEvent( HostID src, HostID dest,
m_eventQueue.push_front( evt );
}
void
CookieRef::pushRemoveSocketEvent( int socket )
{
CRefEvent evt;
evt.type = XW_EVENT_REMOVESOCKET;
evt.u.rmsock.socket = socket;
m_eventQueue.push_front( evt );
}
void
CookieRef::pushDestBadEvent()
{
@ -453,15 +308,41 @@ CookieRef::pushDestOkEvent( const CRefEvent* oldEvt )
m_eventQueue.push_front( evt );
}
void
CookieRef::pushCanLockEvent( const CRefEvent* oldEvt )
{
CRefEvent evt;
memcpy( &evt, oldEvt, sizeof(evt) );
evt.type = XW_EVENT_CAN_LOCK;
m_eventQueue.push_front( evt );
}
void
CookieRef::pushCantLockEvent( const CRefEvent* oldEvt )
{
CRefEvent evt;
memcpy( &evt, oldEvt, sizeof(evt) );
evt.type = XW_EVENT_CANT_LOCK;
m_eventQueue.push_front( evt );
}
void
CookieRef::pushLastSocketGoneEvent()
{
CRefEvent evt;
evt.type = XW_EVENT_NOMORESOCKETS;
m_eventQueue.push_front( evt );
}
void
CookieRef::handleEvents()
{
XW_RELAY_ACTION takeAction;
while ( m_eventQueue.size() > 0 ) {
/* Assumption: has mutex!!!! */
while ( m_eventQueue.size () > 0 ) {
CRefEvent evt = m_eventQueue.front();
m_eventQueue.pop_front();
XW_RELAY_ACTION takeAction;
if ( getFromTable( m_curState, evt.type, &takeAction, &m_nextState ) ) {
logf( "moving from state %s to state %s for event %s",
@ -469,6 +350,9 @@ CookieRef::handleEvents()
eventString(evt.type) );
switch( takeAction ) {
case XW_ACTION_SEND_1ST_RSP:
setAllConnectedTimer();
/* fallthru */
case XW_ACTION_SENDRSP:
sendResponse( &evt );
break;
@ -481,6 +365,10 @@ CookieRef::handleEvents()
checkDest( &evt );
break;
case XW_ACTION_CHECK_CAN_LOCK:
checkFromServer( &evt );
break;
case XW_ACTION_DISCONNECTALL:
disconnectAll( &evt );
break;
@ -493,6 +381,10 @@ CookieRef::handleEvents()
checkHeartbeats( &evt );
break;
case XW_ACTION_REMOVESOCKET:
removeSocket( &evt );
break;
case XW_ACTION_HEARTOK:
case XW_ACTION_NONE:
/* nothing to do for these */
@ -504,6 +396,8 @@ CookieRef::handleEvents()
}
m_curState = m_nextState;
} else {
assert(0);
}
}
} /* handleEvents */
@ -536,6 +430,21 @@ putNetLong( unsigned char** bufpp, unsigned long s )
assert( sizeof(s) == 4 ); /* otherwise need to hardcode */
}
void
CookieRef::setAllConnectedTimer()
{
time_t inHowLong;
inHowLong = RelayConfigs::GetConfigs()->GetAllConnectedInterval();
TimerMgr::getTimerMgr()->setTimer( inHowLong,
s_checkAllConnected, this, 0 );
}
void
CookieRef::cancelAllConnectedTimer()
{
TimerMgr::getTimerMgr()->clearTimer( s_checkAllConnected, this );
}
void
CookieRef::sendResponse( const CRefEvent* evt )
{
@ -595,9 +504,27 @@ CookieRef::checkDest( const CRefEvent* evt )
}
} /* checkDest */
void
CookieRef::checkFromServer( const CRefEvent* evt )
{
HostID src = evt->u.fwd.src;
if ( src == HOST_ID_SERVER ) {
pushCanLockEvent( evt );
} else {
pushCantLockEvent( evt );
}
}
void
CookieRef::disconnectAll( const CRefEvent* evt )
{
logf( "disconnectAll" );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
pushRemoveSocketEvent( iter->second.m_socket );
++iter;
}
logf( "disconnectAll done" );
}
void
@ -609,15 +536,18 @@ CookieRef::noteHeartbeat( const CRefEvent* evt )
RWWriteLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.find(id);
assert( iter != m_hostSockets.end() );
if ( iter == m_hostSockets.end() ) {
logf( "no socket for HostID %d", id );
} else {
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. */
assert( iter->second.m_socket == socket );
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. */
assert( iter->second.m_socket == socket );
logf( "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, now() );
iter->second.m_lastHeartbeat = now();
logf( "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, now() );
iter->second.m_lastHeartbeat = now();
}
} /* noteHeartbeat */
void
@ -632,7 +562,7 @@ CookieRef::checkHeartbeats( const CRefEvent* evt )
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > HEARTBEAT * 2 ) {
if ( (now - last) > GetHeartbeat() * 2 ) {
victims->push_back( iter->second.m_socket );
++vcount;
}
@ -646,8 +576,31 @@ CookieRef::checkHeartbeats( const CRefEvent* evt )
m_eventQueue.push_front( newEvt );
} /* checkHeartbeats */
/* timer callback */
/* static */ void
CookieRef::s_checkAllConnected( void* closure )
{
/* Need to ensure */
CookieRef* self = (CookieRef*)closure;
SafeCref scr(self);
if ( scr.IsValid() ) {
scr.CheckAllConnected();
}
}
void
CookieRef::PrintCookieInfo( string& out )
CookieRef::_CheckAllConnected()
{
logf( "checkAllConnected" );
MutexLock ml( &m_EventsMutex );
CRefEvent newEvt;
newEvt.type = XW_EVENT_CONNTIMER;
m_eventQueue.push_front( newEvt );
handleEvents();
}
void
CookieRef::_PrintCookieInfo( string& out )
{
out += "Name: ";
out += Name();
@ -666,28 +619,3 @@ CookieRef::PrintCookieInfo( string& out )
/* sockets */
} /* PrintCookieInfo */
/* static */ CookieMapIterator
CookieRef::GetCookieIterator()
{
CookieMapIterator iter;
return iter;
}
CookieMapIterator:: CookieMapIterator()
: _iter( gCookieMap.begin() )
{
}
CookieID
CookieMapIterator::Next()
{
CookieID id = 0;
if ( _iter != gCookieMap.end() ) {
CookieRef* cref = _iter->second;
id = cref->GetCookieID();
++_iter;
}
return id;
}

View file

@ -1,5 +1,24 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2005 by Eric House (fixin@peak.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _CREF_H_
#define _CREF_H_
@ -11,10 +30,6 @@
#include "xwrelay_priv.h"
#include "states.h"
#ifndef HEARTBEAT
# define HEARTBEAT 60
#endif
using namespace std;
class CookieMapIterator; /* forward */
@ -35,22 +50,19 @@ class CookieRef {
/* Within this cookie, remember that this hostID and socket go together.
If the hostID is HOST_ID_SERVER, it's the server. */
void Connect( int socket, HostID srcID );
short GetHeartbeat() { return HEARTBEAT; }
short GetHeartbeat() { return m_heatbeat; }
CookieID GetCookieID() { return m_connectionID; }
int SocketForHost( HostID dest );
void Remove( int socket );
int HostKnown( HostID host ) { return -1 != SocketForHost( host ); }
int CountSockets() { return m_hostSockets.size(); }
int HasSocket( int socket );
string Name() { return m_name; }
int NotFullyConnected() { return m_curState != XW_ST_ALLCONNECTED; }
void HandleHeartbeat( HostID id, int socket );
void CheckHeartbeats( time_t now, vector<int>* victims );
void Forward( HostID src, HostID dest, unsigned char* buf, int buflen );
/* for console */
void PrintCookieInfo( string& out );
void _PrintCookieInfo( string& out );
void PrintSocketInfo( string& out, int socket );
static CookieMapIterator GetCookieIterator();
@ -59,7 +71,18 @@ class CookieRef {
static void Delete( CookieID id );
static void Delete( const char* name );
/* These need to become private */
void _Connect( int socket, HostID srcID );
void _Reconnect( int socket, HostID srcID );
void _HandleHeartbeat( HostID id, int socket );
void _Forward( HostID src, HostID dest, unsigned char* buf, int buflen );
void _Remove( int socket );
void _CheckAllConnected();
int ShouldDie() { return m_curState == XW_ST_DEAD; }
private:
typedef struct CRefEvent {
XW_RELAY_EVENT type;
union {
@ -74,7 +97,8 @@ class CookieRef {
HostID srcID;
} con;
struct {
int socket;
HostID srcID;
} recon;
struct {
HostID id;
@ -84,38 +108,64 @@ class CookieRef {
time_t now;
vector<int>* victims;
} htime;
struct {
HostID hostID;
int reason;
} discon;
struct {
int socket;
} rmsock;
} u;
} CRefEvent;
friend class CRefMgr;
CookieRef( string s, CookieID id );
int SocketForHost( HostID dest );
void RecordSent( int nBytes, int socket ) {
/* This really needs a lock.... */
m_totalSent += nBytes;
}
void pushConnectEvent( int socket, HostID srcID );
void pushReconnectEvent( int socket, HostID srcID );
void pushHeartbeatEvent( HostID id, int socket );
void pushHeartTimerEvent( time_t now, vector<int>* victims );
void pushForwardEvent( HostID src, HostID dest, unsigned char* buf,
int buflen );
void pushDestBadEvent();
void pushLastSocketGoneEvent();
void pushRemoveSocketEvent( int socket );
void pushDestOkEvent( const CRefEvent* evt );
void pushCanLockEvent( const CRefEvent* evt );
void pushCantLockEvent( const CRefEvent* evt );
void handleEvents();
void sendResponse(const CRefEvent* evt);
void forward(const CRefEvent* evt);
void sendResponse( const CRefEvent* evt );
void setAllConnectedTimer();
void cancelAllConnectedTimer();
void forward( const CRefEvent* evt );
void checkDest( const CRefEvent* evt );
void checkFromServer( const CRefEvent* evt );
void disconnectAll(const CRefEvent* evt);
void noteHeartbeat(const CRefEvent* evt);
void checkHeartbeats(const CRefEvent* evt);
void removeSocket(const CRefEvent* evt);
/* timer callback */
static void s_checkAllConnected( void* closure );
map<HostID,HostRec> m_hostSockets;
pthread_rwlock_t m_sockets_rwlock;
CookieID m_connectionID;
short m_heatbeat; /* might change per carrier or something. */
string m_name;
int m_totalSent;
@ -124,6 +174,7 @@ class CookieRef {
current ones. */
pthread_mutex_t m_EventsMutex;
XW_RELAY_STATE m_curState;
XW_RELAY_STATE m_nextState;
deque<CRefEvent> m_eventQueue;
@ -131,46 +182,4 @@ class CookieRef {
static CookieID ms_nextConnectionID;
}; /* CookieRef */
typedef map<CookieID,CookieRef*> CookieMap;
class CookieMapIterator {
public:
CookieMapIterator();
~CookieMapIterator() {}
CookieID Next();
private:
CookieMap::const_iterator _iter;
};
CookieRef* get_make_cookieRef( const char* cookie, CookieID connID );
CookieRef* get_cookieRef( CookieID cookieID );
CookieID CookieIdForName( const char* name );
void CheckHeartbeats( time_t now, vector<int>* victims );
class SocketStuff;
typedef map< int, SocketStuff* > SocketMap;
class SocketsIterator {
public:
SocketsIterator( SocketMap::iterator iter );
int Next();
private:
SocketMap::iterator m_iter;
};
class SocketMgr {
public:
static void Associate( int socket, CookieRef* cref );
static pthread_mutex_t* GetWriteMutexForSocket( int socket );
static void RemoveSocketRefs( int socket );
static void PrintSocketInfo( int socket, string& out );
static SocketsIterator MakeSocketsIterator();
private:
static CookieRef* CookieRefForSocket( int socket );
static SocketMap ms_SocketStuff;
static pthread_mutex_t ms_SocketStuffMutex;
};
#endif

View file

@ -40,6 +40,7 @@
#include "ctrl.h"
#include "cref.h"
#include "crefmgr.h"
#include "xwrelay_priv.h"
/* this is *only* for testing. Don't abuse!!!! */
@ -116,15 +117,18 @@ cmd_discon( int socket, const char** args )
static void
print_cookies( int socket, CookieID theID )
{
CookieMapIterator iter = CookieRef::GetCookieIterator();
CRefMgr* cmgr = CRefMgr::Get();
CookieMapIterator iter = cmgr->GetCookieIterator();
CookieID id;
for ( id = iter.Next(); id != 0; id = iter.Next() ) {
if ( theID == 0 || theID == id ) {
CookieRef* cref = get_cookieRef( id );
string s;
cref->PrintCookieInfo( s );
SafeCref scr( id );
if ( scr.IsValid() ) {
string s;
scr.PrintCookieInfo( s );
print_to_sock( socket, s.c_str() );
print_to_sock( socket, s.c_str() );
}
}
}
}
@ -157,10 +161,10 @@ cmd_kill( int socket, const char** args )
const char* id = args[3];
if ( idhow != NULL && id != NULL ) {
if ( 0 == strcmp( idhow, "name" ) ) {
CookieRef::Delete( id );
CRefMgr::Get()->Delete( id );
found = 1;
} else if ( 0 == strcmp( idhow, "id" ) ) {
CookieRef::Delete( atoi( id ) );
CRefMgr::Get()->Delete( atoi( id ) );
found = 1;
}
}
@ -198,7 +202,7 @@ cmd_shutdown( int socket, const char** args )
static void
print_cookies( int socket, const char* name )
{
CookieID id = CookieIdForName( name );
CookieID id = CRefMgr::Get()->CookieIdForName( name );
print_cookies( socket, id );
}
@ -206,14 +210,14 @@ static void
print_socket_info( int out, int which )
{
string s;
SocketMgr::PrintSocketInfo( which, s );
CRefMgr::Get()->PrintSocketInfo( which, s );
print_to_sock( out, s.c_str() );
}
static void
print_sockets( int out, int sought )
{
SocketsIterator iter = SocketMgr::MakeSocketsIterator();
SocketsIterator iter = CRefMgr::Get()->MakeSocketsIterator();
int sock;
while ( (sock = iter.Next()) != 0 ) {
if ( sought == 0 || sought == sock ) {
@ -264,10 +268,11 @@ cmd_print( int socket, const char** args )
static int
cmd_lock( int socket, const char** args )
{
CRefMgr* mgr = CRefMgr::Get();
if ( 0 == strcmp( "on", args[1] ) ) {
pthread_rwlock_wrlock( &gCookieMapRWLock );
mgr->LockAll();
} else if ( 0 == strcmp( "off", args[1] ) ) {
pthread_rwlock_unlock( &gCookieMapRWLock );
mgr->UnlockAll();
} else {
print_to_sock( socket, "%s [on|off] (lock/unlock mutex)", args[0] );
}

View file

@ -25,6 +25,7 @@
#include "xwrelay_priv.h"
#include "cref.h"
#include "crefmgr.h"
class MutexLock {
public:
@ -51,9 +52,10 @@ class MutexLock {
class SocketWriteLock {
public:
SocketWriteLock( int socket ) {
m_socket = socket;
m_mutex = SocketMgr::GetWriteMutexForSocket( socket );
SocketWriteLock( int socket )
: m_socket( socket )
, m_mutex( CRefMgr::Get()->GetWriteMutexForSocket( socket ) )
{
#ifdef DEBUG_LOCKS
logf( "locking mutex %x for socket %d", m_mutex, socket );
#endif
@ -70,8 +72,8 @@ class SocketWriteLock {
}
private:
pthread_mutex_t* m_mutex;
int m_socket;
pthread_mutex_t* m_mutex;
};
class RWReadLock {

View file

@ -59,27 +59,37 @@ typedef struct StateTable {
StateTable g_stateTable[] = {
/* Initial msg comes in. Managing object created in init state, sends response */
{ XW_ST_INITED, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
{ XW_ST_INITED, XW_EVENT_CONNECTMSG, XW_ACTION_SEND_1ST_RSP, XW_ST_CONNECTING },
{ XW_ST_INITED, XW_EVENT_RECONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
/* Another connect msg comes in */
{ XW_ST_CONNECTING, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
/* Server can lock game when all players are accounted for. Not required
yet, though. In fact I'm not sure how to require it. :-) */
{ XW_ST_CONNECTING, XW_EVENT_ALLHEREMSG, XW_ACTION_LOCKGAME, XW_ST_ALLCONNECTED },
{ XW_ST_CONNECTING, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_RECONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
/* Forward requests while not locked are ok -- but we must check that the
target is actually present. If no socket available must drop the message */
{ XW_ST_CONNECTING, XW_EVENT_FORWARDMSG, XW_ACTION_CHECKDEST, XW_ST_CHECKINGDEST },
{ XW_ST_CHECKINGDEST, XW_EVENT_DESTOK, XW_ACTION_FWD, XW_ST_CONNECTING },
{ XW_ST_CHECKINGDEST, XW_EVENT_DESTBAD, XW_ACTION_NONE, XW_ST_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_FORWARDMSG, XW_ACTION_CHECKDEST, XW_ST_CHECKINGDEST },
{ XW_ST_CHECKINGDEST, XW_EVENT_DESTOK, XW_ACTION_CHECK_CAN_LOCK, XW_ST_CHECKING_CAN_LOCK },
{ XW_ST_CHECKING_CAN_LOCK, XW_EVENT_CAN_LOCK, XW_ACTION_FWD, XW_ST_ALLCONNECTED },
{ XW_ST_CHECKING_CAN_LOCK, XW_EVENT_CANT_LOCK, XW_ACTION_FWD, XW_ST_CONNECTING },
{ XW_ST_CHECKINGDEST, XW_EVENT_DESTBAD, XW_ACTION_NONE, XW_ST_CONNECTING },
/* Timeout before all connected */
{ XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_CONNECTING, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_CONNECTING },
{ XW_ST_ALLCONNECTED, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_NOMORESOCKETS, XW_ACTION_NONE, XW_ST_DEAD },
/* This is the entry we'll use most of the time */
{ XW_ST_ALLCONNECTED, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED },
{ XW_ST_ALLCONNECTED, XW_EVENT_CONNTIMER, XW_ACTION_NONE, XW_ST_ALLCONNECTED },
/* Heartbeat arrived */
{ XW_ST_CONNECTING, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_CONNECTING },
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_ALLCONNECTED },
@ -92,6 +102,9 @@ StateTable g_stateTable[] = {
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_ALLCONNECTED },
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_ANY, XW_ACTION_NONE, XW_ST_DEAD },
/* Reconnect. Just like a connect but cookieID is supplied. Can it
happen in the middle of a game when state is XW_ST_ALLCONNECTED? */
@ -107,10 +120,12 @@ getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent,
{
StateTable* stp = g_stateTable;
while ( stp->stateStart != XW_ST_NONE ) {
if ( stp->stateStart == curState && stp->stateEvent == curEvent ) {
*takeAction = stp->stateAction;
*nextState = stp->stateEnd;
return 1;
if ( stp->stateStart == curState ) {
if ( stp->stateEvent == curEvent || stp->stateEvent == XW_EVENT_ANY ) {
*takeAction = stp->stateAction;
*nextState = stp->stateEnd;
return 1;
}
}
++stp;
}
@ -140,6 +155,7 @@ stateString( XW_RELAY_STATE state )
CASESTR(XW_ST_DEAD);
CASESTR(XW_ST_CHECKING_CONN);
CASESTR(XW_ST_CHECKINGDEST);
CASESTR(XW_ST_CHECKING_CAN_LOCK);
}
assert(0);
return "";
@ -162,7 +178,11 @@ eventString( XW_RELAY_EVENT evt )
CASESTR(XW_EVENT_ALLHEREMSG);
CASESTR(XW_EVENT_DESTOK);
CASESTR(XW_EVENT_DESTBAD);
CASESTR(XW_EVENT_CAN_LOCK);
CASESTR(XW_EVENT_CANT_LOCK);
CASESTR(XW_EVENT_ANY);
CASESTR(XW_EVENT_REMOVESOCKET);
CASESTR(XW_EVENT_NOMORESOCKETS);
}
assert(0);
return "";

View file

@ -60,6 +60,9 @@ typedef enum {
,XW_ST_CHECKINGDEST /* Checking for valid socket */
,XW_ST_CHECKING_CAN_LOCK /* Is this message one that implies all
players are present? */
,XW_ST_DEAD /* About to kill the object */
} XW_RELAY_STATE;
@ -97,7 +100,16 @@ typedef enum {
,XW_EVENT_DESTBAD
,XW_EVENT_CAN_LOCK /* ready to stop allowing new connections */
,XW_EVENT_CANT_LOCK /* can't disallow new connections yet */
,XW_EVENT_HEARTFAILED
,XW_EVENT_REMOVESOCKET /* Need to remove socket from this cref */
,XW_EVENT_NOMORESOCKETS /* last socket's been removed */
,XW_EVENT_ANY /* wildcard; matches all */
} XW_RELAY_EVENT;
@ -105,6 +117,8 @@ typedef enum {
typedef enum {
XW_ACTION_NONE
,XW_ACTION_SEND_1ST_RSP
,XW_ACTION_SENDRSP /* Send a connection response */
,XW_ACTION_FWD /* Forward a message */
@ -115,12 +129,16 @@ typedef enum {
,XW_ACTION_DISCONNECTALL
,XW_ACTION_HEARTOK /* allows transition back to stationary state */
,XW_ACTION_LOCKGAME /* lock the cref/cookie session to new hosts */
,XW_ACTION_HEARTOK /* allows transition back to stationary
state */
,XW_ACTION_CHECKDEST /* check that a given hostID has a socket */
,XW_ACTION_REMOVESOCKET
,XW_ACTION_CHECK_CAN_LOCK /* check whether this message implies all
expected players present */
} XW_RELAY_ACTION;

View file

@ -29,6 +29,7 @@
#include "tpool.h"
#include "xwrelay_priv.h"
#include "xwrelay.h"
#include "timermgr.h"
#include "mlock.h"
XWThreadPool* XWThreadPool::g_instance = NULL;
@ -211,22 +212,11 @@ XWThreadPool::interrupt_poll()
}
}
static int
figureTimeout()
{
return -1;
}
static void
considerFireTimer()
{
/* logf( "timer fired" ); */
}
void*
XWThreadPool::real_listener()
{
int flags = POLLIN | POLLERR | POLLHUP;
TimerMgr* tmgr = TimerMgr::getTimerMgr();
for ( ; ; ) {
@ -252,14 +242,15 @@ XWThreadPool::real_listener()
}
pthread_rwlock_unlock( &m_activeSocketsRWLock );
int nMillis = figureTimeout();
int nMillis = tmgr->getPollTimeout();
logf( "calling poll on %s", log );
int nEvents = poll( fds, nSockets, nMillis ); /* -1: infinite timeout */
logf( "back from poll: %d", nEvents );
if ( nEvents < 0 ) {
if ( nEvents == 0 ) {
tmgr->fireElapsedTimers();
} else if ( nEvents < 0 ) {
logf( "errno: %d", errno );
}
}
if ( fds[0].revents != 0 ) {
logf( "poll interrupted" );
@ -300,8 +291,6 @@ XWThreadPool::real_listener()
assert( nEvents == 0 );
}
considerFireTimer();
free( fds );
free( log );
}

View file

@ -47,16 +47,19 @@
#include <assert.h>
#include <sys/select.h>
#include <stdarg.h>
#include <getopt.h>
#include <sys/time.h>
#include "xwrelay.h"
#include "cref.h"
#include "crefmgr.h"
#include "ctrl.h"
#include "mlock.h"
#include "tpool.h"
#include "configs.h"
#include "timermgr.h"
#define N_WORKER_THREADS 5
#define MILLIS 1000000
#define MILLIS 1000
void
logf( const char* format, ... )
@ -105,9 +108,10 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket )
CookieID cookieID = getNetLong( &buf );
HostID hostID = getNetShort( &buf );
logf( "processHeartbeat: cookieID 0x%lx, hostID 0x%x", cookieID, hostID );
CookieRef* cref = get_cookieRef( cookieID );
if ( cref != NULL ) {
cref->HandleHeartbeat( hostID, socket );
SafeCref scr( cookieID );
if ( scr.IsValid() ) {
scr.HandleHeartbeat( hostID, socket );
} else {
killSocket( socket, "no cref for socket" );
}
@ -176,16 +180,18 @@ send_with_length_unsafe( int socket, unsigned char* buf, int bufLen )
* game?
*/
static void
processConnect( unsigned char* bufp, int bufLen, int socket )
processConnect( unsigned char* bufp, int bufLen, int socket, int recon )
{
char cookie[MAX_COOKIE_LEN+1];
unsigned char* end = bufp + bufLen;
logf( "processConnect" );
cookie[0] = '\0';
unsigned char flags = *bufp++;
if ( flagsOK( flags ) ) {
if ( readCookie( &bufp, end, cookie ) ) {
if ( recon || readCookie( &bufp, end, cookie ) ) {
HostID srcID;
CookieID cookieID;
@ -194,9 +200,14 @@ processConnect( unsigned char* bufp, int bufLen, int socket )
srcID = getNetShort( &bufp );
cookieID = getNetLong( &bufp );
CookieRef* cref = get_make_cookieRef( cookie, cookieID );
assert( cref != NULL );
cref->Connect( socket, srcID );
SafeCref scr( cookie, cookieID );
if ( scr.IsValid() ) {
if ( recon ) {
scr.Reconnect( socket, srcID );
} else {
scr.Connect( socket, srcID );
}
}
}
}
} else {
@ -208,7 +219,7 @@ void
killSocket( int socket, char* why )
{
logf( "killSocket(%d): %s", socket, why );
SocketMgr::RemoveSocketRefs( socket );
CRefMgr::Get()->RemoveSocketRefs( socket );
/* Might want to kill the thread it belongs to if we're not in it,
e.g. when unable to write to another socket. */
logf( "killSocket done" );
@ -229,12 +240,14 @@ forwardMessage( unsigned char* buf, int buflen, int srcSocket )
unsigned char* bufp = buf + 1; /* skip cmd */
CookieID cookieID = getNetLong( &bufp );
logf( "cookieID = %d", cookieID );
CookieRef* cref = get_cookieRef( cookieID );
if ( cref != NULL ) {
SafeCref scr( cookieID );
if ( scr.IsValid() ) {
HostID src = getNetShort( &bufp );
HostID dest = getNetShort( &bufp );
cref->Forward( src, dest, buf, buflen );
scr.Forward( src, dest, buf, buflen );
success = 1;
}
return success;
@ -247,7 +260,11 @@ processMessage( unsigned char* buf, int bufLen, int socket )
switch( cmd ) {
case XWRELAY_CONNECT:
logf( "processMessage got XWRELAY_CONNECT" );
processConnect( buf+1, bufLen-1, socket );
processConnect( buf+1, bufLen-1, socket, 0 );
break;
case XWRELAY_RECONNECT:
logf( "processMessage got XWRELAY_RECONNECT" );
processConnect( buf+1, bufLen-1, socket, 1 );
break;
case XWRELAY_CONNECTRESP:
logf( "bad: processMessage got XWRELAY_CONNECTRESP" );
@ -263,7 +280,6 @@ processMessage( unsigned char* buf, int bufLen, int socket )
logf( "processMessage got XWRELAY_MSG_TORELAY" );
if ( !forwardMessage( buf, bufLen, socket ) ) {
killSocket( socket, "couldn't forward message" );
} else {
}
break;
default:
@ -298,42 +314,142 @@ make_socket( unsigned long addr, unsigned short port )
} /* make_socket */
static void
sighandler( int signal )
HeartbeatProc( void* closure )
{
vector<int> victims;
CheckHeartbeats( now(), &victims );
CRefMgr::Get()->CheckHeartbeats( now(), &victims );
unsigned int i;
for ( i = 0; i < victims.size(); ++i ) {
killSocket( victims[i], "heartbeat check failed" );
}
} /* sighandler */
} /* HeartbeatProc */
enum { FLAG_HELP
,FLAG_CONFFILE
,FLAG_PORT
,FLAG_CPORT
,FLAG_NTHREADS
};
struct option longopts[] = {
{
"help",
0,
NULL,
FLAG_HELP
}
,{
"conffile",
1,
NULL,
FLAG_CONFFILE
}
,{
"port",
1,
NULL,
FLAG_PORT
}
,{
"ctrlport",
1,
NULL,
FLAG_CPORT
}
,{
"nthreads",
1,
NULL,
FLAG_NTHREADS
}
};
static void
usage( char* arg0 )
{
unsigned int i;
fprintf( stderr, "usage: %s \\\n", arg0 );
for ( i = 0; i < sizeof(longopts)/sizeof(longopts[0]); ++i ) {
struct option* opt = &longopts[i];
fprintf( stderr, "\t--%s", opt->name );
if ( opt->has_arg ) {
fprintf( stderr, " <%s>", opt->name );
}
fprintf( stderr, "\\\n" );
}
}
int main( int argc, char** argv )
{
int port = 10999;
int port = 0;
int ctrlport = 0;
int nWorkerThreads = 0;
char* conffile = NULL;
if ( argc > 1 ) {
port = atoi( argv[1] );
/* Read options. Options trump config file values when they conflict, but
the name of the config file is an option so we have to get that
first. */
for ( ; ; ) {
int opt = getopt_long(argc, argv, "hc:p:l:",longopts, NULL);
if ( opt == -1 ) {
break;
}
switch( opt ) {
case FLAG_HELP:
usage( argv[0] );
exit( 0 );
case FLAG_CONFFILE:
conffile = optarg;
break;
case FLAG_PORT:
port = atoi( optarg );
break;
case FLAG_CPORT:
ctrlport = atoi( optarg );
break;
case FLAG_NTHREADS:
nWorkerThreads = atoi( optarg );
break;
default:
usage( argv[0] );
exit( 1 );
}
}
/* Open a listening socket. For each received message, fork a thread into
which relevant stuff is passed. */
RelayConfigs::InitConfigs( conffile );
RelayConfigs* cfg = RelayConfigs::GetConfigs();
if ( port == 0 ) {
port = cfg->GetPort();
}
if ( ctrlport == 0 ) {
ctrlport = cfg->GetCtrlPort();
}
if ( nWorkerThreads == 0 ) {
nWorkerThreads = cfg->GetNWorkerThreads();
}
int listener = make_socket( INADDR_ANY, port );
if ( listener == -1 ) {
exit( 1 );
}
int control = make_socket( INADDR_LOOPBACK, port + 1 );
int control = make_socket( INADDR_LOOPBACK, ctrlport );
if ( control == -1 ) {
exit( 1 );
}
/* generate a signal after n milliseconds, then every m milliseconds */
(void)signal( SIGALRM, sighandler );
(void)ualarm( 2 * HEARTBEAT * MILLIS, 2 * HEARTBEAT* MILLIS );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( N_WORKER_THREADS, processMessage );
tPool->Setup( nWorkerThreads, processMessage );
short heartbeat = cfg->GetHeartbeatInterval();
TimerMgr::getTimerMgr()->setTimer( heartbeat, HeartbeatProc, NULL,
heartbeat );
/* set up select call */
fd_set rfds;
@ -349,7 +465,9 @@ int main( int argc, char** argv )
int retval = select( highest, &rfds, NULL, NULL, NULL );
if ( retval < 0 ) {
logf( "errno: %d", errno );
if ( errno != 4 ) { /* 4's what we get when signal interrupts */
logf( "errno: %d", errno );
}
} else {
if ( FD_ISSET( listener, &rfds ) ) {
struct sockaddr_in newaddr;
@ -372,5 +490,8 @@ int main( int argc, char** argv )
close( listener );
close( control );
delete cfg;
return 0;
} // main

View file

@ -43,12 +43,6 @@ enum { XWRELAY_NONE /* 0 is an illegal value */
XWRELAY_RECONNECT. Format: heartbeat_seconds: 2; connectionID:
2; */
, XWRELAY_LOCKGAME
/* Sent by a participant in game when it's satisfied that all desired
participants are present. On seeing this message the relay goes
into a state where no further connection requests will be
allowed. */
, XWRELAY_CONNECTDENIED
/* The relay says go away. Format: reason code: 1 */