move to using a formal table-driven state machine for each game (cref

object).  While it makes things more complex at this point, it should
make it easier in the long run to add error handling, timeouts, etc.
This commit is contained in:
ehouse 2005-07-05 22:05:37 +00:00
parent d5858f7a21
commit db6f6c258d
6 changed files with 542 additions and 91 deletions

View file

@ -16,7 +16,7 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
CC = g++
SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp
SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp states.cpp
OBJ = $(patsubst %.cpp,%.o,$(SRC))
LDFLAGS += -lpthread -g
CPPFLAGS += -g -Wall -DHEARTBEAT=10

View file

@ -22,11 +22,13 @@
#include <map>
#include <assert.h>
#include <pthread.h>
#include <netinet/in.h>
#include "cref.h"
#include "xwrelay.h"
#include "mlock.h"
#include "tpool.h"
#include "states.h"
using namespace std;
@ -73,8 +75,7 @@ CookieIdForName( const char* name )
CookieMap::iterator iter = gCookieMap.begin();
while ( iter != gCookieMap.end() ) {
ref = iter->second;
if ( ref->Name() == s ) {
ref = iter->second;
if ( ref->Name() == s && ref->NotFullyConnected() ) {
return ref->GetCookieID();
}
++iter;
@ -211,7 +212,6 @@ SocketMgr::RemoveSocketRefs( int socket )
{
CookieRef* cref = CookieRefForSocket( socket );
if ( cref != NULL ) {
MutexLock ml( &ms_SocketStuffMutex );
SocketMap::iterator iter = ms_SocketStuff.find( socket );
assert( iter != ms_SocketStuff.end() );
@ -269,10 +269,12 @@ SocketsIterator::Next()
*****************************************************************************/
CookieRef::CookieRef(string s)
: m_name(s),
m_totalSent(0)
: m_name(s)
, m_totalSent(0)
, m_curState(XW_ST_INITED)
{
pthread_rwlock_init( &m_sockets_rwlock, NULL );
pthread_mutex_init( &m_EventsMutex, NULL );
m_connectionID = ms_nextConnectionID++; /* needs a mutex!!! */
}
@ -299,13 +301,13 @@ CookieRef::~CookieRef()
}
void
CookieRef::Associate( int socket, HostID srcID )
CookieRef::Connect( int socket, HostID srcID )
{
assert( srcID != HOST_ID_NONE );
logf( "remembering pair: hostid=%x, socket=%d", srcID, socket );
RWWriteLock ml( &m_sockets_rwlock );
HostRec hr(socket);
m_hostSockets.insert( pair<HostID,HostRec>(srcID,hr) );
SocketMgr::Associate( socket, this );
MutexLock ml( &m_EventsMutex );
pushConnectEvent( socket, srcID );
handleEvents();
}
int
@ -343,6 +345,9 @@ CookieRef::Remove( int socket )
}
}
/* Does this belong here or at a higher level? */
XWThreadPool::GetTPool()->CloseSocket( socket );
if ( count == 0 ) {
ForgetCref( this );
delete this;
@ -352,6 +357,204 @@ CookieRef::Remove( int socket )
void
CookieRef::HandleHeartbeat( HostID id, int socket )
{
MutexLock ml( &m_EventsMutex );
pushHeartbeatEvent( id, socket );
handleEvents();
} /* HandleHeartbeat */
void
CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
{
logf( "CookieRef::CheckHeartbeats" );
MutexLock ml( &m_EventsMutex );
pushHeartTimerEvent( now, victims );
handleEvents();
} /* CheckHeartbeats */
void
CookieRef::Forward( HostID src, HostID dest, unsigned char* buf, int buflen )
{
MutexLock ml( &m_EventsMutex );
pushForwardEvent( src, dest, buf, buflen );
handleEvents();
} /* Forward */
void
CookieRef::pushConnectEvent( int socket, HostID srcID )
{
CRefEvent evt;
evt.type = XW_EVENT_CONNECTMSG;
evt.u.con.socket = socket;
evt.u.con.srcID = srcID;
m_eventQueue.push_front( evt );
} /* pushConnectEvent */
void
CookieRef::pushHeartbeatEvent( HostID id, int socket )
{
CRefEvent evt;
evt.type = XW_EVENT_HEARTMSG;
evt.u.heart.id = id;
evt.u.heart.socket = socket;
m_eventQueue.push_front( evt );
}
void
CookieRef::pushHeartTimerEvent( time_t now, vector<int>* victims )
{
CRefEvent evt;
evt.type = XW_EVENT_HEARTTIMER;
evt.u.htime.now = now;
evt.u.htime.victims = victims;
m_eventQueue.push_front( evt );
}
void
CookieRef::pushForwardEvent( HostID src, HostID dest,
unsigned char* buf, int buflen )
{
CRefEvent evt;
evt.type = XW_EVENT_FORWARDMSG;
evt.u.fwd.src = src;
evt.u.fwd.dest = dest;
evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen;
m_eventQueue.push_front( evt );
}
void
CookieRef::handleEvents()
{
XW_RELAY_ACTION takeAction;
while ( m_eventQueue.size() > 0 ) {
CRefEvent evt = m_eventQueue.front();
m_eventQueue.pop_front();
if ( getFromTable( m_curState, evt.type, &takeAction, &m_nextState ) ) {
logf( "moving from state %s to state %s for event %s",
stateString(m_curState), stateString(m_nextState),
eventString(evt.type) );
switch( takeAction ) {
case XW_ACTION_SENDRSP:
sendResponse( &evt );
break;
case XW_ACTION_FWD:
forward( &evt );
break;
case XW_ACTION_DISCONNECTALL:
disconnectAll( &evt );
break;
case XW_ACTION_NOTEHEART:
noteHeartbeat( &evt );
break;
case XW_ACTION_CHECKHEART:
checkHeartbeats( &evt );
break;
case XW_ACTION_HEARTOK:
/* nothing to do for this */
break;
case XW_ACTION_NONE:
default:
assert(0);
break;
}
m_curState = m_nextState;
}
}
} /* handleEvents */
static void
send_with_length( int socket, unsigned char* buf, int bufLen )
{
SocketWriteLock slock( socket );
int ok = 0;
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
if ( nSent == 2 ) {
nSent = send( socket, buf, bufLen, 0 );
if ( nSent == bufLen ) {
logf( "sent %d bytes on socket %d", nSent, socket );
ok = 1;
}
}
if ( !ok ) {
killSocket( socket, "couldn't send" );
}
}
static void
putNetShort( unsigned char** bufpp, unsigned short s )
{
s = htons( s );
memcpy( *bufpp, &s, sizeof(s) );
*bufpp += sizeof(s);
}
void
CookieRef::sendResponse( const CRefEvent* evt )
{
int socket = evt->u.con.socket;
HostID id = evt->u.con.srcID;
assert( id != HOST_ID_NONE );
logf( "remembering pair: hostid=%x, socket=%d", id, socket );
RWWriteLock ml( &m_sockets_rwlock );
HostRec hr(socket);
m_hostSockets.insert( pair<HostID,HostRec>(id,hr) );
/* Now send the response */
unsigned char buf[5];
unsigned char* bufp = buf;
*bufp++ = XWRELAY_CONNECTRESP;
putNetShort( &bufp, GetHeartbeat() );
putNetShort( &bufp, GetCookieID() );
send_with_length( socket, buf, sizeof(buf) );
RecordSent( sizeof(buf), socket );
logf( "sent CONNECTIONRSP" );
} /* sendResponse */
void
CookieRef::forward( const CRefEvent* evt )
{
unsigned char* buf = evt->u.fwd.buf;
int buflen = evt->u.fwd.buflen;
HostID src = evt->u.fwd.src;
HostID dest = evt->u.fwd.dest;
int destSocket = SocketForHost( dest );
/* This is an ugly hack!!!! */
*buf = XWRELAY_MSG_FROMRELAY;
send_with_length( destSocket, buf, buflen );
/* also note that we've heard from src recently */
pushHeartbeatEvent( src, SocketForHost(src) );
} /* forward */
void
CookieRef::disconnectAll( const CRefEvent* evt )
{
}
void
CookieRef::noteHeartbeat( const CRefEvent* evt )
{
int socket = evt->u.heart.socket;
HostID id = evt->u.heart.id;
RWWriteLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.find(id);
@ -361,15 +564,17 @@ CookieRef::HandleHeartbeat( HostID id, int socket )
connection. An attack is the most likely explanation. */
assert( iter->second.m_socket == socket );
logf( "upping m_lastHeartbeat from %d to %d",
logf( "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, now() );
iter->second.m_lastHeartbeat = now();
} /* HandleHeartbeat */
} /* noteHeartbeat */
void
CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
CookieRef::checkHeartbeats( const CRefEvent* evt )
{
logf( "CookieRef::CheckHeartbeats" );
int vcount = 0;
vector<int>* victims = evt->u.htime.victims;
time_t now = evt->u.htime.now;
RWWriteLock rwl( &m_sockets_rwlock );
@ -378,11 +583,17 @@ CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > HEARTBEAT * 2 ) {
victims->push_back( iter->second.m_socket );
++vcount;
}
++iter;
}
logf( "CookieRef::CheckHeartbeats done" );
} /* CheckHeartbeats */
/* Post an event */
CRefEvent newEvt;
newEvt.type = vcount > 0 ? XW_EVENT_HEARTFAILED : XW_EVENT_HEARTOK;
m_eventQueue.push_front( newEvt );
} /* checkHeartbeats */
void
CookieRef::PrintCookieInfo( string& out )

