diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 432279cbf..c5d5e9e4a 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -93,6 +93,7 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id ) m_locking_thread = 0; m_starttime = uptime(); m_gameFull = false; + m_nHostMsgs = 0; RelayConfigs::GetConfigs()->GetValueFor( "HEARTBEAT", &m_heatbeat ); logf( XW_LOGINFO, "initing cref for cookie %s, connName %s", @@ -133,7 +134,15 @@ CookieRef::Clear(void) m_connName = ""; m_cookieID = 0; m_eventQueue.clear(); -} + + if ( 0 < m_nHostMsgs ) { + unsigned int ii; + for ( ii = 0; ii < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]);++ii){ + m_hostMsgQueues[ii].clear(); + } + /* m_nHostMsgs will get cleared in ReInit */ + } +} /* Clear */ bool CookieRef::Lock( void ) @@ -506,10 +515,11 @@ CookieRef::handleEvents() case XWA_SEND_RERSP: increasePlayerCounts( &evt ); sendResponse( &evt, takeAction == XWA_SEND_RSP ); + sendAnyStored( &evt ); break; case XWA_FWD: - forward( &evt ); + forward_or_store( &evt ); break; case XWA_TIMERDISCONN: @@ -562,6 +572,14 @@ CookieRef::handleEvents() sendAllHere(); break; + case XWA_NOTE_EMPTY: + if ( 0 == count_msgs_stored() ) { + CRefEvent evt; + evt.type = XWE_NOMOREMSGS; + m_eventQueue.push_back( evt ); + } + break; + case XWA_POSTCLONE: moveSockets(); break; @@ -582,7 +600,7 @@ CookieRef::handleEvents() } } /* handleEvents */ -void +bool CookieRef::send_with_length( int socket, unsigned char* buf, int bufLen, bool cascade ) { @@ -597,6 +615,7 @@ CookieRef::send_with_length( int socket, unsigned char* buf, int bufLen, _Remove( socket ); XWThreadPool::GetTPool()->CloseSocket( socket ); } + return !failed; } /* send_with_length */ static void @@ -607,14 +626,48 @@ putNetShort( unsigned char** bufpp, unsigned short s ) *bufpp += sizeof(s); } -/* static bool */ -/* hostRecComp( const HostRec& aa, const HostRec& bb ) */ -/* { */ -/* /\* first order, hosts go before guests; second order, by m_nPlayersHere *\/ */ -/* /\* if ( aa.m_nPlayersS *\/ */ +void +CookieRef::store_message( HostID dest, const unsigned char* buf, + unsigned int len ) +{ + logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__, + len, dest ); + assert( dest > 0 ); + --dest; // 1-based + MsgBuffer* entry = new MsgBuffer( buf, buf+len ); + assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) ); + m_hostMsgQueues[dest].push_back( entry ); + ++m_nHostMsgs; +} -/* return aa.m_nPlayersH < bb.m_nPlayersH; */ -/* } */ +void +CookieRef::send_stored_messages( HostID dest, int socket ) +{ + assert( dest > 0 ); + logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest ); + --dest; // 0 is invalid value + assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) ); + assert( -1 != socket ); + + MsgBufQueue& mqueue = m_hostMsgQueues[dest]; + while ( mqueue.size() > 0 ) { + assert( m_nHostMsgs > 0 ); + // send_with_length will call _Remove if it fails to send. So + // need to check on presence of socket each time through! No, + // the break below takes care of that. + + MsgBufQueue::iterator iter = mqueue.begin(); + + logf( XW_LOGVERBOSE0, "%s: sending stored msg (len=%d)", + __func__, (*iter)->size() ); + if ( ! send_with_length( socket, &((**iter)[0]), (*iter)->size(), true ) ) { + break; + } + mqueue.erase( iter ); + delete *iter; + --m_nHostMsgs; + } +} /* send_stored_messages */ static void print_sockets( const char* caller, vector& sockets ) @@ -901,7 +954,16 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial ) } /* sendResponse */ void -CookieRef::forward( const CRefEvent* evt ) +CookieRef::sendAnyStored( const CRefEvent* evt ) +{ + HostID dest = evt->u.con.srcID; + if ( HOST_ID_NONE != dest ) { + send_stored_messages( dest, evt->u.con.socket ); + } +} + +void +CookieRef::forward_or_store( const CRefEvent* evt ) { unsigned char* buf = evt->u.fwd.buf; int buflen = evt->u.fwd.buflen; @@ -909,20 +971,20 @@ CookieRef::forward( const CRefEvent* evt ) int destSocket = SocketForHost( dest ); - if ( destSocket != -1 ) { - /* This is an ugly hack!!!! */ - *buf = XWRELAY_MSG_FROMRELAY; - send_with_length( destSocket, buf, buflen, true ); + /* This is an ugly hack!!!! */ + *buf = XWRELAY_MSG_FROMRELAY; - /* also note that we've heard from src recently */ -#ifdef RELAY_HEARTBEAT - HostID src = evt->u.fwd.src; - pushHeartbeatEvent( src, SocketForHost(src) ); -#endif - } else { - /* We're not really connected yet! */ + if ( (destSocket == -1) + || !send_with_length( destSocket, buf, buflen, true ) ) { + store_message( dest, buf, buflen ); } -} /* forward */ + + /* also note that we've heard from src recently */ +#ifdef RELAY_HEARTBEAT + HostID src = evt->u.fwd.src; + pushHeartbeatEvent( src, SocketForHost(src) ); +#endif +} /* forward_or_store */ void CookieRef::send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why, diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index c1b98f724..70c52dba6 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -31,6 +31,9 @@ #include "xwrelay.h" #include "states.h" +typedef vector MsgBuffer; +typedef deque MsgBufQueue; + using namespace std; class CookieMapIterator; /* forward */ @@ -157,7 +160,7 @@ class CookieRef { } u; } CRefEvent; - void send_with_length( int socket, unsigned char* buf, int bufLen, + bool send_with_length( int socket, unsigned char* buf, int bufLen, bool cascade ); void send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why, bool cascade ); @@ -185,6 +188,7 @@ class CookieRef { void handleEvents(); void sendResponse( const CRefEvent* evt, bool initial ); + void sendAnyStored( const CRefEvent* evt ); void populate( vector hosts ); void increasePlayerCounts( const CRefEvent* evt ); void reducePlayerCounts( int socket ); @@ -192,7 +196,7 @@ class CookieRef { void setAllConnectedTimer(); void cancelAllConnectedTimer(); - void forward( const CRefEvent* evt ); + void forward_or_store( const CRefEvent* evt ); void checkFromServer( const CRefEvent* evt ); void notifyOthers( int socket, XWRelayMsg msg, XWREASON why ); @@ -216,9 +220,16 @@ class CookieRef { bool tryMakeGame( vector& remaining ); void insertSorted( HostRec hr ); + void store_message( HostID dest, const unsigned char* buf, + unsigned int len ); + void send_stored_messages( HostID dest, int socket ); + unsigned int count_msgs_stored( void ) { return m_nHostMsgs; } + /* timer callback */ static void s_checkAllConnected( void* closure ); + unsigned int m_nHostMsgs; + MsgBufQueue m_hostMsgQueues[4]; vector m_sockets; bool m_gameFull; /* once we've filled up, no more *new* connections ever */ diff --git a/xwords4/relay/states.cpp b/xwords4/relay/states.cpp index f9bd44b03..4c3fa8919 100644 --- a/xwords4/relay/states.cpp +++ b/xwords4/relay/states.cpp @@ -78,7 +78,9 @@ StateTable g_stateTable[] = { /* I'm seeing this but not sure how to handle. Might disconnect be needed now */ -{ XWS_MISSING, XWE_FORWARDMSG, XWA_DISCONNECT, XWS_MISSING }, + +{ XWS_MISSING, XWE_NOMORESOCKETS, XWA_NOTE_EMPTY, XWS_MISSING }, +{ XWS_MISSING, XWE_NOMOREMSGS, XWA_NONE, XWS_DEAD }, { XWS_ANY, XWE_NOMORESOCKETS, XWA_NONE, XWS_DEAD }, { XWS_ANY, XWE_SHUTDOWN, XWA_SHUTDOWN, XWS_DEAD }, @@ -115,6 +117,7 @@ StateTable g_stateTable[] = { /* This is our bread-n-butter */ { XWS_ALLCONND, XWE_FORWARDMSG, XWA_FWD, XWS_ALLCONND }, +{ XWS_MISSING, XWE_FORWARDMSG, XWA_FWD, XWS_MISSING }, { XWS_DEAD, XWE_REMOVESOCKET, XWA_REMOVESOCKET, XWS_DEAD } @@ -198,6 +201,7 @@ eventString( XW_RELAY_EVENT evt ) CASESTR(XWE_ANY); CASESTR(XWE_REMOVESOCKET); CASESTR(XWE_NOMORESOCKETS); + CASESTR(XWE_NOMOREMSGS); CASESTR(XWE_NOTIFYDISCON); CASESTR(XWE_ALLHERE); CASESTR(XWE_SOMEMISSING); @@ -230,6 +234,7 @@ actString( XW_RELAY_ACTION act ) CASESTR(XWA_REMOVESOCKET); CASESTR(XWA_HEARTDISCONN); CASESTR(XWA_SHUTDOWN); + CASESTR(XWA_NOTE_EMPTY); CASESTR(XWA_POSTCLONE); default: assert(0); diff --git a/xwords4/relay/states.h b/xwords4/relay/states.h index 52c6f9c9a..eade98ffc 100644 --- a/xwords4/relay/states.h +++ b/xwords4/relay/states.h @@ -111,6 +111,8 @@ typedef enum { ,XWE_NOMORESOCKETS /* last socket's been removed */ + ,XWE_NOMOREMSGS /* No messages are stored here for disconnected + hosts */ ,XWE_SHUTDOWN /* shutdown this game */ ,XWE_ANY /* wildcard; matches all */ @@ -138,6 +140,8 @@ typedef enum { ,XWA_NOTEHEART /* Record heartbeat received */ + ,XWA_NOTE_EMPTY /* No sockets left; check if can delete */ + ,XWA_TIMERDISCONN /* disconnect all because of a timer */ ,XWA_DISCONNECT