toward changing APIs to allow combined messages for mqtt

This commit is contained in:
Eric House 2023-03-17 19:21:57 -07:00
parent f5f8ae9908
commit 6ec0e64657
15 changed files with 127 additions and 77 deletions

View file

@ -1655,6 +1655,7 @@ comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
elem = addToQueue( comms, xwe, elem, XP_TRUE ); elem = addToQueue( comms, xwe, elem, XP_TRUE );
if ( !!elem ) { if ( !!elem ) {
XP_ASSERT( !elem->next );
XP_LOGFF( "added invite on channel %d", elem->channelNo & CHANNEL_MASK ); XP_LOGFF( "added invite on channel %d", elem->channelNo & CHANNEL_MASK );
/* Let's let platform code decide whether to call sendMsg() . On /* Let's let platform code decide whether to call sendMsg() . On
Android creating a game with an invitation in its queue is always Android creating a game with an invitation in its queue is always
@ -2134,15 +2135,20 @@ sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
} }
#endif #endif
} else { } else {
XP_ASSERT( !!comms->procs.sendMsg ); XP_ASSERT( !!comms->procs.sendMsgs );
XP_U32 gameid = gameID( comms ); XP_U32 gameid = gameID( comms );
logAddrComms( comms, &addr, __func__ ); logAddrComms( comms, &addr, __func__ );
XP_UCHAR msgNo[16]; XP_UCHAR msgNo[16];
formatMsgNo( comms, elem, msgNo, sizeof(msgNo) ); formatMsgNo( comms, elem, msgNo, sizeof(msgNo) );
XP_ASSERT( 0 != elem->createdStamp ); XP_ASSERT( 0 != elem->createdStamp );
nSent = (*comms->procs.sendMsg)( xwe, elem->msg, elem->len, SendMsgsPacket packet = {
comms->streamVersion, msgNo, .msgNo = msgNo,
elem->createdStamp, &addr, .createdStamp = elem->createdStamp,
.buf = elem->msg,
.len = elem->len,
};
nSent = (*comms->procs.sendMsgs)( xwe, 1, &packet,
comms->streamVersion, &addr,
typ, gameid, typ, gameid,
comms->procs.closure ); comms->procs.closure );
checkForPrev( comms, elem, typ ); checkForPrev( comms, elem, typ );

View file

@ -79,9 +79,10 @@ typedef XP_S16 (*TransportSendInvt)( XWEnv xwe, const NetLaunchInfo* nli,
XP_U32 createdStamp, const CommsAddrRec* addr, XP_U32 createdStamp, const CommsAddrRec* addr,
CommsConnType conType, void* closure ); CommsConnType conType, void* closure );
#endif #endif
typedef XP_S16 (*TransportSendMsg)( XWEnv xwe, const XP_U8* buf, XP_U16 len,
XP_U16 streamVersion, const XP_UCHAR* msgNo, typedef XP_S16 (*TransportSendMsgs)( XWEnv xwe, XP_U16 count, SendMsgsPacket msgs[],
XP_U32 createdStamp, const CommsAddrRec* addr, XP_U16 streamVersion,
const CommsAddrRec* addr,
CommsConnType conType, XP_U32 gameID, CommsConnType conType, XP_U32 gameID,
void* closure ); void* closure );
@ -125,7 +126,7 @@ typedef struct _TransportProcs {
#else #else
XP_U32 flags; XP_U32 flags;
#endif #endif
TransportSendMsg sendMsg; TransportSendMsgs sendMsgs;
#ifdef XWFEATURE_COMMS_INVITE #ifdef XWFEATURE_COMMS_INVITE
TransportSendInvt sendInvt; TransportSendInvt sendInvt;
#endif #endif

View file

@ -161,6 +161,13 @@ typedef XP_U8 XWPhoniesChoice;
typedef XP_U8 XP_LangCode; typedef XP_U8 XP_LangCode;
typedef struct _SendMsgsPacket {
const XP_UCHAR* msgNo;
XP_U32 createdStamp;
const XP_U8* buf;
XP_U16 len;
} SendMsgsPacket;
/* I'm going to try putting all forward "class" decls in the same file */ /* I'm going to try putting all forward "class" decls in the same file */
typedef struct BoardCtxt BoardCtxt; typedef struct BoardCtxt BoardCtxt;
typedef struct CommMgrCtxt CommMgrCtxt; typedef struct CommMgrCtxt CommMgrCtxt;

View file

@ -337,17 +337,16 @@ dvc_makeMQTTNukeInvite( XW_DUtilCtxt* dutil, XWEnv xwe,
#endif #endif
} }
/* Ship with == 1, but increase to test */ XP_S16
#define MULTI_MSG_COUNT 1
void
dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe, dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure, MsgAndTopicProc proc, void* closure,
XP_U16 nBufs, SendMsgsPacket bufs[],
const MQTTDevID* addressee, const MQTTDevID* addressee,
XP_U32 gameID, const XP_U8* buf, XP_U16 len, XP_U32 gameID, XP_U16 streamVersion )
XP_U16 streamVersion )
{ {
XP_LOGFF( "(len: %d; streamVersion: %X)", len, streamVersion ); XP_S16 nSent0 = 0;
XP_S16 nSent1 = 0;
XP_LOGFF( "(nBufs: %d; streamVersion: %X)", nBufs, streamVersion );
XP_UCHAR devTopic[64]; /* used by two below */ XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) ); formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) );
@ -357,11 +356,14 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */ it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */
if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) { if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) {
for ( int ii = 0; ii < nBufs; ++ii ) {
XWStreamCtxt* stream = mkStream( dutil ); XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream ); addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream );
stream_putBytes( stream, buf, len ); stream_putBytes( stream, bufs[ii].buf, bufs[ii].len );
callProc( proc, closure, devTopic, stream ); callProc( proc, closure, devTopic, stream );
stream_destroy( stream ); stream_destroy( stream );
nSent0 += bufs[ii].len;
}
} }
if ( 0 == streamVersion || STREAM_VERS_NORELAY <= streamVersion ) { if ( 0 == streamVersion || STREAM_VERS_NORELAY <= streamVersion ) {
@ -370,12 +372,16 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
/* For now, we ship one message per packet. But the receiving code /* For now, we ship one message per packet. But the receiving code
should be ready */ should be ready */
stream_putU8( stream, MULTI_MSG_COUNT ); stream_putU8( stream, nBufs );
for ( int ii = 0; ii < MULTI_MSG_COUNT; ++ii ) { for ( int ii = 0; ii < nBufs; ++ii ) {
XP_U32 len = bufs[ii].len;
stream_putU32VL( stream, len ); stream_putU32VL( stream, len );
stream_putBytes( stream, buf, len ); stream_putBytes( stream, bufs[ii].buf, len );
nSent1 += len;
} }
XP_ASSERT( nSent0 == nSent1 || nSent0 == 0 || nSent1 == 0 );
XP_UCHAR gameTopic[64]; XP_UCHAR gameTopic[64];
size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic), size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic),
"%s/%X", devTopic, gameID ); "%s/%X", devTopic, gameID );
@ -385,6 +391,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
callProc( proc, closure, gameTopic, stream ); callProc( proc, closure, gameTopic, stream );
stream_destroy( stream ); stream_destroy( stream );
} }
return XP_MAX( nSent0, nSent1 );
} }
void void

