snapshot: new mqtt msg format allowing multiple messages

This commit is contained in:
Eric House 2022-12-19 19:53:57 -08:00
parent b179a0bade
commit af4e39d921
4 changed files with 161 additions and 48 deletions

View file

@ -204,15 +204,19 @@ dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
MQTTDevID devid;
getMQTTDevID( dutil, xwe, XP_FALSE, &devid );
XP_UCHAR buf[64];
/* First, the main device topic */
formatMQTTDevTopic( &devid, buf, VSIZE(buf) );
#ifdef MQTT_DEV_TOPICS
/* First, the main device topic */
topics[count++] = appendToStorage( storage, &offset, buf );
#endif
#ifdef MQTT_GAMEID_TOPICS
/* Then the pattern that includes gameIDs */
XP_SNPRINTF( buf, VSIZE(buf), "%s/+", topics[0] );
topics[count++] = appendToStorage( storage, &offset, buf );
XP_UCHAR buf2[64];
size_t siz = XP_SNPRINTF( buf2, VSIZE(buf2), "%s/+", buf );
XP_ASSERT( siz < VSIZE(buf) );
topics[count++] = appendToStorage( storage, &offset, buf2 );
#endif
/* Finally, the control pattern */
@ -248,8 +252,10 @@ dvc_getMQTTPubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( devid, devTopic, VSIZE(devTopic) );
#ifdef MQTT_DEV_TOPICS
/* device topic; eventually goes away; but invites? */
topics[count++] = appendToStorage( storage, &offset, devTopic );
#endif
#ifdef MQTT_GAMEID_TOPICS
XP_UCHAR buf[128];
@ -272,6 +278,7 @@ typedef enum { CMD_INVITE, CMD_MSG, CMD_DEVGONE, } MQTTCmd;
// #define PROTO_0 0
#define PROTO_1 1 /* moves gameID into "header" relay2 knows about */
#define PROTO_2 2 /* adds timestamp to header */
#define PROTO_3 3 /* adds multi-message, removes gameID */
#ifndef MQTT_USE_PROTO
# define MQTT_USE_PROTO PROTO_1
#endif
@ -299,6 +306,29 @@ addHeaderGameIDAndCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
stream_putU8( stream, cmd );
}
#ifdef MQTT_GAMEID_TOPICS
static void
addProto3HeaderCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
XP_U32 timestamp, XWStreamCtxt* stream )
{
stream_putU8( stream, PROTO_3 );
MQTTDevID myID;
dvc_getMQTTDevID( dutil, xwe, &myID );
myID = htobe64( myID );
stream_putBytes( stream, &myID, sizeof(myID) );
if ( 0 == timestamp ) {
timestamp = dutil_getCurSeconds( dutil, xwe );
XP_LOGFF( "replacing timestamp of 0" );
}
stream_putU32( stream, timestamp );
stream_putU8( stream, cmd );
}
#endif
void
dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
const NetLaunchInfo* nli, XP_U32 timestamp )
@ -308,16 +338,54 @@ dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
nli_saveToStream( nli, stream );
}
/* Ship with == 1, but increase to test */
#define MULTI_MSG_COUNT 1
void
dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len )
dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len )
{
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, timestamp, stream );
if ( PROTO_2 <= MQTT_USE_PROTO ) {
stream_putU32VL( stream, len );
XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) );
#ifdef MQTT_DEV_TOPICS
{
XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, timestamp, stream );
if ( PROTO_2 <= MQTT_USE_PROTO ) {
stream_putU32VL( stream, len );
}
stream_putBytes( stream, buf, len );
(*proc)(closure, devTopic, stream );
stream_destroy( stream, xwe );
}
stream_putBytes( stream, buf, len );
#endif
#ifdef MQTT_GAMEID_TOPICS
{
XWStreamCtxt* stream = mkStream( dutil );
addProto3HeaderCmd( dutil, xwe, CMD_MSG, timestamp, stream );
/* For now, we ship one message per packet. But the receiving code
should be ready */
stream_putU8( stream, MULTI_MSG_COUNT );
for ( int ii = 0; ii < MULTI_MSG_COUNT; ++ii ) {
stream_putU32VL( stream, len );
stream_putBytes( stream, buf, len );
}
XP_UCHAR gameTopic[64];
size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic),
"%s/%X", devTopic, gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
(*proc)( closure, gameTopic, stream );
stream_destroy( stream, xwe );
}
#endif
}
void
@ -330,12 +398,17 @@ dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
}
static XP_Bool
isDevMsg( const MQTTDevID* myID, const XP_UCHAR* topic )
isDevMsg( const MQTTDevID* myID, const XP_UCHAR* topic, XP_U32* gameID )
{
XP_UCHAR buf[64];
formatMQTTDevTopic( myID, buf, VSIZE(buf) );
XP_Bool success = 0 == strncmp( buf, topic, XP_STRLEN(buf) );
XP_LOGFF( "(%s) => %s", topic, boolToStr(success) );
size_t topicLen = XP_STRLEN(buf);
XP_Bool success = 0 == strncmp( buf, topic, topicLen );
if ( success ) {
const XP_UCHAR* gameIDPart = topic + topicLen;
sscanf( gameIDPart, "/%X", gameID );
}
// XP_LOGFF( "(%s) => %s (gameID=%X)", topic, boolToStr(success), *gameID );
return success;
}
@ -349,6 +422,32 @@ isCtrlMsg( const MQTTDevID* myID, const XP_UCHAR* topic )
return success;
}
static void
dispatchMsgs( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U8 proto, XWStreamCtxt* stream,
XP_U32 gameID, const CommsAddrRec* from )
{
int msgCount = proto >= PROTO_3 ? stream_getU8( stream ) : 1;
for ( int ii = 0; ii < msgCount; ++ii ) {
XP_U32 msgLen;
if ( PROTO_1 == proto ) {
msgLen = stream_getSize( stream );
} else {
msgLen = stream_getU32VL( stream );
}
if ( msgLen > stream_getSize( stream ) ) {
XP_LOGFF( "msglen %d too large", msgLen );
msgLen = 0;
XP_ASSERT(0);
}
if ( 0 < msgLen ) {
XP_U8 msgBuf[msgLen];
stream_getBytes( stream, msgBuf, msgLen );
dutil_onMessageReceived( dutil, xwe, gameID,
from, msgBuf, msgLen );
}
}
}
void
dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
const XP_U8* buf, XP_U16 len )
@ -358,12 +457,13 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
MQTTDevID myID;
dvc_getMQTTDevID( dutil, xwe, &myID );
if ( isDevMsg( &myID, topic ) ) {
XP_U32 gameID = 0;
if ( isDevMsg( &myID, topic, &gameID ) ) {
XWStreamCtxt* stream = mkStream( dutil );
stream_putBytes( stream, buf, len );
XP_U8 proto = stream_getU8( stream );
if ( proto == PROTO_1 || proto == PROTO_2 ) {
if ( proto == PROTO_1 || proto == PROTO_2 || proto == PROTO_3 ) {
MQTTDevID senderID;
stream_getBytes( stream, &senderID, sizeof(senderID) );
senderID = be64toh( senderID );
@ -372,11 +472,14 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
formatMQTTDevID( &senderID, tmp, VSIZE(tmp) );
XP_LOGFF( "senderID: %s", tmp );
#endif
XP_U32 gameID = stream_getU32( stream );
if ( proto < PROTO_3 ) {
gameID = stream_getU32( stream );
} else {
XP_ASSERT( 0 != gameID );
}
XP_U32 timestamp = 0;
if ( PROTO_2 == proto ) {
if ( PROTO_2 <= proto ) {
timestamp = stream_getU32( stream );
#ifdef DEBUG
if ( 0 < timestamp ) {
@ -406,22 +509,7 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
addr_addType( &from, COMMS_CONN_MQTT );
from.u.mqtt.devID = senderID;
if ( CMD_MSG == cmd ) {
XP_U32 msgLen;
if ( PROTO_2 == proto ) {
msgLen = stream_getU32VL( stream );
if ( msgLen > stream_getSize( stream ) ) {
XP_LOGFF( "msglen %d too large", msgLen );
msgLen = 0;
}
} else {
msgLen = stream_getSize( stream );
}
if ( 0 < msgLen ) {
XP_U8 msgBuf[msgLen];
stream_getBytes( stream, msgBuf, msgLen );
dutil_onMessageReceived( dutil, xwe, gameID,
&from, msgBuf, msgLen );
}
dispatchMsgs( dutil, xwe, proto, stream, gameID, &from );
} else if ( CMD_DEVGONE == cmd ) {
dutil_onGameGoneReceived( dutil, xwe, gameID, &from );
}

View file

@ -41,8 +41,14 @@ void dvc_getMQTTPubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_U16* nTopics, XP_UCHAR* topics[] );
void dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
const NetLaunchInfo* nli, XP_U32 timestamp );
void dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
XP_U32 gameID, XP_U32 timestamp, const XP_U8* buf, XP_U16 len );
typedef void (*MsgAndTopicProc)( void* closure, const XP_UCHAR* topic,
XWStreamCtxt* msg );
void dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len );
void dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream, XP_U32 gameID, XP_U32 timestamp );
void dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,

View file

@ -167,7 +167,8 @@ DEFINES += -DXWFEATURE_SLOW_ROBOT -DXWFEATURE_ROBOTPHONIES
DEFINES += -DXWFEATURE_DEVICE
DEFINES += -DXWFEATURE_KNOWNPLAYERS
# DEFINES += -DMQTT_GAMEID_TOPICS
DEFINES += -DMQTT_DEV_TOPICS
DEFINES += -DMQTT_GAMEID_TOPICS
# Support device-to-device connection via UDP, e.g. using wifi on a
# LAN or where the host/server isn't behind a firewall.

View file

@ -1,6 +1,7 @@
/* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */
/*
* Copyright 2020 by Eric House (xwords@eehouse.org). All rights reserved.
* Copyright 2020 - 2022 by Eric House (xwords@eehouse.org). All rights
* reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -167,6 +168,7 @@ static bool
postMsg( MQTTConStorage* storage, XWStreamCtxt* stream, XP_U32 gameID,
const MQTTDevID* invitee )
{
XP_ASSERT(0); /* I need to go away! */
const XP_U8* bytes = stream_getPtr( stream );
XP_U16 len = stream_getSize( stream );
@ -198,6 +200,20 @@ postMsg( MQTTConStorage* storage, XWStreamCtxt* stream, XP_U32 gameID,
return success;
}
static bool
postOne( MQTTConStorage* storage, const XP_UCHAR* topic, XWStreamCtxt* stream )
{
const XP_U8* bytes = stream_getPtr( stream );
XP_U16 len = stream_getSize( stream );
int mid;
int err = mosquitto_publish( storage->mosq, &mid, topic,
len, bytes, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s) => %s; mid=%d", topic,
mosquitto_strerror(err), mid );
XP_ASSERT( 0 == err );
return 0 == err;
}
void
mqttc_init( LaunchParams* params )
{
@ -312,21 +328,23 @@ mqttc_invite( LaunchParams* params, XP_U32 timestamp, const NetLaunchInfo* nli,
postMsg( storage, stream, nli->gameID, invitee );
}
static void
msgAndTopicProc( void* closure, const XP_UCHAR* topic, XWStreamCtxt* stream )
{
MQTTConStorage* storage = (MQTTConStorage*)closure;
(void)postOne( storage, topic, stream );
}
XP_S16
mqttc_send( LaunchParams* params, XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee )
{
XP_S16 result = -1;
MQTTConStorage* storage = getStorage( params );
XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool)
params->vtMgr );
dvc_makeMQTTMessage( params->dutil, NULL_XWE, stream,
gameID, timestamp, buf, len );
if ( postMsg( storage, stream, gameID, addressee ) ) {
result = len;
}
return result;
dvc_makeMQTTMessages( params->dutil, NULL_XWE,
msgAndTopicProc, storage,
addressee, gameID, timestamp, buf, len );
return len;
}
void