Fix problems reconnecting hosts where some but not all received the

ALLHERE message and connName: change relay protocol so cookie is
included in RECONNECT message, and hostIDs are not assigned until
ALLHERE, and change host-to-game matching to use connName first but
fall back to cookie.  This fixes nearly all cases failing to reconnect
after relay goes down.
This commit is contained in:
ehouse 2009-08-21 12:00:09 +00:00
parent f5f5df990e
commit dbf9daf71b
8 changed files with 313 additions and 208 deletions

View file

@ -87,7 +87,8 @@ typedef enum {
struct CommsCtxt {
XW_UtilCtxt* util;
XP_U32 connID; /* 0 means ignore; otherwise must match */
XP_U32 connID; /* set from gameID: 0 means ignore; otherwise
must match. Set by server. */
XP_PlayerAddr nextChannelNo;
AddressRecord* recs; /* return addresses */
@ -136,6 +137,7 @@ struct CommsCtxt {
XP_U16 nPlayersHere;
XP_U16 nPlayersTotal;
XP_Bool connecting;
XP_Bool connNameValid; /* this can probably go. PENDING */
} r;
XP_Bool isServer;
@ -195,16 +197,47 @@ static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, BTIPMsgType typ,
* implementation
****************************************************************************/
#ifdef XWFEATURE_RELAY
#ifdef DEBUG
static const char*
CommsRelayState2Str( CommsRelayState state )
{
#define CASE_STR(s) case s: return #s
switch( state ) {
CASE_STR(COMMS_RELAYSTATE_UNCONNECTED);
CASE_STR(COMMS_RELAYSTATE_CONNECT_PENDING);
CASE_STR(COMMS_RELAYSTATE_CONNECTED);
CASE_STR(COMMS_RELAYSTATE_ALLCONNECTED);
default:
assert(0);
}
#undef CASE_STR
return NULL;
}
#endif
static void
set_relay_state( CommsCtxt* comms, CommsRelayState state )
{
if ( comms->r.relayState != state ) {
XP_LOGF( "%s: %s => %s", __func__,
CommsRelayState2Str(comms->r.relayState),
CommsRelayState2Str(state) );
comms->r.relayState = state;
}
}
static void
init_relay( CommsCtxt* comms, XP_U16 nPlayersHere, XP_U16 nPlayersTotal )
{
comms->r.myHostID = comms->isServer? HOST_ID_SERVER: HOST_ID_NONE;
XP_LOGF( "%s: set myHostID to %d", __func__, comms->r.myHostID );
comms->r.relayState = COMMS_RELAYSTATE_UNCONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_UNCONNECTED );
comms->r.nPlayersHere = nPlayersHere;
comms->r.nPlayersTotal = nPlayersTotal;
comms->r.cookieID = COOKIE_ID_NONE;
comms->r.connName[0] = '\0';
comms->r.connNameValid = XP_FALSE;
}
#endif
@ -350,7 +383,7 @@ comms_setConnID( CommsCtxt* comms, XP_U32 connID )
{
XP_ASSERT( CONN_ID_NONE != connID );
comms->connID = connID;
XP_STATUSF( "%s: set connID to %lx", __func__, connID );
XP_STATUSF( "%s: set connID (gameID) to %lx", __func__, connID );
} /* comms_setConnID */
static void
@ -437,8 +470,11 @@ comms_makeFromStream( MPFORMAL XWStreamCtxt* stream, XW_UtilCtxt* util,
comms->nextChannelNo = stream_getU16( stream );
if ( addr.conType == COMMS_CONN_RELAY ) {
comms->r.myHostID = stream_getU8( stream );
stringFromStreamHere( stream, comms->r.connName,
sizeof(comms->r.connName) );
comms->r.connNameValid = stream_getU8( stream );
if ( comms->r.connNameValid ) {
stringFromStreamHere( stream, comms->r.connName,
sizeof(comms->r.connName) );
}
}
#ifdef DEBUG
@ -528,7 +564,7 @@ sendConnect( CommsCtxt* comms, XP_Bool breakExisting )
case COMMS_CONN_RELAY:
if ( breakExisting
|| COMMS_RELAYSTATE_UNCONNECTED == comms->r.relayState ) {
comms->r.relayState = COMMS_RELAYSTATE_UNCONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_UNCONNECTED );
relayConnect( comms );
}
break;
@ -608,7 +644,10 @@ comms_writeToStream( const CommsCtxt* comms, XWStreamCtxt* stream )
stream_putU16( stream, comms->nextChannelNo );
if ( comms->addr.conType == COMMS_CONN_RELAY ) {
stream_putU8( stream, comms->r.myHostID );
stringToStream( stream, comms->r.connName );
stream_putU8( stream, comms->r.connNameValid );
if ( comms->r.connNameValid ) {
stringToStream( stream, comms->r.connName );
}
}
#ifdef DEBUG
@ -1043,17 +1082,20 @@ relayPreProcess( CommsCtxt* comms, XWStreamCtxt* stream, XWHostID* senderID )
case XWRELAY_CONNECT_RESP:
case XWRELAY_RECONNECT_RESP:
comms->r.relayState = COMMS_RELAYSTATE_CONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_CONNECTED );
comms->r.heartbeat = stream_getU16( stream );
comms->r.cookieID = stream_getU16( stream );
comms->r.myHostID = (XWHostID)stream_getU8( stream );
XP_LOGF( "set cookieID = %d; set hostid: %x",
comms->r.cookieID, comms->r.myHostID );
XP_LOGF( "set cookieID = %d", comms->r.cookieID );
setHeartbeatTimer( comms );
break;
case XWRELAY_ALLHERE:
comms->r.relayState = COMMS_RELAYSTATE_ALLCONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_ALLCONNECTED );
srcID = (XWHostID)stream_getU8( stream );
XP_ASSERT( comms->r.myHostID == HOST_ID_NONE
|| comms->r.myHostID == srcID );
comms->r.myHostID = srcID;
XP_LOGF( "set hostid: %x", comms->r.myHostID );
hasName = stream_getU8( stream );
if ( hasName ) {
stringFromStreamHere( stream, comms->r.connName,
@ -1081,6 +1123,10 @@ relayPreProcess( CommsCtxt* comms, XWStreamCtxt* stream, XWHostID* senderID )
XP_LOGF( "%s: rejecting data message", __func__ );
} else {
*senderID = srcID;
if ( !comms->r.connNameValid ) {
XP_LOGF( "%s: setting connNameValid", __func__ );
comms->r.connNameValid = XP_TRUE;
}
}
break;
@ -1096,7 +1142,7 @@ relayPreProcess( CommsCtxt* comms, XWStreamCtxt* stream, XWHostID* senderID )
case XWRELAY_CONNECTDENIED: /* Close socket for this? */
relayErr = stream_getU8( stream );
util_userError( comms->util, ERR_RELAY_BASE + relayErr );
comms->r.relayState = COMMS_RELAYSTATE_UNCONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_UNCONNECTED );
/* fallthru */
default:
XP_LOGF( "%s: dropping relay msg with cmd %d", __func__, (XP_U16)cmd );
@ -1202,7 +1248,7 @@ getRecordFor( CommsCtxt* comms, const CommsAddrRec* addr,
break;
case COMMS_CONN_IR: /* no way to test */
break;
case COMMS_CONN_SMS: /* no way to test */
case COMMS_CONN_SMS:
if ( ( 0 == XP_MEMCMP( &addr->u.sms.phone, &rec->addr.u.sms.phone,
sizeof(addr->u.sms.phone) ) )
&& addr->u.sms.port == rec->addr.u.sms.port ) {
@ -1326,7 +1372,7 @@ validateChannelMessage( CommsCtxt* comms, const CommsAddrRec* addr,
rec = NULL;
}
} else {
XP_LOGF( "%s: no rec for addr", __func__ );
XP_LOGF( "%s: no rec for channelNo %d", __func__, channelNo );
}
LOG_RETURNF( XP_P, rec );
@ -1337,7 +1383,7 @@ XP_Bool
comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
const CommsAddrRec* retAddr )
{
XP_Bool validMessage = XP_FALSE;
XP_Bool messageValid = XP_FALSE;
XWHostID senderID = 0; /* unset; default for non-relay cases */
XP_Bool usingRelay = XP_FALSE;
AddressRecord* rec = NULL;
@ -1357,15 +1403,15 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
XP_U16 payloadSize;
connID = stream_getU32( stream );
XP_STATUSF( "%s: read connID of %lx", __func__, connID );
XP_STATUSF( "%s: read connID (gameID) of %lx", __func__, connID );
channelNo = stream_getU16( stream );
XP_STATUSF( "read channelNo %d", channelNo );
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
XP_DEBUGF( "rcd: msgID=" XP_LD ",lastMsgRcd=" XP_LD " on chnl %d",
msgID, lastMsgRcd, channelNo );
XP_DEBUGF( "rcd on channelNo %d: msgID=%ld,lastMsgRcd=%ld ",
channelNo, msgID, lastMsgRcd );
payloadSize = stream_getSize( stream ) > 0; /* anything left? */
if ( connID == CONN_ID_NONE ) {
/* special case: initial message from client */
rec = validateInitialMessage( comms, payloadSize > 0, retAddr,
@ -1375,13 +1421,13 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
lastMsgRcd );
}
validMessage = NULL != rec;
if ( validMessage ) {
messageValid = NULL != rec;
if ( messageValid ) {
rec->lastMsgRcd = msgID;
XP_LOGF( "%s: set channel %d's lastMsgRcd to " XP_LD,
__func__, channelNo, msgID );
stream_setAddress( stream, channelNo );
validMessage = payloadSize > 0;
messageValid = payloadSize > 0;
}
} else {
XP_LOGF( "%s: message too small", __func__ );
@ -1391,8 +1437,8 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
/* Call after we've had a chance to create rec for addr */
noteHBReceived( comms/* , addr */ );
LOG_RETURNF( "%d", (XP_U16)validMessage );
return validMessage;
LOG_RETURNF( "%d", (XP_U16)messageValid );
return messageValid;
} /* comms_checkIncomingStream */
#ifdef COMMS_HEARTBEAT
@ -1680,17 +1726,18 @@ send_via_relay( CommsCtxt* comms, XWRELAY_Cmd cmd, XWHostID destID,
stream_putU8( tmpStream, comms->r.nPlayersHere );
stream_putU8( tmpStream, comms->r.nPlayersTotal );
comms->r.relayState = COMMS_RELAYSTATE_CONNECT_PENDING;
set_relay_state( comms,COMMS_RELAYSTATE_CONNECT_PENDING );
break;
case XWRELAY_GAME_RECONNECT:
stream_putU8( tmpStream, XWRELAY_PROTO_VERSION );
stringToStream( tmpStream, addr.u.ip_relay.cookie );
stream_putU8( tmpStream, comms->r.myHostID );
stream_putU8( tmpStream, comms->r.nPlayersHere );
stream_putU8( tmpStream, comms->r.nPlayersTotal );
stringToStream( tmpStream, comms->r.connName );
comms->r.relayState = COMMS_RELAYSTATE_CONNECT_PENDING;
set_relay_state( comms, COMMS_RELAYSTATE_CONNECT_PENDING );
break;
case XWRELAY_GAME_DISCONNECT:
@ -1741,9 +1788,8 @@ relayConnect( CommsCtxt* comms )
LOG_FUNC();
if ( comms->addr.conType == COMMS_CONN_RELAY && !comms->r.connecting ) {
comms->r.connecting = XP_TRUE;
success = send_via_relay( comms,
comms->r.connName[0] == '\0' ?
XWRELAY_GAME_CONNECT:XWRELAY_GAME_RECONNECT,
success = send_via_relay( comms, comms->r.connNameValid?
XWRELAY_GAME_RECONNECT : XWRELAY_GAME_CONNECT,
comms->r.myHostID, NULL, 0 );
comms->r.connecting = XP_FALSE;
}
@ -1788,7 +1834,7 @@ relayDisconnect( CommsCtxt* comms )
LOG_FUNC();
if ( comms->addr.conType == COMMS_CONN_RELAY ) {
if ( comms->r.relayState != COMMS_RELAYSTATE_UNCONNECTED ) {
comms->r.relayState = COMMS_RELAYSTATE_UNCONNECTED;
set_relay_state( comms, COMMS_RELAYSTATE_UNCONNECTED );
send_via_relay( comms, XWRELAY_GAME_DISCONNECT, HOST_ID_NONE,
NULL, 0 );
}

View file

@ -39,6 +39,7 @@
#include "timermgr.h"
#include "configs.h"
#include "crefmgr.h"
#include "permid.h"
using namespace std;
@ -82,7 +83,7 @@ void
CookieRef::ReInit( const char* cookie, const char* connName, CookieID id )
{
m_cookie = cookie==NULL?"":cookie;
m_connName = connName;
m_connName = connName==NULL?"":connName;
m_cookieID = id;
m_totalSent = 0;
m_curState = XWS_INITED;
@ -113,14 +114,9 @@ CookieRef::~CookieRef()
XWThreadPool* tPool = XWThreadPool::GetTPool();
ASSERT_LOCKED();
for ( ; ; ) {
map<HostID,HostRec>::iterator iter = m_sockets.begin();
if ( iter == m_sockets.end() ) {
break;
}
int socket = iter->second.m_socket;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int socket = iter->m_socket;
tPool->CloseSocket( socket );
m_sockets.erase( iter );
}
@ -175,12 +171,6 @@ void
CookieRef::_Connect( int socket, HostID hid, int nPlayersH, int nPlayersT )
{
if ( CRefMgr::Get()->Associate( socket, this ) ) {
if ( hid == HOST_ID_NONE ) {
hid = nextHostID();
logf( XW_LOGINFO, "assigned host id: %x", hid );
} else {
logf( XW_LOGINFO, "NOT assigned host id; why?" );
}
pushConnectEvent( socket, hid, nPlayersH, nPlayersT );
handleEvents();
} else {
@ -192,7 +182,6 @@ void
CookieRef::_Reconnect( int socket, HostID hid, int nPlayersH, int nPlayersT )
{
(void)CRefMgr::Get()->Associate( socket, this );
/* MutexLock ml( &m_EventsMutex ); */
pushReconnectEvent( socket, hid, nPlayersH, nPlayersT );
handleEvents();
}
@ -225,14 +214,16 @@ CookieRef::_Shutdown()
int
CookieRef::SocketForHost( HostID dest )
{
int socket;
int socket = -1;
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.find( dest );
if ( iter == m_sockets.end() ) {
socket = -1;
} else {
socket = iter->second.m_socket;
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == dest ) {
socket = iter->m_socket;
break;
}
}
logf( XW_LOGVERBOSE0, "returning socket=%d for hostid=%x", socket, dest );
return socket;
}
@ -250,7 +241,8 @@ CookieRef::NeverFullyConnected()
}
bool
CookieRef::AcceptingReconnections( HostID hid, int nPlayersH, int nPlayersT )
CookieRef::AcceptingReconnections( HostID hid, const char* cookie,
int nPlayersH )
{
bool accept = false;
/* First, do we have room. Second, are we missing this guy? */
@ -274,6 +266,13 @@ CookieRef::AcceptingReconnections( HostID hid, int nPlayersH, int nPlayersT )
}
}
/* Error to connect if cookie doesn't match. */
if ( accept && !!cookie && 0 != strcmp( cookie, Cookie() ) ) {
logf( XW_LOGERROR, "%s: not accepting b/c cookie mismatch: %s vs %s",
__func__, cookie, Cookie() );
accept = false;
}
return accept;
} /* AcceptingReconnections */
@ -295,19 +294,18 @@ CookieRef::removeSocket( int socket )
logf( XW_LOGINFO, "%s(socket=%d)", __func__, socket );
int count;
{
/* RWWriteLock rwl( &m_sockets_rwlock ); */
ASSERT_LOCKED();
count = m_sockets.size();
assert( count > 0 );
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
if ( iter->second.m_socket == socket ) {
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == socket ) {
m_sockets.erase(iter);
--count;
break;
}
++iter;
}
}
@ -333,13 +331,12 @@ CookieRef::HasSocket_locked( int socket )
bool found = false;
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
if ( iter->second.m_socket == socket ) {
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == socket ) {
found = true;
break;
}
++iter;
}
return found;
@ -349,7 +346,6 @@ CookieRef::HasSocket_locked( int socket )
void
CookieRef::_HandleHeartbeat( HostID id, int socket )
{
/* MutexLock ml( &m_EventsMutex ); */
pushHeartbeatEvent( id, socket );
handleEvents();
} /* HandleHeartbeat */
@ -357,15 +353,14 @@ CookieRef::_HandleHeartbeat( HostID id, int socket )
void
CookieRef::_CheckHeartbeats( time_t now )
{
logf( XW_LOGINFO, "CookieRef::_CheckHeartbeats" );
logf( XW_LOGINFO, "%s", __func__ );
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
time_t last = iter->second.m_lastHeartbeat;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
time_t last = iter->m_lastHeartbeat;
if ( (now - last) > GetHeartbeat() ) {
pushHeartFailedEvent( iter->second.m_socket );
pushHeartFailedEvent( iter->m_socket );
}
++iter;
}
handleEvents();
@ -387,7 +382,7 @@ CookieRef::_Remove( int socket )
} /* Forward */
void
CookieRef::pushConnectEvent( int socket, HostID srcID,
CookieRef::pushConnectEvent( int socket, HostID srcID,
int nPlayersH, int nPlayersT )
{
CRefEvent evt;
@ -400,7 +395,7 @@ CookieRef::pushConnectEvent( int socket, HostID srcID,
} /* pushConnectEvent */
void
CookieRef::pushReconnectEvent( int socket, HostID srcID,
CookieRef::pushReconnectEvent( int socket, HostID srcID,
int nPlayersH, int nPlayersT )
{
CRefEvent evt;
@ -426,6 +421,7 @@ CookieRef::pushHeartbeatEvent( HostID id, int socket )
void
CookieRef::pushHeartFailedEvent( int socket )
{
logf( XW_LOGINFO, "%s", __func__ );
CRefEvent evt;
evt.type = XWE_HEARTFAILED;
evt.u.heart.socket = socket;
@ -555,6 +551,8 @@ CookieRef::handleEvents()
case XWA_SENDALLHERE:
case XWA_SNDALLHERE_2:
cancelAllConnectedTimer();
assignConnName();
assignHostIds();
sendAllHere( takeAction == XWA_SENDALLHERE );
break;
@ -607,7 +605,7 @@ CookieRef::increasePlayerCounts( const CRefEvent* evt )
int nPlayersT = evt->u.con.nPlayersT;
HostID hid = evt->u.con.srcID;
logf( XW_LOGVERBOSE1, "increasePlayerCounts: hid=%d, nPlayersH=%d, "
logf( XW_LOGINFO, "increasePlayerCounts: hid=%d, nPlayersH=%d, "
"nPlayersT=%d", hid, nPlayersH, nPlayersT );
if ( hid == HOST_ID_SERVER ) {
@ -633,25 +631,21 @@ CookieRef::reducePlayerCounts( int socket )
{
logf( XW_LOGVERBOSE1, "reducePlayerCounts on socket %d", socket );
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
if ( iter->second.m_socket == socket ) {
assert( iter->first != 0 );
if ( iter->first == HOST_ID_SERVER ) {
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_socket == socket ) {
if ( iter->m_hostID == HOST_ID_SERVER ) {
m_nPlayersSought = 0;
} else {
assert( iter->second.m_nPlayersT == 0 );
assert( iter->m_nPlayersT == 0 );
}
m_nPlayersHere -= iter->second.m_nPlayersH;
m_nPlayersHere -= iter->m_nPlayersH;
logf( XW_LOGVERBOSE1, "reducePlayerCounts: m_nPlayersHere=%d; m_nPlayersSought=%d",
m_nPlayersHere, m_nPlayersSought );
break;
}
++iter;
}
} /* reducePlayerCounts */
@ -710,11 +704,12 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial )
int nPlayersH = evt->u.con.nPlayersH;
int nPlayersT = evt->u.con.nPlayersT;
assert( id != HOST_ID_NONE );
logf( XW_LOGINFO, "remembering pair: hostid=%x, socket=%d", id, socket );
HostRec hr(socket, nPlayersH, nPlayersT);
ASSERT_LOCKED();
m_sockets.insert( pair<HostID,HostRec>(id,hr) );
logf( XW_LOGINFO, "%s: remembering pair: hostid=%x, socket=%d (size=%d)",
__func__, id, socket, m_sockets.size());
HostRec hr(id, socket, nPlayersH, nPlayersT);
m_sockets.push_back( hr );
/* Now send the response */
unsigned char buf[1 + /* cmd */
@ -728,11 +723,9 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial )
*bufp++ = initial ? XWRELAY_CONNECT_RESP : XWRELAY_RECONNECT_RESP;
putNetShort( &bufp, GetHeartbeat() );
putNetShort( &bufp, GetCookieID() );
logf( XW_LOGVERBOSE0, "writing hostID of %d into msg", id );
*bufp++ = (char)id;
send_with_length( socket, buf, bufp - buf, true );
logf( XW_LOGVERBOSE0, "sent XWRELAY_CONNECTRESP" );
logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) );
} /* sendResponse */
void
@ -789,15 +782,13 @@ CookieRef::notifyOthers( int socket, XWRelayMsg msg, XWREASON why )
{
assert( socket != 0 );
/* RWReadLock ml( &m_sockets_rwlock ); */
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
int other = iter->second.m_socket;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int other = iter->m_socket;
if ( other != socket ) {
send_msg( other, iter->first, msg, why, false );
send_msg( other, iter->m_hostID, msg, why, false );
}
++iter;
}
} /* notifyOthers */
@ -806,38 +797,82 @@ CookieRef::sendAllHere( bool includeName )
{
unsigned char buf[1 + 1 + MAX_CONNNAME_LEN];
unsigned char* bufp = buf;
unsigned char* idLoc;
*bufp++ = XWRELAY_ALLHERE;
idLoc = bufp++; /* space for hostId, remembering address */
*bufp++ = includeName? 1 : 0;
if ( includeName ) {
const char* connName = ConnName();
assert( !!connName && connName[0] );
int len = strlen( connName );
assert( len < MAX_CONNNAME_LEN );
*bufp++ = (char)len;
memcpy( bufp, connName, len );
bufp += len;
}
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
vector<HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
send_with_length( iter->second.m_socket, buf, bufp-buf,
logf( XW_LOGINFO, "%s: sending to hostid %d", __func__,
iter->m_hostID );
*idLoc = iter->m_hostID; /* write in this target's hostId */
send_with_length( iter->m_socket, buf, bufp-buf,
true );
++iter;
}
} /* sendAllHere */
void
CookieRef::assignConnName( void )
{
if ( !ConnName()[0] ) {
m_connName = PermID::GetNextUniqueID();
logf( XW_LOGINFO, "%s: assigning name: %s", __func__, ConnName() );
assert( GetCookieID() != 0 );
} else {
logf( XW_LOGINFO, "%s: has name: %s", __func__, ConnName() );
}
} /* assignConnName */
void
CookieRef::assignHostIds( void )
{
ASSERT_LOCKED();
HostID nextId = HOST_ID_SERVER;
unsigned int bits = 0;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID != HOST_ID_NONE ) {
bits |= 1 << iter->m_hostID;
}
}
assert( (bits & (1 << HOST_ID_SERVER)) != 0 );
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == HOST_ID_NONE ) {
while ( ((1 << nextId) & bits) != 0 ) {
++nextId;
}
iter->m_hostID = nextId++; /* ++: don't reuse */
}
}
}
void
CookieRef::disconnectSockets( int socket, XWREASON why )
{
if ( socket == 0 ) {
/* RWReadLock ml( &m_sockets_rwlock ); */
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
assert( iter->second.m_socket != 0 );
disconnectSockets( iter->second.m_socket, why );
++iter;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
assert( iter->m_socket != 0 );
disconnectSockets( iter->m_socket, why );
}
} else {
pushNotifyDisconEvent( socket, why );
@ -851,21 +886,30 @@ CookieRef::noteHeartbeat( const CRefEvent* evt )
int socket = evt->u.heart.socket;
HostID id = evt->u.heart.id;
/* RWWriteLock rwl( &m_sockets_rwlock ); */
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter = m_sockets.find(id);
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == id ) {
break;
}
}
if ( iter == m_sockets.end() ) {
logf( XW_LOGERROR, "no socket for HostID %x", id );
} else {
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. */
assert( iter->second.m_socket == socket );
/* if see this again recover from it */
logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, uptime() );
iter->second.m_lastHeartbeat = uptime();
int second_socket = iter->m_socket;
if ( second_socket == socket ) {
logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d",
iter->m_lastHeartbeat, uptime() );
iter->m_lastHeartbeat = uptime();
} else {
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. But:
now it's happening after a crash and clients reconnect. */
logf( XW_LOGERROR, "wrong socket record for HostID %x; wanted %d, found %d",
id, socket, second_socket );
}
}
} /* noteHeartbeat */
@ -882,7 +926,7 @@ CookieRef::s_checkAllConnected( void* closure )
void
CookieRef::_CheckAllConnected()
{
logf( XW_LOGVERBOSE0, "checkAllConnected" );
logf( XW_LOGVERBOSE0, "%s", __func__ );
/* MutexLock ml( &m_EventsMutex ); */
CRefEvent newEvt;
newEvt.type = XWE_CONNTIMER;
@ -943,13 +987,12 @@ CookieRef::_PrintCookieInfo( string& out )
snprintf( buf, sizeof(buf), "Hosts connected=%d; cur time = %ld\n",
m_sockets.size(), uptime() );
out += buf;
map<HostID,HostRec>::iterator iter = m_sockets.begin();
while ( iter != m_sockets.end() ) {
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n",
iter->first, iter->second.m_socket,
iter->second.m_lastHeartbeat );
iter->m_hostID, iter->m_socket,
iter->m_lastHeartbeat );
out += buf;
++iter;
}
} /* PrintCookieInfo */
@ -957,20 +1000,18 @@ CookieRef::_PrintCookieInfo( string& out )
void
CookieRef::_FormatHostInfo( string* hostIds, string* addrs )
{
logf( XW_LOGINFO, "%s", __func__ );
ASSERT_LOCKED();
map<HostID,HostRec>::iterator iter;
vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( !!hostIds ) {
char buf[8];
snprintf( buf, sizeof(buf), "%d ", iter->first );
snprintf( buf, sizeof(buf), "%d ", iter->m_hostID );
*hostIds += buf;
}
if ( !!addrs ) {
int s = iter->second.m_socket;
int s = iter->m_socket;
struct sockaddr_in name;
socklen_t siz = sizeof(name);
if ( 0 == getpeername( s, (struct sockaddr*)&name, &siz) ) {

View file

@ -35,15 +35,16 @@ using namespace std;
class CookieMapIterator; /* forward */
class HostRec {
struct HostRec {
public:
HostRec(int socket, int nPlayersH, int nPlayersT)
: m_socket(socket)
HostRec(HostID hostID, int socket, int nPlayersH, int nPlayersT)
: m_hostID(hostID)
, m_socket(socket)
, m_nPlayersH(nPlayersH)
, m_nPlayersT(nPlayersT)
, m_lastHeartbeat(uptime())
{}
~HostRec() {}
HostID m_hostID;
int m_socket;
int m_nPlayersH;
int m_nPlayersT;
@ -87,7 +88,8 @@ class CookieRef {
int SocketForHost( HostID dest );
bool NeverFullyConnected();
bool AcceptingReconnections( HostID hid, int nPlayersH, int nPlayersT );
bool AcceptingReconnections( HostID hid, const char* cookie,
int nPlayersH );
/* for console */
void _PrintCookieInfo( string& out );
@ -160,9 +162,9 @@ class CookieRef {
m_totalSent += nBytes;
}
void pushConnectEvent( int socket, HostID srcID,
void pushConnectEvent( int socket, HostID srcID,
int nPlayersH, int nPlayersT );
void pushReconnectEvent( int socket, HostID srcID,
void pushReconnectEvent( int socket, HostID srcID,
int nPlayersH, int nPlayersT );
void pushHeartbeatEvent( HostID id, int socket );
void pushHeartFailedEvent( int socket );
@ -193,6 +195,8 @@ class CookieRef {
void notifyDisconn(const CRefEvent* evt);
void removeSocket( int socket );
void sendAllHere( bool includeName );
void assignConnName( void );
void assignHostIds( void );
HostID nextHostID() { return ++m_nextHostID; }
@ -203,8 +207,7 @@ class CookieRef {
/* timer callback */
static void s_checkAllConnected( void* closure );
map<HostID,HostRec> m_sockets;
/* pthread_rwlock_t m_sockets_rwlock; */
vector<HostRec> m_sockets;
int m_heatbeat; /* might change per carrier or something. */
string m_cookie; /* cookie used for initial connections */

View file

@ -28,7 +28,6 @@
#include "crefmgr.h"
#include "cref.h"
#include "mlock.h"
#include "permid.h"
#include "configs.h"
#include "timermgr.h"
@ -61,6 +60,7 @@ CRefMgr::CRefMgr()
{
/* should be using pthread_once() here */
pthread_mutex_init( &m_SocketStuffMutex, NULL );
pthread_mutex_init( &m_nextCIDMutex, NULL );
pthread_mutex_init( &m_freeList_mutex, NULL );
pthread_rwlock_init( &m_cookieMapRWLock, NULL );
}
@ -103,44 +103,57 @@ CRefMgr::CloseAll()
}
} /* CloseAll */
/* Matching hosts to games. If they have a connName, it's easy. That's the
* only thing we'll match on, and it must match, and the game must have room.
* If they have a cookie as well, it must match. If only a cookie is
* provided, we may be dealing with a new game *or* a reconnect from somebody
* who didn't get his connName yet (even if other participants did.)
*/
CookieRef*
CRefMgr::FindOpenGameFor( const char* cORn, bool isCookie,
CRefMgr::FindOpenGameFor( const char* cookie, const char* connName,
HostID hid, int socket, int nPlayersH, int nPlayersT )
{
logf( XW_LOGINFO, "%s(cORn=%s,hid=%d,socket=%d)", __func__, cORn, hid,
socket );
CookieRef* cref = NULL;
RWReadLock rwl( &m_cookieMapRWLock );
logf( XW_LOGINFO, "%s(cookie=%s,connName=%s,hid=%d,socket=%d)", __func__,
cookie, connName, hid, socket );
CookieRef* found = NULL;
CookieMap::iterator iter = m_cookieMap.begin();
while ( iter != m_cookieMap.end() ) {
cref = iter->second;
if ( isCookie ) {
if ( 0 == strcmp( cref->Cookie(), cORn ) ) {
if ( cref->NeverFullyConnected() ) {
break;
} else if ( cref->HasSocket(socket) ) {
logf( XW_LOGINFO, "%s: HasSocket case", __func__ );
break;
if ( !!cookie || !!connName ) { /* drop if both are null */
RWReadLock rwl( &m_cookieMapRWLock );
CookieMap::iterator iter;
for ( iter = m_cookieMap.begin();
NULL == found && iter != m_cookieMap.end();
++iter ) {
CookieRef* cref = iter->second;
if ( !!connName ) {
if ( 0 == strcmp( cref->ConnName(), connName ) ) {
if ( cref->Lock() ) {
assert( !cookie || 0 == strcmp( cookie, cref->Cookie() ) );
if ( cref->AcceptingReconnections( hid, cookie,
nPlayersH ) ) {
found = cref;
}
cref->Unlock();
}
}
}
} else {
if ( 0 == strcmp( cref->ConnName(), cORn ) ) {
bool found = false;
if ( cref->Lock() ) {
found = cref->AcceptingReconnections( hid, nPlayersH,
nPlayersH );
cref->Unlock();
}
if ( found ) {
break;
} else if ( !!cookie ) {
if ( 0 == strcmp( cref->Cookie(), cookie ) ) {
if ( cref->NeverFullyConnected() ) {
found = cref;
} else if ( cref->HasSocket(socket) ) {
logf( XW_LOGINFO, "%s: HasSocket case", __func__ );
found = cref;
}
}
}
}
++iter;
}
return (iter == m_cookieMap.end()) ? NULL : cref;
logf( XW_LOGINFO, "%s=>%p", __func__, found );
return found;
} /* FindOpenGameFor */
CookieID
@ -148,12 +161,14 @@ CRefMgr::nextCID( const char* connName )
{
/* Later may want to guarantee that wrap-around doesn't cause an overlap.
But that's really only a theoretical possibility. */
MutexLock ml(&m_nextCIDMutex);
return ++m_nextCID;
} /* nextCID */
int
CRefMgr::GetNumGamesSeen( void )
{
MutexLock ml(&m_nextCIDMutex);
return m_nextCID;
}
@ -207,36 +222,23 @@ CRefMgr::getFromFreeList( void )
CookieRef*
CRefMgr::getMakeCookieRef_locked( const char* cORn, bool isCookie, HostID hid,
int socket, int nPlayersH, int nPlayersT )
CRefMgr::getMakeCookieRef_locked( const char* cookie, const char* connName,
HostID hid, int socket, int nPlayersH,
int nPlayersT )
{
CookieRef* cref;
/* We have a cookie from a new connection. This may be the first time
it's been seen, or there may be a game currently in the
XW_ST_CONNECTING state, or it may be a dupe of a connect packet. So we
need to look up the cookie first, then generate new connName and
cookieIDs if it's not found. */
/* We have a cookie from a new connection or from a reconnect. This may
be the first time it's been seen, or there may be a game currently in
the XW_ST_CONNECTING state, or it may be a dupe of a connect packet.
If there's a game, cool. Otherwise add a new one. Pass the connName
which will be used if set, but if not set we'll be generating another
later when the game is complete.
*/
cref = FindOpenGameFor( cORn, isCookie, hid, socket, nPlayersH, nPlayersT );
cref = FindOpenGameFor( cookie, connName, hid, socket, nPlayersH, nPlayersT );
if ( cref == NULL ) {
string s;
const char* connName;
const char* cookie = NULL;
if ( isCookie ) {
cookie = cORn;
s = PermID::GetNextUniqueID();
connName = s.c_str();
} else {
connName = cORn;
}
CookieID cid = cookieIDForConnName( connName );
if ( cid == 0 ) {
cid = nextCID( connName );
}
cref = AddNew( cookie, connName, cid );
cref = AddNew( cookie, connName, nextCID( NULL ) );
}
return cref;
@ -369,8 +371,6 @@ CRefMgr::AddNew( const char* cookie, const char* connName, CookieID id )
{
logf( XW_LOGINFO, "%s( cookie=%s, connName=%s, id=%d", __func__,
cookie, connName, id );
CookieRef* exists = getCookieRef_impl( id );
assert( exists == NULL ); /* failed once */
CookieRef* ref = getFromFreeList();
@ -386,7 +386,7 @@ CRefMgr::AddNew( const char* cookie, const char* connName, CookieID id )
}
m_cookieMap.insert( pair<CookieID, CookieRef*>(ref->GetCookieID(), ref ) );
logf( XW_LOGINFO, "paired cookie %s/connName %s with id %d",
logf( XW_LOGINFO, "%s: paired cookie %s/connName %s with id %d", __func__,
(cookie?cookie:"NULL"), connName, ref->GetCookieID() );
#ifdef RELAY_HEARTBEAT
@ -529,15 +529,15 @@ CookieMapIterator::Next()
// SafeCref
//////////////////////////////////////////////////////////////////////////////
SafeCref::SafeCref( const char* cORn, bool isCookie, HostID hid, int socket,
int nPlayersH, int nPlayersT )
SafeCref::SafeCref( const char* cookie, const char* connName, HostID hid,
int socket, int nPlayersH, int nPlayersT )
: m_cref( NULL )
, m_mgr( CRefMgr::Get() )
, m_isValid( false )
{
CookieRef* cref;
cref = m_mgr->getMakeCookieRef_locked( cORn, isCookie, hid, socket,
cref = m_mgr->getMakeCookieRef_locked( cookie, connName, hid, socket,
nPlayersH, nPlayersT );
if ( cref != NULL ) {
m_locked = cref->Lock();

View file

@ -100,7 +100,7 @@ class CRefMgr {
void addToFreeList( CookieRef* cref );
CookieRef* getFromFreeList( void );
CookieRef* getMakeCookieRef_locked( const char* cORn, bool isCookie,
CookieRef* getMakeCookieRef_locked( const char* cookie, const char* connName,
HostID hid, int socket,
int nPlayersH, int nPlayersT );
CookieRef* getCookieRef( CookieID cookieID );
@ -108,8 +108,9 @@ class CRefMgr {
bool checkCookieRef_locked( CookieRef* cref );
CookieRef* getCookieRef_impl( CookieID cookieID );
CookieRef* AddNew( const char* cookie, const char* connName, CookieID id );
CookieRef* FindOpenGameFor( const char* cORn, bool isCookie,
HostID hid, int socket, int nPlayersH, int nPlayersT );
CookieRef* FindOpenGameFor( const char* cookie, const char* connName,
HostID hid, int socket, int nPlayersH,
int nPlayersT );
CookieID cookieIDForConnName( const char* connName );
CookieID nextCID( const char* connName );
@ -117,6 +118,7 @@ class CRefMgr {
static void heartbeatProc( void* closure );
void checkHeartbeats( time_t now );
pthread_mutex_t m_nextCIDMutex;
CookieID m_nextCID;
pthread_rwlock_t m_cookieMapRWLock;
@ -137,7 +139,7 @@ class SafeCref {
CookieRef instance at a time. */
public:
SafeCref( const char* cookieOrConnName, bool cookie, HostID hid,
SafeCref( const char* cookie, const char* connName, HostID hid,
int socket, int nPlayersH, int nPlayersT );
SafeCref( CookieID cid, bool failOk = false );
SafeCref( int socket );

View file

@ -137,7 +137,7 @@ logf( XW_LogLevel level, const char* format, ... )
}
} /* logf */
static const char*
const char*
cmdToStr( XWRELAY_Cmd cmd )
{
# define CASESTR(s) case s: return #s
@ -257,6 +257,11 @@ send_with_length_unsafe( int socket, unsigned char* buf, int bufLen )
ok = true;
}
}
if ( !ok ) {
logf( XW_LOGERROR, "%s(socket=%d) failed", __func__, socket );
}
return ok;
} /* send_with_length_unsafe */
@ -301,7 +306,7 @@ processConnect( unsigned char* bufp, int bufLen, int socket )
static pthread_mutex_t s_newCookieLock = PTHREAD_MUTEX_INITIALIZER;
MutexLock ml( &s_newCookieLock );
SafeCref scr( cookie, true, srcID, socket, nPlayersH, nPlayersT );
SafeCref scr( cookie, NULL, srcID, socket, nPlayersH, nPlayersT );
success = scr.Connect( socket, srcID, nPlayersH, nPlayersT );
}
@ -325,18 +330,22 @@ processReconnect( unsigned char* bufp, int bufLen, int socket )
if ( err != XWRELAY_ERROR_NONE ) {
denyConnection( socket, err );
} else {
char cookie[MAX_COOKIE_LEN+1];
char connName[MAX_CONNNAME_LEN+1];
HostID srcID;
unsigned char nPlayersH;
unsigned char nPlayersT;
connName[0] = '\0';
if ( getNetByte( &bufp, end, &srcID )
if ( readStr( &bufp, end, cookie, sizeof(cookie) )
&& getNetByte( &bufp, end, &srcID )
&& getNetByte( &bufp, end, &nPlayersH )
&& getNetByte( &bufp, end, &nPlayersT )
&& readStr( &bufp, end, connName, sizeof(connName) ) ) {
SafeCref scr( connName, false, srcID, socket, nPlayersH,
SafeCref scr( cookie[0]? cookie : NULL,
connName[0]? connName : NULL,
srcID, socket, nPlayersH,
nPlayersT );
success = scr.Reconnect( socket, srcID, nPlayersH, nPlayersT );
}

View file

@ -38,11 +38,11 @@ enum { XWRELAY_NONE /* 0 is an illegal value */
nPlayersTotal: 1 */
, XWRELAY_GAME_RECONNECT
/* Connect using connName rather than cookie. Used by a device that's
/* Connect using connName as well as cookie. Used by a device that's
lost its connection to a game in progress. Once a game is locked
this is the only way a host can get (back) in. Format: flags: 1;
hostID: 1; nPlayers: 1; nPlayersTotal: 1; connNameLen: 1;
connName<connNameLen>*/
cookieLen: 1; cookie: <cookieLen>; hostID: 1; nPlayers: 1;
nPlayersTotal: 1; connNameLen: 1; connName<connNameLen>*/
, XWRELAY_GAME_DISCONNECT
/* Tell the relay that we're gone for this game. After this message is
@ -51,18 +51,19 @@ enum { XWRELAY_NONE /* 0 is an illegal value */
, XWRELAY_CONNECT_RESP
/* Sent from relay to device in response to XWRELAY_CONNECT. Format:
heartbeat_seconds: 2; connectionID: 2; assignedHostID: 1 */
heartbeat_seconds: 2; connectionID: 2; */
, XWRELAY_RECONNECT_RESP
/* Sent from relay to device in response to XWRELAY_RECONNECT. Format:
heartbeat_seconds: 2; connectionID: 2; */
heartbeat_seconds: 2; */
, XWRELAY_ALLHERE
/* Sent from relay when it enters the state where all expected devices
are here (at start of new game or after having been gone for a
while). Devices should not attempt to forward messages before this
message is received or after XWRELAY_DISCONNECT_OTHER is received.
Format: hasName: 1; [nameLen: 1; connName: <nameLen> */
Format: hostID: 1; hasName: 1; [connNameLen: 1; connName:
<connNameLen>]; */
, XWRELAY_DISCONNECT_YOU
/* Sent from relay when existing connection is terminated.

View file

@ -5,6 +5,7 @@
#include <time.h>
#include "lstnrmgr.h"
#include "xwrelay.h"
typedef unsigned char HostID; /* see HOST_ID_SERVER */
@ -27,6 +28,8 @@ int GetNSpawns(void);
int make_socket( unsigned long addr, unsigned short port );
const char* cmdToStr( XWRELAY_Cmd cmd );
extern class ListenerMgr g_listeners;
#endif