From 9a7946de36cef63809c60634c972bd1def19e99b Mon Sep 17 00:00:00 2001 From: Eric House Date: Sat, 1 Oct 2022 08:43:10 -0700 Subject: [PATCH] fix stall showing up in curses test app Duplicate messages early on, which happened only in the test script but could have anywhere, broke connectivity. So don't kill address records when a duplicate shows up. Dupes only escape message ID checking early (before channel is established). I used to remove address records when a message was rejected, but don't understand why so removed that, though asserts show it's not mattering except for those early messages. --- xwords4/common/comms.c | 192 ++++++++++++++++++++---------------- xwords4/common/device.c | 2 +- xwords4/common/server.c | 13 +-- xwords4/linux/cursesboard.c | 4 +- xwords4/linux/gamesdb.c | 4 +- 5 files changed, 114 insertions(+), 101 deletions(-) diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index 1f6f27c81..8edc1b37e 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -271,8 +271,8 @@ static const char* relayCmdToStr( XWRELAY_Cmd cmd ); static void printQueue( const CommsCtxt* comms ); static void logAddr( const CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, const char* caller ); -static void logAddrs( const CommsCtxt* comms, XWEnv xwe, - const char* caller ); +/* static void logAddrs( const CommsCtxt* comms, XWEnv xwe, */ +/* const char* caller ); */ #else #define ASSERT_ADDR_OK(addr) @@ -488,29 +488,29 @@ cleanupAddrRecs( CommsCtxt* comms ) comms->recs = (AddressRecord*)NULL; } /* cleanupAddrRecs */ -static void -removeAddrRec( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), AddressRecord* rec ) -{ - XP_LOGFF( TAGFMT(%p), TAGPRMS, rec ); -#ifdef DEBUG - logAddrs( comms, xwe, "BEFORE" ); - XP_U16 nBefore = countAddrRecs( comms ); -#endif - AddressRecord** curp = &comms->recs; - while ( NULL != *curp ) { - if ( rec == *curp ) { - *curp = rec->next; - XP_FREE( comms->mpool, rec ); - break; - } - curp = &(*curp)->next; - } -#ifdef DEBUG - XP_U16 nAfter = countAddrRecs( comms ); - XP_ASSERT( (nAfter + 1) == nBefore ); - logAddrs( comms, xwe, "AFTER" ); -#endif -} +/* static void */ +/* removeAddrRec( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), AddressRecord* rec ) */ +/* { */ +/* XP_LOGFF( TAGFMT(%p), TAGPRMS, rec ); */ +/* #ifdef DEBUG */ +/* logAddrs( comms, xwe, "BEFORE" ); */ +/* XP_U16 nBefore = countAddrRecs( comms ); */ +/* #endif */ +/* AddressRecord** curp = &comms->recs; */ +/* while ( NULL != *curp ) { */ +/* if ( rec == *curp ) { */ +/* *curp = rec->next; */ +/* XP_FREE( comms->mpool, rec ); */ +/* break; */ +/* } */ +/* curp = &(*curp)->next; */ +/* } */ +/* #ifdef DEBUG */ +/* XP_U16 nAfter = countAddrRecs( comms ); */ +/* XP_ASSERT( (nAfter + 1) == nBefore ); */ +/* logAddrs( comms, xwe, "AFTER" ); */ +/* #endif */ +/* } */ void comms_resetSame( CommsCtxt* comms, XWEnv xwe ) @@ -826,7 +826,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, comms->channelSeed = 0; } else { comms->channelSeed = stream_getU16( stream ); - CNO_FMT( cbuf, comms->channelSeed ); + // CNO_FMT( cbuf, comms->channelSeed ); } if ( STREAM_VERS_COMMSBACKOFF <= version ) { comms->resendBackoff = stream_getU16( stream ); @@ -1138,6 +1138,7 @@ comms_writeToStream( CommsCtxt* comms, XWEnv xwe, stream_putU16( stream, (XP_U16)rec->lastMsgAckd ); stream_putU16( stream, rec->channelNo ); if ( addr_hasType( addr, COMMS_CONN_RELAY ) ) { + XP_ASSERT(0); stream_putU8( stream, rec->rr.hostID ); /* unneeded unless RELAY */ } } @@ -1554,37 +1555,44 @@ nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo ) } } /* nukeInvites */ +static XP_Bool +haveRealChannel( const CommsCtxt* comms, XP_PlayerAddr channelNo ) +{ + XP_ASSERT( (channelNo & CHANNEL_MASK) == channelNo ); + XP_Bool found = XP_FALSE; + + for ( AddressRecord* rec = comms->recs; !!rec && !found; rec = rec->next ) { + found = (channelNo == (CHANNEL_MASK & rec->channelNo)) + && (0 != (rec->channelNo & ~CHANNEL_MASK)); + } + + CNO_FMT( cbuf, channelNo ); + XP_LOGFF( "(%s) => %s", cbuf, boolToStr(found) ); + return found; +} + void comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli, const CommsAddrRec* destAddr ) { LOG_FUNC(); XP_PlayerAddr forceChannel = nli->forceChannel; - /* See if we have a channel for this address. Then see if we have an - invite matching this one, and if not add one. Then trigger a send of - it. */ + if ( !haveRealChannel( comms, forceChannel ) ) { + /* See if we have a channel for this address. Then see if we have an + invite matching this one, and if not add one. Then trigger a send of + it. */ - /* remove the old rec, if found */ - nukeInvites( comms, xwe, forceChannel ); + /* remove the old rec, if found */ + nukeInvites( comms, xwe, forceChannel ); - /* WTF is this doing? It's leaving msgQueueHead pointing at garbage */ - // const XP_PlayerAddr channelNo = 1; - /* for ( MsgQueueElem* elem = comms->msgQueueHead; !!elem; elem = elem-> next ) { */ - /* if ( forceChannel == elem->channelNo ) { */ - /* if ( 0 == elem->msgID && 0 != forceChannel ) { */ - /* freeElem( comms->mpool, elem ); */ - /* XP_LOGFF( "nuked old invite" ); */ - /* fix me */ - /* } */ - /* } */ - /* } */ + /*AddressRecord* rec = */rememberChannelAddress( comms, xwe, forceChannel, + 0, destAddr, 0 ); + MsgQueueElem* elem = makeInviteElem( comms, xwe, forceChannel, nli ); - /*AddressRecord* rec = */rememberChannelAddress( comms, xwe, forceChannel, - 0, destAddr, 0 ); - MsgQueueElem* elem = makeInviteElem( comms, xwe, forceChannel, nli ); - - elem = addToQueue( comms, xwe, elem ); - sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); + elem = addToQueue( comms, xwe, elem ); + sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); + } + LOG_RETURN_VOID(); } #endif @@ -1913,7 +1921,7 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi #endif break; #endif - default: { + default: XP_ASSERT( addr_hasType( &addr, typ ) ); /* A more general check that the address type has the settings @@ -1926,12 +1934,12 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi if ( 0 ) { #ifdef XWFEATURE_COMMS_INVITE - } else if ( isInvite ) { - if ( !!comms->procs.sendInvt ) { - NetLaunchInfo* nli = (NetLaunchInfo*)elem->msg; - nSent = (*comms->procs.sendInvt)( xwe, nli, elem->createdStamp, - &addr, comms->procs.closure ); - } + } else if ( isInvite ) { + if ( !!comms->procs.sendInvt ) { + NetLaunchInfo* nli = (NetLaunchInfo*)elem->msg; + nSent = (*comms->procs.sendInvt)( xwe, nli, elem->createdStamp, + &addr, comms->procs.closure ); + } #endif } else { XP_ASSERT( !!comms->procs.sendMsg ); @@ -1944,11 +1952,10 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi typ, gameid, comms->procs.closure ); } - } - break; + break; } /* switch */ - } + } if ( nSent > result ) { result = nSent; } @@ -2001,6 +2008,8 @@ resendImpl( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force, XP_S16 len = (*proc)( comms, xwe, msg, filter, closure ); if ( 0 > len ) { success = XP_FALSE; + /* might want to remove break! Otherwise one bad channel (old + invite?) spoils all */ break; } else { XP_ASSERT( 0 < len ); @@ -2029,7 +2038,9 @@ sendMsgWrapper( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* msg, CommsConnType fi XP_S16 comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force ) { - return resendImpl( comms, xwe, filter, force, sendMsgWrapper, NULL ); + XP_S16 result = resendImpl( comms, xwe, filter, force, sendMsgWrapper, NULL ); + // LOG_RETURNF( "%d", result ); + return result; } typedef struct _GetAllClosure{ @@ -2060,19 +2071,27 @@ ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force ) if ( CONN_ID_NONE == comms->connID ) { XP_LOGFF( "doing nothing because connID still unset" ); } else { - XP_U16 nSent = 0; +#ifdef DEBUG + int nSent = 0; + int nSeen = 0; +#endif AddressRecord* rec; for ( rec = comms->recs; !!rec; rec = rec->next ) { +#ifdef DEBUG + ++nSeen; +#endif if ( force || rec->lastMsgAckd < rec->lastMsgRcd ) { +#ifdef DEBUG ++nSent; CNO_FMT( cbuf, rec->channelNo ); XP_LOGFF( "%s; %d < %d (or force: %s): rec getting ack", cbuf, rec->lastMsgAckd, rec->lastMsgRcd, boolToStr(force) ); +#endif sendEmptyMsg( comms, xwe, rec ); } } - XP_LOGFF( "sent for %d channels", nSent ); + XP_LOGFF( "sent for %d channels (of %d)", nSent, nSent ); } } @@ -2508,15 +2527,15 @@ getRecordFor( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, #ifdef XWFEATURE_SMS { XW_DUtilCtxt* duc = util_getDevUtilCtxt( comms->util, xwe ); - if ( dutil_phoneNumbersSame( duc, xwe, addr->u.sms.phone, - rec->addr.u.sms.phone ) - && addr->u.sms.port == rec->addr.u.sms.port ) { - matched = XP_TRUE; - XP_ASSERT( 0 ); - } + matched = dutil_phoneNumbersSame( duc, xwe, addr->u.sms.phone, + rec->addr.u.sms.phone ) + && addr->u.sms.port == rec->addr.u.sms.port; } #endif break; + case COMMS_CONN_MQTT: + matched = addr->u.mqtt.devID == rec->addr.u.mqtt.devID; + break; case COMMS_CONN_NONE: matched = channelNo == (rec->channelNo & mask); break; @@ -2575,7 +2594,7 @@ static AddressRecord* validateInitialMessage( CommsCtxt* comms, XWEnv xwe, XP_Bool XP_UNUSED_HEARTBEAT(hasPayload), const CommsAddrRec* addr, XWHostID senderID, - XP_PlayerAddr* channelNo, XP_U16 flags ) + XP_PlayerAddr* channelNo, XP_U16 flags, MsgID msgID ) { CNO_FMT( cbuf, *channelNo ); XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf ); @@ -2633,14 +2652,12 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe, /* Used to be that the initial message was where the channel record got created, but now the client creates an address for the host on startup (comms_make()) */ - if ( comms->isServer ) { + if ( comms->isServer || 1 != msgID ) { XP_LOGFF( TAGFMT() "rejecting duplicate INIT message", TAGPRMS ); rec = NULL; } else { XP_LOGFF( "accepting duplicate (?) msg" ); } - /* reject: we've already seen init message on channel */ - // XP_ASSERT(0); } else { if ( comms->isServer ) { if ( checkChannelNo( comms, channelNo ) ) { @@ -2899,7 +2916,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, /* special case: initial message from client or server */ rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr, senderID, &stuff.channelNo, - stuff.flags ); + stuff.flags, stuff.msgID ); state->rec = rec; } else if ( comms->connID == stuff.connID ) { rec = validateChannelMessage( comms, xwe, retAddr, @@ -2943,15 +2960,18 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe, CommsMsgState* state, XP_Bool rejected ) { #ifdef COMMS_CHECKSUM - XP_LOGFF( "id: %d; len: %d; sum: %s; rejected: %s", state->msgID, state->len, state->sum, - boolToStr(rejected) ); + XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec, + state->len, state->sum, state->msgID, boolToStr(rejected) ); #endif XP_ASSERT( comms == state->comms ); XP_ASSERT( comms->processingMsg ); if ( rejected ) { if ( !!state->rec ) { - removeAddrRec( comms, xwe, state->rec ); + XP_LOGFF( "should I remove rec???; msgID: %d", state->msgID ); + XP_ASSERT( 1 >= state->msgID ); + /* this is likely a mistake!!! Why remove it??? */ + // removeAddrRec( comms, xwe, state->rec ); } #ifdef LOG_COMMS_MSGNOS XP_LOGFF( "msg rejected; NOT upping lastMsgRcd to %d", state->msgID ); @@ -3380,17 +3400,17 @@ logAddr( const CommsCtxt* comms, XWEnv xwe, } } -static void -logAddrs( const CommsCtxt* comms, XWEnv xwe, const char* caller ) -{ - const AddressRecord* rec = comms->recs; - while ( !!rec ) { - CNO_FMT( cbuf, rec->channelNo ); - XP_LOGFF( TAGFMT() "%s", TAGPRMS, cbuf ); - logAddr( comms, xwe, &rec->addr, caller ); - rec = rec->next; - } -} +/* static void */ +/* logAddrs( const CommsCtxt* comms, XWEnv xwe, const char* caller ) */ +/* { */ +/* const AddressRecord* rec = comms->recs; */ +/* while ( !!rec ) { */ +/* CNO_FMT( cbuf, rec->channelNo ); */ +/* XP_LOGFF( TAGFMT() "%s", TAGPRMS, cbuf ); */ +/* logAddr( comms, xwe, &rec->addr, caller ); */ +/* rec = rec->next; */ +/* } */ +/* } */ #endif static void diff --git a/xwords4/common/device.c b/xwords4/common/device.c index 66cdd03bc..a6d335438 100644 --- a/xwords4/common/device.c +++ b/xwords4/common/device.c @@ -259,7 +259,7 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, XP_LOGFF( "delivery took %ds", now - timestamp ); } #else - XW_USE( timestamp ); + XP_USE( timestamp ); #endif } MQTTCmd cmd = stream_getU8( stream ); diff --git a/xwords4/common/server.c b/xwords4/common/server.c index 0858ae158..027c5dce5 100644 --- a/xwords4/common/server.c +++ b/xwords4/common/server.c @@ -1911,6 +1911,7 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream ) { LOG_FUNC(); XP_Bool accepted = 0 == server->nv.addresses[0].channelNo; + XP_ASSERT( accepted ); /* We should never get this message a second time, but very rarely we do. Drop it in that case. */ @@ -2035,8 +2036,6 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream ) informMissing( server, xwe ); setTurn( server, xwe, 0 ); dupe_resetTimer( server, xwe ); - } else { - XP_LOGFF( "wanted 0; got %d", server->nv.addresses[0].channelNo ); } return accepted; } /* client_readInitialMessage */ @@ -2048,8 +2047,6 @@ client_readInitialMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* stream ) * that all must use for the game. Then for each player on the device give * the starting tray. */ -#ifndef XWFEATURE_STANDALONE_ONLY - static void makeSendableGICopy( ServerCtxt* server, CurGameInfo* giCopy, XP_U16 deviceIndex ) @@ -2136,7 +2133,6 @@ sendInitialMessage( ServerCtxt* server, XWEnv xwe ) dupe_resetTimer( server, xwe ); } /* sendInitialMessage */ -#endif static void freeBWI( MPFORMAL BadWordInfo* bwi ) @@ -4208,7 +4204,7 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming ) XP_Bool accepted = XP_FALSE; XP_Bool isServer = amServer( server ); const XW_Proto code = readProto( server, incoming ); - XP_LOGFF( "(code=%s)", codeToStr(code) ); + XP_LOGFF( "code=%s", codeToStr(code) ); switch ( code ) { case XWPROTO_DEVICE_REGISTRATION: @@ -4224,9 +4220,8 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming ) } break; case XWPROTO_CLIENT_SETUP: - accepted = !isServer; + accepted = XWSTATE_NONE == server->nv.gameState && !isServer; if ( accepted ) { - XP_STATUSF( "client got XWPROTO_CLIENT_SETUP" ); accepted = client_readInitialMessage( server, xwe, incoming ); } break; @@ -4303,7 +4298,7 @@ server_receiveMessage( ServerCtxt* server, XWEnv xwe, XWStreamCtxt* incoming ) XP_ASSERT( isServer == amServer( server ) ); /* caching value is ok? */ stream_close( incoming, xwe ); - XP_LOGFF( "=> %d (code=%s)", accepted, codeToStr(code) ); + XP_LOGFF( "=> %s (code=%s)", boolToStr(accepted), codeToStr(code) ); // XP_ASSERT( accepted ); /* do not commit!!! */ return accepted; } /* server_receiveMessage */ diff --git a/xwords4/linux/cursesboard.c b/xwords4/linux/cursesboard.c index 600c441ac..6cd8a1efe 100644 --- a/xwords4/linux/cursesboard.c +++ b/xwords4/linux/cursesboard.c @@ -639,12 +639,11 @@ void cb_feedGame( CursesBoardState* cbState, XP_U32 gameID, const XP_U8* buf, XP_U16 len, const CommsAddrRec* from ) { - LOG_FUNC(); sqlite3_int64 rowids[4]; int nRows = VSIZE( rowids ); LaunchParams* params = cbState->params; gdb_getRowsForGameID( params->pDb, gameID, rowids, &nRows ); - XP_LOGF( "%s(): found %d rows for gameID %d", __func__, nRows, gameID ); + XP_LOGFF( "found %d rows for gameID %d", nRows, gameID ); for ( int ii = 0; ii < nRows; ++ii ) { #ifdef DEBUG bool success = @@ -1273,6 +1272,7 @@ inviteList( CommonGlobals* cGlobals, CommsAddrRec* myAddr, GSList* invitees, static bool sendInvite( void* closure, int XP_UNUSED(key) ) { + LOG_FUNC(); CursesBoardGlobals* bGlobals = (CursesBoardGlobals*)closure; CommonGlobals* cGlobals = &bGlobals->cGlobals; LaunchParams* params = cGlobals->params; diff --git a/xwords4/linux/gamesdb.c b/xwords4/linux/gamesdb.c index 0d643ed17..112b861b8 100644 --- a/xwords4/linux/gamesdb.c +++ b/xwords4/linux/gamesdb.c @@ -285,9 +285,7 @@ gdb_write( XWStreamCtxt* stream, XWEnv XP_UNUSED(xwe), void* closure ) if ( newGame ) { /* new row; need to insert blob first */ cGlobals->rowid = selRow; - const CurGameInfo* gi = cGlobals->gi; - XP_U32 gameID = gi->gameID; - XP_LOGFF( "new game for id %d at row %lld", gameID, selRow ); + XP_LOGFF( "new game for id %d at row %lld", cGlobals->gi->gameID, selRow ); } else { assert( selRow == cGlobals->rowid ); }