on linux, keep a queue of mqtt messages

Was dropping invitations that were posted before I connected to the
remote broker. Now they're kept, and sent until the broker says
publish succeeded. Better would be to have comms resend when mqtt
connects, but I'm not set up for that.
This commit is contained in:
Eric House 2023-03-24 20:28:45 -07:00
parent 9144fbae59
commit 326624e942
2 changed files with 150 additions and 26 deletions

View file

@ -346,6 +346,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
{ {
XP_S16 nSent0 = 0; XP_S16 nSent0 = 0;
XP_S16 nSent1 = 0; XP_S16 nSent1 = 0;
XP_U8 nBufs = 0;
XP_LOGFF( "(streamVersion: %X)", streamVersion ); XP_LOGFF( "(streamVersion: %X)", streamVersion );
XP_UCHAR devTopic[64]; /* used by two below */ XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) ); formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) );
@ -355,9 +356,10 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
more likely we just aren't in that point in the game, but send both. If more likely we just aren't in that point in the game, but send both. If
it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */ it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */
if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) {
for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs;
!!packet; packet = (SendMsgsPacket* const)packet->next ) { !!packet; packet = (SendMsgsPacket* const)packet->next ) {
++nBufs;
if ( 0 == streamVersion || STREAM_VERS_NORELAY > streamVersion ) {
XWStreamCtxt* stream = mkStream( dutil ); XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream ); addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream );
stream_putBytes( stream, packet->buf, packet->len ); stream_putBytes( stream, packet->buf, packet->len );
@ -371,15 +373,12 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream = mkStream( dutil ); XWStreamCtxt* stream = mkStream( dutil );
addProto3HeaderCmd( dutil, xwe, CMD_MSG, stream ); addProto3HeaderCmd( dutil, xwe, CMD_MSG, stream );
XP_U8 nBufs = 0;
for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs;
!!packet; packet = (SendMsgsPacket* const)packet->next ) {
++nBufs;
}
/* For now, we ship one message per packet. But the receiving code /* For now, we ship one message per packet. But the receiving code
should be ready */ should be ready */
stream_putU8( stream, nBufs ); stream_putU8( stream, nBufs );
if ( 1 < nBufs ) {
XP_LOGFF( "nBufs > 1: %d", nBufs );
}
for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs; for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs;
!!packet; packet = (SendMsgsPacket* const)packet->next ) { !!packet; packet = (SendMsgsPacket* const)packet->next ) {
XP_U32 len = packet->len; XP_U32 len = packet->len;
@ -459,6 +458,9 @@ dispatchMsgs( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U8 proto, XWStreamCtxt* stream,
XP_U32 gameID, const CommsAddrRec* from ) XP_U32 gameID, const CommsAddrRec* from )
{ {
int msgCount = proto >= PROTO_3 ? stream_getU8( stream ) : 1; int msgCount = proto >= PROTO_3 ? stream_getU8( stream ) : 1;
if ( 1 < msgCount ) {
XP_LOGFF( "nBufs > 1: %d", msgCount );
}
for ( int ii = 0; ii < msgCount; ++ii ) { for ( int ii = 0; ii < msgCount; ++ii ) {
XP_U32 msgLen; XP_U32 msgLen;
if ( PROTO_1 == proto ) { if ( PROTO_1 == proto ) {

View file

@ -32,10 +32,137 @@ typedef struct _MQTTConStorage {
gchar clientIDStr[32]; gchar clientIDStr[32];
int msgPipe[2]; int msgPipe[2];
XP_Bool connected; XP_Bool connected;
GSList* queue;
} MQTTConStorage; } MQTTConStorage;
#define DEFAULT_QOS 2 #define DEFAULT_QOS 2
typedef struct _QElem {
gchar* topic;
uint8_t* buf;
uint16_t len;
int mid;
} QElem;
static void
sendQueueHead( MQTTConStorage* storage )
{
LOG_FUNC();
if ( storage->connected ) {
for ( GSList* iter = storage->queue; !!iter; iter = iter->next ) {
QElem* elem = (QElem*)iter->data;
if ( 0 == elem->mid ) {
int err = mosquitto_publish( storage->mosq, &elem->mid, elem->topic,
elem->len, elem->buf, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s, len=%d) => %s; mid=%d", elem->topic,
elem->len, mosquitto_strerror(err), elem->mid );
break;
}
}
}
LOG_RETURN_VOID();
} /* sendQueueHead */
typedef struct _FindState {
QElem elem;
XP_Bool found;
} FindState;
static bool
elemsEqual( QElem* qe1, QElem* qe2 )
{
return qe1->len == qe2->len
&& 0 == strcmp( qe1->topic, qe2->topic )
&& 0 == memcmp( qe1->buf, qe2->buf, qe1->len );
}
static void
findMsg( gpointer data, gpointer user_data )
{
QElem* qe = (QElem*)data;
FindState* fsp = (FindState*)user_data;
if ( !fsp->found && elemsEqual( qe, &fsp->elem ) ) {
fsp->found = XP_TRUE;
}
}
static gint
queueIdle( gpointer data )
{
LOG_FUNC();
MQTTConStorage* storage = (MQTTConStorage*)data;
sendQueueHead( storage );
return FALSE;
}
static void
tickleQueue( MQTTConStorage* storage )
{
ADD_ONETIME_IDLE( queueIdle, storage );
}
/* Add to queue if not already there */
static void
enqueue( MQTTConStorage* storage, const char* topic,
const XP_U8* buf, XP_U16 len )
{
FindState fs = {
.elem.buf = (uint8_t*)buf,
.elem.len = len,
.elem.topic = (gchar*)topic,
};
g_slist_foreach( storage->queue, findMsg, &fs );
if ( fs.found ) {
XP_LOGFF( "dropping duplicate message" );
} else {
QElem* elem = g_malloc0( sizeof(*elem) );
elem->topic = g_strdup( topic );
elem->buf = g_memdup2( buf, len );
elem->len = len;
storage->queue = g_slist_append( storage->queue, elem );
XP_LOGFF( "added elem; len now %d", g_slist_length(storage->queue) );
tickleQueue( storage );
}
}
typedef struct _RemoveState {
MQTTConStorage* storage;
int mid;
XP_Bool found;
} RemoveState;
static void
removeWithMid( gpointer data, gpointer user_data )
{
QElem* qe = (QElem*)data;
RemoveState* rsp = (RemoveState*)user_data;
if ( qe->mid == rsp->mid ) {
XP_ASSERT( !rsp->found );
rsp->found = XP_TRUE;
MQTTConStorage* storage = rsp->storage;
storage->queue = g_slist_remove( storage->queue, qe );
XP_LOGFF( "removed elem with mid %d; len now %d", rsp->mid,
g_slist_length(storage->queue) );
g_free( qe->topic );
g_free( qe->buf );
g_free( qe );
}
}
static void
dequeue( MQTTConStorage* storage, int mid )
{
XP_LOGFF("(mid: %d)", mid );
RemoveState rs = { .mid = mid, .storage = storage, };
g_slist_foreach( storage->queue, removeWithMid, &rs );
if ( !rs.found ) {
XP_LOGFF( "failed to find mid %d", mid );
}
}
static MQTTConStorage* static MQTTConStorage*
getStorage( LaunchParams* params ) getStorage( LaunchParams* params )
{ {
@ -99,6 +226,8 @@ connect_callback( struct mosquitto* mosq, void* userdata,
XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0], XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0],
mosquitto_strerror(err), mid ); mosquitto_strerror(err), mid );
XP_USE(err); XP_USE(err);
tickleQueue( storage );
} }
static void static void
@ -116,6 +245,14 @@ subscribe_callback( struct mosquitto *mosq, void *userdata, int mid,
} }
} }
static void
publish_callback( struct mosquitto* XP_UNUSED(mosq), void* userdata, int mid )
{
XP_LOGFF( "publish of mid %d successful", mid );
MQTTConStorage* storage = (MQTTConStorage*)userdata;
dequeue( storage, mid );
}
static void static void
log_callback( struct mosquitto *mosq, void *userdata, int level, log_callback( struct mosquitto *mosq, void *userdata, int level,
const char* str ) const char* str )
@ -172,22 +309,6 @@ handle_gotmsg( GIOChannel* source, GIOCondition XP_UNUSED(condition), gpointer d
return TRUE; return TRUE;
} /* handle_gotmsg */ } /* handle_gotmsg */
static bool
postOne( MQTTConStorage* storage, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
{
int err = -1;
if ( storage->connected ) {
int mid;
err = mosquitto_publish( storage->mosq, &mid, topic,
len, buf, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s, len=%d) => %s; mid=%d", topic,
len, mosquitto_strerror(err), mid );
} else {
XP_LOGFF( "not connected and so not sending" );
}
return 0 == err;
}
void void
mqttc_init( LaunchParams* params ) mqttc_init( LaunchParams* params )
{ {
@ -219,6 +340,7 @@ mqttc_init( LaunchParams* params )
mosquitto_connect_callback_set( mosq, connect_callback ); mosquitto_connect_callback_set( mosq, connect_callback );
mosquitto_message_callback_set( mosq, onMessageReceived ); mosquitto_message_callback_set( mosq, onMessageReceived );
mosquitto_subscribe_callback_set( mosq, subscribe_callback ); mosquitto_subscribe_callback_set( mosq, subscribe_callback );
mosquitto_publish_callback_set( mosq, publish_callback );
int keepalive = 60; int keepalive = 60;
err = mosquitto_connect( mosq, params->connInfo.mqtt.hostName, err = mosquitto_connect( mosq, params->connInfo.mqtt.hostName,
@ -273,7 +395,7 @@ static void
msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len ) msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
{ {
MQTTConStorage* storage = (MQTTConStorage*)closure; MQTTConStorage* storage = (MQTTConStorage*)closure;
(void)postOne( storage, topic, buf, len ); (void)enqueue( storage, topic, buf, len );
} }
void void