fix stall showing up in curses test app

Duplicate messages early on, which happened only in the test script
but could have anywhere, broke connectivity. So don't kill address
records when a duplicate shows up. Dupes only escape message ID
checking early (before channel is established). I used to remove
address records when a message was rejected, but don't understand why
so removed that, though asserts show it's not mattering except for
those early messages.
This commit is contained in:
Eric House 2022-10-01 08:43:10 -07:00
parent 0ed20391c3
commit 9a7946de36
5 changed files with 114 additions and 101 deletions

View file

@ -271,8 +271,8 @@ static const char* relayCmdToStr( XWRELAY_Cmd cmd );
static void printQueue( const CommsCtxt* comms );
static void logAddr( const CommsCtxt* comms, XWEnv xwe,
const CommsAddrRec* addr, const char* caller );
static void logAddrs( const CommsCtxt* comms, XWEnv xwe,
const char* caller );
/* static void logAddrs( const CommsCtxt* comms, XWEnv xwe, */
/* const char* caller ); */
#else
#define ASSERT_ADDR_OK(addr)
@ -488,29 +488,29 @@ cleanupAddrRecs( CommsCtxt* comms )
comms->recs = (AddressRecord*)NULL;
} /* cleanupAddrRecs */
static void
removeAddrRec( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), AddressRecord* rec )
{
XP_LOGFF( TAGFMT(%p), TAGPRMS, rec );
#ifdef DEBUG
logAddrs( comms, xwe, "BEFORE" );
XP_U16 nBefore = countAddrRecs( comms );
#endif
AddressRecord** curp = &comms->recs;
while ( NULL != *curp ) {
if ( rec == *curp ) {
*curp = rec->next;
XP_FREE( comms->mpool, rec );
break;
}
curp = &(*curp)->next;
}
#ifdef DEBUG
XP_U16 nAfter = countAddrRecs( comms );
XP_ASSERT( (nAfter + 1) == nBefore );
logAddrs( comms, xwe, "AFTER" );
#endif
}
/* static void */
/* removeAddrRec( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), AddressRecord* rec ) */
/* { */
/* XP_LOGFF( TAGFMT(%p), TAGPRMS, rec ); */
/* #ifdef DEBUG */
/* logAddrs( comms, xwe, "BEFORE" ); */
/* XP_U16 nBefore = countAddrRecs( comms ); */
/* #endif */
/* AddressRecord** curp = &comms->recs; */
/* while ( NULL != *curp ) { */
/* if ( rec == *curp ) { */
/* *curp = rec->next; */
/* XP_FREE( comms->mpool, rec ); */
/* break; */
/* } */
/* curp = &(*curp)->next; */
/* } */
/* #ifdef DEBUG */
/* XP_U16 nAfter = countAddrRecs( comms ); */
/* XP_ASSERT( (nAfter + 1) == nBefore ); */
/* logAddrs( comms, xwe, "AFTER" ); */
/* #endif */
/* } */
void
comms_resetSame( CommsCtxt* comms, XWEnv xwe )
@ -826,7 +826,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
comms->channelSeed = 0;
} else {
comms->channelSeed = stream_getU16( stream );
CNO_FMT( cbuf, comms->channelSeed );
// CNO_FMT( cbuf, comms->channelSeed );
}
if ( STREAM_VERS_COMMSBACKOFF <= version ) {
comms->resendBackoff = stream_getU16( stream );
@ -1138,6 +1138,7 @@ comms_writeToStream( CommsCtxt* comms, XWEnv xwe,
stream_putU16( stream, (XP_U16)rec->lastMsgAckd );
stream_putU16( stream, rec->channelNo );
if ( addr_hasType( addr, COMMS_CONN_RELAY ) ) {
XP_ASSERT(0);
stream_putU8( stream, rec->rr.hostID ); /* unneeded unless RELAY */
}
}
@ -1554,12 +1555,29 @@ nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo )
}
} /* nukeInvites */
static XP_Bool
haveRealChannel( const CommsCtxt* comms, XP_PlayerAddr channelNo )
{
XP_ASSERT( (channelNo & CHANNEL_MASK) == channelNo );
XP_Bool found = XP_FALSE;
for ( AddressRecord* rec = comms->recs; !!rec && !found; rec = rec->next ) {
found = (channelNo == (CHANNEL_MASK & rec->channelNo))
&& (0 != (rec->channelNo & ~CHANNEL_MASK));
}
CNO_FMT( cbuf, channelNo );
XP_LOGFF( "(%s) => %s", cbuf, boolToStr(found) );
return found;
}
void
comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
const CommsAddrRec* destAddr )
{
LOG_FUNC();
XP_PlayerAddr forceChannel = nli->forceChannel;
if ( !haveRealChannel( comms, forceChannel ) ) {
/* See if we have a channel for this address. Then see if we have an
invite matching this one, and if not add one. Then trigger a send of
it. */
@ -1567,18 +1585,6 @@ comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
/* remove the old rec, if found */
nukeInvites( comms, xwe, forceChannel );
/* WTF is this doing? It's leaving msgQueueHead pointing at garbage */
// const XP_PlayerAddr channelNo = 1;
/* for ( MsgQueueElem* elem = comms->msgQueueHead; !!elem; elem = elem-> next ) { */
/* if ( forceChannel == elem->channelNo ) { */
/* if ( 0 == elem->msgID && 0 != forceChannel ) { */
/* freeElem( comms->mpool, elem ); */
/* XP_LOGFF( "nuked old invite" ); */
/* fix me */
/* } */
/* } */
/* } */
/*AddressRecord* rec = */rememberChannelAddress( comms, xwe, forceChannel,
0, destAddr, 0 );
MsgQueueElem* elem = makeInviteElem( comms, xwe, forceChannel, nli );
@ -1586,6 +1592,8 @@ comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
elem = addToQueue( comms, xwe, elem );
sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
}
LOG_RETURN_VOID();
}
#endif
/* Send a message using the sequentially next MsgID. Save the message so
@ -1913,7 +1921,7 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi
#endif
break;
#endif
default: {
default:
XP_ASSERT( addr_hasType( &addr, typ ) );
/* A more general check that the address type has the settings
@ -1944,7 +1952,6 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi
typ, gameid,
comms->procs.closure );
}
}
break;
} /* switch */
@ -2001,6 +2008,8 @@ resendImpl( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force,
XP_S16 len = (*proc)( comms, xwe, msg, filter, closure );
if ( 0 > len ) {
success = XP_FALSE;
/* might want to remove break! Otherwise one bad channel (old
invite?) spoils all */
break;
} else {
XP_ASSERT( 0 < len );
@ -2029,7 +2038,9 @@ sendMsgWrapper( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* msg, CommsConnType fi
XP_S16
comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force )
{
return resendImpl( comms, xwe, filter, force, sendMsgWrapper, NULL );
XP_S16 result = resendImpl( comms, xwe, filter, force, sendMsgWrapper, NULL );
// LOG_RETURNF( "%d", result );
return result;
}
typedef struct _GetAllClosure{
@ -2060,19 +2071,27 @@ ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force )
if ( CONN_ID_NONE == comms->connID ) {
XP_LOGFF( "doing nothing because connID still unset" );
} else {
XP_U16 nSent = 0;
#ifdef DEBUG
int nSent = 0;
int nSeen = 0;
#endif
AddressRecord* rec;
for ( rec = comms->recs; !!rec; rec = rec->next ) {
#ifdef DEBUG
++nSeen;
#endif
if ( force || rec->lastMsgAckd < rec->lastMsgRcd ) {
#ifdef DEBUG
++nSent;
CNO_FMT( cbuf, rec->channelNo );
XP_LOGFF( "%s; %d < %d (or force: %s): rec getting ack",
cbuf, rec->lastMsgAckd, rec->lastMsgRcd,
boolToStr(force) );
#endif
sendEmptyMsg( comms, xwe, rec );
}
}
XP_LOGFF( "sent for %d channels", nSent );
XP_LOGFF( "sent for %d channels (of %d)", nSent, nSent );
}
}
@ -2508,15 +2527,15 @@ getRecordFor( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
#ifdef XWFEATURE_SMS
{
XW_DUtilCtxt* duc = util_getDevUtilCtxt( comms->util, xwe );
if ( dutil_phoneNumbersSame( duc, xwe, addr->u.sms.phone,
matched = dutil_phoneNumbersSame( duc, xwe, addr->u.sms.phone,
rec->addr.u.sms.phone )
&& addr->u.sms.port == rec->addr.u.sms.port ) {
matched = XP_TRUE;
XP_ASSERT( 0 );
}
&& addr->u.sms.port == rec->addr.u.sms.port;
}
#endif
break;
case COMMS_CONN_MQTT:
matched = addr->u.mqtt.devID == rec->addr.u.mqtt.devID;
break;
case COMMS_CONN_NONE:
matched = channelNo == (rec->channelNo & mask);
break;
@ -2575,7 +2594,7 @@ static AddressRecord*
validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
XP_Bool XP_UNUSED_HEARTBEAT(hasPayload),
const CommsAddrRec* addr, XWHostID senderID,
XP_PlayerAddr* channelNo, XP_U16 flags )
XP_PlayerAddr* channelNo, XP_U16 flags, MsgID msgID )
{
CNO_FMT( cbuf, *channelNo );
XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf );
@ -2633,14 +2652,12 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
/* Used to be that the initial message was where the channel
record got created, but now the client creates an address for
the host on startup (comms_make()) */
if ( comms->isServer ) {
if ( comms->isServer || 1 != msgID ) {
XP_LOGFF( TAGFMT() "rejecting duplicate INIT message", TAGPRMS );
rec = NULL;
} else {
XP_LOGFF( "accepting duplicate (?) msg" );
}
/* reject: we've already seen init message on channel */
// XP_ASSERT(0);
} else {
if ( comms->isServer ) {
if ( checkChannelNo( comms, channelNo ) ) {
@ -2899,7 +2916,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
/* special case: initial message from client or server */
rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr,
senderID, &stuff.channelNo,
stuff.flags );
stuff.flags, stuff.msgID );
state->rec = rec;
} else if ( comms->connID == stuff.connID ) {
rec = validateChannelMessage( comms, xwe, retAddr,
@ -2943,15 +2960,18 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
CommsMsgState* state, XP_Bool rejected )
{
#ifdef COMMS_CHECKSUM
XP_LOGFF( "id: %d; len: %d; sum: %s; rejected: %s", state->msgID, state->len, state->sum,
boolToStr(rejected) );
XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec,
state->len, state->sum, state->msgID, boolToStr(rejected) );
#endif
XP_ASSERT( comms == state->comms );
XP_ASSERT( comms->processingMsg );
if ( rejected ) {
if ( !!state->rec ) {
removeAddrRec( comms, xwe, state->rec );
XP_LOGFF( "should I remove rec???; msgID: %d", state->msgID );
XP_ASSERT( 1 >= state->msgID );
/* this is likely a mistake!!! Why remove it??? */
// removeAddrRec( comms, xwe, state->rec );
}
#ifdef LOG_COMMS_MSGNOS
XP_LOGFF( "msg rejected; NOT upping lastMsgRcd to %d", state->msgID );
@ -3380,17 +3400,17 @@ logAddr( const CommsCtxt* comms, XWEnv xwe,
}
}
static void
logAddrs( const CommsCtxt* comms, XWEnv xwe, const char* caller )
{
const AddressRecord* rec = comms->recs;
while ( !!rec ) {
CNO_FMT( cbuf, rec->channelNo );
XP_LOGFF( TAGFMT() "%s", TAGPRMS, cbuf );
logAddr( comms, xwe, &rec->addr, caller );
rec = rec->next;
}
}
/* static void */
/* logAddrs( const CommsCtxt* comms, XWEnv xwe, const char* caller ) */
/* { */
/* const AddressRecord* rec = comms->recs; */
/* while ( !!rec ) { */
/* CNO_FMT( cbuf, rec->channelNo ); */
/* XP_LOGFF( TAGFMT() "%s", TAGPRMS, cbuf ); */
/* logAddr( comms, xwe, &rec->addr, caller ); */
/* rec = rec->next; */
/* } */
/* } */
#endif
static void

View file

@ -259,7 +259,7 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_LOGFF( "delivery took %ds", now - timestamp );
}
#else
XW_USE( timestamp );
XP_USE( timestamp );
#endif
}
MQTTCmd cmd = stream_getU8( stream );