View file

@ -46,11 +46,12 @@ void dvc_makeMQTTNukeInvite( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure, MsgAndTopicProc proc, void* closure,
const NetLaunchInfo* nli ); const NetLaunchInfo* nli );
void dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe, XP_S16 dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure, MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee, XP_U16 nBufs, SendMsgsPacket bufs[],
XP_U32 gameID, const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee, XP_U32 gameID,
XP_U16 streamVersion ); XP_U16 streamVersion );
void dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe, void dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure, MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee, const MQTTDevID* addressee,

View file

@ -365,7 +365,7 @@ static void
initTProcsCurses( CommonGlobals* cGlobals ) initTProcsCurses( CommonGlobals* cGlobals )
{ {
cGlobals->procs.closure = cGlobals; cGlobals->procs.closure = cGlobals;
cGlobals->procs.sendMsg = linux_send; cGlobals->procs.sendMsgs = linux_send;
#ifdef XWFEATURE_COMMS_INVITE #ifdef XWFEATURE_COMMS_INVITE
cGlobals->procs.sendInvt = linux_send_invt; cGlobals->procs.sendInvt = linux_send_invt;
#endif #endif

View file

@ -485,7 +485,7 @@ setTransportProcs( TransportProcs* procs, GtkGameGlobals* globals )
{ {
XP_ASSERT( !procs->closure ); XP_ASSERT( !procs->closure );
procs->closure = globals; procs->closure = globals;
procs->sendMsg = linux_send; procs->sendMsgs = linux_send;
#ifdef XWFEATURE_COMMS_INVITE #ifdef XWFEATURE_COMMS_INVITE
procs->sendInvt = linux_send_invt; procs->sendInvt = linux_send_invt;
#endif #endif

View file

@ -416,8 +416,8 @@ linux_bt_close( CommonGlobals* globals )
} }
} /* linux_bt_close */ } /* linux_bt_close */
XP_S16 static XP_S16
linux_bt_send( const XP_U8* buf, XP_U16 buflen, linux_bt_send_impl( const XP_U8* buf, XP_U16 buflen,
const CommsAddrRec* addrP, const CommsAddrRec* addrP,
CommonGlobals* globals ) CommonGlobals* globals )
{ {
@ -460,7 +460,26 @@ linux_bt_send( const XP_U8* buf, XP_U16 buflen,
} }
LOG_RETURNF( "%d", nSent ); LOG_RETURNF( "%d", nSent );
return nSent; return nSent;
} /* linux_bt_send */ } /* linux_bt_send_impl */
XP_S16
linux_bt_send( XP_U16 count, SendMsgsPacket msgs[],
const CommsAddrRec* addrRec, CommonGlobals* globals )
{
XP_S16 result = 0;
for ( int ii = 0; ii < count; ++ii ) {
const SendMsgsPacket* packet = &msgs[ii];
XP_S16 tmp = linux_bt_send_impl( packet->buf, packet->len,
addrRec, globals );
if ( tmp > 0 ) {
result += tmp;
} else {
result = -1;
break;
}
}
return result;
}
#if defined BT_USE_RFCOMM #if defined BT_USE_RFCOMM
static void static void

