refactor message queue access

Cleanup to reduce changes when queue storage changes, which is coming
to optimize mqtt sends.
This commit is contained in:
Eric House 2023-02-01 19:08:04 -08:00
parent c820abb514
commit e79450ab4c
3 changed files with 296 additions and 145 deletions

View file

@ -183,8 +183,7 @@ struct CommsCtxt {
#endif #endif
void* sendClosure; void* sendClosure;
MsgQueueElem* msgQueueHead; MsgQueueElem* _msgQueueHead;
MsgQueueElem* msgQueueTail;
XP_U16 queueLen; XP_U16 queueLen;
XP_U16 channelSeed; /* tries to be unique per device to aid XP_U16 channelSeed; /* tries to be unique per device to aid
dupe elimination at start */ dupe elimination at start */
@ -270,7 +269,8 @@ static AddressRecord* getRecordFor( const CommsCtxt* comms, XWEnv xwe,
const CommsAddrRec* addr, XP_PlayerAddr channelNo ); const CommsAddrRec* addr, XP_PlayerAddr channelNo );
static XP_S16 sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, static XP_S16 sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
CommsConnType filter ); CommsConnType filter );
static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem ); static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe,
MsgQueueElem* newElem, XP_Bool notify );
static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ; static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ;
static void freeElem( MPFORMAL MsgQueueElem* elem ); static void freeElem( MPFORMAL MsgQueueElem* elem );
static void removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, static void removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo,
@ -281,6 +281,13 @@ static void sendConnect( CommsCtxt* comms, XWEnv xwe
, XP_Bool breakExisting , XP_Bool breakExisting
#endif #endif
); );
typedef enum {FEA_OK = 0x00, FEA_REMOVE = 0x01, FEA_EXIT = 0x02} ForEachAct;
typedef ForEachAct (*EachMsgProc)( MsgQueueElem* elem, void* closure );
static void forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure );
static MsgQueueElem* makeNewElem( const CommsCtxt* comms, XWEnv xwe, MsgID msgID,
XP_PlayerAddr channelNo );
static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe ); static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe );
static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen, static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen,
MsgID msgID ); MsgID msgID );
@ -312,6 +319,8 @@ static void putDevID( const CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream );
#ifdef DEBUG #ifdef DEBUG
static void assertAddrOk( const CommsAddrRec* addr ); static void assertAddrOk( const CommsAddrRec* addr );
static void listRecs( const CommsCtxt* comms, const char* msg ); static void listRecs( const CommsCtxt* comms, const char* msg );
static void assertQueueOk( const CommsCtxt* comms );
#define ASSERT_ADDR_OK(addr) assertAddrOk( addr ) #define ASSERT_ADDR_OK(addr) assertAddrOk( addr )
# ifdef XWFEATURE_RELAY # ifdef XWFEATURE_RELAY
@ -322,6 +331,7 @@ static void logAddr( const CommsCtxt* comms, XWEnv xwe,
const CommsAddrRec* addr, const char* caller ); const CommsAddrRec* addr, const char* caller );
#else #else
# define ASSERT_ADDR_OK(addr) # define ASSERT_ADDR_OK(addr)
# define assertQueueOk(comms)
# define printQueue( comms ) # define printQueue( comms )
# define logAddr( comms, xwe, addr, caller) # define logAddr( comms, xwe, addr, caller)
# define listRecs( comms, caller ) # define listRecs( comms, caller )
@ -444,6 +454,58 @@ init_relay( CommsCtxt* comms, XWEnv xwe, XP_U16 nPlayersHere, XP_U16 nPlayersTot
} }
#endif #endif
#ifdef DEBUG
static ForEachAct
rmTarget( MsgQueueElem* elem, void* closure )
{
ForEachAct result = FEA_OK;
MsgQueueElem* target = (MsgQueueElem*)closure;
if ( target == elem ) {
result = FEA_REMOVE;
if ( 1 == (elem->msgID & 1) ) {
result |= FEA_EXIT;
}
}
return result;
}
static void
testQueues( CommsCtxt* comms, XWEnv xwe )
{
LOG_FUNC();
XP_U16 startLen = comms->queueLen;
MsgQueueElem* elems[5] = {0};
for ( int ii = 0; ii < VSIZE(elems); ++ii ) {
XP_PlayerAddr channelNo = 0;
MsgQueueElem* elem = makeNewElem( comms, xwe, ii + 1, channelNo );
addToQueue( comms, xwe, elem, XP_FALSE );
elems[ii] = elem;
}
XP_ASSERT( comms->queueLen == startLen + VSIZE(elems) );
for ( int ii = 0; ii < VSIZE(elems); ++ii ) {
XP_U16 indx = XP_RANDOM() % VSIZE(elems);
while ( !elems[(indx+VSIZE(elems)) % VSIZE(elems)] ) {
++indx;
}
indx = (indx+VSIZE(elems)) % VSIZE(elems);
XP_LOGFF( "removing elem %d", indx );
MsgQueueElem* elem = elems[indx];
elems[indx] = NULL; /* mark for next time */
forEachElem( comms, rmTarget, elem );
assertQueueOk( comms );
}
XP_ASSERT( comms->queueLen == startLen );
LOG_RETURN_VOID();
}
#else
# define testQueues(c,x)
#endif
CommsCtxt* CommsCtxt*
comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer, comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer,
const CommsAddrRec* selfAddr, const CommsAddrRec* hostAddr, const CommsAddrRec* selfAddr, const CommsAddrRec* hostAddr,
@ -519,21 +581,47 @@ comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer,
#endif #endif
} }
testQueues( comms, xwe );
return comms; return comms;
} /* comms_make */ } /* comms_make */
static void
forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure )
{
THREAD_CHECK_START(comms);
for ( MsgQueueElem** home = &comms->_msgQueueHead; !!*home; ) {
MsgQueueElem* elem = *home;
ForEachAct fea = (*proc)( elem, closure );
if ( 0 != (FEA_REMOVE & fea) ) {
*home = elem->next;
freeElem( MPPARM(comms->mpool) elem );
XP_ASSERT( 1 <= comms->queueLen );
--comms->queueLen;
} else {
home = &elem->next;
}
if ( 0 != (FEA_EXIT & fea) ) {
break;
}
}
assertQueueOk( comms );
THREAD_CHECK_END();
}
static ForEachAct
freeElemProc( MsgQueueElem* XP_UNUSED(elem), void* XP_UNUSED(closure) )
{
return FEA_REMOVE;
}
static void static void
cleanupInternal( CommsCtxt* comms ) cleanupInternal( CommsCtxt* comms )
{ {
MsgQueueElem* msg; forEachElem( comms, freeElemProc, NULL );
MsgQueueElem* next; XP_ASSERT( 0 == comms->queueLen );
XP_ASSERT( NULL == comms->_msgQueueHead );
for ( msg = comms->msgQueueHead; !!msg; msg = next ) {
next = msg->next;
freeElem( MPPARM(comms->mpool) msg );
}
comms->queueLen = 0;
comms->msgQueueHead = comms->msgQueueTail = (MsgQueueElem*)NULL;
} /* cleanupInternal */ } /* cleanupInternal */
static void static void
@ -806,7 +894,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
sizeof(comms->rr.connName) ); sizeof(comms->rr.connName) );
} }
comms->queueLen = stream_getU8( stream ); XP_U16 queueLen = stream_getU8( stream );
XP_U16 nAddrRecs = stream_getU8( stream ); XP_U16 nAddrRecs = stream_getU8( stream );
XP_LOGFF( "nAddrRecs: %d", nAddrRecs ); XP_LOGFF( "nAddrRecs: %d", nAddrRecs );
@ -843,8 +931,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
prevsAddrNext = &rec->next; prevsAddrNext = &rec->next;
} }
MsgQueueElem** prevsQueueNext = &comms->msgQueueHead; for ( int ii = 0; ii < queueLen; ++ii ) {
for ( int ii = 0; ii < comms->queueLen; ++ii ) {
MsgQueueElem* msg = (MsgQueueElem*)XP_CALLOC( mpool, sizeof(*msg) ); MsgQueueElem* msg = (MsgQueueElem*)XP_CALLOC( mpool, sizeof(*msg) );
msg->channelNo = readChannelNo( stream ); msg->channelNo = readChannelNo( stream );
@ -889,9 +976,9 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
msg->checksum = dutil_md5sum( comms->dutil, xwe, msg->msg, len ); msg->checksum = dutil_md5sum( comms->dutil, xwe, msg->msg, len );
#endif #endif
XP_ASSERT( NULL == msg->next ); XP_ASSERT( NULL == msg->next );
*prevsQueueNext = comms->msgQueueTail = msg; addToQueue( comms, xwe, msg, XP_FALSE );
prevsQueueNext = &msg->next;
} }
XP_ASSERT( queueLen == comms->queueLen );
if ( STREAM_VERS_DISABLEDS <= version ) { if ( STREAM_VERS_DISABLEDS <= version ) {
for ( CommsConnType typ = (CommsConnType)0; typ < VSIZE(comms->disableds); ++typ ) { for ( CommsConnType typ = (CommsConnType)0; typ < VSIZE(comms->disableds); ++typ ) {
@ -1069,13 +1156,44 @@ addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addrP )
} }
} }
typedef struct _E2SData {
CommsCtxt* comms;
XWStreamCtxt* stream;
XWEnv xwe;
} E2SData;
static ForEachAct
elemToStream( MsgQueueElem* elem, void* closure )
{
E2SData* e2sp = (E2SData*)closure;
XWStreamCtxt* stream = e2sp->stream;
CommsCtxt* comms = e2sp->comms;
writeChannelNo( stream, elem->channelNo );
stream_putU32VL( stream, elem->msgID );
stream_putU32VL( stream, elem->len );
stream_putU32( stream, elem->createdStamp );
if ( 0 == elem->len ) {
XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
NetLaunchInfo* nli = (NetLaunchInfo*)elem->msg;
nli_saveToStream( nli, nliStream );
XP_U16 nliLen = stream_getSize( nliStream );
stream_putU32VL( stream, nliLen );
stream_getFromStream( stream, nliStream, nliLen );
stream_destroy( nliStream, e2sp->xwe );
} else {
stream_putBytes( stream, elem->msg, elem->len );
}
return FEA_OK;
}
void void
comms_writeToStream( CommsCtxt* comms, XWEnv xwe, comms_writeToStream( CommsCtxt* comms, XWEnv xwe,
XWStreamCtxt* stream, XP_U16 saveToken ) XWStreamCtxt* stream, XP_U16 saveToken )
{ {
XP_U16 nAddrRecs; XP_U16 nAddrRecs;
AddressRecord* rec; AddressRecord* rec;
MsgQueueElem* msg;
listRecs( comms, __func__ ); listRecs( comms, __func__ );
@ -1128,25 +1246,8 @@ comms_writeToStream( CommsCtxt* comms, XWEnv xwe,
} }
} }
for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) { E2SData e2sd = { .comms = comms, .stream = stream, .xwe = xwe, };
writeChannelNo( stream, msg->channelNo ); forEachElem( comms, elemToStream, &e2sd );
stream_putU32VL( stream, msg->msgID );
stream_putU32VL( stream, msg->len );
stream_putU32( stream, msg->createdStamp );
if ( 0 == msg->len ) {
XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
NetLaunchInfo* nli = (NetLaunchInfo*)msg->msg;
nli_saveToStream( nli, nliStream );
XP_U16 nliLen = stream_getSize( nliStream );
stream_putU32VL( stream, nliLen );
stream_getFromStream( stream, nliStream, nliLen );
stream_destroy( nliStream, xwe );
} else {
stream_putBytes( stream, msg->msg, msg->len );
}
}
/* This writes 2 bytes instead of 1 if it were smarter. Not worth the work /* This writes 2 bytes instead of 1 if it were smarter. Not worth the work
* to fix. */ * to fix. */
@ -1565,7 +1666,7 @@ comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
0, destAddr, flags ); 0, destAddr, flags );
MsgQueueElem* elem = makeInviteElem( comms, xwe, forceChannel, nli ); MsgQueueElem* elem = makeInviteElem( comms, xwe, forceChannel, nli );
elem = addToQueue( comms, xwe, elem ); elem = addToQueue( comms, xwe, elem, XP_TRUE );
XP_LOGFF( "added invite on channel %d", elem->channelNo & CHANNEL_MASK ); XP_LOGFF( "added invite on channel %d", elem->channelNo & CHANNEL_MASK );
/* Let's let platform code decide whether to call sendMsg() . On /* Let's let platform code decide whether to call sendMsg() . On
Android creating a game with an invitation in its queue is always Android creating a game with an invitation in its queue is always
@ -1579,27 +1680,34 @@ comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
LOG_RETURN_VOID(); LOG_RETURN_VOID();
} }
typedef struct _GetInvitedData {
XP_U16 allBits;
XP_U16 count;
} GetInvitedData;
static ForEachAct
getInvitedProc( MsgQueueElem* elem, void* closure )
{
if ( IS_INVITE( elem ) ) {
GetInvitedData* gidp = (GetInvitedData*)closure;
XP_PlayerAddr channelNo = elem->channelNo & CHANNEL_MASK;
XP_LOGFF( "got invite on channel %d", channelNo );
XP_U16 thisBit = 1 << channelNo;
XP_ASSERT( 0 == (thisBit & gidp->allBits) ); /* should be no dupes */
if ( 0 == (thisBit & gidp->allBits) ) {
++gidp->count;
}
gidp->allBits |= thisBit;
}
return FEA_OK;
}
void void
comms_getInvited( const CommsCtxt* comms, XP_U16* nInvites ) comms_getInvited( const CommsCtxt* comms, XP_U16* nInvites )
{ {
XP_U16 count = 0; GetInvitedData gid = {0};
forEachElem( (CommsCtxt*)comms, getInvitedProc, &gid );
XP_U16 allBits = 0; *nInvites = gid.count;
for ( const MsgQueueElem* elem = comms->msgQueueHead; !!elem;
elem = elem->next ) {
if ( IS_INVITE( elem ) ) {
XP_PlayerAddr channelNo = elem->channelNo & CHANNEL_MASK;
XP_LOGFF( "got invite on channel %d", channelNo );
XP_U16 thisBit = 1 << channelNo;
XP_ASSERT( 0 == (thisBit & allBits) ); /* should be no dupes */
if ( 0 == (thisBit & allBits) ) {
++count;
}
allBits |= thisBit;
}
}
*nInvites = count;
// LOG_RETURNF( "%d", *nInvites ); // LOG_RETURNF( "%d", *nInvites );
} }
#endif #endif
@ -1630,7 +1738,7 @@ comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream )
elem = makeElemWithID( comms, xwe, msgID, rec, channelNo, stream ); elem = makeElemWithID( comms, xwe, msgID, rec, channelNo, stream );
if ( NULL != elem ) { if ( NULL != elem ) {
elem = addToQueue( comms, xwe, elem ); elem = addToQueue( comms, xwe, elem, XP_TRUE );
printQueue( comms ); printQueue( comms );
result = sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); result = sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
} }
@ -1653,23 +1761,25 @@ notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe )
* the order in which they need to be sent. * the order in which they need to be sent.
*/ */
static MsgQueueElem* static MsgQueueElem*
addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem ) addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem, XP_Bool notify )
{ {
MsgQueueElem* asAdded = newElem; MsgQueueElem* asAdded = newElem;
THREAD_CHECK_START( comms ); THREAD_CHECK_START( comms );
newElem->next = (MsgQueueElem*)NULL; newElem->next = (MsgQueueElem*)NULL;
if ( !comms->msgQueueHead ) { if ( !comms->_msgQueueHead ) {
comms->msgQueueHead = comms->msgQueueTail = newElem; comms->_msgQueueHead = newElem;
XP_ASSERT( comms->queueLen == 0 ); XP_ASSERT( comms->queueLen == 0 );
} else { } else {
XP_ASSERT( !!comms->msgQueueTail ); MsgQueueElem* tail = comms->_msgQueueHead;
XP_ASSERT( !comms->msgQueueTail->next ); while ( !!tail->next ) {
if ( elems_same( comms->msgQueueTail, newElem ) ) { tail = tail->next;
}
if ( elems_same( tail, newElem ) ) {
/* This does still happen! Not sure why. */
freeElem( MPPARM(comms->mpool) newElem ); freeElem( MPPARM(comms->mpool) newElem );
asAdded = comms->msgQueueTail; asAdded = tail;
} else { } else {
comms->msgQueueTail->next = newElem; tail->next = newElem;
comms->msgQueueTail = newElem;
} }
XP_ASSERT( comms->queueLen > 0 ); XP_ASSERT( comms->queueLen > 0 );
@ -1677,7 +1787,9 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem )
if ( newElem == asAdded ) { if ( newElem == asAdded ) {
++comms->queueLen; ++comms->queueLen;
notifyQueueChanged( comms, xwe ); if ( notify ) {
notifyQueueChanged( comms, xwe );
}
} }
XP_ASSERT( comms->queueLen <= 128 ); /* reasonable limit in testing */ XP_ASSERT( comms->queueLen <= 128 ); /* reasonable limit in testing */
THREAD_CHECK_END(); THREAD_CHECK_END();
@ -1685,39 +1797,38 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem )
} /* addToQueue */ } /* addToQueue */
#ifdef DEBUG #ifdef DEBUG
static ForEachAct
printElem( MsgQueueElem* elem, void* closure )
{
int* iip = (int*)closure;
CNO_FMT( cbuf, elem->channelNo );
XP_LOGFF( "%d: %s; msgID=" XP_LD
#ifdef COMMS_CHECKSUM
"; sum=%s"
#endif
, *iip, cbuf, elem->msgID
#ifdef COMMS_CHECKSUM
, elem->checksum
#endif
);
++*iip;
return FEA_OK;
}
static void static void
printQueue( const CommsCtxt* comms ) printQueue( const CommsCtxt* comms )
{ {
MsgQueueElem* elem; int ii = 0;
short ii; forEachElem( (CommsCtxt*)comms, printElem, &ii );
for ( elem = comms->msgQueueHead, ii = 0; ii < comms->queueLen;
elem = elem->next, ++ii ) {
CNO_FMT( cbuf, elem->channelNo );
XP_LOGFF( "%d: %s; msgID=" XP_LD
#ifdef COMMS_CHECKSUM
"; sum=%s"
#endif
, ii+1, cbuf, elem->msgID
#ifdef COMMS_CHECKSUM
, elem->checksum
#endif
);
}
} }
static void static void
assertQueueOk( const CommsCtxt* comms ) assertQueueOk( const CommsCtxt* comms )
{ {
XP_U16 count = 0; XP_U16 count = 0;
MsgQueueElem* elem; for ( MsgQueueElem* elem = comms->_msgQueueHead;
!!elem; elem = elem->next ) {
for ( elem = comms->msgQueueHead; !!elem; elem = elem->next ) {
++count; ++count;
if ( elem == comms->msgQueueTail ) {
XP_ASSERT( !elem->next );
break;
}
} }
XP_ASSERT( count == comms->queueLen ); XP_ASSERT( count == comms->queueLen );
if ( count >= 10 ) { if ( count >= 10 ) {
@ -1769,10 +1880,10 @@ elems_same( const MsgQueueElem* elem1, const MsgQueueElem* elem2 )
static void static void
freeElem( MPFORMAL MsgQueueElem* elem ) freeElem( MPFORMAL MsgQueueElem* elem )
{ {
XP_FREE( mpool, elem->msg ); XP_FREEP( mpool, &elem->msg );
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
XP_LOGFF( "freeing msg with len %d, sum %s", elem->len, elem->checksum ); XP_LOGFF( "freeing msg with len %d, sum %s", elem->len, elem->checksum );
XP_FREE( mpool, elem->checksum ); XP_FREEP( mpool, &elem->checksum );
#else #else
XP_LOGFF( "freeing msg with len %d", elem->len ); XP_LOGFF( "freeing msg with len %d", elem->len );
#endif #endif
@ -1785,6 +1896,40 @@ freeElem( MPFORMAL MsgQueueElem* elem )
* we've sent, don't remove. We may be starting a new game but have a server * we've sent, don't remove. We may be starting a new game but have a server
* that's still on the old one. * that's still on the old one.
*/ */
typedef struct _RemoveData {
const CommsCtxt* comms;
XP_PlayerAddr channelNo;
MsgID msgID;
} RemoveData;
static ForEachAct
removeProc( MsgQueueElem* elem, void* closure )
{
ForEachAct result = FEA_OK;
RemoveData* rdp = (RemoveData*)closure;
XP_PlayerAddr maskedChannelNo = ~CHANNEL_MASK & rdp->channelNo;
XP_Bool knownGood = XP_FALSE;
/* remove the 0-channel message if we've established a channel number.
Only clients should have any 0-channel messages in the queue, and
receiving something from the server is an implicit ACK -- IFF it isn't
left over from the last game. */
XP_PlayerAddr maskedElemChannelNo = ~CHANNEL_MASK & elem->channelNo;
if ( (maskedElemChannelNo == 0) && (rdp->channelNo != 0) ) {
XP_ASSERT( !rdp->comms->isServer || IS_INVITE(elem) );
XP_ASSERT( elem->msgID == 0 );
} else if ( maskedElemChannelNo != maskedChannelNo ) {
knownGood = XP_TRUE;
}
if ( !knownGood && (elem->msgID <= rdp->msgID) ) {
result = FEA_REMOVE;
}
return result;
}
static void static void
removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID ) removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID )
{ {
@ -1798,39 +1943,13 @@ removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msg
if ((channelNo == 0) || !!getRecordFor( comms, xwe, NULL, channelNo)) { if ((channelNo == 0) || !!getRecordFor( comms, xwe, NULL, channelNo)) {
MsgQueueElem* elem = comms->msgQueueHead; RemoveData rd = {
MsgQueueElem* next; .comms = comms,
.msgID = msgID,
.channelNo = channelNo,
};
forEachElem( comms, removeProc, &rd );
/* empty the queue so we can add all back again */
comms->msgQueueHead = comms->msgQueueTail = NULL;
comms->queueLen = 0;
XP_PlayerAddr maskedChannelNo = ~CHANNEL_MASK & channelNo;
for ( ; !!elem; elem = next ) {
XP_Bool knownGood = XP_FALSE;
next = elem->next;
/* remove the 0-channel message if we've established a channel
number. Only clients should have any 0-channel messages in the
queue, and receiving something from the server is an implicit
ACK -- IFF it isn't left over from the last game. */
XP_PlayerAddr maskedElemChannelNo = ~CHANNEL_MASK & elem->channelNo;
if ( (maskedElemChannelNo == 0) && (channelNo != 0) ) {
XP_ASSERT( !comms->isServer || IS_INVITE(elem) );
XP_ASSERT( elem->msgID == 0 );
} else if ( maskedElemChannelNo != maskedChannelNo ) {
knownGood = XP_TRUE;
}
if ( !knownGood && (elem->msgID <= msgID) ) {
freeElem( MPPARM(comms->mpool) elem );
} else {
MsgQueueElem* asAdded = addToQueue( comms, xwe, elem );
XP_ASSERT( asAdded == elem );
elem = asAdded; /* for non-assert case */
}
}
notifyQueueChanged( comms, xwe ); notifyQueueChanged( comms, xwe );
} }
@ -2018,33 +2137,55 @@ send_relay_ack( CommsCtxt* comms, XWEnv xwe )
typedef XP_S16 (*MsgProc)( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* msg, typedef XP_S16 (*MsgProc)( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* msg,
CommsConnType filter, void* closure ); CommsConnType filter, void* closure );
typedef struct _SendElemData {
CommsCtxt* comms;
XWEnv xwe;
MsgProc msgProc;
CommsConnType filter;
void* msgClosure;
XP_U16 count;
XP_Bool success;
} SendElemData;
static ForEachAct
sendElemProc( MsgQueueElem* elem, void* closure )
{
SendElemData* sedp = (SendElemData*)closure;
XP_S16 len = (*sedp->msgProc)( sedp->comms, sedp->xwe, elem,
sedp->filter, sedp->msgClosure );
if ( 0 > len ) {
sedp->success = XP_FALSE;
} else {
XP_ASSERT( 0 < len );
++sedp->count;
}
return FEA_OK;
}
static XP_S16 static XP_S16
resendImpl( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force, resendImpl( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force,
MsgProc proc, void* closure ) MsgProc proc, void* closure )
{ {
XP_S16 count = 0; XP_S16 count = 0;
XP_Bool success = XP_TRUE;
XP_ASSERT( !!comms ); XP_ASSERT( !!comms );
XP_U32 now = dutil_getCurSeconds( comms->dutil, xwe ); XP_U32 now = dutil_getCurSeconds( comms->dutil, xwe );
if ( !force && (now < comms->nextResend) ) { if ( !force && (now < comms->nextResend) ) {
XP_LOGFF( "aborting: %d seconds left in backoff", XP_LOGFF( "aborting: %d seconds left in backoff",
comms->nextResend - now ); comms->nextResend - now );
success = XP_FALSE; } else {
SendElemData sed = {
} else if ( !!comms->msgQueueHead ) { .success = XP_TRUE,
for ( MsgQueueElem* msg = comms->msgQueueHead; !!msg; msg = msg->next ) { .comms = comms,
XP_S16 len = (*proc)( comms, xwe, msg, filter, closure ); .xwe = xwe,
if ( 0 > len ) { .msgProc = proc,
success = XP_FALSE; .msgClosure = closure,
} else { .filter = filter,
XP_ASSERT( 0 < len ); };
++count; forEachElem( comms, sendElemProc, &sed );
}
}
/* Now set resend values */ /* Now set resend values */
if ( success && !force ) { if ( 0 < sed.count && sed.success && !force ) {
comms->resendBackoff = 2 * (1 + comms->resendBackoff); comms->resendBackoff = 2 * (1 + comms->resendBackoff);
XP_LOGFF( "backoff now %d", comms->resendBackoff ); XP_LOGFF( "backoff now %d", comms->resendBackoff );
comms->nextResend = now + comms->resendBackoff; comms->nextResend = now + comms->resendBackoff;
@ -3283,6 +3424,19 @@ ConnType2Str( CommsConnType typ )
} /* ConnType2Str */ } /* ConnType2Str */
#ifdef DEBUG #ifdef DEBUG
static ForEachAct
statsProc( MsgQueueElem* elem, void* closure )
{
XWStreamCtxt* stream = (XWStreamCtxt*)closure;
XP_UCHAR buf[100];
XP_SNPRINTF( buf, sizeof(buf),
"msgID: " XP_LD ": channelNo=%.4X; len=%d\n",
elem->msgID, elem->channelNo, elem->len );
stream_catString( stream, buf );
return FEA_OK;
}
void void
comms_getStats( const CommsCtxt* comms, XWStreamCtxt* stream ) comms_getStats( const CommsCtxt* comms, XWStreamCtxt* stream )
{ {
@ -3294,12 +3448,7 @@ comms_getStats( const CommsCtxt* comms, XWStreamCtxt* stream )
comms->queueLen ); comms->queueLen );
stream_catString( stream, buf ); stream_catString( stream, buf );
for ( MsgQueueElem* elem = comms->msgQueueHead; !!elem; elem = elem->next ) { forEachElem( (CommsCtxt*)comms, statsProc, stream );
XP_SNPRINTF( buf, sizeof(buf),
"msgID: " XP_LD ": channelNo=%.4X; len=%d\n",
elem->msgID, elem->channelNo, elem->len );
stream_catString( stream, buf );
}
for ( AddressRecord* rec = comms->recs; !!rec; rec = rec->next ) { for ( AddressRecord* rec = comms->recs; !!rec; rec = rec->next ) {
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf), XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),

View file

@ -528,4 +528,5 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
} else if ( isCtrlMsg( &myID, topic ) ) { } else if ( isCtrlMsg( &myID, topic ) ) {
dutil_onCtrlReceived( dutil, xwe, buf, len ); dutil_onCtrlReceived( dutil, xwe, buf, len );
} }
LOG_RETURN_VOID();
} }

View file

@ -98,6 +98,7 @@ connect_callback( struct mosquitto* mosq, void* userdata,
DEFAULT_QOS, 0, NULL ); DEFAULT_QOS, 0, NULL );
XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0], XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0],
mosquitto_strerror(err), mid ); mosquitto_strerror(err), mid );
XP_USE(err);
} }
static void static void
@ -167,7 +168,7 @@ handle_gotmsg( GIOChannel* source, GIOCondition XP_UNUSED(condition), gpointer d
dvc_parseMQTTPacket( storage->params->dutil, NULL_XWE, dvc_parseMQTTPacket( storage->params->dutil, NULL_XWE,
(XP_UCHAR*)topicBuf, msgBuf, msgLen ); (XP_UCHAR*)topicBuf, msgBuf, msgLen );
LOG_RETURN_VOID();
return TRUE; return TRUE;
} /* handle_gotmsg */ } /* handle_gotmsg */