View file

@ -6,8 +6,10 @@
#include <map>
#include <vector>
#include <string>
#include <deque>
#include <pthread.h>
#include "xwrelay_priv.h"
#include "states.h"
typedef unsigned short CookieID;
@ -35,7 +37,7 @@ class CookieRef {
/* Within this cookie, remember that this hostID and socket go together.
If the hostID is HOST_ID_SERVER, it's the server. */
void Associate( int socket, HostID srcID );
void Connect( int socket, HostID srcID );
short GetHeartbeat() { return HEARTBEAT; }
CookieID GetCookieID() { return m_connectionID; }
int SocketForHost( HostID dest );
@ -43,13 +45,11 @@ class CookieRef {
int CountSockets() { return m_hostSockets.size(); }
string Name() { return m_name; }
void RecordSent( int nBytes, int socket ) {
/* This really needs a lock.... */
m_totalSent += nBytes;
}
int NotFullyConnected() { return m_curState != XW_ST_ALLCONNECTED; }
void HandleHeartbeat( HostID id, int socket );
void CheckHeartbeats( time_t now, vector<int>* victims );
void Forward( HostID src, HostID dest, unsigned char* buf, int buflen );
/* for console */
void PrintCookieInfo( string& out );
@ -63,6 +63,51 @@ class CookieRef {
private:
CookieRef( string s );
typedef struct CRefEvent {
XW_RELAY_EVENT type;
union {
struct {
HostID src;
HostID dest;
unsigned char* buf;
int buflen;
} fwd;
struct {
int socket;
HostID srcID;
} con;
struct {
} recon;
struct {
HostID id;
int socket;
} heart;
struct {
time_t now;
vector<int>* victims;
} htime;
} u;
} CRefEvent;
void RecordSent( int nBytes, int socket ) {
/* This really needs a lock.... */
m_totalSent += nBytes;
}
void pushConnectEvent( int socket, HostID srcID );
void pushHeartbeatEvent( HostID id, int socket );
void pushHeartTimerEvent( time_t now, vector<int>* victims );
void pushForwardEvent( HostID src, HostID dest, unsigned char* buf,
int buflen );
void handleEvents();
void sendResponse(const CRefEvent* evt);
void forward(const CRefEvent* evt);
void disconnectAll(const CRefEvent* evt);
void noteHeartbeat(const CRefEvent* evt);
void checkHeartbeats(const CRefEvent* evt);
map<HostID,HostRec> m_hostSockets;
pthread_rwlock_t m_sockets_rwlock;
@ -70,8 +115,17 @@ class CookieRef {
string m_name;
int m_totalSent;
/* Guard the event queue. Only one thread at a time can post to the
queue, but once in a thread can post new events while processing
current ones. */
pthread_mutex_t m_EventsMutex;
XW_RELAY_STATE m_curState;
XW_RELAY_STATE m_nextState;
deque<CRefEvent> m_eventQueue;
static CookieID ms_nextConnectionID;
};
}; /* CookieRef */
typedef map<CookieID,CookieRef*> CookieMap;

129
xwords4/relay/states.cpp Normal file
View file

@ -0,0 +1,129 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2005 by Eric House (fixin@peak.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "assert.h"
#include "states.h"
#include "xwrelay_priv.h"
typedef struct StateTable {
XW_RELAY_STATE stateStart;
XW_RELAY_EVENT stateEvent;
XW_RELAY_ACTION stateAction;
XW_RELAY_STATE stateEnd; /* Do I need this? Or does the code that
performs the action determine the state? */
} StateTable;
StateTable g_stateTable[] = {
/* Initial msg comes in. Managing object created in init state, sends response */
{ XW_ST_INITED, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
/* Another connect msg comes in */
{ XW_ST_CONNECTING, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING },
/* Fwd message comes in, tells us we're now fully connected */
{ XW_ST_CONNECTING, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED },
{ XW_ST_CONNECTING, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_CONNECTING },
/* Timeout before all connected */
{ XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_CONNECTING, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTING },
/* This is the entry we'll use most of the time */
{ XW_ST_ALLCONNECTED, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED },
/* Heartbeat arrived */
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_ALLCONNECTED },
/* Heartbeat timer means check for dead connections. Post event to self if there's a problem. */
{ XW_ST_ALLCONNECTED, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTED },
{ XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_CONNECTING },
{ XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_ALLCONNECTED },
{ XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD },
/* Reconnect. Just like a connect but cookieID is supplied. Can it
happen in the middle of a game when state is XW_ST_ALLCONNECTED? */
/* Marks end of table */
{ XW_ST_NONE, XW_EVENT_NONE, XW_ACTION_NONE, XW_ST_NONE }
};
int
getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent,
XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState )
{
StateTable* stp = g_stateTable;
while ( stp->stateStart != XW_ST_NONE ) {
if ( stp->stateStart == curState && stp->stateEvent == curEvent ) {
*takeAction = stp->stateAction;
*nextState = stp->stateEnd;
return 1;
}
++stp;
}
logf( "unable to find transition from %s on event %s",
stateString(curState), eventString(curEvent) );
assert(0);
return 0;
} /* getFromTable */
#define CASESTR(s) case s: return #s
char*
stateString( XW_RELAY_STATE state )
{
switch( state ) {
CASESTR(XW_ST_NONE);
CASESTR(XW_ST_INITED);
CASESTR(XW_ST_CONNECTING);
CASESTR(XW_ST_ALLCONNECTED);
CASESTR(XW_ST_WAITING_RECON);
CASESTR(XW_ST_SENDING_DISCON);
CASESTR(XW_ST_DISCONNECTED);
CASESTR(XW_ST_HEARTCHECK_CONNECTING);
CASESTR(XW_ST_HEARTCHECK_CONNECTED);
CASESTR(XW_ST_DEAD);
}
assert(0);
return "";
}
char*
eventString( XW_RELAY_EVENT evt )
{
switch( evt ) {
CASESTR(XW_EVENT_NONE);
CASESTR(XW_EVENT_CONNECTMSG);
CASESTR(XW_EVENT_RECONNECTMSG);
CASESTR(XW_EVENT_FORWARDMSG);
CASESTR(XW_EVENT_HEARTMSG);
CASESTR(XW_EVENT_HEARTTIMER);
CASESTR(XW_EVENT_DISCONTIMER);
CASESTR(XW_EVENT_CONNTIMER);
CASESTR(XW_EVENT_HEARTOK);
CASESTR(XW_EVENT_HEARTFAILED);
}
assert(0);
return "";
}
#undef CASESTR

116
xwords4/relay/states.h Normal file
View file

@ -0,0 +1,116 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2005 by Eric House (fixin@peak.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _STATES_H_
#define _STATES_H_
/* states */
typedef enum {
XW_ST_NONE
,XW_ST_INITED /* Relay's running and the object's been
created, but nobody's signed up yet. This
is a very short-lived state since an
incoming connection is why the object was
created. */
,XW_ST_CONNECTING /* At least one device has connected, but no
packets have yet arrived to be
forwarded. */
,XW_ST_ALLCONNECTED /* All devices are connected and ready for the
relay to do its work. This is the state
we're in most of the time. */
,XW_ST_WAITING_RECON /* At least one device has been timed out or
sent a disconnect message. We can't flow
messages in this state, and will be killing
all connections if we don't hear back from
the missing guy soon. */
,XW_ST_HEARTCHECK_CONNECTING /* In the middle of checking heartbeat situation. */
,XW_ST_HEARTCHECK_CONNECTED /* Need two states so we know where to
go back to. */
,XW_ST_SENDING_DISCON /* We're in the process of pulling the plug on
all devices that remain connected. */
,XW_ST_DISCONNECTED /* We're about to kill this object. */
,XW_ST_DEAD /* About to kill the object */
} XW_RELAY_STATE;
/* events */
typedef enum {
XW_EVENT_NONE
,XW_EVENT_CONNECTMSG /* A device is connecting using the cookie for
this object */
,XW_EVENT_RECONNECTMSG /* A device is re-connecting using the
connID for this object */
,XW_EVENT_FORWARDMSG /* A message needs forwarding */
,XW_EVENT_HEARTMSG /* A heartbeat message arrived */
,XW_EVENT_HEARTTIMER /* Time to check for missing heartbeats */
,XW_EVENT_DISCONTIMER /* No reconnect received: time to kill the
remaining connections */
,XW_EVENT_CONNTIMER /* timer for did we get all players hooked
up */
,XW_EVENT_HEARTOK
,XW_EVENT_HEARTFAILED
} XW_RELAY_EVENT;
/* actions */
typedef enum {
XW_ACTION_NONE
,XW_ACTION_SENDRSP /* Send a connection response */
,XW_ACTION_FWD /* Forward a message */
,XW_ACTION_NOTEHEART /* Record heartbeat received */
,XW_ACTION_CHECKHEART /* Check for heartbeats */
,XW_ACTION_DISCONNECTALL
,XW_ACTION_HEARTOK /* allows transition back to stationary state */
} XW_RELAY_ACTION;
int getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent,
XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState );
char* stateString( XW_RELAY_STATE state );
char* eventString( XW_RELAY_EVENT evt );
#endif

View file

@ -89,14 +89,6 @@ getNetShort( unsigned char** bufpp )
return ntohs( tmp );
}
static void
putNetShort( unsigned char** bufpp, unsigned short s )
{
s = htons( s );
memcpy( *bufpp, &s, sizeof(s) );
*bufpp += sizeof(s);
}
static void
processHeartbeat( unsigned char* buf, int bufLen, int socket )
{
@ -104,7 +96,11 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket )
HostID hostID = getNetShort( &buf );
logf( "processHeartbeat: cookieID 0x%x, hostID 0x%x", cookieID, hostID );
CookieRef* cref = get_cookieRef( cookieID );
cref->HandleHeartbeat( hostID, socket );
if ( cref != NULL ) {
cref->HandleHeartbeat( hostID, socket );
} else {
killSocket( socket, "no cref for socket" );
}
} /* processHeartbeat */
/* A CONNECT message from a device gives us the hostID and socket we'll
@ -138,8 +134,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket )
if ( bufp == end ) {
cref = get_make_cookieRef( cookie, connID );
assert( cref != NULL );
cref->Associate( socket, srcID );
SocketMgr::Associate( socket, cref );
cref->Connect( socket, srcID );
}
}
}
@ -162,45 +157,10 @@ now()
return (unsigned long)time(NULL);
}
static void
send_with_length( int socket, unsigned char* buf, int bufLen )
{
SocketWriteLock slock( socket );
int ok = 0;
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
if ( nSent == 2 ) {
nSent = send( socket, buf, bufLen, 0 );
if ( nSent == bufLen ) {
logf( "sent %d bytes on socket %d", nSent, socket );
ok = 1;
}
}
if ( !ok ) {
killSocket( socket, "couldn't send" );
}
}
static void
sendConnResp( CookieRef* cref, int socket )
{
/* send cmd, heartbeat, connid */
unsigned char buf[5];
unsigned char* bufp = buf;
*bufp++ = XWRELAY_CONNECTRESP;
putNetShort( &bufp, cref->GetHeartbeat() );
putNetShort( &bufp, cref->GetCookieID() );
send_with_length( socket, buf, sizeof(buf) );
cref->RecordSent( sizeof(buf), socket );
logf( "sent CONNECTIONRSP" );
}
/* forward the message. Need only change the command after looking up the
* socket and it's ready to go. */
static int
forwardMessage( unsigned char* buf, int bufLen, int srcSocket )
forwardMessage( unsigned char* buf, int buflen, int srcSocket )
{
int success = 0;
unsigned char* bufp = buf + 1; /* skip cmd */
@ -208,25 +168,11 @@ forwardMessage( unsigned char* buf, int bufLen, int srcSocket )
logf( "cookieID = %d", cookieID );
CookieRef* cref = get_cookieRef( cookieID );
if ( cref != NULL ) {
HostID src = getNetShort( &bufp );
/* we heard from host: good as a heartbeat */
cref->HandleHeartbeat( src, srcSocket );
HostID dest = getNetShort( &bufp );
logf( "forwarding from %x to %x", src, dest );
int destSocket = cref->SocketForHost( dest );
logf( "got socket %d for dest %x", destSocket, dest );
if ( destSocket != -1 ) {
*buf = XWRELAY_MSG_FROMRELAY;
send_with_length( destSocket, buf, bufLen );
cref->RecordSent( bufLen, destSocket );
success = 1;
} else if ( dest == HOST_ID_SERVER ) {
logf( "server not connected yet; fail silently" );
success = 1;
}
cref->Forward( src, dest, buf, buflen );
success = 1;
}
return success;
} /* forwardMessage */
@ -241,11 +187,6 @@ processMessage( unsigned char* buf, int bufLen, int socket )
case XWRELAY_CONNECT:
logf( "processMessage got XWRELAY_CONNECT" );
cref = processConnect( buf+1, bufLen-1, socket );
if ( cref != NULL ) {
sendConnResp( cref, socket );
} else {
killSocket( socket, "no cref found" );
}
break;
case XWRELAY_CONNECTRESP:
logf( "bad: processMessage got XWRELAY_CONNECTRESP" );