View file

@ -28,7 +28,7 @@ void linux_bt_open( CommonGlobals* globals, XP_Bool amMaster );
void linux_bt_reset( CommonGlobals* globals ); void linux_bt_reset( CommonGlobals* globals );
void linux_bt_close( CommonGlobals* globals ); void linux_bt_close( CommonGlobals* globals );
XP_S16 linux_bt_send( const XP_U8* buf, XP_U16 buflen, XP_S16 linux_bt_send( XP_U16 count, SendMsgsPacket msgs[],
const CommsAddrRec* addrRec, const CommsAddrRec* addrRec,
CommonGlobals* globals ); CommonGlobals* globals );
XP_S16 linux_bt_receive( int sock, XP_U8* buf, XP_U16 buflen ); XP_S16 linux_bt_receive( int sock, XP_U8* buf, XP_U16 buflen );

View file

@ -1441,22 +1441,14 @@ linux_reset( XWEnv xwe, void* closure )
#endif #endif
XP_S16 XP_S16
linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen, linux_send( XWEnv XP_UNUSED(xwe), XP_U16 count, SendMsgsPacket msgs[],
XP_U16 streamVersion, const XP_UCHAR* msgNo, XP_U16 streamVersion, const CommsAddrRec* addrRec,
XP_U32 XP_UNUSED(createdStamp), CommsConnType conType, XP_U32 gameID, void* closure )
const CommsAddrRec* addrRec, CommsConnType conType,
XP_U32 gameID, void* closure )
{ {
XP_LOGFF( "(streamVersion: %X, len: %d)", streamVersion, buflen ); XP_LOGFF( "(streamVersion: %X)", streamVersion );
XP_S16 nSent = -1; XP_S16 nSent = -1;
CommonGlobals* cGlobals = (CommonGlobals*)closure; CommonGlobals* cGlobals = (CommonGlobals*)closure;
/* if ( !!addrRec ) { */
/* conType = addr_getType( addrRec ); */
/* } else { */
/* conType = addr_getType( &cGlobals->params->addr ); */
/* } */
switch ( conType ) { switch ( conType ) {
#ifdef XWFEATURE_RELAY #ifdef XWFEATURE_RELAY
case COMMS_CONN_RELAY: case COMMS_CONN_RELAY:
@ -1474,7 +1466,7 @@ linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen,
case COMMS_CONN_BT: { case COMMS_CONN_BT: {
XP_Bool isServer = game_getIsServer( &cGlobals->game ); XP_Bool isServer = game_getIsServer( &cGlobals->game );
linux_bt_open( cGlobals, isServer ); linux_bt_open( cGlobals, isServer );
nSent = linux_bt_send( buf, buflen, addrRec, cGlobals ); nSent = linux_bt_send( count, msgs, addrRec, cGlobals );
} }
break; break;
#endif #endif
@ -1499,15 +1491,14 @@ linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen,
// use serverphone if I'm a client, else hope one's provided (this is // use serverphone if I'm a client, else hope one's provided (this is
// a reply) // a reply)
nSent = linux_sms_send( cGlobals->params, buf, buflen, msgNo, nSent = linux_sms_send( cGlobals->params, count, msgs, addrRec->u.sms.phone,
addrRec->u.sms.phone, addrRec->u.sms.port, addrRec->u.sms.port, gameID );
gameID );
} }
break; break;
#endif #endif
case COMMS_CONN_MQTT: case COMMS_CONN_MQTT:
nSent = mqttc_send( cGlobals->params, gameID, buf, buflen, nSent = mqttc_send( cGlobals->params, gameID, count, msgs,
streamVersion, &addrRec->u.mqtt.devID ); streamVersion, &addrRec->u.mqtt.devID );
break; break;

View file

@ -38,11 +38,9 @@ typedef struct LinuxBMStruct {
} LinuxBMStruct; } LinuxBMStruct;
int initListenerSocket( int port ); int initListenerSocket( int port );
XP_S16 linux_send( XWEnv xwe, const XP_U8* buf, XP_U16 buflen, XP_S16 linux_send( XWEnv xwe, XP_U16 count, SendMsgsPacket msgs[],
XP_U16 streamVersion, const XP_UCHAR* msgNo, XP_U16 streamVersion, const CommsAddrRec* addrRec,
XP_U32 createdStamp, const CommsAddrRec* addrRec, CommsConnType conType, XP_U32 gameID, void* closure );
CommsConnType conType, XP_U32 gameID,
void* closure );
XP_S16 linux_send_invt( XWEnv xwe, const NetLaunchInfo* nli, XP_S16 linux_send_invt( XWEnv xwe, const NetLaunchInfo* nli,
XP_U32 createdStamp, const CommsAddrRec* addr, XP_U32 createdStamp, const CommsAddrRec* addr,
CommsConnType conType, void* closure ); CommsConnType conType, void* closure );

View file

@ -324,8 +324,8 @@ linux_sms_invite( LaunchParams* params, const NetLaunchInfo* nli,
stream_destroy( stream ); stream_destroy( stream );
} }
XP_S16 static XP_S16
linux_sms_send( LaunchParams* params, const XP_U8* buf, linux_sms_send_impl( LaunchParams* params, const XP_U8* buf,
XP_U16 buflen, const XP_UCHAR* msgNo, const XP_UCHAR* phone, XP_U16 buflen, const XP_UCHAR* msgNo, const XP_UCHAR* phone,
XP_U16 port, XP_U32 gameID ) XP_U16 port, XP_U32 gameID )
{ {
@ -344,6 +344,25 @@ linux_sms_send( LaunchParams* params, const XP_U8* buf,
return nSent; return nSent;
} }
XP_S16
linux_sms_send( LaunchParams* params, XP_U16 count, SendMsgsPacket msgs[],
const XP_UCHAR* phone, XP_U16 port, XP_U32 gameID )
{
XP_S16 result = 0;
for ( int ii = 0; ii < count; ++ii ) {
const SendMsgsPacket* packet = &msgs[ii];
XP_S16 tmp = linux_sms_send_impl( params, packet->buf,
packet->len, packet->msgNo, phone, port, gameID );
if ( tmp > 0 ) {
result += tmp;
} else {
result = -1;
break;
}
}
return result;
}
typedef struct _RetryClosure { typedef struct _RetryClosure {
LaunchParams* params; LaunchParams* params;
SMS_CMD cmd; SMS_CMD cmd;

View file

@ -38,8 +38,8 @@ typedef struct _SMSProcs {
void linux_sms_init( LaunchParams* params, const gchar* phone, void linux_sms_init( LaunchParams* params, const gchar* phone,
XP_U16 port, const SMSProcs* procs, void* procClosure ); XP_U16 port, const SMSProcs* procs, void* procClosure );
XP_S16 linux_sms_send( LaunchParams* params, const XP_U8* buf, XP_U16 buflen, XP_S16 linux_sms_send( LaunchParams* params, XP_U16 count, SendMsgsPacket msgs[],
const XP_UCHAR* msgNo, const XP_UCHAR* phone, XP_U16 port, const XP_UCHAR* phone, XP_U16 port,
XP_U32 gameID ); XP_U32 gameID );
void linux_sms_invite( LaunchParams* params, const NetLaunchInfo* nli, void linux_sms_invite( LaunchParams* params, const NetLaunchInfo* nli,
const gchar* phone, int port ); const gchar* phone, int port );

View file

@ -293,15 +293,15 @@ mqttc_invite( LaunchParams* params, const NetLaunchInfo* nli,
XP_S16 XP_S16
mqttc_send( LaunchParams* params, XP_U32 gameID, mqttc_send( LaunchParams* params, XP_U32 gameID,
const XP_U8* buf, XP_U16 len, XP_U16 streamVersion, XP_U16 nMsgs, SendMsgsPacket msgs[],
const MQTTDevID* addressee ) XP_U16 streamVersion, const MQTTDevID* addressee )
{ {
MQTTConStorage* storage = getStorage( params ); MQTTConStorage* storage = getStorage( params );
XP_S16 nSent = dvc_makeMQTTMessages( params->dutil, NULL_XWE,
dvc_makeMQTTMessages( params->dutil, NULL_XWE,
msgAndTopicProc, storage, msgAndTopicProc, storage,
addressee, gameID, buf, len, streamVersion ); nMsgs, msgs, addressee,
return len; gameID, streamVersion );
return nSent;
} }
void void

View file

@ -1,6 +1,7 @@
/* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */ /* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */
/* /*
* Copyright 2020 by Eric House (xwords@eehouse.org). All rights reserved. * Copyright 2020 - 2023 by Eric House (xwords@eehouse.org). All rights
* reserved.
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -32,7 +33,7 @@ void mqttc_invite( LaunchParams* params, const NetLaunchInfo* nli,
const MQTTDevID* mqttInvitee ); const MQTTDevID* mqttInvitee );
void mqttc_onInviteHandled( LaunchParams* params, const NetLaunchInfo* nli ); void mqttc_onInviteHandled( LaunchParams* params, const NetLaunchInfo* nli );
XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID, XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID,
const XP_U8* buf, XP_U16 len, XP_U16 streamVersion, XP_U16 count, SendMsgsPacket msgs[], XP_U16 streamVersion,
const MQTTDevID* addressee ); const MQTTDevID* addressee );
void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID ); void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID );