linux: subscribe only after connect succeeds

This commit is contained in:
Eric House 2023-01-11 11:10:45 -08:00
parent baf02f6800
commit 8c8561e2ea

View file

@ -31,6 +31,7 @@ typedef struct _MQTTConStorage {
MQTTDevID clientID;
gchar clientIDStr[32];
int msgPipe[2];
XP_Bool connected;
} MQTTConStorage;
#define DEFAULT_QOS 2
@ -79,18 +80,24 @@ onMessageReceived( struct mosquitto* XP_UNUSED_DBG(mosq), void *userdata,
}
static void
connect_callback( struct mosquitto *mosq, void *userdata, int XP_UNUSED_DBG(err) )
connect_callback( struct mosquitto* mosq, void* userdata,
int XP_UNUSED_DBG(connErr) )
{
XP_LOGFF( "(err=%s)", mosquitto_strerror(err) );
XP_USE(mosq);
XP_USE(userdata);
/* int i; */
/* if(!result){ */
/* /\* Subscribe to broker information topics on successful connect. *\/ */
/* mosquitto_subscribe(mosq, NULL, "$SYS/#", 2); */
/* }else{ */
/* fprintf(stderr, "Connect failed\n"); */
/* } */
XP_LOGFF( "(err=%s)", mosquitto_strerror(connErr) );
MQTTConStorage* storage = (MQTTConStorage*)userdata;
storage->connected = XP_TRUE;
XP_UCHAR topicStorage[256];
XP_UCHAR* topics[4];
XP_U16 nTopics = VSIZE(topics);
dvc_getMQTTSubTopics( storage->params->dutil, NULL_XWE,
topicStorage, VSIZE(topicStorage),
&nTopics, topics );
int mid;
int err = mosquitto_subscribe_multiple( mosq, &mid, nTopics, topics,
DEFAULT_QOS, 0, NULL );
XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0],
mosquitto_strerror(err), mid );
}
static void
@ -167,19 +174,22 @@ handle_gotmsg( GIOChannel* source, GIOCondition XP_UNUSED(condition), gpointer d
static bool
postOne( MQTTConStorage* storage, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
{
int mid;
int err = mosquitto_publish( storage->mosq, &mid, topic,
int err = -1;
if ( storage->connected ) {
int mid;
err = mosquitto_publish( storage->mosq, &mid, topic,
len, buf, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s) => %s; mid=%d", topic,
mosquitto_strerror(err), mid );
XP_ASSERT( 0 == err );
XP_LOGFF( "mosquitto_publish(topic=%s, len=%d) => %s; mid=%d", topic,
len, mosquitto_strerror(err), mid );
} else {
XP_LOGFF( "not connected and so not sending" );
}
return 0 == err;
}
void
mqttc_init( LaunchParams* params )
{
LOG_FUNC();
if ( types_hasType( params->conTypes, COMMS_CONN_MQTT ) ) {
XP_ASSERT( !params->mqttConStorage );
MQTTConStorage* storage = getStorage( params );
@ -215,18 +225,6 @@ mqttc_init( LaunchParams* params )
XP_LOGFF( "mosquitto_connect(host=%s) => %s", params->connInfo.mqtt.hostName,
mosquitto_strerror(err) );
if ( MOSQ_ERR_SUCCESS == err ) {
XP_UCHAR topicStorage[256];
XP_UCHAR* topics[4];
XP_U16 nTopics = VSIZE(topics);
dvc_getMQTTSubTopics( storage->params->dutil, NULL_XWE,
topicStorage, VSIZE(topicStorage),
&nTopics, topics );
int mid;
err = mosquitto_subscribe_multiple( mosq, &mid, nTopics, topics, DEFAULT_QOS, 0, NULL );
XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0],
mosquitto_strerror(err), mid );
err = mosquitto_loop_start( mosq );
XP_ASSERT( !err );
} else {
@ -235,7 +233,6 @@ mqttc_init( LaunchParams* params )
} else {
XP_LOGFF( "MQTT disabled; doing nothing" );
}
LOG_RETURN_VOID();
}
void