store messages in channel rather than a single queue

Moving toward being able to send in groups per device for MQTT
optimization
This commit is contained in:
Eric House 2023-03-16 07:41:53 -07:00
parent b27384df63
commit daac5ca098

View file

@ -121,6 +121,8 @@ typedef struct _SD {
EXTERN_C_START EXTERN_C_START
#define MSGS_IN_CHANNEL 1
typedef struct MsgQueueElem { typedef struct MsgQueueElem {
struct MsgQueueElem* next; struct MsgQueueElem* next;
XP_U8* msg; /* ptr to NetLaunchInfo if isInvite is true */ XP_U8* msg; /* ptr to NetLaunchInfo if isInvite is true */
@ -138,6 +140,10 @@ typedef struct MsgQueueElem {
typedef struct AddressRecord { typedef struct AddressRecord {
struct AddressRecord* next; struct AddressRecord* next;
#ifdef MSGS_IN_CHANNEL
MsgQueueElem* _msgQueueHead;
#endif
CommsAddrRec addr; CommsAddrRec addr;
MsgID nextMsgID; /* on a per-channel basis */ MsgID nextMsgID; /* on a per-channel basis */
MsgID lastMsgAckd; /* on a per-channel basis */ MsgID lastMsgAckd; /* on a per-channel basis */
@ -183,8 +189,9 @@ struct CommsCtxt {
XP_U32 lastMsgRcd; XP_U32 lastMsgRcd;
#endif #endif
void* sendClosure; void* sendClosure;
#ifndef MSGS_IN_CHANNEL
MsgQueueElem* _msgQueueHead; MsgQueueElem* _msgQueueHead;
#endif
XP_U16 queueLen; XP_U16 queueLen;
XP_U16 channelSeed; /* tries to be unique per device to aid XP_U16 channelSeed; /* tries to be unique per device to aid
dupe elimination at start */ dupe elimination at start */
@ -457,7 +464,7 @@ init_relay( CommsCtxt* comms, XWEnv xwe, XP_U16 nPlayersHere, XP_U16 nPlayersTot
} }
#endif #endif
#ifdef DEBUG #if defined DEBUG && !defined MSGS_IN_CHANNEL
static ForEachAct static ForEachAct
rmTarget( MsgQueueElem* elem, void* closure ) rmTarget( MsgQueueElem* elem, void* closure )
@ -589,26 +596,48 @@ comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer,
return comms; return comms;
} /* comms_make */ } /* comms_make */
/* This can get folded in at call site once MSGS_IN_CHANNEL goes away */
static XP_Bool
forOneElem( CommsCtxt* comms, MsgQueueElem*** home,
EachMsgProc proc, void* closure )
{
MsgQueueElem* elem = **home;
ForEachAct fea = (*proc)( elem, closure );
if ( 0 != (FEA_REMOVE & fea) ) {
**home = elem->next;
#ifdef DEBUG
elem->next = NULL;
#endif
freeElem( MPPARM(comms->mpool) elem );
XP_ASSERT( 1 <= comms->queueLen );
--comms->queueLen;
} else {
*home = &elem->next;
}
XP_Bool done = 0 != (FEA_EXIT & fea);
return done;
}
static void static void
forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure ) forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure )
{ {
THREAD_CHECK_START(comms); THREAD_CHECK_START(comms);
for ( MsgQueueElem** home = &comms->_msgQueueHead; !!*home; ) { #ifdef MSGS_IN_CHANNEL
MsgQueueElem* elem = *home; for ( AddressRecord* recs = comms->recs; !!recs; recs = recs->next ) {
ForEachAct fea = (*proc)( elem, closure ); for ( MsgQueueElem** home = &recs->_msgQueueHead; !!*home; ) {
if ( 0 != (FEA_REMOVE & fea) ) { if ( forOneElem( comms, &home, proc, closure ) ) {
*home = elem->next; goto done;
freeElem( MPPARM(comms->mpool) elem ); }
XP_ASSERT( 1 <= comms->queueLen );
--comms->queueLen;
} else {
home = &elem->next;
}
if ( 0 != (FEA_EXIT & fea) ) {
break;
} }
} }
#else
for ( MsgQueueElem** home = &comms->_msgQueueHead; !!*home; ) {
if ( forOneElem( comms, &home, proc, closure ) ) {
goto done;
}
}
#endif
done:
assertQueueOk( comms ); assertQueueOk( comms );
THREAD_CHECK_END(); THREAD_CHECK_END();
} }
@ -624,7 +653,9 @@ cleanupInternal( CommsCtxt* comms )
{ {
forEachElem( comms, freeElemProc, NULL ); forEachElem( comms, freeElemProc, NULL );
XP_ASSERT( 0 == comms->queueLen ); XP_ASSERT( 0 == comms->queueLen );
#ifndef MSGS_IN_CHANNEL
XP_ASSERT( NULL == comms->_msgQueueHead ); XP_ASSERT( NULL == comms->_msgQueueHead );
#endif
} /* cleanupInternal */ } /* cleanupInternal */
static void static void
@ -635,6 +666,9 @@ cleanupAddrRecs( CommsCtxt* comms )
for ( recs = comms->recs; !!recs; recs = next ) { for ( recs = comms->recs; !!recs; recs = next ) {
next = recs->next; next = recs->next;
#ifdef MSGS_IN_CHANNEL
XP_ASSERT( !recs->_msgQueueHead );
#endif
XP_FREE( comms->mpool, recs ); XP_FREE( comms->mpool, recs );
} }
comms->recs = (AddressRecord*)NULL; comms->recs = (AddressRecord*)NULL;
@ -1628,6 +1662,7 @@ static void
nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo ) nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo )
{ {
XP_LOGFF( "(channelNo=0x%X)", channelNo ); XP_LOGFF( "(channelNo=0x%X)", channelNo );
assertQueueOk( comms );
channelNo &= CHANNEL_MASK; channelNo &= CHANNEL_MASK;
listRecs( comms, __func__ ); listRecs( comms, __func__ );
@ -1649,9 +1684,18 @@ nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo )
} }
if ( !!deadRec ) { if ( !!deadRec ) {
#ifdef MSGS_IN_CHANNEL
XP_ASSERT( !!deadRec->_msgQueueHead ); /* otherwise we'll leak */
freeElem( MPPARM(comms->mpool) deadRec->_msgQueueHead );
deadRec->_msgQueueHead = NULL;
--comms->queueLen;
#endif
removeFromQueue( comms, xwe, channelNo, 0 ); removeFromQueue( comms, xwe, channelNo, 0 );
CNO_FMT( cbuf, deadRec->channelNo ); CNO_FMT( cbuf, deadRec->channelNo );
XP_LOGFF( "removing rec for %s", cbuf ); XP_LOGFF( "removing rec for %s", cbuf );
#ifdef MSGS_IN_CHANNEL
XP_ASSERT( !deadRec->_msgQueueHead );
#endif
XP_FREEP( comms->mpool, &deadRec ); XP_FREEP( comms->mpool, &deadRec );
} }
@ -1795,20 +1839,30 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem, XP_Bool notify )
MsgQueueElem* asAdded = newElem; MsgQueueElem* asAdded = newElem;
THREAD_CHECK_START( comms ); THREAD_CHECK_START( comms );
newElem->next = (MsgQueueElem*)NULL; newElem->next = (MsgQueueElem*)NULL;
if ( !comms->_msgQueueHead ) {
comms->_msgQueueHead = newElem; MsgQueueElem** head;
#ifdef MSGS_IN_CHANNEL
AddressRecord* rec = getRecordFor( comms, newElem->channelNo );
head = &rec->_msgQueueHead;
#else
head = &comms->_msgQueueHead;
#endif
if ( !*head ) {
*head = newElem;
#ifndef MSGS_IN_CHANNEL
XP_ASSERT( comms->queueLen == 0 ); XP_ASSERT( comms->queueLen == 0 );
#endif
} else { } else {
MsgQueueElem* tail = comms->_msgQueueHead; while ( !!(*head)->next ) {
while ( !!tail->next ) { head = &(*head)->next;
tail = tail->next;
} }
if ( elems_same( tail, newElem ) ) { if ( elems_same( *head, newElem ) ) {
/* This does still happen! Not sure why. */ /* This does still happen! Not sure why. */
freeElem( MPPARM(comms->mpool) newElem ); freeElem( MPPARM(comms->mpool) newElem );
asAdded = tail; asAdded = *head;
} else { } else {
tail->next = newElem; (*head)->next = newElem;
} }
XP_ASSERT( comms->queueLen > 0 ); XP_ASSERT( comms->queueLen > 0 );
@ -1856,10 +1910,20 @@ _assertQueueOk( const CommsCtxt* comms, const char* func )
{ {
XP_LOGFF( "(func=%s)", func ); XP_LOGFF( "(func=%s)", func );
XP_U16 count = 0; XP_U16 count = 0;
#ifdef MSGS_IN_CHANNEL
for ( AddressRecord* recs = comms->recs; !!recs; recs = recs->next ) {
for ( MsgQueueElem* elem = recs->_msgQueueHead; !!elem; elem = elem->next ) {
++count;
}
}
#else
for ( MsgQueueElem* elem = comms->_msgQueueHead; for ( MsgQueueElem* elem = comms->_msgQueueHead;
!!elem; elem = elem->next ) { !!elem; elem = elem->next ) {
++count; ++count;
} }
#endif
XP_ASSERT( count == comms->queueLen ); XP_ASSERT( count == comms->queueLen );
if ( count >= 10 ) { if ( count >= 10 ) {
XP_LOGFF( "queueLen unexpectedly high: %d", count ); XP_LOGFF( "queueLen unexpectedly high: %d", count );
@ -1910,6 +1974,7 @@ elems_same( const MsgQueueElem* elem1, const MsgQueueElem* elem2 )
static void static void
freeElem( MPFORMAL MsgQueueElem* elem ) freeElem( MPFORMAL MsgQueueElem* elem )
{ {
XP_ASSERT( !elem->next );
XP_FREEP( mpool, &elem->msg ); XP_FREEP( mpool, &elem->msg );
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
/* XP_LOGFF( "freeing msg with len %d, sum %s", elem->len, elem->checksum ); */ /* XP_LOGFF( "freeing msg with len %d, sum %s", elem->len, elem->checksum ); */
@ -1964,6 +2029,7 @@ static void
removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID ) removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID )
{ {
THREAD_CHECK_START( comms ); THREAD_CHECK_START( comms );
assertQueueOk( comms );
CNO_FMT( cbuf, channelNo ); CNO_FMT( cbuf, channelNo );
XP_LOGFF( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)", XP_LOGFF( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)",
channelNo, msgID, cbuf, comms->queueLen ); channelNo, msgID, cbuf, comms->queueLen );
@ -3145,6 +3211,7 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
CommsMsgState* state, XP_Bool rejected ) CommsMsgState* state, XP_Bool rejected )
{ {
THREAD_CHECK_START(comms); THREAD_CHECK_START(comms);
assertQueueOk( comms );
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec, XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec,
state->len, state->sum, state->msgID, boolToStr(rejected) ); state->len, state->sum, state->msgID, boolToStr(rejected) );