From db6f6c258d52e9b33fbd8393f18583c34c3d93bd Mon Sep 17 00:00:00 2001 From: ehouse Date: Tue, 5 Jul 2005 22:05:37 +0000 Subject: [PATCH] move to using a formal table-driven state machine for each game (cref object). While it makes things more complex at this point, it should make it easier in the long run to add error handling, timeouts, etc. --- xwords4/relay/Makefile | 2 +- xwords4/relay/cref.cpp | 243 +++++++++++++++++++++++++++++++++++--- xwords4/relay/cref.h | 66 ++++++++++- xwords4/relay/states.cpp | 129 ++++++++++++++++++++ xwords4/relay/states.h | 116 ++++++++++++++++++ xwords4/relay/xwrelay.cpp | 77 ++---------- 6 files changed, 542 insertions(+), 91 deletions(-) create mode 100644 xwords4/relay/states.cpp create mode 100644 xwords4/relay/states.h diff --git a/xwords4/relay/Makefile b/xwords4/relay/Makefile index 5cdf546c9..44db05f65 100644 --- a/xwords4/relay/Makefile +++ b/xwords4/relay/Makefile @@ -16,7 +16,7 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. CC = g++ -SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp +SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp states.cpp OBJ = $(patsubst %.cpp,%.o,$(SRC)) LDFLAGS += -lpthread -g CPPFLAGS += -g -Wall -DHEARTBEAT=10 diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index cc4375f28..76d1b862b 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -22,11 +22,13 @@ #include #include #include +#include #include "cref.h" #include "xwrelay.h" #include "mlock.h" #include "tpool.h" +#include "states.h" using namespace std; @@ -73,8 +75,7 @@ CookieIdForName( const char* name ) CookieMap::iterator iter = gCookieMap.begin(); while ( iter != gCookieMap.end() ) { ref = iter->second; - if ( ref->Name() == s ) { - ref = iter->second; + if ( ref->Name() == s && ref->NotFullyConnected() ) { return ref->GetCookieID(); } ++iter; @@ -211,7 +212,6 @@ SocketMgr::RemoveSocketRefs( int socket ) { CookieRef* cref = CookieRefForSocket( socket ); if ( cref != NULL ) { - MutexLock ml( &ms_SocketStuffMutex ); SocketMap::iterator iter = ms_SocketStuff.find( socket ); assert( iter != ms_SocketStuff.end() ); @@ -269,10 +269,12 @@ SocketsIterator::Next() *****************************************************************************/ CookieRef::CookieRef(string s) - : m_name(s), - m_totalSent(0) + : m_name(s) + , m_totalSent(0) + , m_curState(XW_ST_INITED) { pthread_rwlock_init( &m_sockets_rwlock, NULL ); + pthread_mutex_init( &m_EventsMutex, NULL ); m_connectionID = ms_nextConnectionID++; /* needs a mutex!!! */ } @@ -299,13 +301,13 @@ CookieRef::~CookieRef() } void -CookieRef::Associate( int socket, HostID srcID ) +CookieRef::Connect( int socket, HostID srcID ) { - assert( srcID != HOST_ID_NONE ); - logf( "remembering pair: hostid=%x, socket=%d", srcID, socket ); - RWWriteLock ml( &m_sockets_rwlock ); - HostRec hr(socket); - m_hostSockets.insert( pair(srcID,hr) ); + SocketMgr::Associate( socket, this ); + + MutexLock ml( &m_EventsMutex ); + pushConnectEvent( socket, srcID ); + handleEvents(); } int @@ -343,6 +345,9 @@ CookieRef::Remove( int socket ) } } + /* Does this belong here or at a higher level? */ + XWThreadPool::GetTPool()->CloseSocket( socket ); + if ( count == 0 ) { ForgetCref( this ); delete this; @@ -352,6 +357,204 @@ CookieRef::Remove( int socket ) void CookieRef::HandleHeartbeat( HostID id, int socket ) { + MutexLock ml( &m_EventsMutex ); + pushHeartbeatEvent( id, socket ); + handleEvents(); +} /* HandleHeartbeat */ + +void +CookieRef::CheckHeartbeats( time_t now, vector* victims ) +{ + logf( "CookieRef::CheckHeartbeats" ); + MutexLock ml( &m_EventsMutex ); + + pushHeartTimerEvent( now, victims ); + handleEvents(); +} /* CheckHeartbeats */ + +void +CookieRef::Forward( HostID src, HostID dest, unsigned char* buf, int buflen ) +{ + MutexLock ml( &m_EventsMutex ); + pushForwardEvent( src, dest, buf, buflen ); + handleEvents(); +} /* Forward */ + +void +CookieRef::pushConnectEvent( int socket, HostID srcID ) +{ + CRefEvent evt; + evt.type = XW_EVENT_CONNECTMSG; + evt.u.con.socket = socket; + evt.u.con.srcID = srcID; + m_eventQueue.push_front( evt ); +} /* pushConnectEvent */ + +void +CookieRef::pushHeartbeatEvent( HostID id, int socket ) +{ + CRefEvent evt; + evt.type = XW_EVENT_HEARTMSG; + evt.u.heart.id = id; + evt.u.heart.socket = socket; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::pushHeartTimerEvent( time_t now, vector* victims ) +{ + CRefEvent evt; + evt.type = XW_EVENT_HEARTTIMER; + evt.u.htime.now = now; + evt.u.htime.victims = victims; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::pushForwardEvent( HostID src, HostID dest, + unsigned char* buf, int buflen ) +{ + CRefEvent evt; + evt.type = XW_EVENT_FORWARDMSG; + evt.u.fwd.src = src; + evt.u.fwd.dest = dest; + evt.u.fwd.buf = buf; + evt.u.fwd.buflen = buflen; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::handleEvents() +{ + XW_RELAY_ACTION takeAction; + + while ( m_eventQueue.size() > 0 ) { + CRefEvent evt = m_eventQueue.front(); + m_eventQueue.pop_front(); + + if ( getFromTable( m_curState, evt.type, &takeAction, &m_nextState ) ) { + + logf( "moving from state %s to state %s for event %s", + stateString(m_curState), stateString(m_nextState), + eventString(evt.type) ); + + switch( takeAction ) { + case XW_ACTION_SENDRSP: + sendResponse( &evt ); + break; + + case XW_ACTION_FWD: + forward( &evt ); + break; + + case XW_ACTION_DISCONNECTALL: + disconnectAll( &evt ); + break; + + case XW_ACTION_NOTEHEART: + noteHeartbeat( &evt ); + break; + + case XW_ACTION_CHECKHEART: + checkHeartbeats( &evt ); + break; + + case XW_ACTION_HEARTOK: + /* nothing to do for this */ + break; + + case XW_ACTION_NONE: + default: + assert(0); + break; + } + + m_curState = m_nextState; + } + } +} /* handleEvents */ + +static void +send_with_length( int socket, unsigned char* buf, int bufLen ) +{ + SocketWriteLock slock( socket ); + int ok = 0; + unsigned short len = htons( bufLen ); + ssize_t nSent = send( socket, &len, 2, 0 ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == bufLen ) { + logf( "sent %d bytes on socket %d", nSent, socket ); + ok = 1; + } + } + if ( !ok ) { + killSocket( socket, "couldn't send" ); + } +} + +static void +putNetShort( unsigned char** bufpp, unsigned short s ) +{ + s = htons( s ); + memcpy( *bufpp, &s, sizeof(s) ); + *bufpp += sizeof(s); +} + +void +CookieRef::sendResponse( const CRefEvent* evt ) +{ + int socket = evt->u.con.socket; + HostID id = evt->u.con.srcID; + + assert( id != HOST_ID_NONE ); + logf( "remembering pair: hostid=%x, socket=%d", id, socket ); + RWWriteLock ml( &m_sockets_rwlock ); + HostRec hr(socket); + m_hostSockets.insert( pair(id,hr) ); + + /* Now send the response */ + unsigned char buf[5]; + unsigned char* bufp = buf; + + *bufp++ = XWRELAY_CONNECTRESP; + putNetShort( &bufp, GetHeartbeat() ); + putNetShort( &bufp, GetCookieID() ); + + send_with_length( socket, buf, sizeof(buf) ); + RecordSent( sizeof(buf), socket ); + logf( "sent CONNECTIONRSP" ); +} /* sendResponse */ + +void +CookieRef::forward( const CRefEvent* evt ) +{ + unsigned char* buf = evt->u.fwd.buf; + int buflen = evt->u.fwd.buflen; + HostID src = evt->u.fwd.src; + HostID dest = evt->u.fwd.dest; + + int destSocket = SocketForHost( dest ); + + /* This is an ugly hack!!!! */ + *buf = XWRELAY_MSG_FROMRELAY; + send_with_length( destSocket, buf, buflen ); + + /* also note that we've heard from src recently */ + pushHeartbeatEvent( src, SocketForHost(src) ); +} /* forward */ + +void +CookieRef::disconnectAll( const CRefEvent* evt ) +{ +} + +void +CookieRef::noteHeartbeat( const CRefEvent* evt ) +{ + int socket = evt->u.heart.socket; + HostID id = evt->u.heart.id; + RWWriteLock rwl( &m_sockets_rwlock ); map::iterator iter = m_hostSockets.find(id); @@ -361,15 +564,17 @@ CookieRef::HandleHeartbeat( HostID id, int socket ) connection. An attack is the most likely explanation. */ assert( iter->second.m_socket == socket ); - logf( "upping m_lastHeartbeat from %d to %d", + logf( "upping m_lastHeartbeat from %d to %d", iter->second.m_lastHeartbeat, now() ); iter->second.m_lastHeartbeat = now(); -} /* HandleHeartbeat */ +} /* noteHeartbeat */ void -CookieRef::CheckHeartbeats( time_t now, vector* victims ) +CookieRef::checkHeartbeats( const CRefEvent* evt ) { - logf( "CookieRef::CheckHeartbeats" ); + int vcount = 0; + vector* victims = evt->u.htime.victims; + time_t now = evt->u.htime.now; RWWriteLock rwl( &m_sockets_rwlock ); @@ -378,11 +583,17 @@ CookieRef::CheckHeartbeats( time_t now, vector* victims ) time_t last = iter->second.m_lastHeartbeat; if ( (now - last) > HEARTBEAT * 2 ) { victims->push_back( iter->second.m_socket ); + ++vcount; } ++iter; } logf( "CookieRef::CheckHeartbeats done" ); -} /* CheckHeartbeats */ + + /* Post an event */ + CRefEvent newEvt; + newEvt.type = vcount > 0 ? XW_EVENT_HEARTFAILED : XW_EVENT_HEARTOK; + m_eventQueue.push_front( newEvt ); +} /* checkHeartbeats */ void CookieRef::PrintCookieInfo( string& out ) diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 89ba81f39..f562ded5f 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -6,8 +6,10 @@ #include #include #include +#include #include #include "xwrelay_priv.h" +#include "states.h" typedef unsigned short CookieID; @@ -35,7 +37,7 @@ class CookieRef { /* Within this cookie, remember that this hostID and socket go together. If the hostID is HOST_ID_SERVER, it's the server. */ - void Associate( int socket, HostID srcID ); + void Connect( int socket, HostID srcID ); short GetHeartbeat() { return HEARTBEAT; } CookieID GetCookieID() { return m_connectionID; } int SocketForHost( HostID dest ); @@ -43,13 +45,11 @@ class CookieRef { int CountSockets() { return m_hostSockets.size(); } string Name() { return m_name; } - void RecordSent( int nBytes, int socket ) { - /* This really needs a lock.... */ - m_totalSent += nBytes; - } + int NotFullyConnected() { return m_curState != XW_ST_ALLCONNECTED; } void HandleHeartbeat( HostID id, int socket ); void CheckHeartbeats( time_t now, vector* victims ); + void Forward( HostID src, HostID dest, unsigned char* buf, int buflen ); /* for console */ void PrintCookieInfo( string& out ); @@ -63,6 +63,51 @@ class CookieRef { private: CookieRef( string s ); + typedef struct CRefEvent { + XW_RELAY_EVENT type; + union { + struct { + HostID src; + HostID dest; + unsigned char* buf; + int buflen; + } fwd; + struct { + int socket; + HostID srcID; + } con; + struct { + + } recon; + struct { + HostID id; + int socket; + } heart; + struct { + time_t now; + vector* victims; + } htime; + } u; + } CRefEvent; + + void RecordSent( int nBytes, int socket ) { + /* This really needs a lock.... */ + m_totalSent += nBytes; + } + + void pushConnectEvent( int socket, HostID srcID ); + void pushHeartbeatEvent( HostID id, int socket ); + void pushHeartTimerEvent( time_t now, vector* victims ); + void pushForwardEvent( HostID src, HostID dest, unsigned char* buf, + int buflen ); + + void handleEvents(); + + void sendResponse(const CRefEvent* evt); + void forward(const CRefEvent* evt); + void disconnectAll(const CRefEvent* evt); + void noteHeartbeat(const CRefEvent* evt); + void checkHeartbeats(const CRefEvent* evt); map m_hostSockets; pthread_rwlock_t m_sockets_rwlock; @@ -70,8 +115,17 @@ class CookieRef { string m_name; int m_totalSent; + /* Guard the event queue. Only one thread at a time can post to the + queue, but once in a thread can post new events while processing + current ones. */ + pthread_mutex_t m_EventsMutex; + + XW_RELAY_STATE m_curState; + XW_RELAY_STATE m_nextState; + deque m_eventQueue; + static CookieID ms_nextConnectionID; -}; +}; /* CookieRef */ typedef map CookieMap; diff --git a/xwords4/relay/states.cpp b/xwords4/relay/states.cpp new file mode 100644 index 000000000..ae8cbae16 --- /dev/null +++ b/xwords4/relay/states.cpp @@ -0,0 +1,129 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2005 by Eric House (fixin@peak.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "assert.h" +#include "states.h" +#include "xwrelay_priv.h" + +typedef struct StateTable { + XW_RELAY_STATE stateStart; + XW_RELAY_EVENT stateEvent; + XW_RELAY_ACTION stateAction; + XW_RELAY_STATE stateEnd; /* Do I need this? Or does the code that + performs the action determine the state? */ +} StateTable; + +StateTable g_stateTable[] = { + + /* Initial msg comes in. Managing object created in init state, sends response */ + { XW_ST_INITED, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING }, + + /* Another connect msg comes in */ + { XW_ST_CONNECTING, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING }, + /* Fwd message comes in, tells us we're now fully connected */ + { XW_ST_CONNECTING, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED }, + { XW_ST_CONNECTING, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_CONNECTING }, + /* Timeout before all connected */ + { XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + { XW_ST_CONNECTING, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTING }, + + /* This is the entry we'll use most of the time */ + { XW_ST_ALLCONNECTED, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED }, + /* Heartbeat arrived */ + { XW_ST_ALLCONNECTED, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_ALLCONNECTED }, + + /* Heartbeat timer means check for dead connections. Post event to self if there's a problem. */ + { XW_ST_ALLCONNECTED, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTED }, + + { XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_CONNECTING }, + { XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + { XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_ALLCONNECTED }, + { XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + + /* Reconnect. Just like a connect but cookieID is supplied. Can it + happen in the middle of a game when state is XW_ST_ALLCONNECTED? */ + + + /* Marks end of table */ + { XW_ST_NONE, XW_EVENT_NONE, XW_ACTION_NONE, XW_ST_NONE } +}; + + +int +getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent, + XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState ) +{ + StateTable* stp = g_stateTable; + while ( stp->stateStart != XW_ST_NONE ) { + if ( stp->stateStart == curState && stp->stateEvent == curEvent ) { + *takeAction = stp->stateAction; + *nextState = stp->stateEnd; + return 1; + } + ++stp; + } + + logf( "unable to find transition from %s on event %s", + stateString(curState), eventString(curEvent) ); + + assert(0); + return 0; +} /* getFromTable */ + +#define CASESTR(s) case s: return #s + +char* +stateString( XW_RELAY_STATE state ) +{ + switch( state ) { + CASESTR(XW_ST_NONE); + CASESTR(XW_ST_INITED); + CASESTR(XW_ST_CONNECTING); + CASESTR(XW_ST_ALLCONNECTED); + CASESTR(XW_ST_WAITING_RECON); + CASESTR(XW_ST_SENDING_DISCON); + CASESTR(XW_ST_DISCONNECTED); + CASESTR(XW_ST_HEARTCHECK_CONNECTING); + CASESTR(XW_ST_HEARTCHECK_CONNECTED); + CASESTR(XW_ST_DEAD); + } + assert(0); + return ""; +} + +char* +eventString( XW_RELAY_EVENT evt ) +{ + switch( evt ) { + CASESTR(XW_EVENT_NONE); + CASESTR(XW_EVENT_CONNECTMSG); + CASESTR(XW_EVENT_RECONNECTMSG); + CASESTR(XW_EVENT_FORWARDMSG); + CASESTR(XW_EVENT_HEARTMSG); + CASESTR(XW_EVENT_HEARTTIMER); + CASESTR(XW_EVENT_DISCONTIMER); + CASESTR(XW_EVENT_CONNTIMER); + CASESTR(XW_EVENT_HEARTOK); + CASESTR(XW_EVENT_HEARTFAILED); + } + assert(0); + return ""; +} + +#undef CASESTR diff --git a/xwords4/relay/states.h b/xwords4/relay/states.h new file mode 100644 index 000000000..4e0680f2b --- /dev/null +++ b/xwords4/relay/states.h @@ -0,0 +1,116 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2005 by Eric House (fixin@peak.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _STATES_H_ +#define _STATES_H_ + + +/* states */ +typedef enum { + XW_ST_NONE + + ,XW_ST_INITED /* Relay's running and the object's been + created, but nobody's signed up yet. This + is a very short-lived state since an + incoming connection is why the object was + created. */ + + ,XW_ST_CONNECTING /* At least one device has connected, but no + packets have yet arrived to be + forwarded. */ + + ,XW_ST_ALLCONNECTED /* All devices are connected and ready for the + relay to do its work. This is the state + we're in most of the time. */ + + ,XW_ST_WAITING_RECON /* At least one device has been timed out or + sent a disconnect message. We can't flow + messages in this state, and will be killing + all connections if we don't hear back from + the missing guy soon. */ + + + ,XW_ST_HEARTCHECK_CONNECTING /* In the middle of checking heartbeat situation. */ + ,XW_ST_HEARTCHECK_CONNECTED /* Need two states so we know where to + go back to. */ + + ,XW_ST_SENDING_DISCON /* We're in the process of pulling the plug on + all devices that remain connected. */ + + ,XW_ST_DISCONNECTED /* We're about to kill this object. */ + + ,XW_ST_DEAD /* About to kill the object */ +} XW_RELAY_STATE; + + +/* events */ +typedef enum { + XW_EVENT_NONE + + ,XW_EVENT_CONNECTMSG /* A device is connecting using the cookie for + this object */ + + ,XW_EVENT_RECONNECTMSG /* A device is re-connecting using the + connID for this object */ + + ,XW_EVENT_FORWARDMSG /* A message needs forwarding */ + + ,XW_EVENT_HEARTMSG /* A heartbeat message arrived */ + + ,XW_EVENT_HEARTTIMER /* Time to check for missing heartbeats */ + + ,XW_EVENT_DISCONTIMER /* No reconnect received: time to kill the + remaining connections */ + + ,XW_EVENT_CONNTIMER /* timer for did we get all players hooked + up */ + + ,XW_EVENT_HEARTOK + + ,XW_EVENT_HEARTFAILED +} XW_RELAY_EVENT; + + +/* actions */ +typedef enum { + XW_ACTION_NONE + + ,XW_ACTION_SENDRSP /* Send a connection response */ + + ,XW_ACTION_FWD /* Forward a message */ + + ,XW_ACTION_NOTEHEART /* Record heartbeat received */ + + ,XW_ACTION_CHECKHEART /* Check for heartbeats */ + + ,XW_ACTION_DISCONNECTALL + + ,XW_ACTION_HEARTOK /* allows transition back to stationary state */ + +} XW_RELAY_ACTION; + + +int getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent, + XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState ); + + +char* stateString( XW_RELAY_STATE state ); +char* eventString( XW_RELAY_EVENT evt ); + +#endif diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 620eb67a4..fb617ed92 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -89,14 +89,6 @@ getNetShort( unsigned char** bufpp ) return ntohs( tmp ); } -static void -putNetShort( unsigned char** bufpp, unsigned short s ) -{ - s = htons( s ); - memcpy( *bufpp, &s, sizeof(s) ); - *bufpp += sizeof(s); -} - static void processHeartbeat( unsigned char* buf, int bufLen, int socket ) { @@ -104,7 +96,11 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket ) HostID hostID = getNetShort( &buf ); logf( "processHeartbeat: cookieID 0x%x, hostID 0x%x", cookieID, hostID ); CookieRef* cref = get_cookieRef( cookieID ); - cref->HandleHeartbeat( hostID, socket ); + if ( cref != NULL ) { + cref->HandleHeartbeat( hostID, socket ); + } else { + killSocket( socket, "no cref for socket" ); + } } /* processHeartbeat */ /* A CONNECT message from a device gives us the hostID and socket we'll @@ -138,8 +134,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket ) if ( bufp == end ) { cref = get_make_cookieRef( cookie, connID ); assert( cref != NULL ); - cref->Associate( socket, srcID ); - SocketMgr::Associate( socket, cref ); + cref->Connect( socket, srcID ); } } } @@ -162,45 +157,10 @@ now() return (unsigned long)time(NULL); } -static void -send_with_length( int socket, unsigned char* buf, int bufLen ) -{ - SocketWriteLock slock( socket ); - int ok = 0; - unsigned short len = htons( bufLen ); - ssize_t nSent = send( socket, &len, 2, 0 ); - if ( nSent == 2 ) { - nSent = send( socket, buf, bufLen, 0 ); - if ( nSent == bufLen ) { - logf( "sent %d bytes on socket %d", nSent, socket ); - ok = 1; - } - } - if ( !ok ) { - killSocket( socket, "couldn't send" ); - } -} - -static void -sendConnResp( CookieRef* cref, int socket ) -{ - /* send cmd, heartbeat, connid */ - unsigned char buf[5]; - unsigned char* bufp = buf; - - *bufp++ = XWRELAY_CONNECTRESP; - putNetShort( &bufp, cref->GetHeartbeat() ); - putNetShort( &bufp, cref->GetCookieID() ); - - send_with_length( socket, buf, sizeof(buf) ); - cref->RecordSent( sizeof(buf), socket ); - logf( "sent CONNECTIONRSP" ); -} - /* forward the message. Need only change the command after looking up the * socket and it's ready to go. */ static int -forwardMessage( unsigned char* buf, int bufLen, int srcSocket ) +forwardMessage( unsigned char* buf, int buflen, int srcSocket ) { int success = 0; unsigned char* bufp = buf + 1; /* skip cmd */ @@ -208,25 +168,11 @@ forwardMessage( unsigned char* buf, int bufLen, int srcSocket ) logf( "cookieID = %d", cookieID ); CookieRef* cref = get_cookieRef( cookieID ); if ( cref != NULL ) { - HostID src = getNetShort( &bufp ); - /* we heard from host: good as a heartbeat */ - cref->HandleHeartbeat( src, srcSocket ); - HostID dest = getNetShort( &bufp ); - logf( "forwarding from %x to %x", src, dest ); - int destSocket = cref->SocketForHost( dest ); - logf( "got socket %d for dest %x", destSocket, dest ); - if ( destSocket != -1 ) { - *buf = XWRELAY_MSG_FROMRELAY; - send_with_length( destSocket, buf, bufLen ); - cref->RecordSent( bufLen, destSocket ); - success = 1; - } else if ( dest == HOST_ID_SERVER ) { - logf( "server not connected yet; fail silently" ); - success = 1; - } + cref->Forward( src, dest, buf, buflen ); + success = 1; } return success; } /* forwardMessage */ @@ -241,11 +187,6 @@ processMessage( unsigned char* buf, int bufLen, int socket ) case XWRELAY_CONNECT: logf( "processMessage got XWRELAY_CONNECT" ); cref = processConnect( buf+1, bufLen-1, socket ); - if ( cref != NULL ) { - sendConnResp( cref, socket ); - } else { - killSocket( socket, "no cref found" ); - } break; case XWRELAY_CONNECTRESP: logf( "bad: processMessage got XWRELAY_CONNECTRESP" );