Add limited support for store-and-forward (where limitations include

that all devices must be connected initially and that it's all memory
based so a crash wipes stored messages.)  Accept messages for
forwarding when in the MISSING state, not just the ALLHERE state.
Store messages that can't be sent now, and send any that have
accumulated when a host reconnects.  When a cref loses its last
connection, keep it around unless it has no messages stored (as will
be the case when a game ends.)
This commit is contained in:
ehouse 2009-11-08 21:35:39 +00:00
parent 2d4ae3e44a
commit aae12428da
4 changed files with 108 additions and 26 deletions

View file

@ -93,6 +93,7 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id )
m_locking_thread = 0;
m_starttime = uptime();
m_gameFull = false;
m_nHostMsgs = 0;
RelayConfigs::GetConfigs()->GetValueFor( "HEARTBEAT", &m_heatbeat );
logf( XW_LOGINFO, "initing cref for cookie %s, connName %s",
@ -133,7 +134,15 @@ CookieRef::Clear(void)
m_connName = "";
m_cookieID = 0;
m_eventQueue.clear();
}
if ( 0 < m_nHostMsgs ) {
unsigned int ii;
for ( ii = 0; ii < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]);++ii){
m_hostMsgQueues[ii].clear();
}
/* m_nHostMsgs will get cleared in ReInit */
}
} /* Clear */
bool
CookieRef::Lock( void )
@ -506,10 +515,11 @@ CookieRef::handleEvents()
case XWA_SEND_RERSP:
increasePlayerCounts( &evt );
sendResponse( &evt, takeAction == XWA_SEND_RSP );
sendAnyStored( &evt );
break;
case XWA_FWD:
forward( &evt );
forward_or_store( &evt );
break;
case XWA_TIMERDISCONN:
@ -562,6 +572,14 @@ CookieRef::handleEvents()
sendAllHere();
break;
case XWA_NOTE_EMPTY:
if ( 0 == count_msgs_stored() ) {
CRefEvent evt;
evt.type = XWE_NOMOREMSGS;
m_eventQueue.push_back( evt );
}
break;
case XWA_POSTCLONE:
moveSockets();
break;
@ -582,7 +600,7 @@ CookieRef::handleEvents()
}
} /* handleEvents */
void
bool
CookieRef::send_with_length( int socket, unsigned char* buf, int bufLen,
bool cascade )
{
@ -597,6 +615,7 @@ CookieRef::send_with_length( int socket, unsigned char* buf, int bufLen,
_Remove( socket );
XWThreadPool::GetTPool()->CloseSocket( socket );
}
return !failed;
} /* send_with_length */
static void
@ -607,14 +626,48 @@ putNetShort( unsigned char** bufpp, unsigned short s )
*bufpp += sizeof(s);
}
/* static bool */
/* hostRecComp( const HostRec& aa, const HostRec& bb ) */
/* { */
/* /\* first order, hosts go before guests; second order, by m_nPlayersHere *\/ */
/* /\* if ( aa.m_nPlayersS *\/ */
void
CookieRef::store_message( HostID dest, const unsigned char* buf,
unsigned int len )
{
logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__,
len, dest );
assert( dest > 0 );
--dest; // 1-based
MsgBuffer* entry = new MsgBuffer( buf, buf+len );
assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) );
m_hostMsgQueues[dest].push_back( entry );
++m_nHostMsgs;
}
/* return aa.m_nPlayersH < bb.m_nPlayersH; */
/* } */
void
CookieRef::send_stored_messages( HostID dest, int socket )
{
assert( dest > 0 );
logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest );
--dest; // 0 is invalid value
assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) );
assert( -1 != socket );
MsgBufQueue& mqueue = m_hostMsgQueues[dest];
while ( mqueue.size() > 0 ) {
assert( m_nHostMsgs > 0 );
// send_with_length will call _Remove if it fails to send. So
// need to check on presence of socket each time through! No,
// the break below takes care of that.
MsgBufQueue::iterator iter = mqueue.begin();
logf( XW_LOGVERBOSE0, "%s: sending stored msg (len=%d)",
__func__, (*iter)->size() );
if ( ! send_with_length( socket, &((**iter)[0]), (*iter)->size(), true ) ) {
break;
}
mqueue.erase( iter );
delete *iter;
--m_nHostMsgs;
}
} /* send_stored_messages */
static void
print_sockets( const char* caller, vector<HostRec>& sockets )
@ -901,7 +954,16 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial )
} /* sendResponse */
void
CookieRef::forward( const CRefEvent* evt )
CookieRef::sendAnyStored( const CRefEvent* evt )
{
HostID dest = evt->u.con.srcID;
if ( HOST_ID_NONE != dest ) {
send_stored_messages( dest, evt->u.con.socket );
}
}
void
CookieRef::forward_or_store( const CRefEvent* evt )
{
unsigned char* buf = evt->u.fwd.buf;
int buflen = evt->u.fwd.buflen;
@ -909,20 +971,20 @@ CookieRef::forward( const CRefEvent* evt )
int destSocket = SocketForHost( dest );
if ( destSocket != -1 ) {
/* This is an ugly hack!!!! */
*buf = XWRELAY_MSG_FROMRELAY;
send_with_length( destSocket, buf, buflen, true );
/* This is an ugly hack!!!! */
*buf = XWRELAY_MSG_FROMRELAY;
/* also note that we've heard from src recently */
#ifdef RELAY_HEARTBEAT
HostID src = evt->u.fwd.src;
pushHeartbeatEvent( src, SocketForHost(src) );
#endif
} else {
/* We're not really connected yet! */
if ( (destSocket == -1)
|| !send_with_length( destSocket, buf, buflen, true ) ) {
store_message( dest, buf, buflen );
}
} /* forward */
/* also note that we've heard from src recently */
#ifdef RELAY_HEARTBEAT
HostID src = evt->u.fwd.src;
pushHeartbeatEvent( src, SocketForHost(src) );
#endif
} /* forward_or_store */
void
CookieRef::send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why,

