From 8da3c5fc6874c00d4fbb161dac5fc2e86cc68778 Mon Sep 17 00:00:00 2001 From: Eric House Date: Mon, 18 Nov 2019 09:26:46 -0800 Subject: [PATCH] add comms_getPending(), and refactor to implement --- xwords4/common/comms.c | 50 ++++++++++++++++++++++++++++++++++++------ xwords4/common/comms.h | 3 +++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index 9ed9db47b..6c73530cf 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -1338,8 +1338,8 @@ static void removeFromQueue( CommsCtxt* comms, XP_PlayerAddr channelNo, MsgID msgID ) { CNO_FMT( cbuf, channelNo ); - XP_LOGF( "%s: remove msgs <= " XP_LD " for %s (queueLen: %d)", - __func__, msgID, cbuf, comms->queueLen ); + XP_LOGF( "%s(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)", + __func__, channelNo, msgID, cbuf, comms->queueLen ); if ((channelNo == 0) || !!getRecordFor( comms, NULL, channelNo, XP_FALSE)) { @@ -1469,7 +1469,7 @@ sendMsg( CommsCtxt* comms, MsgQueueElem* elem, const CommsConnType filter ) if ( NULL == addrP || !addr_hasType( addrP, typ ) ) { XP_LOGF( TAGFMT() "no addr for channel or addr type %s" - " so using comms'", ConnType2Str(typ), TAGPRMS ); + " so using comms'", TAGPRMS, ConnType2Str(typ) ); comms_getAddr( comms, &addr ); } else { addr = *addrP; @@ -1516,8 +1516,10 @@ send_ack( CommsCtxt* comms ) NULL ); } -XP_S16 -comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force ) +typedef XP_S16 (*MsgProc)( CommsCtxt* comms, MsgQueueElem* msg, CommsConnType filter, void* closure ); + +static XP_S16 +resendImpl( CommsCtxt* comms, CommsConnType filter, XP_Bool force, MsgProc proc, void* closure ) { XP_S16 count = 0; XP_Bool success = XP_TRUE; @@ -1533,7 +1535,7 @@ comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force ) MsgQueueElem* msg; for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) { - XP_S16 len = sendMsg( comms, msg, filter ); + XP_S16 len = (*proc)( comms, msg, filter, closure ); if ( 0 > len ) { success = XP_FALSE; break; @@ -1552,7 +1554,41 @@ comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force ) } XP_LOGF( TAGFMT() "=> %d", TAGPRMS, count ); return count; -} /* comms_resendAll */ +} /* resendImpl */ + +static XP_S16 +sendMsgWrapper( CommsCtxt* comms, MsgQueueElem* msg, CommsConnType filter, + void* XP_UNUSED(closure) ) +{ + return sendMsg( comms, msg, filter ); +} + +XP_S16 +comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force ) +{ + return resendImpl( comms, filter, force, sendMsgWrapper, NULL ); +} + +typedef struct _GetAllClosure{ + PendingMsgProc proc; + void* closure; +} GetAllClosure; + +static XP_S16 +gatherMsgs( CommsCtxt* XP_UNUSED(comms), MsgQueueElem* msg, + CommsConnType XP_UNUSED(filter), void* closure ) +{ + GetAllClosure* gac = (GetAllClosure*)closure; + (*gac->proc)( gac->closure, msg->msg, msg->len, msg->msgID ); + return 1; /* 0 gets an assert */ +} + +void +comms_getPending( CommsCtxt* comms, PendingMsgProc proc, void* closure ) +{ + GetAllClosure gac = { .proc = proc, .closure = closure }; + (void)resendImpl( comms, COMMS_CONN_NONE, XP_TRUE, gatherMsgs, &gac ); +} #ifdef XWFEATURE_COMMSACK void diff --git a/xwords4/common/comms.h b/xwords4/common/comms.h index 1340691aa..8b5422b82 100644 --- a/xwords4/common/comms.h +++ b/xwords4/common/comms.h @@ -237,6 +237,9 @@ void addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addr ); XP_S16 comms_send( CommsCtxt* comms, XWStreamCtxt* stream ); XP_S16 comms_resendAll( CommsCtxt* comms, CommsConnType filter, XP_Bool force ); + +typedef void (*PendingMsgProc)( void* closure, XP_U8* msg, XP_U16 len, MsgID msgID ); +void comms_getPending( CommsCtxt* comms, PendingMsgProc proc, void* closure ); XP_U16 comms_getChannelSeed( CommsCtxt* comms ); #ifdef XWFEATURE_COMMSACK