Send disconnect events with error codes on heartbeat and connect timer

failures; change how state machine handles heartbeats: only put events
into the machine when there's a failure.
This commit is contained in:
ehouse 2005-09-03 06:55:08 +00:00
parent 497b589558
commit 175f4eb87d
7 changed files with 120 additions and 58 deletions

View file

@ -148,6 +148,18 @@ CookieRef::SocketForHost( HostID dest )
return socket;
}
void
CookieRef::notifyDisconn( const CRefEvent* evt )
{
int socket = evt->u.disnote.socket;
unsigned char buf[2];
buf[0] = XWRELAY_DISCONNECT;
buf[1] = evt->u.disnote.why;
send_with_length( socket, buf, sizeof(buf) );
} /* notifyDisconn */
void
CookieRef::removeSocket( const CRefEvent* evt )
{
@ -177,7 +189,6 @@ CookieRef::removeSocket( const CRefEvent* evt )
}
} /* Remove */
int
CookieRef::HasSocket( int socket )
{
@ -205,11 +216,21 @@ CookieRef::_HandleHeartbeat( HostID id, int socket )
} /* HandleHeartbeat */
void
CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
CookieRef::_CheckHeartbeats( time_t now )
{
logf( "CookieRef::CheckHeartbeats" );
MutexLock ml( &m_EventsMutex );
pushHeartTimerEvent( now, victims );
logf( "CookieRef::_CheckHeartbeats" );
MutexLock ml( &m_EventsMutex );
{
RWReadLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > GetHeartbeat() * 2 ) {
pushHeartFailedEvent( iter->second.m_socket );
}
++iter;
}
}
handleEvents();
} /* CheckHeartbeats */
@ -236,7 +257,7 @@ CookieRef::pushConnectEvent( int socket, HostID srcID )
evt.type = XW_EVENT_CONNECTMSG;
evt.u.con.socket = socket;
evt.u.con.srcID = srcID;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
} /* pushConnectEvent */
void
@ -246,7 +267,7 @@ CookieRef::pushReconnectEvent( int socket, HostID srcID )
evt.type = XW_EVENT_RECONNECTMSG;
evt.u.recon.socket = socket;
evt.u.recon.srcID = srcID;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
} /* pushConnectEvent */
void
@ -256,7 +277,16 @@ CookieRef::pushHeartbeatEvent( HostID id, int socket )
evt.type = XW_EVENT_HEARTMSG;
evt.u.heart.id = id;
evt.u.heart.socket = socket;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
CookieRef::pushHeartFailedEvent( int socket )
{
CRefEvent evt;
evt.type = XW_EVENT_HEARTFAILED;
evt.u.heart.socket = socket;
m_eventQueue.push_back( evt );
}
void
@ -266,7 +296,7 @@ CookieRef::pushHeartTimerEvent( time_t now, vector<int>* victims )
evt.type = XW_EVENT_HEARTTIMER;
evt.u.htime.now = now;
evt.u.htime.victims = victims;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -279,7 +309,7 @@ CookieRef::pushForwardEvent( HostID src, HostID dest,
evt.u.fwd.dest = dest;
evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -288,7 +318,17 @@ CookieRef::pushRemoveSocketEvent( int socket )
CRefEvent evt;
evt.type = XW_EVENT_REMOVESOCKET;
evt.u.rmsock.socket = socket;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
CookieRef::pushNotifyDisconEvent( int socket, XWREASON why )
{
CRefEvent evt;
evt.type = XW_EVENT_NOTIFYDISCON;
evt.u.disnote.socket = socket;
evt.u.disnote.why = why;
m_eventQueue.push_back( evt );
}
void
@ -296,7 +336,7 @@ CookieRef::pushDestBadEvent()
{
CRefEvent evt;
evt.type = XW_EVENT_DESTBAD;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -305,7 +345,7 @@ CookieRef::pushDestOkEvent( const CRefEvent* oldEvt )
CRefEvent evt;
memcpy( &evt, oldEvt, sizeof(evt) );
evt.type = XW_EVENT_DESTOK;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -314,7 +354,7 @@ CookieRef::pushCanLockEvent( const CRefEvent* oldEvt )
CRefEvent evt;
memcpy( &evt, oldEvt, sizeof(evt) );
evt.type = XW_EVENT_CAN_LOCK;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -323,7 +363,7 @@ CookieRef::pushCantLockEvent( const CRefEvent* oldEvt )
CRefEvent evt;
memcpy( &evt, oldEvt, sizeof(evt) );
evt.type = XW_EVENT_CANT_LOCK;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -331,7 +371,7 @@ CookieRef::pushLastSocketGoneEvent()
{
CRefEvent evt;
evt.type = XW_EVENT_NOMORESOCKETS;
m_eventQueue.push_front( evt );
m_eventQueue.push_back( evt );
}
void
@ -369,8 +409,11 @@ CookieRef::handleEvents()
checkFromServer( &evt );
break;
case XW_ACTION_DISCONNECTALL:
disconnectAll( &evt );
case XW_ACTION_TIMERDISCONNECT:
disconnectSockets( 0, XWRELAY_ERROR_TIMEOUT );
break;
case XW_ACTION_HEARTDISCONNECT:
disconnectSockets( evt.u.heart.socket, XWRELAY_ERROR_HEART );
break;
case XW_ACTION_NOTEHEART:
@ -381,6 +424,10 @@ CookieRef::handleEvents()
checkHeartbeats( &evt );
break;
case XW_ACTION_NOTIFYDISCON:
notifyDisconn( &evt );
break;
case XW_ACTION_REMOVESOCKET:
removeSocket( &evt );
break;
@ -517,16 +564,21 @@ CookieRef::checkFromServer( const CRefEvent* evt )
}
void
CookieRef::disconnectAll( const CRefEvent* evt )
CookieRef::disconnectSockets( int socket, XWREASON why )
{
logf( "disconnectAll" );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
pushRemoveSocketEvent( iter->second.m_socket );
++iter;
if ( socket == 0 ) {
RWReadLock ml( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
assert( iter->second.m_socket != 0 );
disconnectSockets( iter->second.m_socket, why );
++iter;
}
} else {
pushNotifyDisconEvent( socket, why );
pushRemoveSocketEvent( socket );
}
logf( "disconnectAll done" );
}
} /* disconnectSockets */
void
CookieRef::noteHeartbeat( const CRefEvent* evt )
@ -555,17 +607,15 @@ void
CookieRef::checkHeartbeats( const CRefEvent* evt )
{
int vcount = 0;
vector<int>* victims = evt->u.htime.victims;
time_t now = evt->u.htime.now;
RWWriteLock rwl( &m_sockets_rwlock );
RWReadLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > GetHeartbeat() * 2 ) {
victims->push_back( iter->second.m_socket );
++vcount;
pushHeartFailedEvent( iter->second.m_socket );
}
++iter;
}
@ -574,7 +624,7 @@ CookieRef::checkHeartbeats( const CRefEvent* evt )
/* Post an event */
CRefEvent newEvt;
newEvt.type = vcount > 0 ? XW_EVENT_HEARTFAILED : XW_EVENT_HEARTOK;
m_eventQueue.push_front( newEvt );
m_eventQueue.push_back( newEvt );
} /* checkHeartbeats */
/* timer callback */
@ -596,7 +646,7 @@ CookieRef::_CheckAllConnected()
MutexLock ml( &m_EventsMutex );
CRefEvent newEvt;
newEvt.type = XW_EVENT_CONNTIMER;
m_eventQueue.push_front( newEvt );
m_eventQueue.push_back( newEvt );
handleEvents();
}