View file

@ -1911,6 +1911,7 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream )
{
LOG_FUNC();
XP_Bool accepted = 0 == server->nv.addresses[0].channelNo;
XP_ASSERT( accepted );
/* We should never get this message a second time, but very rarely we do.
Drop it in that case. */
@ -2035,8 +2036,6 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream )
informMissing( server, xwe );
setTurn( server, xwe, 0 );
dupe_resetTimer( server, xwe );
} else {
XP_LOGFF( "wanted 0; got %d", server->nv.addresses[0].channelNo );
}
return accepted;
} /* client_readInitialMessage */
@ -2048,8 +2047,6 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream )
* that all must use for the game. Then for each player on the device give
* the starting tray.
*/
#ifndef XWFEATURE_STANDALONE_ONLY
static void
makeSendableGICopy( ServerCtxt* server, CurGameInfo* giCopy,
XP_U16 deviceIndex )
@ -2136,7 +2133,6 @@ sendInitialMessage( ServerCtxt* server, XWEnv xwe )
dupe_resetTimer( server, xwe );
} /* sendInitialMessage */
#endif
static void
freeBWI( MPFORMAL BadWordInfo* bwi )
@ -4208,7 +4204,7 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming )
XP_Bool accepted = XP_FALSE;
XP_Bool isServer = amServer( server );
const XW_Proto code = readProto( server, incoming );
XP_LOGFF( "(code=%s)", codeToStr(code) );
XP_LOGFF( "code=%s", codeToStr(code) );
switch ( code ) {
case XWPROTO_DEVICE_REGISTRATION:
@ -4224,9 +4220,8 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming )
}
break;
case XWPROTO_CLIENT_SETUP:
accepted = !isServer;
accepted = XWSTATE_NONE == server->nv.gameState && !isServer;
if ( accepted ) {
XP_STATUSF( "client got XWPROTO_CLIENT_SETUP" );
accepted = client_readInitialMessage( server, xwe, incoming );
}
break;
@ -4303,7 +4298,7 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming )
XP_ASSERT( isServer == amServer( server ) ); /* caching value is ok? */
stream_close( incoming, xwe );
XP_LOGFF( "=> %d (code=%s)", accepted, codeToStr(code) );
XP_LOGFF( "=> %s (code=%s)", boolToStr(accepted), codeToStr(code) );
// XP_ASSERT( accepted ); /* do not commit!!! */
return accepted;
} /* server_receiveMessage */