View file

@ -31,6 +31,9 @@
#include "xwrelay.h"
#include "states.h"
typedef vector<unsigned char> MsgBuffer;
typedef deque<MsgBuffer*> MsgBufQueue;
using namespace std;
class CookieMapIterator; /* forward */
@ -157,7 +160,7 @@ class CookieRef {
} u;
} CRefEvent;
void send_with_length( int socket, unsigned char* buf, int bufLen,
bool send_with_length( int socket, unsigned char* buf, int bufLen,
bool cascade );
void send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why,
bool cascade );
@ -185,6 +188,7 @@ class CookieRef {
void handleEvents();
void sendResponse( const CRefEvent* evt, bool initial );
void sendAnyStored( const CRefEvent* evt );
void populate( vector<HostRec> hosts );
void increasePlayerCounts( const CRefEvent* evt );
void reducePlayerCounts( int socket );
@ -192,7 +196,7 @@ class CookieRef {
void setAllConnectedTimer();
void cancelAllConnectedTimer();
void forward( const CRefEvent* evt );
void forward_or_store( const CRefEvent* evt );
void checkFromServer( const CRefEvent* evt );
void notifyOthers( int socket, XWRelayMsg msg, XWREASON why );
@ -216,9 +220,16 @@ class CookieRef {
bool tryMakeGame( vector<HostRec>& remaining );
void insertSorted( HostRec hr );
void store_message( HostID dest, const unsigned char* buf,
unsigned int len );
void send_stored_messages( HostID dest, int socket );
unsigned int count_msgs_stored( void ) { return m_nHostMsgs; }
/* timer callback */
static void s_checkAllConnected( void* closure );
unsigned int m_nHostMsgs;
MsgBufQueue m_hostMsgQueues[4];
vector<HostRec> m_sockets;
bool m_gameFull; /* once we've filled up, no more *new*
connections ever */

View file

@ -78,7 +78,9 @@ StateTable g_stateTable[] = {
/* I'm seeing this but not sure how to handle. Might disconnect be
needed now */
{ XWS_MISSING, XWE_FORWARDMSG, XWA_DISCONNECT, XWS_MISSING },
{ XWS_MISSING, XWE_NOMORESOCKETS, XWA_NOTE_EMPTY, XWS_MISSING },
{ XWS_MISSING, XWE_NOMOREMSGS, XWA_NONE, XWS_DEAD },
{ XWS_ANY, XWE_NOMORESOCKETS, XWA_NONE, XWS_DEAD },
{ XWS_ANY, XWE_SHUTDOWN, XWA_SHUTDOWN, XWS_DEAD },
@ -115,6 +117,7 @@ StateTable g_stateTable[] = {
/* This is our bread-n-butter */
{ XWS_ALLCONND, XWE_FORWARDMSG, XWA_FWD, XWS_ALLCONND },
{ XWS_MISSING, XWE_FORWARDMSG, XWA_FWD, XWS_MISSING },
{ XWS_DEAD, XWE_REMOVESOCKET, XWA_REMOVESOCKET, XWS_DEAD }
@ -198,6 +201,7 @@ eventString( XW_RELAY_EVENT evt )
CASESTR(XWE_ANY);
CASESTR(XWE_REMOVESOCKET);
CASESTR(XWE_NOMORESOCKETS);
CASESTR(XWE_NOMOREMSGS);
CASESTR(XWE_NOTIFYDISCON);
CASESTR(XWE_ALLHERE);
CASESTR(XWE_SOMEMISSING);
@ -230,6 +234,7 @@ actString( XW_RELAY_ACTION act )
CASESTR(XWA_REMOVESOCKET);
CASESTR(XWA_HEARTDISCONN);
CASESTR(XWA_SHUTDOWN);
CASESTR(XWA_NOTE_EMPTY);
CASESTR(XWA_POSTCLONE);
default:
assert(0);

View file

@ -111,6 +111,8 @@ typedef enum {
,XWE_NOMORESOCKETS /* last socket's been removed */
,XWE_NOMOREMSGS /* No messages are stored here for disconnected
hosts */
,XWE_SHUTDOWN /* shutdown this game */
,XWE_ANY /* wildcard; matches all */
@ -138,6 +140,8 @@ typedef enum {
,XWA_NOTEHEART /* Record heartbeat received */
,XWA_NOTE_EMPTY /* No sockets left; check if can delete */
,XWA_TIMERDISCONN /* disconnect all because of a timer */
,XWA_DISCONNECT