From ee2a4aaf72fbc574945a438cc17f760614122896 Mon Sep 17 00:00:00 2001 From: Eric House Date: Thu, 26 Oct 2023 12:25:59 -0700 Subject: [PATCH] fix failure to deliver ACKs on MQTT My combining messages logic failed for ACKs, sending the queue but not the ACK unless the ACK was in the queue. Appending it at send time is fraught, so I am instead adding ACKs to the queue, but not persisting them, so they only last long enough to be sent after they're added. Seems to fix a common problem of games failing to get ACKs for their final messages after they finish. --- xwords4/common/comms.c | 109 ++++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 39 deletions(-) diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index a32f6e0b4..ed212f33e 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -352,7 +352,7 @@ static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp #endif #if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK -static void sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ); +static void sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ); #endif static inline XP_Bool IS_INVITE(const MsgQueueElem* elem) { @@ -947,6 +947,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, if ( queueLen != comms->queueLen ) { XP_LOGFF( "Error: queueLen %d != comms->queueLen %d", queueLen, comms->queueLen ); + XP_ASSERT(0); } #endif @@ -1128,31 +1129,42 @@ addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addrP ) typedef struct _E2SData { CommsCtxt* comms; XWStreamCtxt* stream; + XP_U16 queueLen; } E2SData; static ForEachAct elemToStream( MsgQueueElem* elem, void* closure ) { - E2SData* e2sp = (E2SData*)closure; - XWStreamCtxt* stream = e2sp->stream; - CommsCtxt* comms = e2sp->comms; - writeChannelNo( stream, elem->channelNo ); - stream_putU32VL( stream, elem->msgID ); - - stream_putU32VL( stream, elem->smp.len ); - stream_putU32( stream, elem->smp.createdStamp ); - if ( 0 == elem->smp.len ) { - XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool) - dutil_getVTManager(comms->dutil)); - NetLaunchInfo nli; - XP_MEMCPY( &nli, elem->smp.buf, sizeof(nli) ); - nli_saveToStream( &nli, nliStream ); - XP_U16 nliLen = stream_getSize( nliStream ); - stream_putU32VL( stream, nliLen ); - stream_getFromStream( stream, nliStream, nliLen ); - stream_destroy( nliStream ); + if ( 0 == elem->msgID && 0 < elem->smp.len ) { + /* Maybe don't wrte non-invites with msgID of 0 */ + XP_LOGFF( "skipping apparent ACK elem" ); } else { - stream_putBytes( stream, elem->smp.buf, elem->smp.len ); + E2SData* e2sp = (E2SData*)closure; + CommsCtxt* comms = e2sp->comms; + + ++e2sp->queueLen; + + XWStreamCtxt* stream = e2sp->stream; + writeChannelNo( stream, elem->channelNo ); + stream_putU32VL( stream, elem->msgID ); + + stream_putU32VL( stream, elem->smp.len ); + stream_putU32( stream, elem->smp.createdStamp ); + if ( 0 == elem->smp.len ) { + XP_ASSERT( 0 == elem->msgID ); + XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool) + dutil_getVTManager(comms->dutil)); + NetLaunchInfo nli; + XP_MEMCPY( &nli, elem->smp.buf, sizeof(nli) ); + nli_saveToStream( &nli, nliStream ); + XP_U16 nliLen = stream_getSize( nliStream ); + stream_putU32VL( stream, nliLen ); + stream_getFromStream( stream, nliStream, nliLen ); + stream_destroy( nliStream ); + } else { + stream_putBytes( stream, elem->smp.buf, elem->smp.len ); + // XP_LOGFF( "wrote non-invite with id: %d", elem->msgID ); + } } return FEA_OK; } @@ -1186,11 +1198,14 @@ comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken ) stringToStream( stream, comms->rr.connName ); } - XP_ASSERT( comms->queueLen <= 255 ); - stream_putU8( stream, (XP_U8)comms->queueLen ); + /* Next field is queueLen, but we don't know that until after we write the + queue, since ACKs are not persisted. */ + XWStreamCtxt* tmpStream = mem_stream_make_raw( MPPARM(comms->mpool) + dutil_getVTManager(comms->dutil)); + stream_setVersion( tmpStream, CUR_STREAM_VERS ); nAddrRecs = countAddrRecs(comms); - stream_putU8( stream, (XP_U8)nAddrRecs ); + stream_putU8( tmpStream, (XP_U8)nAddrRecs ); #ifdef LOG_COMMS_MSGNOS int ii = 0; @@ -1199,25 +1214,31 @@ comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken ) const CommsAddrRec* addr = &rec->addr; logAddrComms( comms, addr, __func__ ); - addrToStream( stream, addr ); + addrToStream( tmpStream, addr ); - stream_putU32VL( stream, rec->nextMsgID ); - stream_putU32VL( stream, rec->lastMsgRcd ); - stream_putU16( stream, rec->flags ); + stream_putU32VL( tmpStream, rec->nextMsgID ); + stream_putU32VL( tmpStream, rec->lastMsgRcd ); + stream_putU16( tmpStream, rec->flags ); #ifdef LOG_COMMS_MSGNOS XP_LOGFF( "wrote lastMsgRcd of %d for addr %d", rec->lastMsgRcd, ii++ ); #endif - stream_putU16( stream, (XP_U16)rec->lastMsgAckd ); - writeChannelNo( stream, rec->channelNo ); + stream_putU16( tmpStream, (XP_U16)rec->lastMsgAckd ); + writeChannelNo( tmpStream, rec->channelNo ); if ( addr_hasType( addr, COMMS_CONN_RELAY ) ) { XP_ASSERT(0); - stream_putU8( stream, rec->rr.hostID ); /* unneeded unless RELAY */ + stream_putU8( tmpStream, rec->rr.hostID ); /* unneeded unless RELAY */ } } - E2SData e2sd = { .comms = comms, .stream = stream, }; + E2SData e2sd = { .comms = comms, .stream = tmpStream, }; forEachElem( comms, elemToStream, &e2sd ); + XP_ASSERT( e2sd.queueLen <= 255 ); + XP_ASSERT( e2sd.queueLen <= comms->queueLen ); + stream_putU8( stream, (XP_U8)e2sd.queueLen ); + stream_getFromStream( stream, tmpStream, stream_getSize(tmpStream) ); + stream_destroy( tmpStream ); + /* This writes 2 bytes instead of 1 if it were smarter. Not worth the work * to fix. */ for ( CommsConnType typ = (CommsConnType)0; typ < VSIZE(comms->disableds); ++typ ) { @@ -2028,7 +2049,7 @@ checkForPrev( const CommsCtxt* comms, MsgQueueElem* elem, CommsConnType typ ) { if ( COMMS_CONN_MQTT == typ ) { CheckPrevState cps = { .comms = comms, - .elem = elem, + .elem = elem, }; forEachElem( (CommsCtxt*)comms, checkPrevProc, &cps ); } @@ -2150,6 +2171,14 @@ sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, if ( COMMS_CONN_MQTT == typ ) { AddressRecord* rec = getRecordFor( comms, channelNo); head = &rec->_msgQueueHead->smp; +#ifdef DEBUG + /* Make sure our message is in there!!! */ + XP_Bool found = XP_FALSE; + for ( SendMsgsPacket* tmp = head; !found && !!tmp; tmp = tmp->next ) { + found = tmp == &elem->smp; + } + XP_ASSERT( found ); +#endif } else { XP_ASSERT( !elem->smp.next ); } @@ -2895,8 +2924,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, } else if ( msgID != rec->lastMsgRcd + 1 ) { XP_LOGFF( TAGFMT() "expected %d, got %d", TAGPRMS, rec->lastMsgRcd + 1, msgID ); - /* Let's not do this yet. I need to understand why there are - messages with msgID==0 and how to get rid of them. */ + // Add this if adding ACK to queue isn't enough // ackAnyImpl( comms, xwe, XP_TRUE ); rec = NULL; } @@ -3269,14 +3297,17 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI #if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK static void -sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ) +sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ) { - // THREAD_CHECK_START(comms); -- doesn't work with const yet + THREAD_CHECK_START(comms); MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */, rec, rec? rec->channelNo : 0, NULL ); - (void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); - freeElem( MPPARM(comms->mpool) elem ); - // THREAD_CHECK_END(); + XP_ASSERT( !!elem ); + elem = addToQueue( comms, xwe, elem, XP_FALSE ); + if ( !!elem ) { + sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); + } + THREAD_CHECK_END(); } /* sendEmptyMsg */ #endif