View file

@ -28,6 +28,7 @@
#include <deque>
#include <pthread.h>
#include "xwrelay_priv.h"
#include "xwrelay.h"
#include "states.h"
using namespace std;
@ -59,8 +60,6 @@ class CookieRef {
int NotFullyConnected() { return m_curState != XW_ST_ALLCONNECTED; }
void CheckHeartbeats( time_t now, vector<int>* victims );
/* for console */
void _PrintCookieInfo( string& out );
void PrintSocketInfo( string& out, int socket );
@ -75,6 +74,7 @@ class CookieRef {
void _Connect( int socket, HostID srcID );
void _Reconnect( int socket, HostID srcID );
void _HandleHeartbeat( HostID id, int socket );
void _CheckHeartbeats( time_t now );
void _Forward( HostID src, HostID dest, unsigned char* buf, int buflen );
void _Remove( int socket );
void _CheckAllConnected();
@ -115,6 +115,10 @@ class CookieRef {
struct {
int socket;
} rmsock;
struct {
int socket;
XWREASON why;
} disnote;
} u;
} CRefEvent;
@ -131,6 +135,8 @@ class CookieRef {
void pushConnectEvent( int socket, HostID srcID );
void pushReconnectEvent( int socket, HostID srcID );
void pushHeartbeatEvent( HostID id, int socket );
void pushHeartFailedEvent( int socket );
void pushHeartTimerEvent( time_t now, vector<int>* victims );
void pushForwardEvent( HostID src, HostID dest, unsigned char* buf,
@ -138,6 +144,7 @@ class CookieRef {
void pushDestBadEvent();
void pushLastSocketGoneEvent();
void pushRemoveSocketEvent( int socket );
void pushNotifyDisconEvent( int socket, XWREASON why );
void pushDestOkEvent( const CRefEvent* evt );
void pushCanLockEvent( const CRefEvent* evt );
@ -153,10 +160,12 @@ class CookieRef {
void checkDest( const CRefEvent* evt );
void checkFromServer( const CRefEvent* evt );
void disconnectAll(const CRefEvent* evt);
void disconnectSockets( int socket, XWREASON why );
void noteHeartbeat(const CRefEvent* evt);
void checkHeartbeats(const CRefEvent* evt);
void notifyDisconn(const CRefEvent* evt);
void removeSocket(const CRefEvent* evt);
/* timer callback */
static void s_checkAllConnected( void* closure );

View file

@ -313,13 +313,15 @@ CRefMgr::getCookieRef_impl( CookieID cookieID )
}
void
CRefMgr::CheckHeartbeats( time_t now, vector<int>* sockets )
CRefMgr::CheckHeartbeats( time_t now )
{
RWReadLock rwl( &m_cookieMapRWLock );
CookieMap::iterator iter = m_cookieMap.begin();
while ( iter != m_cookieMap.end() ) {
CookieRef* ref = iter->second;
ref->CheckHeartbeats( now, sockets );
SafeCref scr( iter->second );
if ( scr.IsValid() ) {
scr.CheckHeartbeats( now );
}
++iter;
}
} /* CheckHeartbeats */

View file

@ -57,7 +57,7 @@ class CRefMgr {
CookieMapIterator GetCookieIterator();
/* PENDING. These need to go through SafeCref */
void CheckHeartbeats( time_t now, vector<int>* sockets );
void CheckHeartbeats( time_t now );
void Delete( CookieID id );
void Delete( CookieRef* cref );
void Delete( const char* name );
@ -128,6 +128,9 @@ class SafeCref {
void HandleHeartbeat( HostID id, int socket ) {
m_cref->_HandleHeartbeat( id, socket );
}
void CheckHeartbeats( time_t now ) {
m_cref->_CheckHeartbeats( now );
}
void PrintCookieInfo( string& out ) {
m_cref->_PrintCookieInfo( out );
}

View file

@ -77,14 +77,16 @@ StateTable g_stateTable[] = {
{ XW_ST_CHECKINGDEST, XW_EVENT_DESTBAD, XW_ACTION_NONE, XW_ST_CONNECTING },
/* Timeout before all connected */
{ XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_CONNECTING, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_TIMERDISCONNECT,XW_ST_DEAD },
{ XW_ST_CONNECTING, XW_EVENT_HEARTFAILED, XW_ACTION_HEARTDISCONNECT, XW_ST_CONNECTING },
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_HEARTDISCONNECT, XW_ST_ALLCONNECTED },
{ XW_ST_CONNECTING, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_CONNECTING },
{ XW_ST_ALLCONNECTED, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_CONNECTING },
{ XW_ST_CONNECTING, XW_EVENT_NOMORESOCKETS, XW_ACTION_NONE, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_NOMORESOCKETS, XW_ACTION_NONE, XW_ST_DEAD },
/* This is the entry we'll use most of the time */
{ XW_ST_ALLCONNECTED, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED },
@ -94,21 +96,15 @@ StateTable g_stateTable[] = {
{ XW_ST_CONNECTING, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_CONNECTING },
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_ALLCONNECTED },
/* Heartbeat timer means check for dead connections. Post event to self if there's a problem. */
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTED },
{ XW_ST_CONNECTING, XW_EVENT_NOTIFYDISCON, XW_ACTION_NOTIFYDISCON, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_NOTIFYDISCON, XW_ACTION_NOTIFYDISCON, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_DEAD },
{ XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_CONNECTING },
{ XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_ALLCONNECTED },
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_REMOVESOCKET, XW_ACTION_REMOVESOCKET, XW_ST_DEAD },
{ XW_ST_DEAD, XW_EVENT_ANY, XW_ACTION_NONE, XW_ST_DEAD },
// { XW_ST_DEAD, XW_EVENT_ANY, XW_ACTION_NONE, XW_ST_DEAD },
/* Reconnect. Just like a connect but cookieID is supplied. Can it
happen in the middle of a game when state is XW_ST_ALLCONNECTED? */
/* Marks end of table */
{ XW_ST_NONE, XW_EVENT_NONE, XW_ACTION_NONE, XW_ST_NONE }
};
@ -183,6 +179,7 @@ eventString( XW_RELAY_EVENT evt )
CASESTR(XW_EVENT_ANY);
CASESTR(XW_EVENT_REMOVESOCKET);
CASESTR(XW_EVENT_NOMORESOCKETS);
CASESTR(XW_EVENT_NOTIFYDISCON);
}
assert(0);
return "";

View file

@ -107,6 +107,8 @@ typedef enum {
,XW_EVENT_REMOVESOCKET /* Need to remove socket from this cref */
,XW_EVENT_NOTIFYDISCON /* Send a discon */
,XW_EVENT_NOMORESOCKETS /* last socket's been removed */
,XW_EVENT_ANY /* wildcard; matches all */
@ -128,17 +130,22 @@ typedef enum {
,XW_ACTION_CHECKHEART /* Check for heartbeats */
,XW_ACTION_DISCONNECTALL
,XW_ACTION_TIMERDISCONNECT /* disconnect all because of a timer */
,XW_ACTION_HEARTOK /* allows transition back to stationary
state */
,XW_ACTION_CHECKDEST /* check that a given hostID has a socket */
,XW_ACTION_NOTIFYDISCON
,XW_ACTION_REMOVESOCKET
,XW_ACTION_CHECK_CAN_LOCK /* check whether this message implies all
expected players present */
,XW_ACTION_HEARTDISCONNECT
} XW_RELAY_ACTION;

View file

@ -316,13 +316,7 @@ make_socket( unsigned long addr, unsigned short port )
static void
HeartbeatProc( void* closure )
{
vector<int> victims;
CRefMgr::Get()->CheckHeartbeats( now(), &victims );
unsigned int i;
for ( i = 0; i < victims.size(); ++i ) {
killSocket( victims[i], "heartbeat check failed" );
}
CRefMgr::Get()->CheckHeartbeats( now() );
} /* HeartbeatProc */
enum { FLAG_HELP