diff --git a/xwords4/relay/Makefile b/xwords4/relay/Makefile index 5cdf546c9..44db05f65 100644 --- a/xwords4/relay/Makefile +++ b/xwords4/relay/Makefile @@ -16,7 +16,7 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. CC = g++ -SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp +SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp states.cpp OBJ = $(patsubst %.cpp,%.o,$(SRC)) LDFLAGS += -lpthread -g CPPFLAGS += -g -Wall -DHEARTBEAT=10 diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index cc4375f28..76d1b862b 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -22,11 +22,13 @@ #include #include #include +#include #include "cref.h" #include "xwrelay.h" #include "mlock.h" #include "tpool.h" +#include "states.h" using namespace std; @@ -73,8 +75,7 @@ CookieIdForName( const char* name ) CookieMap::iterator iter = gCookieMap.begin(); while ( iter != gCookieMap.end() ) { ref = iter->second; - if ( ref->Name() == s ) { - ref = iter->second; + if ( ref->Name() == s && ref->NotFullyConnected() ) { return ref->GetCookieID(); } ++iter; @@ -211,7 +212,6 @@ SocketMgr::RemoveSocketRefs( int socket ) { CookieRef* cref = CookieRefForSocket( socket ); if ( cref != NULL ) { - MutexLock ml( &ms_SocketStuffMutex ); SocketMap::iterator iter = ms_SocketStuff.find( socket ); assert( iter != ms_SocketStuff.end() ); @@ -269,10 +269,12 @@ SocketsIterator::Next() *****************************************************************************/ CookieRef::CookieRef(string s) - : m_name(s), - m_totalSent(0) + : m_name(s) + , m_totalSent(0) + , m_curState(XW_ST_INITED) { pthread_rwlock_init( &m_sockets_rwlock, NULL ); + pthread_mutex_init( &m_EventsMutex, NULL ); m_connectionID = ms_nextConnectionID++; /* needs a mutex!!! */ } @@ -299,13 +301,13 @@ CookieRef::~CookieRef() } void -CookieRef::Associate( int socket, HostID srcID ) +CookieRef::Connect( int socket, HostID srcID ) { - assert( srcID != HOST_ID_NONE ); - logf( "remembering pair: hostid=%x, socket=%d", srcID, socket ); - RWWriteLock ml( &m_sockets_rwlock ); - HostRec hr(socket); - m_hostSockets.insert( pair(srcID,hr) ); + SocketMgr::Associate( socket, this ); + + MutexLock ml( &m_EventsMutex ); + pushConnectEvent( socket, srcID ); + handleEvents(); } int @@ -343,6 +345,9 @@ CookieRef::Remove( int socket ) } } + /* Does this belong here or at a higher level? */ + XWThreadPool::GetTPool()->CloseSocket( socket ); + if ( count == 0 ) { ForgetCref( this ); delete this; @@ -352,6 +357,204 @@ CookieRef::Remove( int socket ) void CookieRef::HandleHeartbeat( HostID id, int socket ) { + MutexLock ml( &m_EventsMutex ); + pushHeartbeatEvent( id, socket ); + handleEvents(); +} /* HandleHeartbeat */ + +void +CookieRef::CheckHeartbeats( time_t now, vector* victims ) +{ + logf( "CookieRef::CheckHeartbeats" ); + MutexLock ml( &m_EventsMutex ); + + pushHeartTimerEvent( now, victims ); + handleEvents(); +} /* CheckHeartbeats */ + +void +CookieRef::Forward( HostID src, HostID dest, unsigned char* buf, int buflen ) +{ + MutexLock ml( &m_EventsMutex ); + pushForwardEvent( src, dest, buf, buflen ); + handleEvents(); +} /* Forward */ + +void +CookieRef::pushConnectEvent( int socket, HostID srcID ) +{ + CRefEvent evt; + evt.type = XW_EVENT_CONNECTMSG; + evt.u.con.socket = socket; + evt.u.con.srcID = srcID; + m_eventQueue.push_front( evt ); +} /* pushConnectEvent */ + +void +CookieRef::pushHeartbeatEvent( HostID id, int socket ) +{ + CRefEvent evt; + evt.type = XW_EVENT_HEARTMSG; + evt.u.heart.id = id; + evt.u.heart.socket = socket; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::pushHeartTimerEvent( time_t now, vector* victims ) +{ + CRefEvent evt; + evt.type = XW_EVENT_HEARTTIMER; + evt.u.htime.now = now; + evt.u.htime.victims = victims; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::pushForwardEvent( HostID src, HostID dest, + unsigned char* buf, int buflen ) +{ + CRefEvent evt; + evt.type = XW_EVENT_FORWARDMSG; + evt.u.fwd.src = src; + evt.u.fwd.dest = dest; + evt.u.fwd.buf = buf; + evt.u.fwd.buflen = buflen; + m_eventQueue.push_front( evt ); +} + +void +CookieRef::handleEvents() +{ + XW_RELAY_ACTION takeAction; + + while ( m_eventQueue.size() > 0 ) { + CRefEvent evt = m_eventQueue.front(); + m_eventQueue.pop_front(); + + if ( getFromTable( m_curState, evt.type, &takeAction, &m_nextState ) ) { + + logf( "moving from state %s to state %s for event %s", + stateString(m_curState), stateString(m_nextState), + eventString(evt.type) ); + + switch( takeAction ) { + case XW_ACTION_SENDRSP: + sendResponse( &evt ); + break; + + case XW_ACTION_FWD: + forward( &evt ); + break; + + case XW_ACTION_DISCONNECTALL: + disconnectAll( &evt ); + break; + + case XW_ACTION_NOTEHEART: + noteHeartbeat( &evt ); + break; + + case XW_ACTION_CHECKHEART: + checkHeartbeats( &evt ); + break; + + case XW_ACTION_HEARTOK: + /* nothing to do for this */ + break; + + case XW_ACTION_NONE: + default: + assert(0); + break; + } + + m_curState = m_nextState; + } + } +} /* handleEvents */ + +static void +send_with_length( int socket, unsigned char* buf, int bufLen ) +{ + SocketWriteLock slock( socket ); + int ok = 0; + unsigned short len = htons( bufLen ); + ssize_t nSent = send( socket, &len, 2, 0 ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == bufLen ) { + logf( "sent %d bytes on socket %d", nSent, socket ); + ok = 1; + } + } + if ( !ok ) { + killSocket( socket, "couldn't send" ); + } +} + +static void +putNetShort( unsigned char** bufpp, unsigned short s ) +{ + s = htons( s ); + memcpy( *bufpp, &s, sizeof(s) ); + *bufpp += sizeof(s); +} + +void +CookieRef::sendResponse( const CRefEvent* evt ) +{ + int socket = evt->u.con.socket; + HostID id = evt->u.con.srcID; + + assert( id != HOST_ID_NONE ); + logf( "remembering pair: hostid=%x, socket=%d", id, socket ); + RWWriteLock ml( &m_sockets_rwlock ); + HostRec hr(socket); + m_hostSockets.insert( pair(id,hr) ); + + /* Now send the response */ + unsigned char buf[5]; + unsigned char* bufp = buf; + + *bufp++ = XWRELAY_CONNECTRESP; + putNetShort( &bufp, GetHeartbeat() ); + putNetShort( &bufp, GetCookieID() ); + + send_with_length( socket, buf, sizeof(buf) ); + RecordSent( sizeof(buf), socket ); + logf( "sent CONNECTIONRSP" ); +} /* sendResponse */ + +void +CookieRef::forward( const CRefEvent* evt ) +{ + unsigned char* buf = evt->u.fwd.buf; + int buflen = evt->u.fwd.buflen; + HostID src = evt->u.fwd.src; + HostID dest = evt->u.fwd.dest; + + int destSocket = SocketForHost( dest ); + + /* This is an ugly hack!!!! */ + *buf = XWRELAY_MSG_FROMRELAY; + send_with_length( destSocket, buf, buflen ); + + /* also note that we've heard from src recently */ + pushHeartbeatEvent( src, SocketForHost(src) ); +} /* forward */ + +void +CookieRef::disconnectAll( const CRefEvent* evt ) +{ +} + +void +CookieRef::noteHeartbeat( const CRefEvent* evt ) +{ + int socket = evt->u.heart.socket; + HostID id = evt->u.heart.id; + RWWriteLock rwl( &m_sockets_rwlock ); map::iterator iter = m_hostSockets.find(id); @@ -361,15 +564,17 @@ CookieRef::HandleHeartbeat( HostID id, int socket ) connection. An attack is the most likely explanation. */ assert( iter->second.m_socket == socket ); - logf( "upping m_lastHeartbeat from %d to %d", + logf( "upping m_lastHeartbeat from %d to %d", iter->second.m_lastHeartbeat, now() ); iter->second.m_lastHeartbeat = now(); -} /* HandleHeartbeat */ +} /* noteHeartbeat */ void -CookieRef::CheckHeartbeats( time_t now, vector* victims ) +CookieRef::checkHeartbeats( const CRefEvent* evt ) { - logf( "CookieRef::CheckHeartbeats" ); + int vcount = 0; + vector* victims = evt->u.htime.victims; + time_t now = evt->u.htime.now; RWWriteLock rwl( &m_sockets_rwlock ); @@ -378,11 +583,17 @@ CookieRef::CheckHeartbeats( time_t now, vector* victims ) time_t last = iter->second.m_lastHeartbeat; if ( (now - last) > HEARTBEAT * 2 ) { victims->push_back( iter->second.m_socket ); + ++vcount; } ++iter; } logf( "CookieRef::CheckHeartbeats done" ); -} /* CheckHeartbeats */ + + /* Post an event */ + CRefEvent newEvt; + newEvt.type = vcount > 0 ? XW_EVENT_HEARTFAILED : XW_EVENT_HEARTOK; + m_eventQueue.push_front( newEvt ); +} /* checkHeartbeats */ void CookieRef::PrintCookieInfo( string& out ) diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 89ba81f39..f562ded5f 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -6,8 +6,10 @@ #include #include #include +#include #include #include "xwrelay_priv.h" +#include "states.h" typedef unsigned short CookieID; @@ -35,7 +37,7 @@ class CookieRef { /* Within this cookie, remember that this hostID and socket go together. If the hostID is HOST_ID_SERVER, it's the server. */ - void Associate( int socket, HostID srcID ); + void Connect( int socket, HostID srcID ); short GetHeartbeat() { return HEARTBEAT; } CookieID GetCookieID() { return m_connectionID; } int SocketForHost( HostID dest ); @@ -43,13 +45,11 @@ class CookieRef { int CountSockets() { return m_hostSockets.size(); } string Name() { return m_name; } - void RecordSent( int nBytes, int socket ) { - /* This really needs a lock.... */ - m_totalSent += nBytes; - } + int NotFullyConnected() { return m_curState != XW_ST_ALLCONNECTED; } void HandleHeartbeat( HostID id, int socket ); void CheckHeartbeats( time_t now, vector* victims ); + void Forward( HostID src, HostID dest, unsigned char* buf, int buflen ); /* for console */ void PrintCookieInfo( string& out ); @@ -63,6 +63,51 @@ class CookieRef { private: CookieRef( string s ); + typedef struct CRefEvent { + XW_RELAY_EVENT type; + union { + struct { + HostID src; + HostID dest; + unsigned char* buf; + int buflen; + } fwd; + struct { + int socket; + HostID srcID; + } con; + struct { + + } recon; + struct { + HostID id; + int socket; + } heart; + struct { + time_t now; + vector* victims; + } htime; + } u; + } CRefEvent; + + void RecordSent( int nBytes, int socket ) { + /* This really needs a lock.... */ + m_totalSent += nBytes; + } + + void pushConnectEvent( int socket, HostID srcID ); + void pushHeartbeatEvent( HostID id, int socket ); + void pushHeartTimerEvent( time_t now, vector* victims ); + void pushForwardEvent( HostID src, HostID dest, unsigned char* buf, + int buflen ); + + void handleEvents(); + + void sendResponse(const CRefEvent* evt); + void forward(const CRefEvent* evt); + void disconnectAll(const CRefEvent* evt); + void noteHeartbeat(const CRefEvent* evt); + void checkHeartbeats(const CRefEvent* evt); map m_hostSockets; pthread_rwlock_t m_sockets_rwlock; @@ -70,8 +115,17 @@ class CookieRef { string m_name; int m_totalSent; + /* Guard the event queue. Only one thread at a time can post to the + queue, but once in a thread can post new events while processing + current ones. */ + pthread_mutex_t m_EventsMutex; + + XW_RELAY_STATE m_curState; + XW_RELAY_STATE m_nextState; + deque m_eventQueue; + static CookieID ms_nextConnectionID; -}; +}; /* CookieRef */ typedef map CookieMap; diff --git a/xwords4/relay/states.cpp b/xwords4/relay/states.cpp new file mode 100644 index 000000000..ae8cbae16 --- /dev/null +++ b/xwords4/relay/states.cpp @@ -0,0 +1,129 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2005 by Eric House (fixin@peak.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "assert.h" +#include "states.h" +#include "xwrelay_priv.h" + +typedef struct StateTable { + XW_RELAY_STATE stateStart; + XW_RELAY_EVENT stateEvent; + XW_RELAY_ACTION stateAction; + XW_RELAY_STATE stateEnd; /* Do I need this? Or does the code that + performs the action determine the state? */ +} StateTable; + +StateTable g_stateTable[] = { + + /* Initial msg comes in. Managing object created in init state, sends response */ + { XW_ST_INITED, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING }, + + /* Another connect msg comes in */ + { XW_ST_CONNECTING, XW_EVENT_CONNECTMSG, XW_ACTION_SENDRSP, XW_ST_CONNECTING }, + /* Fwd message comes in, tells us we're now fully connected */ + { XW_ST_CONNECTING, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED }, + { XW_ST_CONNECTING, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_CONNECTING }, + /* Timeout before all connected */ + { XW_ST_CONNECTING, XW_EVENT_CONNTIMER, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + { XW_ST_CONNECTING, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTING }, + + /* This is the entry we'll use most of the time */ + { XW_ST_ALLCONNECTED, XW_EVENT_FORWARDMSG, XW_ACTION_FWD, XW_ST_ALLCONNECTED }, + /* Heartbeat arrived */ + { XW_ST_ALLCONNECTED, XW_EVENT_HEARTMSG, XW_ACTION_NOTEHEART, XW_ST_ALLCONNECTED }, + + /* Heartbeat timer means check for dead connections. Post event to self if there's a problem. */ + { XW_ST_ALLCONNECTED, XW_EVENT_HEARTTIMER, XW_ACTION_CHECKHEART, XW_ST_HEARTCHECK_CONNECTED }, + + { XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_CONNECTING }, + { XW_ST_HEARTCHECK_CONNECTING, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + { XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTOK, XW_ACTION_HEARTOK, XW_ST_ALLCONNECTED }, + { XW_ST_HEARTCHECK_CONNECTED, XW_EVENT_HEARTFAILED, XW_ACTION_DISCONNECTALL, XW_ST_DEAD }, + + /* Reconnect. Just like a connect but cookieID is supplied. Can it + happen in the middle of a game when state is XW_ST_ALLCONNECTED? */ + + + /* Marks end of table */ + { XW_ST_NONE, XW_EVENT_NONE, XW_ACTION_NONE, XW_ST_NONE } +}; + + +int +getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent, + XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState ) +{ + StateTable* stp = g_stateTable; + while ( stp->stateStart != XW_ST_NONE ) { + if ( stp->stateStart == curState && stp->stateEvent == curEvent ) { + *takeAction = stp->stateAction; + *nextState = stp->stateEnd; + return 1; + } + ++stp; + } + + logf( "unable to find transition from %s on event %s", + stateString(curState), eventString(curEvent) ); + + assert(0); + return 0; +} /* getFromTable */ + +#define CASESTR(s) case s: return #s + +char* +stateString( XW_RELAY_STATE state ) +{ + switch( state ) { + CASESTR(XW_ST_NONE); + CASESTR(XW_ST_INITED); + CASESTR(XW_ST_CONNECTING); + CASESTR(XW_ST_ALLCONNECTED); + CASESTR(XW_ST_WAITING_RECON); + CASESTR(XW_ST_SENDING_DISCON); + CASESTR(XW_ST_DISCONNECTED); + CASESTR(XW_ST_HEARTCHECK_CONNECTING); + CASESTR(XW_ST_HEARTCHECK_CONNECTED); + CASESTR(XW_ST_DEAD); + } + assert(0); + return ""; +} + +char* +eventString( XW_RELAY_EVENT evt ) +{ + switch( evt ) { + CASESTR(XW_EVENT_NONE); + CASESTR(XW_EVENT_CONNECTMSG); + CASESTR(XW_EVENT_RECONNECTMSG); + CASESTR(XW_EVENT_FORWARDMSG); + CASESTR(XW_EVENT_HEARTMSG); + CASESTR(XW_EVENT_HEARTTIMER); + CASESTR(XW_EVENT_DISCONTIMER); + CASESTR(XW_EVENT_CONNTIMER); + CASESTR(XW_EVENT_HEARTOK); + CASESTR(XW_EVENT_HEARTFAILED); + } + assert(0); + return ""; +} + +#undef CASESTR diff --git a/xwords4/relay/states.h b/xwords4/relay/states.h new file mode 100644 index 000000000..4e0680f2b --- /dev/null +++ b/xwords4/relay/states.h @@ -0,0 +1,116 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2005 by Eric House (fixin@peak.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _STATES_H_ +#define _STATES_H_ + + +/* states */ +typedef enum { + XW_ST_NONE + + ,XW_ST_INITED /* Relay's running and the object's been + created, but nobody's signed up yet. This + is a very short-lived state since an + incoming connection is why the object was + created. */ + + ,XW_ST_CONNECTING /* At least one device has connected, but no + packets have yet arrived to be + forwarded. */ + + ,XW_ST_ALLCONNECTED /* All devices are connected and ready for the + relay to do its work. This is the state + we're in most of the time. */ + + ,XW_ST_WAITING_RECON /* At least one device has been timed out or + sent a disconnect message. We can't flow + messages in this state, and will be killing + all connections if we don't hear back from + the missing guy soon. */ + + + ,XW_ST_HEARTCHECK_CONNECTING /* In the middle of checking heartbeat situation. */ + ,XW_ST_HEARTCHECK_CONNECTED /* Need two states so we know where to + go back to. */ + + ,XW_ST_SENDING_DISCON /* We're in the process of pulling the plug on + all devices that remain connected. */ + + ,XW_ST_DISCONNECTED /* We're about to kill this object. */ + + ,XW_ST_DEAD /* About to kill the object */ +} XW_RELAY_STATE; + + +/* events */ +typedef enum { + XW_EVENT_NONE + + ,XW_EVENT_CONNECTMSG /* A device is connecting using the cookie for + this object */ + + ,XW_EVENT_RECONNECTMSG /* A device is re-connecting using the + connID for this object */ + + ,XW_EVENT_FORWARDMSG /* A message needs forwarding */ + + ,XW_EVENT_HEARTMSG /* A heartbeat message arrived */ + + ,XW_EVENT_HEARTTIMER /* Time to check for missing heartbeats */ + + ,XW_EVENT_DISCONTIMER /* No reconnect received: time to kill the + remaining connections */ + + ,XW_EVENT_CONNTIMER /* timer for did we get all players hooked + up */ + + ,XW_EVENT_HEARTOK + + ,XW_EVENT_HEARTFAILED +} XW_RELAY_EVENT; + + +/* actions */ +typedef enum { + XW_ACTION_NONE + + ,XW_ACTION_SENDRSP /* Send a connection response */ + + ,XW_ACTION_FWD /* Forward a message */ + + ,XW_ACTION_NOTEHEART /* Record heartbeat received */ + + ,XW_ACTION_CHECKHEART /* Check for heartbeats */ + + ,XW_ACTION_DISCONNECTALL + + ,XW_ACTION_HEARTOK /* allows transition back to stationary state */ + +} XW_RELAY_ACTION; + + +int getFromTable( XW_RELAY_STATE curState, XW_RELAY_EVENT curEvent, + XW_RELAY_ACTION* takeAction, XW_RELAY_STATE* nextState ); + + +char* stateString( XW_RELAY_STATE state ); +char* eventString( XW_RELAY_EVENT evt ); + +#endif diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 620eb67a4..fb617ed92 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -89,14 +89,6 @@ getNetShort( unsigned char** bufpp ) return ntohs( tmp ); } -static void -putNetShort( unsigned char** bufpp, unsigned short s ) -{ - s = htons( s ); - memcpy( *bufpp, &s, sizeof(s) ); - *bufpp += sizeof(s); -} - static void processHeartbeat( unsigned char* buf, int bufLen, int socket ) { @@ -104,7 +96,11 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket ) HostID hostID = getNetShort( &buf ); logf( "processHeartbeat: cookieID 0x%x, hostID 0x%x", cookieID, hostID ); CookieRef* cref = get_cookieRef( cookieID ); - cref->HandleHeartbeat( hostID, socket ); + if ( cref != NULL ) { + cref->HandleHeartbeat( hostID, socket ); + } else { + killSocket( socket, "no cref for socket" ); + } } /* processHeartbeat */ /* A CONNECT message from a device gives us the hostID and socket we'll @@ -138,8 +134,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket ) if ( bufp == end ) { cref = get_make_cookieRef( cookie, connID ); assert( cref != NULL ); - cref->Associate( socket, srcID ); - SocketMgr::Associate( socket, cref ); + cref->Connect( socket, srcID ); } } } @@ -162,45 +157,10 @@ now() return (unsigned long)time(NULL); } -static void -send_with_length( int socket, unsigned char* buf, int bufLen ) -{ - SocketWriteLock slock( socket ); - int ok = 0; - unsigned short len = htons( bufLen ); - ssize_t nSent = send( socket, &len, 2, 0 ); - if ( nSent == 2 ) { - nSent = send( socket, buf, bufLen, 0 ); - if ( nSent == bufLen ) { - logf( "sent %d bytes on socket %d", nSent, socket ); - ok = 1; - } - } - if ( !ok ) { - killSocket( socket, "couldn't send" ); - } -} - -static void -sendConnResp( CookieRef* cref, int socket ) -{ - /* send cmd, heartbeat, connid */ - unsigned char buf[5]; - unsigned char* bufp = buf; - - *bufp++ = XWRELAY_CONNECTRESP; - putNetShort( &bufp, cref->GetHeartbeat() ); - putNetShort( &bufp, cref->GetCookieID() ); - - send_with_length( socket, buf, sizeof(buf) ); - cref->RecordSent( sizeof(buf), socket ); - logf( "sent CONNECTIONRSP" ); -} - /* forward the message. Need only change the command after looking up the * socket and it's ready to go. */ static int -forwardMessage( unsigned char* buf, int bufLen, int srcSocket ) +forwardMessage( unsigned char* buf, int buflen, int srcSocket ) { int success = 0; unsigned char* bufp = buf + 1; /* skip cmd */ @@ -208,25 +168,11 @@ forwardMessage( unsigned char* buf, int bufLen, int srcSocket ) logf( "cookieID = %d", cookieID ); CookieRef* cref = get_cookieRef( cookieID ); if ( cref != NULL ) { - HostID src = getNetShort( &bufp ); - /* we heard from host: good as a heartbeat */ - cref->HandleHeartbeat( src, srcSocket ); - HostID dest = getNetShort( &bufp ); - logf( "forwarding from %x to %x", src, dest ); - int destSocket = cref->SocketForHost( dest ); - logf( "got socket %d for dest %x", destSocket, dest ); - if ( destSocket != -1 ) { - *buf = XWRELAY_MSG_FROMRELAY; - send_with_length( destSocket, buf, bufLen ); - cref->RecordSent( bufLen, destSocket ); - success = 1; - } else if ( dest == HOST_ID_SERVER ) { - logf( "server not connected yet; fail silently" ); - success = 1; - } + cref->Forward( src, dest, buf, buflen ); + success = 1; } return success; } /* forwardMessage */ @@ -241,11 +187,6 @@ processMessage( unsigned char* buf, int bufLen, int socket ) case XWRELAY_CONNECT: logf( "processMessage got XWRELAY_CONNECT" ); cref = processConnect( buf+1, bufLen-1, socket ); - if ( cref != NULL ) { - sendConnResp( cref, socket ); - } else { - killSocket( socket, "no cref found" ); - } break; case XWRELAY_CONNECTRESP: logf( "bad: processMessage got XWRELAY_CONNECTRESP" );