fix failure to deliver ACKs on MQTT

My combining messages logic failed for ACKs, sending the queue but not
the ACK unless the ACK was in the queue. Appending it at send time is
fraught, so I am instead adding ACKs to the queue, but not persisting
them, so they only last long enough to be sent after they're
added. Seems to fix a common problem of games failing to get ACKs for
their final messages after they finish.
This commit is contained in:
Eric House 2023-10-26 12:25:59 -07:00
parent dabb812cab
commit ee2a4aaf72

View file

@ -352,7 +352,7 @@ static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp
#endif
#if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec );
static void sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec );
#endif
static inline XP_Bool IS_INVITE(const MsgQueueElem* elem)
{
@ -947,6 +947,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
if ( queueLen != comms->queueLen ) {
XP_LOGFF( "Error: queueLen %d != comms->queueLen %d",
queueLen, comms->queueLen );
XP_ASSERT(0);
}
#endif
@ -1128,20 +1129,29 @@ addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addrP )
typedef struct _E2SData {
CommsCtxt* comms;
XWStreamCtxt* stream;
XP_U16 queueLen;
} E2SData;
static ForEachAct
elemToStream( MsgQueueElem* elem, void* closure )
{
if ( 0 == elem->msgID && 0 < elem->smp.len ) {
/* Maybe don't wrte non-invites with msgID of 0 */
XP_LOGFF( "skipping apparent ACK elem" );
} else {
E2SData* e2sp = (E2SData*)closure;
XWStreamCtxt* stream = e2sp->stream;
CommsCtxt* comms = e2sp->comms;
++e2sp->queueLen;
XWStreamCtxt* stream = e2sp->stream;
writeChannelNo( stream, elem->channelNo );
stream_putU32VL( stream, elem->msgID );
stream_putU32VL( stream, elem->smp.len );
stream_putU32( stream, elem->smp.createdStamp );
if ( 0 == elem->smp.len ) {
XP_ASSERT( 0 == elem->msgID );
XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
NetLaunchInfo nli;
@ -1153,6 +1163,8 @@ elemToStream( MsgQueueElem* elem, void* closure )
stream_destroy( nliStream );
} else {
stream_putBytes( stream, elem->smp.buf, elem->smp.len );
// XP_LOGFF( "wrote non-invite with id: %d", elem->msgID );
}
}
return FEA_OK;
}
@ -1186,11 +1198,14 @@ comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken )
stringToStream( stream, comms->rr.connName );
}
XP_ASSERT( comms->queueLen <= 255 );
stream_putU8( stream, (XP_U8)comms->queueLen );
/* Next field is queueLen, but we don't know that until after we write the
queue, since ACKs are not persisted. */
XWStreamCtxt* tmpStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
stream_setVersion( tmpStream, CUR_STREAM_VERS );
nAddrRecs = countAddrRecs(comms);
stream_putU8( stream, (XP_U8)nAddrRecs );
stream_putU8( tmpStream, (XP_U8)nAddrRecs );
#ifdef LOG_COMMS_MSGNOS
int ii = 0;
@ -1199,25 +1214,31 @@ comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken )
const CommsAddrRec* addr = &rec->addr;
logAddrComms( comms, addr, __func__ );
addrToStream( stream, addr );
addrToStream( tmpStream, addr );
stream_putU32VL( stream, rec->nextMsgID );
stream_putU32VL( stream, rec->lastMsgRcd );
stream_putU16( stream, rec->flags );
stream_putU32VL( tmpStream, rec->nextMsgID );
stream_putU32VL( tmpStream, rec->lastMsgRcd );
stream_putU16( tmpStream, rec->flags );
#ifdef LOG_COMMS_MSGNOS
XP_LOGFF( "wrote lastMsgRcd of %d for addr %d", rec->lastMsgRcd, ii++ );
#endif
stream_putU16( stream, (XP_U16)rec->lastMsgAckd );
writeChannelNo( stream, rec->channelNo );
stream_putU16( tmpStream, (XP_U16)rec->lastMsgAckd );
writeChannelNo( tmpStream, rec->channelNo );
if ( addr_hasType( addr, COMMS_CONN_RELAY ) ) {
XP_ASSERT(0);
stream_putU8( stream, rec->rr.hostID ); /* unneeded unless RELAY */
stream_putU8( tmpStream, rec->rr.hostID ); /* unneeded unless RELAY */
}
}
E2SData e2sd = { .comms = comms, .stream = stream, };
E2SData e2sd = { .comms = comms, .stream = tmpStream, };
forEachElem( comms, elemToStream, &e2sd );
XP_ASSERT( e2sd.queueLen <= 255 );
XP_ASSERT( e2sd.queueLen <= comms->queueLen );
stream_putU8( stream, (XP_U8)e2sd.queueLen );
stream_getFromStream( stream, tmpStream, stream_getSize(tmpStream) );
stream_destroy( tmpStream );
/* This writes 2 bytes instead of 1 if it were smarter. Not worth the work
* to fix. */
for ( CommsConnType typ = (CommsConnType)0; typ < VSIZE(comms->disableds); ++typ ) {
@ -2150,6 +2171,14 @@ sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
if ( COMMS_CONN_MQTT == typ ) {
AddressRecord* rec = getRecordFor( comms, channelNo);
head = &rec->_msgQueueHead->smp;
#ifdef DEBUG
/* Make sure our message is in there!!! */
XP_Bool found = XP_FALSE;
for ( SendMsgsPacket* tmp = head; !found && !!tmp; tmp = tmp->next ) {
found = tmp == &elem->smp;
}
XP_ASSERT( found );
#endif
} else {
XP_ASSERT( !elem->smp.next );
}
@ -2895,8 +2924,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
} else if ( msgID != rec->lastMsgRcd + 1 ) {
XP_LOGFF( TAGFMT() "expected %d, got %d", TAGPRMS,
rec->lastMsgRcd + 1, msgID );
/* Let's not do this yet. I need to understand why there are
messages with msgID==0 and how to get rid of them. */
// Add this if adding ACK to queue isn't enough
// ackAnyImpl( comms, xwe, XP_TRUE );
rec = NULL;
}
@ -3269,14 +3297,17 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI
#if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void
sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
{
// THREAD_CHECK_START(comms); -- doesn't work with const yet
THREAD_CHECK_START(comms);
MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */,
rec, rec? rec->channelNo : 0, NULL );
(void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
freeElem( MPPARM(comms->mpool) elem );
// THREAD_CHECK_END();
XP_ASSERT( !!elem );
elem = addToQueue( comms, xwe, elem, XP_FALSE );
if ( !!elem ) {
sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
}
THREAD_CHECK_END();
} /* sendEmptyMsg */
#endif