diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index 908e52944..ab1391e84 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -121,6 +121,8 @@ typedef struct _SD { EXTERN_C_START +#define MSGS_IN_CHANNEL 1 + typedef struct MsgQueueElem { struct MsgQueueElem* next; XP_U8* msg; /* ptr to NetLaunchInfo if isInvite is true */ @@ -138,6 +140,10 @@ typedef struct MsgQueueElem { typedef struct AddressRecord { struct AddressRecord* next; +#ifdef MSGS_IN_CHANNEL + MsgQueueElem* _msgQueueHead; +#endif + CommsAddrRec addr; MsgID nextMsgID; /* on a per-channel basis */ MsgID lastMsgAckd; /* on a per-channel basis */ @@ -183,8 +189,9 @@ struct CommsCtxt { XP_U32 lastMsgRcd; #endif void* sendClosure; - +#ifndef MSGS_IN_CHANNEL MsgQueueElem* _msgQueueHead; +#endif XP_U16 queueLen; XP_U16 channelSeed; /* tries to be unique per device to aid dupe elimination at start */ @@ -457,7 +464,7 @@ init_relay( CommsCtxt* comms, XWEnv xwe, XP_U16 nPlayersHere, XP_U16 nPlayersTot } #endif -#ifdef DEBUG +#if defined DEBUG && !defined MSGS_IN_CHANNEL static ForEachAct rmTarget( MsgQueueElem* elem, void* closure ) @@ -589,26 +596,48 @@ comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer, return comms; } /* comms_make */ +/* This can get folded in at call site once MSGS_IN_CHANNEL goes away */ +static XP_Bool +forOneElem( CommsCtxt* comms, MsgQueueElem*** home, + EachMsgProc proc, void* closure ) +{ + MsgQueueElem* elem = **home; + ForEachAct fea = (*proc)( elem, closure ); + if ( 0 != (FEA_REMOVE & fea) ) { + **home = elem->next; +#ifdef DEBUG + elem->next = NULL; +#endif + freeElem( MPPARM(comms->mpool) elem ); + XP_ASSERT( 1 <= comms->queueLen ); + --comms->queueLen; + } else { + *home = &elem->next; + } + XP_Bool done = 0 != (FEA_EXIT & fea); + return done; +} + static void forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure ) { THREAD_CHECK_START(comms); - for ( MsgQueueElem** home = &comms->_msgQueueHead; !!*home; ) { - MsgQueueElem* elem = *home; - ForEachAct fea = (*proc)( elem, closure ); - if ( 0 != (FEA_REMOVE & fea) ) { - *home = elem->next; - freeElem( MPPARM(comms->mpool) elem ); - XP_ASSERT( 1 <= comms->queueLen ); - --comms->queueLen; - } else { - home = &elem->next; - } - if ( 0 != (FEA_EXIT & fea) ) { - break; +#ifdef MSGS_IN_CHANNEL + for ( AddressRecord* recs = comms->recs; !!recs; recs = recs->next ) { + for ( MsgQueueElem** home = &recs->_msgQueueHead; !!*home; ) { + if ( forOneElem( comms, &home, proc, closure ) ) { + goto done; + } } } - +#else + for ( MsgQueueElem** home = &comms->_msgQueueHead; !!*home; ) { + if ( forOneElem( comms, &home, proc, closure ) ) { + goto done; + } + } +#endif + done: assertQueueOk( comms ); THREAD_CHECK_END(); } @@ -624,7 +653,9 @@ cleanupInternal( CommsCtxt* comms ) { forEachElem( comms, freeElemProc, NULL ); XP_ASSERT( 0 == comms->queueLen ); +#ifndef MSGS_IN_CHANNEL XP_ASSERT( NULL == comms->_msgQueueHead ); +#endif } /* cleanupInternal */ static void @@ -635,6 +666,9 @@ cleanupAddrRecs( CommsCtxt* comms ) for ( recs = comms->recs; !!recs; recs = next ) { next = recs->next; +#ifdef MSGS_IN_CHANNEL + XP_ASSERT( !recs->_msgQueueHead ); +#endif XP_FREE( comms->mpool, recs ); } comms->recs = (AddressRecord*)NULL; @@ -1628,6 +1662,7 @@ static void nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo ) { XP_LOGFF( "(channelNo=0x%X)", channelNo ); + assertQueueOk( comms ); channelNo &= CHANNEL_MASK; listRecs( comms, __func__ ); @@ -1649,9 +1684,18 @@ nukeInvites( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo ) } if ( !!deadRec ) { +#ifdef MSGS_IN_CHANNEL + XP_ASSERT( !!deadRec->_msgQueueHead ); /* otherwise we'll leak */ + freeElem( MPPARM(comms->mpool) deadRec->_msgQueueHead ); + deadRec->_msgQueueHead = NULL; + --comms->queueLen; +#endif removeFromQueue( comms, xwe, channelNo, 0 ); CNO_FMT( cbuf, deadRec->channelNo ); XP_LOGFF( "removing rec for %s", cbuf ); +#ifdef MSGS_IN_CHANNEL + XP_ASSERT( !deadRec->_msgQueueHead ); +#endif XP_FREEP( comms->mpool, &deadRec ); } @@ -1795,20 +1839,30 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem, XP_Bool notify ) MsgQueueElem* asAdded = newElem; THREAD_CHECK_START( comms ); newElem->next = (MsgQueueElem*)NULL; - if ( !comms->_msgQueueHead ) { - comms->_msgQueueHead = newElem; + + MsgQueueElem** head; +#ifdef MSGS_IN_CHANNEL + AddressRecord* rec = getRecordFor( comms, newElem->channelNo ); + head = &rec->_msgQueueHead; +#else + head = &comms->_msgQueueHead; +#endif + + if ( !*head ) { + *head = newElem; +#ifndef MSGS_IN_CHANNEL XP_ASSERT( comms->queueLen == 0 ); +#endif } else { - MsgQueueElem* tail = comms->_msgQueueHead; - while ( !!tail->next ) { - tail = tail->next; + while ( !!(*head)->next ) { + head = &(*head)->next; } - if ( elems_same( tail, newElem ) ) { + if ( elems_same( *head, newElem ) ) { /* This does still happen! Not sure why. */ freeElem( MPPARM(comms->mpool) newElem ); - asAdded = tail; + asAdded = *head; } else { - tail->next = newElem; + (*head)->next = newElem; } XP_ASSERT( comms->queueLen > 0 ); @@ -1856,10 +1910,20 @@ _assertQueueOk( const CommsCtxt* comms, const char* func ) { XP_LOGFF( "(func=%s)", func ); XP_U16 count = 0; + + +#ifdef MSGS_IN_CHANNEL + for ( AddressRecord* recs = comms->recs; !!recs; recs = recs->next ) { + for ( MsgQueueElem* elem = recs->_msgQueueHead; !!elem; elem = elem->next ) { + ++count; + } + } +#else for ( MsgQueueElem* elem = comms->_msgQueueHead; !!elem; elem = elem->next ) { ++count; } +#endif XP_ASSERT( count == comms->queueLen ); if ( count >= 10 ) { XP_LOGFF( "queueLen unexpectedly high: %d", count ); @@ -1910,6 +1974,7 @@ elems_same( const MsgQueueElem* elem1, const MsgQueueElem* elem2 ) static void freeElem( MPFORMAL MsgQueueElem* elem ) { + XP_ASSERT( !elem->next ); XP_FREEP( mpool, &elem->msg ); #ifdef COMMS_CHECKSUM /* XP_LOGFF( "freeing msg with len %d, sum %s", elem->len, elem->checksum ); */ @@ -1964,6 +2029,7 @@ static void removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID ) { THREAD_CHECK_START( comms ); + assertQueueOk( comms ); CNO_FMT( cbuf, channelNo ); XP_LOGFF( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)", channelNo, msgID, cbuf, comms->queueLen ); @@ -3145,6 +3211,7 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe, CommsMsgState* state, XP_Bool rejected ) { THREAD_CHECK_START(comms); + assertQueueOk( comms ); #ifdef COMMS_CHECKSUM XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec, state->len, state->sum, state->msgID, boolToStr(rejected) );