From 326624e942a4a4ebc4fbb2bf039918747b2f89c5 Mon Sep 17 00:00:00 2001 From: Eric House Date: Fri, 24 Mar 2023 20:28:45 -0700 Subject: [PATCH] on linux, keep a queue of mqtt messages Was dropping invitations that were posted before I connected to the remote broker. Now they're kept, and sent until the broker says publish succeeded. Better would be to have comms resend when mqtt connects, but I'm not set up for that. --- xwords4/common/device.c | 20 +++--- xwords4/linux/mqttcon.c | 156 +++++++++++++++++++++++++++++++++++----- 2 files changed, 150 insertions(+), 26 deletions(-) diff --git a/xwords4/common/device.c b/xwords4/common/device.c index a89e98389..c118652b4 100644 --- a/xwords4/common/device.c +++ b/xwords4/common/device.c @@ -346,6 +346,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe, { XP_S16 nSent0 = 0; XP_S16 nSent1 = 0; + XP_U8 nBufs = 0; XP_LOGFF( "(streamVersion: %X)", streamVersion ); XP_UCHAR devTopic[64]; /* used by two below */ formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) ); @@ -355,9 +356,10 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe, more likely we just aren't in that point in the game, but send both. If it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */ - if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) { - for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; - !!packet; packet = (SendMsgsPacket* const)packet->next ) { + for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; + !!packet; packet = (SendMsgsPacket* const)packet->next ) { + ++nBufs; + if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) { XWStreamCtxt* stream = mkStream( dutil ); addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream ); stream_putBytes( stream, packet->buf, packet->len ); @@ -371,15 +373,12 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream = mkStream( dutil ); addProto3HeaderCmd( dutil, xwe, CMD_MSG, stream ); - XP_U8 nBufs = 0; - for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; - !!packet; packet = (SendMsgsPacket* const)packet->next ) { - ++nBufs; - } - /* For now, we ship one message per packet. But the receiving code should be ready */ stream_putU8( stream, nBufs ); + if ( 1 < nBufs ) { + XP_LOGFF( "nBufs > 1: %d", nBufs ); + } for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; !!packet; packet = (SendMsgsPacket* const)packet->next ) { XP_U32 len = packet->len; @@ -459,6 +458,9 @@ dispatchMsgs( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U8 proto, XWStreamCtxt* stream, XP_U32 gameID, const CommsAddrRec* from ) { int msgCount = proto >= PROTO_3 ? stream_getU8( stream ) : 1; + if ( 1 < msgCount ) { + XP_LOGFF( "nBufs > 1: %d", msgCount ); + } for ( int ii = 0; ii < msgCount; ++ii ) { XP_U32 msgLen; if ( PROTO_1 == proto ) { diff --git a/xwords4/linux/mqttcon.c b/xwords4/linux/mqttcon.c index ad110da1d..38cd7ce37 100644 --- a/xwords4/linux/mqttcon.c +++ b/xwords4/linux/mqttcon.c @@ -32,10 +32,137 @@ typedef struct _MQTTConStorage { gchar clientIDStr[32]; int msgPipe[2]; XP_Bool connected; + GSList* queue; } MQTTConStorage; #define DEFAULT_QOS 2 +typedef struct _QElem { + gchar* topic; + uint8_t* buf; + uint16_t len; + int mid; +} QElem; + +static void +sendQueueHead( MQTTConStorage* storage ) +{ + LOG_FUNC(); + if ( storage->connected ) { + for ( GSList* iter = storage->queue; !!iter; iter = iter->next ) { + QElem* elem = (QElem*)iter->data; + if ( 0 == elem->mid ) { + int err = mosquitto_publish( storage->mosq, &elem->mid, elem->topic, + elem->len, elem->buf, DEFAULT_QOS, true ); + XP_LOGFF( "mosquitto_publish(topic=%s, len=%d) => %s; mid=%d", elem->topic, + elem->len, mosquitto_strerror(err), elem->mid ); + break; + } + } + } + LOG_RETURN_VOID(); +} /* sendQueueHead */ + +typedef struct _FindState { + QElem elem; + XP_Bool found; +} FindState; + +static bool +elemsEqual( QElem* qe1, QElem* qe2 ) +{ + return qe1->len == qe2->len + && 0 == strcmp( qe1->topic, qe2->topic ) + && 0 == memcmp( qe1->buf, qe2->buf, qe1->len ); +} + +static void +findMsg( gpointer data, gpointer user_data ) +{ + QElem* qe = (QElem*)data; + FindState* fsp = (FindState*)user_data; + if ( !fsp->found && elemsEqual( qe, &fsp->elem ) ) { + fsp->found = XP_TRUE; + } +} + +static gint +queueIdle( gpointer data ) +{ + LOG_FUNC(); + MQTTConStorage* storage = (MQTTConStorage*)data; + sendQueueHead( storage ); + return FALSE; +} + +static void +tickleQueue( MQTTConStorage* storage ) +{ + ADD_ONETIME_IDLE( queueIdle, storage ); +} + +/* Add to queue if not already there */ +static void +enqueue( MQTTConStorage* storage, const char* topic, + const XP_U8* buf, XP_U16 len ) +{ + FindState fs = { + .elem.buf = (uint8_t*)buf, + .elem.len = len, + .elem.topic = (gchar*)topic, + }; + g_slist_foreach( storage->queue, findMsg, &fs ); + + if ( fs.found ) { + XP_LOGFF( "dropping duplicate message" ); + } else { + QElem* elem = g_malloc0( sizeof(*elem) ); + elem->topic = g_strdup( topic ); + elem->buf = g_memdup2( buf, len ); + elem->len = len; + storage->queue = g_slist_append( storage->queue, elem ); + XP_LOGFF( "added elem; len now %d", g_slist_length(storage->queue) ); + + tickleQueue( storage ); + } +} + +typedef struct _RemoveState { + MQTTConStorage* storage; + int mid; + XP_Bool found; +} RemoveState; + +static void +removeWithMid( gpointer data, gpointer user_data ) +{ + QElem* qe = (QElem*)data; + RemoveState* rsp = (RemoveState*)user_data; + if ( qe->mid == rsp->mid ) { + XP_ASSERT( !rsp->found ); + rsp->found = XP_TRUE; + MQTTConStorage* storage = rsp->storage; + storage->queue = g_slist_remove( storage->queue, qe ); + XP_LOGFF( "removed elem with mid %d; len now %d", rsp->mid, + g_slist_length(storage->queue) ); + + g_free( qe->topic ); + g_free( qe->buf ); + g_free( qe ); + } +} + +static void +dequeue( MQTTConStorage* storage, int mid ) +{ + XP_LOGFF("(mid: %d)", mid ); + RemoveState rs = { .mid = mid, .storage = storage, }; + g_slist_foreach( storage->queue, removeWithMid, &rs ); + if ( !rs.found ) { + XP_LOGFF( "failed to find mid %d", mid ); + } +} + static MQTTConStorage* getStorage( LaunchParams* params ) { @@ -99,6 +226,8 @@ connect_callback( struct mosquitto* mosq, void* userdata, XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0], mosquitto_strerror(err), mid ); XP_USE(err); + + tickleQueue( storage ); } static void @@ -116,6 +245,14 @@ subscribe_callback( struct mosquitto *mosq, void *userdata, int mid, } } +static void +publish_callback( struct mosquitto* XP_UNUSED(mosq), void* userdata, int mid ) +{ + XP_LOGFF( "publish of mid %d successful", mid ); + MQTTConStorage* storage = (MQTTConStorage*)userdata; + dequeue( storage, mid ); +} + static void log_callback( struct mosquitto *mosq, void *userdata, int level, const char* str ) @@ -172,22 +309,6 @@ handle_gotmsg( GIOChannel* source, GIOCondition XP_UNUSED(condition), gpointer d return TRUE; } /* handle_gotmsg */ -static bool -postOne( MQTTConStorage* storage, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len ) -{ - int err = -1; - if ( storage->connected ) { - int mid; - err = mosquitto_publish( storage->mosq, &mid, topic, - len, buf, DEFAULT_QOS, true ); - XP_LOGFF( "mosquitto_publish(topic=%s, len=%d) => %s; mid=%d", topic, - len, mosquitto_strerror(err), mid ); - } else { - XP_LOGFF( "not connected and so not sending" ); - } - return 0 == err; -} - void mqttc_init( LaunchParams* params ) { @@ -219,6 +340,7 @@ mqttc_init( LaunchParams* params ) mosquitto_connect_callback_set( mosq, connect_callback ); mosquitto_message_callback_set( mosq, onMessageReceived ); mosquitto_subscribe_callback_set( mosq, subscribe_callback ); + mosquitto_publish_callback_set( mosq, publish_callback ); int keepalive = 60; err = mosquitto_connect( mosq, params->connInfo.mqtt.hostName, @@ -273,7 +395,7 @@ static void msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len ) { MQTTConStorage* storage = (MQTTConStorage*)closure; - (void)postOne( storage, topic, buf, len ); + (void)enqueue( storage, topic, buf, len ); } void