View file

@ -639,12 +639,11 @@ void
cb_feedGame( CursesBoardState* cbState, XP_U32 gameID,
const XP_U8* buf, XP_U16 len, const CommsAddrRec* from )
{
LOG_FUNC();
sqlite3_int64 rowids[4];
int nRows = VSIZE( rowids );
LaunchParams* params = cbState->params;
gdb_getRowsForGameID( params->pDb, gameID, rowids, &nRows );
XP_LOGF( "%s(): found %d rows for gameID %d", __func__, nRows, gameID );
XP_LOGFF( "found %d rows for gameID %d", nRows, gameID );
for ( int ii = 0; ii < nRows; ++ii ) {
#ifdef DEBUG
bool success =
@ -1273,6 +1272,7 @@ inviteList( CommonGlobals* cGlobals, CommsAddrRec* myAddr, GSList* invitees,
static bool
sendInvite( void* closure, int XP_UNUSED(key) )
{
LOG_FUNC();
CursesBoardGlobals* bGlobals = (CursesBoardGlobals*)closure;
CommonGlobals* cGlobals = &bGlobals->cGlobals;
LaunchParams* params = cGlobals->params;

View file

@ -285,9 +285,7 @@ gdb_write( XWStreamCtxt* stream, XWEnv XP_UNUSED(xwe), void* closure )
if ( newGame ) { /* new row; need to insert blob first */
cGlobals->rowid = selRow;
const CurGameInfo* gi = cGlobals->gi;
XP_U32 gameID = gi->gameID;
XP_LOGFF( "new game for id %d at row %lld", gameID, selRow );
XP_LOGFF( "new game for id %d at row %lld", cGlobals->gi->gameID, selRow );
} else {
assert( selRow == cGlobals->rowid );
}