add comms_getPending(), and refactor to implement

This commit is contained in:
Eric House 2019-11-18 09:26:46 -08:00
parent 58c3ab4e4a
commit 8da3c5fc68
2 changed files with 46 additions and 7 deletions

View file

@ -1338,8 +1338,8 @@ static void
removeFromQueue( CommsCtxt* comms, XP_PlayerAddr channelNo, MsgID msgID )
{
CNO_FMT( cbuf, channelNo );
XP_LOGF( "%s: remove msgs <= " XP_LD " for %s (queueLen: %d)",
__func__, msgID, cbuf, comms->queueLen );
XP_LOGF( "%s(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)",
__func__, channelNo, msgID, cbuf, comms->queueLen );
if ((channelNo == 0) || !!getRecordFor( comms, NULL, channelNo, XP_FALSE)) {
@ -1469,7 +1469,7 @@ sendMsg( CommsCtxt* comms, MsgQueueElem* elem, const CommsConnType filter )
if ( NULL == addrP || !addr_hasType( addrP, typ ) ) {
XP_LOGF( TAGFMT() "no addr for channel or addr type %s"
" so using comms'", ConnType2Str(typ), TAGPRMS );
" so using comms'", TAGPRMS, ConnType2Str(typ) );
comms_getAddr( comms, &addr );
} else {
addr = *addrP;
@ -1516,8 +1516,10 @@ send_ack( CommsCtxt* comms )
NULL );
}
XP_S16
comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force )
typedef XP_S16 (*MsgProc)( CommsCtxt* comms, MsgQueueElem* msg, CommsConnType filter, void* closure );
static XP_S16
resendImpl( CommsCtxt* comms, CommsConnType filter, XP_Bool force, MsgProc proc, void* closure )
{
XP_S16 count = 0;
XP_Bool success = XP_TRUE;
@ -1533,7 +1535,7 @@ comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force )
MsgQueueElem* msg;
for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) {
XP_S16 len = sendMsg( comms, msg, filter );
XP_S16 len = (*proc)( comms, msg, filter, closure );
if ( 0 > len ) {
success = XP_FALSE;
break;
@ -1552,7 +1554,41 @@ comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force )
}
XP_LOGF( TAGFMT() "=> %d", TAGPRMS, count );
return count;
} /* comms_resendAll */
} /* resendImpl */
static XP_S16
sendMsgWrapper( CommsCtxt* comms, MsgQueueElem* msg, CommsConnType filter,
void* XP_UNUSED(closure) )
{
return sendMsg( comms, msg, filter );
}
XP_S16
comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force )
{
return resendImpl( comms, filter, force, sendMsgWrapper, NULL );
}
typedef struct _GetAllClosure{
PendingMsgProc proc;
void* closure;
} GetAllClosure;
static XP_S16
gatherMsgs( CommsCtxt* XP_UNUSED(comms), MsgQueueElem* msg,
CommsConnType XP_UNUSED(filter), void* closure )
{
GetAllClosure* gac = (GetAllClosure*)closure;
(*gac->proc)( gac->closure, msg->msg, msg->len, msg->msgID );
return 1; /* 0 gets an assert */
}
void
comms_getPending( CommsCtxt* comms, PendingMsgProc proc, void* closure )
{
GetAllClosure gac = { .proc = proc, .closure = closure };
(void)resendImpl( comms, COMMS_CONN_NONE, XP_TRUE, gatherMsgs, &gac );
}
#ifdef XWFEATURE_COMMSACK
void

View file

@ -237,6 +237,9 @@ void addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addr );
XP_S16 comms_send( CommsCtxt* comms, XWStreamCtxt* stream );
XP_S16 comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force );
typedef void (*PendingMsgProc)( void* closure, XP_U8* msg, XP_U16 len, MsgID msgID );
void comms_getPending( CommsCtxt* comms, PendingMsgProc proc, void* closure );
XP_U16 comms_getChannelSeed( CommsCtxt* comms );
#ifdef XWFEATURE_COMMSACK