get qos from server, and use unless overridden by local settings

This commit is contained in:
Eric House 2024-09-01 22:03:49 -07:00
parent 0323e06dea
commit c23904fbb0
15 changed files with 253 additions and 75 deletions

View file

@ -235,6 +235,18 @@ object MQTTUtils {
sWrapper[0] = null
}
}
private fun chooseQOS(context: Context, qosInt: Int): MqttQos
{
val asStr = XWPrefs.getPrefsString(context, R.string.key_mqtt_qos)
val qos = try {
MqttQos.valueOf(asStr)
} catch (ex: Exception) {
MqttQos.entries[qosInt]
}
Log.d(TAG, "chooseQOS($qosInt) => $qos")
return qos
}
private class Conn(val mContext: Context): MqttClientConnectedListener,
MqttClientDisconnectedListener,
@ -366,8 +378,9 @@ object MQTTUtils {
.putAnd("id", id)
.toString()
val qos = chooseQOS(mContext, XwJNI.dvc_getQOS())
val tap = TopicsAndPackets("xw4/ping/" + mDevID,
packet.toByteArray())
packet.toByteArray(), qos.ordinal)
add(SendTask(tap))
}
@ -452,9 +465,9 @@ object MQTTUtils {
private inner class SubscribeAllTask(): Task() {
override fun run() {
val tmp = XWPrefs.getPrefsInt(mContext, R.string.key_mqtt_qos, 2)
val qos = MqttQos.entries[tmp]
val topics = XwJNI.dvc_getMQTTSubTopics() + arrayOf(PONG_PREFIX + mDevID)
val qosArray = intArrayOf(0)
val topics = XwJNI.dvc_getMQTTSubTopics(qosArray) + arrayOf(PONG_PREFIX + mDevID)
val qos = chooseQOS(mContext, qosArray[0])
topics.map{ add(SubscribeTask(it, qos)) }
}
}
@ -477,12 +490,13 @@ object MQTTUtils {
private inner class SendTask(val tap: TopicsAndPackets): Task()
{
override fun run() {
val qos = chooseQOS(mContext, tap.qosInt())
for (pr in tap.iterator()) {
mClient.toAsync()
.publishWith()
.topic(pr.first)
.payload(pr.second)
.qos(MqttQos.AT_LEAST_ONCE)
.qos(qos)
.retain(true)
.send()
.whenComplete { mqtt3Publish, throwable ->

View file

@ -0,0 +1,58 @@
/* -*- compile-command: "find-and-gradle.sh inXw4dDeb"; -*- */ /*
* Copyright 2024 by Eric House (xwords@eehouse.org). All
* rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.eehouse.android.xw4
import android.content.Context
import android.util.AttributeSet
import androidx.preference.ListPreference
import com.hivemq.client.mqtt.datatypes.MqttQos
import org.eehouse.android.xw4.loc.LocUtils
class QOSListPreference(private val mContext: Context,
attrs: AttributeSet?) :
ListPreference(mContext, attrs)
{
private val TAG = QOSListPreference::class.java.simpleName
override fun onAttached() {
super.onAttached()
summary = getPersistedString("")
}
override fun getEntries(): Array<CharSequence> { return getEntriesImpl() }
override fun getEntryValues(): Array<CharSequence> { return getEntriesImpl() }
override fun persistString(value: String): Boolean {
summary = value
return super.persistString(value)
}
private var mEntries: Array<CharSequence>? = null
private fun getEntriesImpl() :Array<CharSequence>
{
if ( null == mEntries ) {
val enums = MqttQos.entries.map{it.toString()}.toMutableList()
enums.add(LocUtils.getString(mContext, R.string.qos_prefs_default_expl))
mEntries = enums.toTypedArray()
}
return mEntries!!
}
}

View file

@ -114,11 +114,12 @@ class XwJNI private constructor() {
// jni (javac -h and kotlin files are beyond me still) so I'm putting back
// the nullable fields and an empty constructor. PENDING()...
class TopicsAndPackets(val topics: Array<String>?,
val packets: Array<ByteArray>?)
val packets: Array<ByteArray>?,
val qos: Int)
{
constructor(topic: String, packet: ByteArray):
this(arrayOf(topic), arrayOf(packet))
constructor(): this(null, null) // PENDING Remove me later
constructor(topic: String, packet: ByteArray, qos: Int):
this(arrayOf(topic), arrayOf(packet), qos)
constructor(): this(null, null, 0) // PENDING Remove me later
fun iterator(): Iterator<Pair<String, ByteArray>>
{
@ -129,6 +130,8 @@ class XwJNI private constructor() {
return lst.iterator()
}
fun qosInt(): Int { return this.qos }
override fun hashCode(): Int {
val topCode = topics.contentDeepHashCode()
// Log.d(TAG, "hashCode(): topic: ${topics!!.get(0)}, code: $topCode")
@ -324,8 +327,8 @@ class XwJNI private constructor() {
dvc_resetMQTTDevID(jNI.m_ptrGlobals)
}
fun dvc_getMQTTSubTopics(): Array<String> {
return dvc_getMQTTSubTopics(jNI.m_ptrGlobals)
fun dvc_getMQTTSubTopics(qosout: IntArray): Array<String> {
return dvc_getMQTTSubTopics(jNI.m_ptrGlobals, qosout)
}
fun dvc_makeMQTTNukeInvite(nli: NetLaunchInfo): TopicsAndPackets {
@ -1033,7 +1036,9 @@ class XwJNI private constructor() {
@JvmStatic
private external fun dvc_resetMQTTDevID(jniState: Long)
@JvmStatic
private external fun dvc_getMQTTSubTopics(jniState: Long): Array<String>
private external fun dvc_getMQTTSubTopics(
jniState: Long, qosout: IntArray)
: Array<String>
@JvmStatic
private external fun dvc_makeMQTTNukeInvite(
jniState: Long,
@ -1169,5 +1174,9 @@ class XwJNI private constructor() {
// This always returns true on release builds now.
@JvmStatic
private external fun haveEnv(jniState: Long): Boolean
fun dvc_getQOS(): Int { return dvc_getQOS(jNI.m_ptrGlobals) }
@JvmStatic
private external fun dvc_getQOS(jniState: Long): Int
}
}

View file

@ -64,7 +64,7 @@
<string name="key_mqtt_url_path">key_mqtt_url_path</string>
<string name="key_mqtt_host">key_mqtt_host</string>
<string name="key_mqtt_port">key_mqtt_port</string>
<string name="key_mqtt_qos">key_mqtt_qos</string>
<string name="key_mqtt_qos">key_mqtt_qos2</string>
<string name="key_update_prerel">key_update_prerel</string>
<string name="key_log_prune_hours">key_log_prune_hours</string>
<string name="key_proxy_port">key_proxy_port</string>
@ -348,11 +348,7 @@
<item>@string/radio_name_cdma</item>
</string-array>
<string-array name="mqtt_qos_values">
<item>0</item>
<item>1</item>
<item>2</item>
</string-array>
<string-array name="empty_list"/>
<string-array name="force_tablet_names">
<item>@string/force_tablet_default</item>

View file

@ -50,4 +50,6 @@
<string name="gamel_menu_logstore_prune">Prune stored logs</string>
<string name="expl_log_prune_hours">Keep this many hours of logs</string>
<string name="qos_prefs_default_expl">Use what MQTT broker recommends</string>
</resources>

View file

@ -17,12 +17,12 @@
android:defaultValue="1883"
android:numeric="decimal"
/>
<org.eehouse.android.xw4.XWListPreference
<org.eehouse.android.xw4.QOSListPreference
android:key="@string/key_mqtt_qos"
android:title="@string/mqtt_qos"
android:entries="@array/mqtt_qos_values"
android:entryValues="@array/mqtt_qos_values"
android:defaultValue="2"
android:entries="@array/empty_list"
android:entryValues="@array/empty_list"
android:defaultValue="@string/qos_prefs_default_expl"
/>
<org.eehouse.android.xw4.XWEditTextPreference
android:key="@string/key_mqtt_url_path"

View file

@ -54,6 +54,7 @@ LOCAL_DEFINES += \
-DGITREV=\"${GITREV}\" \
LOCAL_DEFINES += -DDUTIL_TIMERS
LOCAL_DEFINES += -DXWFEATURE_DEVICE
# XWFEATURE_RAISETILE: first, fix to not use timer
# -DXWFEATURE_RAISETILE \

View file

@ -868,7 +868,7 @@ deleteLocalRefs( JNIEnv* env, ... )
/* Passed to device methods related to MQTT messages */
void
msgAndTopicProc( void* closure, const XP_UCHAR* topic,
const XP_U8* msgBuf, XP_U16 msgLen )
const XP_U8* msgBuf, XP_U16 msgLen, XP_U8 qos )
{
MTPData* mtp = (MTPData*)closure;
JNIEnv* env = mtp->env;
@ -889,6 +889,9 @@ msgAndTopicProc( void* closure, const XP_UCHAR* topic,
++mtp->count;
XP_LOGFFV( "mtp->count now: %d", mtp->count );
XP_ASSERT( mtp->qos == 0 || mtp->qos == qos );
mtp->qos = qos;
}
}
}
@ -900,6 +903,8 @@ wrapResults( MTPData* mtp )
jobject result =
makeObjectEmptyConstr( env, PKG_PATH("jni/XwJNI$TopicsAndPackets"));
setInt( env, result, "qos", mtp->qos );
jobjectArray jTopics = makeStringArray( env, mtp->count, mtp->topics );
setObjectField( env, result, "topics", "[Ljava/lang/String;", jTopics );

View file

@ -132,11 +132,12 @@ typedef struct _MTPData {
jbyteArray jPackets[N_DATA_PACKETS];
XP_UCHAR storage[N_DATA_PACKETS*128];
int offset;
XP_U8 qos;
} MTPData;
#undef N_DATA_PACKETS
void msgAndTopicProc( void* closure, const XP_UCHAR* topic,
const XP_U8* msgBuf, XP_U16 msgLen );
const XP_U8* msgBuf, XP_U16 msgLen, XP_U8 qos );
jobject wrapResults( MTPData* mtp );
void raw_log( const char* func, const char* fmt, ... );

View file

@ -703,7 +703,7 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1resetMQTTDevID
JNIEXPORT jobjectArray JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTSubTopics
( JNIEnv* env, jclass C, jlong jniGlobalPtr )
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jintArray jQOSOut )
{
jobjectArray result;
DVC_HEADER(jniGlobalPtr);
@ -712,12 +712,15 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTSubTopics
XP_UCHAR storage[256];
XP_UCHAR* topics[4];
XP_U16 nTopics = VSIZE(topics);
XP_U8 qos;
dvc_getMQTTSubTopics( globalState->dutil, env,
storage, VSIZE(storage),
&nTopics, topics );
&nTopics, topics, &qos );
result = makeStringArray( env, nTopics, (const XP_UCHAR* const*)topics );
XP_ASSERT( !!result );
setIntInArray( env, jQOSOut, 0, qos );
DVC_HEADER_END();
return result;
@ -1269,6 +1272,17 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1onTimerFired
DVC_HEADER_END();
}
JNIEXPORT jint JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getQOS
( JNIEnv* env, jclass C, jlong jniGlobalPtr )
{
jint result;
DVC_HEADER(jniGlobalPtr);
result = dvc_getQOS( globalState->dutil, env );
DVC_HEADER_END();
return result;
}
/* Dictionary methods: don't use gamePtr */
JNIEXPORT jboolean JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dict_1tilesAreSame

View file

@ -75,10 +75,17 @@ typedef struct _PhoniesDataCodes {
} PhoniesDataCodes;
typedef struct _DevCtxt {
XP_U16 devCount;
WSData* webSendData;
XP_U32 mWebSendKey;
MutexState webSendMutex;
XP_U8 mqttQOS;
XP_Bool dirty;
MutexState mutex;
struct {
WSData* data;
XP_U32 key;
MutexState mutex;
} webSend;
struct {
MutexState mutex;
@ -94,14 +101,18 @@ typedef struct _DevCtxt {
} DevCtxt;
// PENDING: Actually use a timer, or rename this function
static void setSaveDCTimer( XW_DUtilCtxt* dutil, XWEnv xwe,
DevCtxt* dc );
static DevCtxt*
load( XW_DUtilCtxt* dutil, XWEnv xwe )
{
DevCtxt* state = (DevCtxt*)dutil->devCtxt;
if ( NULL == state ) {
XP_ASSERT(0);
#ifdef XWFEATURE_DEVICE
dutil->devCtxt = state = XP_CALLOC( dutil->mpool, sizeof(*state) );
XWStreamCtxt* stream = mkStream( dutil );
const XP_UCHAR* keys[] = { KEY_DEVSTATE, NULL };
dutil_loadStream( dutil, xwe, keys, stream );
@ -109,28 +120,52 @@ load( XW_DUtilCtxt* dutil, XWEnv xwe )
if ( 0 < stream_getSize( stream ) ) {
state->devCount = stream_getU16( stream );
++state->devCount; /* for testing until something's there */
/* XP_LOGF( "%s(): read devCount: %d", __func__, state->devCount ); */
/* XP_LOGFF( "read devCount: %d", state->devCount ); */
if ( stream_gotU8( stream, &state->mqttQOS ) ) {
XP_LOGFF( "read qos: %d", state->mqttQOS );
} else {
state->mqttQOS = 1;
setSaveDCTimer( dutil, xwe, state );
}
} else {
XP_LOGF( "%s(): empty stream!!", __func__ );
XP_LOGFF( "empty stream!!" );
}
stream_destroy( stream );
#endif
}
// LOG_RETURNF( "%p", state );
return state;
}
XP_U8
dvc_getQOS( XW_DUtilCtxt* dutil, XWEnv xwe )
{
DevCtxt* state = load( dutil, xwe );
LOG_RETURNF("%d", state->mqttQOS);
return state->mqttQOS;
}
#ifdef XWFEATURE_DEVICE
static void
dvcStoreLocked( XW_DUtilCtxt* dutil, XWEnv xwe, DevCtxt* state )
{
XWStreamCtxt* stream = mkStream( dutil );
stream_putU16( stream, state->devCount );
stream_putU8( stream, state->mqttQOS );
const XP_UCHAR* keys[] = { KEY_DEVSTATE, NULL };
dutil_storeStream( dutil, xwe, keys, stream );
stream_destroy( stream );
}
void
dvc_store( XW_DUtilCtxt* dutil, XWEnv xwe )
{
ASSERT_MAGIC();
DevCtxt* state = load( dutil, xwe );
XWStreamCtxt* stream = mkStream( dutil );
stream_putU16( stream, state->devCount );
const XP_UCHAR* keys[] = { KEY_DEVSTATE, NULL };
dutil_storeStream( dutil, xwe, keys, stream );
stream_destroy( stream );
WITH_MUTEX( &state->mutex );
dvcStoreLocked( dutil, xwe, state );
END_WITH_MUTEX();
}
#endif
@ -254,7 +289,7 @@ logPtrs( const char* func, int nTopics, char* topics[] )
void
dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_UCHAR* storage, XP_U16 XP_UNUSED_DBG(storageLen),
XP_U16* nTopics, XP_UCHAR* topics[] )
XP_U16* nTopics, XP_UCHAR* topics[], XP_U8* qos )
{
ASSERT_MAGIC();
int offset = 0;
@ -292,6 +327,8 @@ dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
*nTopics = count;
XP_ASSERT( offset < storageLen );
*qos = dvc_getQOS( dutil, xwe );
logPtrs( __func__, *nTopics, topics );
}
@ -339,11 +376,11 @@ addProto3HeaderCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
static void
callProc( MsgAndTopicProc proc, void* closure, const XP_UCHAR* topic,
XWStreamCtxt* stream )
XWStreamCtxt* stream, XP_U8 qos )
{
const XP_U8* msgBuf = !!stream ? stream_getPtr(stream) : NULL;
XP_U16 msgLen = !!stream ? stream_getSize(stream) : 0;
(*proc)( closure, topic, msgBuf, msgLen );
(*proc)( closure, topic, msgBuf, msgLen, qos );
}
void
@ -360,8 +397,9 @@ dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
addHeaderGameIDAndCmd( dutil, xwe, CMD_INVITE, nli->gameID, stream );
nli_saveToStream( nli, stream );
XP_U8 qos = dvc_getQOS( dutil, xwe );
#ifdef MQTT_DEV_TOPICS
callProc( proc, closure, devTopic, stream );
callProc( proc, closure, devTopic, stream, qos );
#endif
#ifdef MQTT_GAMEID_TOPICS
@ -370,7 +408,7 @@ dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
"%s/%X", devTopic, nli->gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
XP_USE(siz);
callProc( proc, closure, gameTopic, stream );
callProc( proc, closure, gameTopic, stream, qos );
#endif
stream_destroy( stream );
@ -392,7 +430,7 @@ dvc_makeMQTTNukeInvite( XW_DUtilCtxt* dutil, XWEnv xwe,
"%s/%X", devTopic, nli->gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
XP_USE(siz);
callProc( proc, closure, gameTopic, NULL );
callProc( proc, closure, gameTopic, NULL, dvc_getQOS(dutil, xwe) );
#endif
}
@ -416,6 +454,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
more likely we just aren't in that point in the game, but send both. If
it's > 0 but < STREAM_VERS_NORELAY, no point sending PROTO_3 */
XP_U8 qos = dvc_getQOS( dutil, xwe );
for ( SendMsgsPacket* packet = (SendMsgsPacket*)msgs;
!!packet; packet = (SendMsgsPacket* const)packet->next ) {
++nBufs;
@ -423,7 +462,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream );
stream_putBytes( stream, packet->buf, packet->len );
callProc( proc, closure, devTopic, stream );
callProc( proc, closure, devTopic, stream, qos );
stream_destroy( stream );
nSent0 += packet->len;
}
@ -460,7 +499,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_ASSERT( siz < VSIZE(gameTopic) );
XP_USE(siz);
callProc( proc, closure, gameTopic, stream );
callProc( proc, closure, gameTopic, stream, qos );
stream_destroy( stream );
}
return XP_MAX( nSent0, nSent1 );
@ -477,10 +516,11 @@ dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) );
XP_U8 qos = dvc_getQOS( dutil, xwe );
XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_DEVGONE, gameID, stream );
#ifdef MQTT_DEV_TOPICS
callProc( proc, closure, devTopic, stream );
callProc( proc, closure, devTopic, stream, qos );
#endif
#ifdef MQTT_GAMEID_TOPICS
@ -489,7 +529,7 @@ dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe,
"%s/%X", devTopic, gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
XP_USE(siz);
callProc( proc, closure, gameTopic, stream );
callProc( proc, closure, gameTopic, stream, qos );
#endif
stream_destroy( stream );
@ -708,11 +748,11 @@ popForKey( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U32 key )
WSData* item = NULL;
DevCtxt* dc = load( dutil, xwe );
WITH_MUTEX(&dc->webSendMutex);
WITH_MUTEX(&dc->webSend.mutex);
GetByKeyData gbkd = { .resultKey = key, };
dc->webSendData = (WSData*)dll_map( &dc->webSendData->links,
getByKeyProc, NULL, &gbkd );
dc->webSend.data = (WSData*)dll_map( &dc->webSend.data->links,
getByKeyProc, NULL, &gbkd );
item = gbkd.found;
END_WITH_MUTEX();
@ -724,10 +764,10 @@ static XP_U32
addWithKey( XW_DUtilCtxt* dutil, XWEnv xwe, WSData* wsdp )
{
DevCtxt* dc = load( dutil, xwe );
WITH_MUTEX(&dc->webSendMutex);
wsdp->resultKey = ++dc->mWebSendKey;
dc->webSendData = (WSData*)
dll_insert( &dc->webSendData->links, &wsdp->links, NULL );
WITH_MUTEX(&dc->webSend.mutex);
wsdp->resultKey = ++dc->webSend.key;
dc->webSend.data = (WSData*)
dll_insert( &dc->webSend.data->links, &wsdp->links, NULL );
END_WITH_MUTEX();
XP_LOGFFV( "(%p) => %d", wsdp, wsdp->resultKey );
return wsdp->resultKey;
@ -741,6 +781,20 @@ delWSDatum( DLHead* elem, void* closure )
XP_FREEP( dutil->mpool, &elem );
}
static void
setSaveDCTimerLocked( XW_DUtilCtxt* dutil, XWEnv xwe, DevCtxt* dc )
{
dvcStoreLocked( dutil, xwe, dc );
}
static void
setSaveDCTimer( XW_DUtilCtxt* dutil, XWEnv xwe, DevCtxt* dc )
{
WITH_MUTEX( &dc->mutex );
setSaveDCTimerLocked( dutil, xwe, dc );
END_WITH_MUTEX();
}
void
dvc_onWebSendResult( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U32 resultKey,
XP_Bool succeeded, const XP_UCHAR* resultJson )
@ -770,6 +824,22 @@ dvc_onWebSendResult( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U32 resultKey,
XP_STRLEN(GITREV) );
}
}
tmp = cJSON_GetObjectItem( result, "qos" );
if ( !!tmp ) {
XP_U8 qos = (XP_U8)tmp->valueint;
XP_ASSERT( 0 <= qos && qos <= 2 );
if ( 0 <= qos && qos <= 2 ) {
DevCtxt* dc = load( dutil, xwe );
WITH_MUTEX( &dc->mutex );
if ( dc->mqttQOS != qos ) {
dc->dirty = XP_TRUE;
dc->mqttQOS = qos;
setSaveDCTimerLocked( dutil, xwe, dc );
}
END_WITH_MUTEX();
}
}
}
}
break;
@ -787,8 +857,8 @@ dvc_onWebSendResult( XW_DUtilCtxt* dutil, XWEnv xwe, XP_U32 resultKey,
static void
freeWSState( XW_DUtilCtxt* dutil, DevCtxt* dc )
{
WITH_MUTEX( &dc->webSendMutex );
dll_removeAll( &dc->webSendData->links, delWSDatum, dutil );
WITH_MUTEX( &dc->webSend.mutex );
dll_removeAll( &dc->webSend.data->links, delWSDatum, dutil );
END_WITH_MUTEX();
}
@ -1179,11 +1249,12 @@ dvc_init( XW_DUtilCtxt* dutil, XWEnv xwe )
XP_ASSERT( 0 == dutil->magic );
XP_ASSERT( !dutil->devCtxt );
DevCtxt* dc = dutil->devCtxt = XP_CALLOC( dutil->mpool, sizeof(*dc) );
dc->webSendData = NULL;
dc->mWebSendKey = 0;
DevCtxt* dc = dutil->devCtxt = load( dutil, xwe );
dc->webSend.data = NULL;
dc->webSend.key = 0;
MUTEX_INIT( &dc->webSendMutex, XP_FALSE );
MUTEX_INIT( &dc->mutex, XP_FALSE );
MUTEX_INIT( &dc->webSend.mutex, XP_FALSE );
MUTEX_INIT( &dc->ackTimer.mutex, XP_FALSE );
loadPhoniesData( dutil, xwe, dc );
@ -1200,8 +1271,9 @@ dvc_cleanup( XW_DUtilCtxt* dutil, XWEnv xwe )
DevCtxt* dc = freePhonyState( dutil, xwe );
freeWSState( dutil, dc );
MUTEX_DESTROY( &dc->webSendMutex );
MUTEX_DESTROY( &dc->webSend.mutex );
MUTEX_DESTROY( &dc->ackTimer.mutex );
MUTEX_DESTROY( &dc->mutex );
XP_FREEP( dutil->mpool, &dc );
}

View file

@ -31,14 +31,16 @@ void dvc_store( XW_DUtilCtxt* dctxt, XWEnv xwe );
# endif
typedef void (*MsgAndTopicProc)( void* closure, const XP_UCHAR* topic,
const XP_U8* msgBuf, XP_U16 msgLen );
const XP_U8* msgBuf, XP_U16 msgLen,
XP_U8 qos );
void dvc_getMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTDevID* devID );
void dvc_setMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe, const MQTTDevID* devID );
void dvc_resetMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe );
void dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_UCHAR* storage, XP_U16 storageLen,
XP_U16* nTopics, XP_UCHAR* topics[] );
XP_U16* nTopics, XP_UCHAR* topics[],
XP_U8* qos );
void dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
@ -81,6 +83,8 @@ void dvc_getPhoniesFor( XW_DUtilCtxt* dutil, XWEnv env, const XP_UCHAR* code,
void dvc_onTimerFired( XW_DUtilCtxt* dutil, XWEnv env, TimerKey key );
#endif
XP_U8 dvc_getQOS( XW_DUtilCtxt* dutil, XWEnv env );
/* All platforms need to call this shortly after setting up their XW_DUtilCtxt */
void dvc_init( XW_DUtilCtxt* dutil, XWEnv xwe );
void dvc_cleanup( XW_DUtilCtxt* dutil, XWEnv xwe );

View file

@ -55,7 +55,7 @@ checkListProcLocked( const DLHead* elem, void* closure )
{
CheckListData* cld = (CheckListData*)closure;
CheckThreadData* ctd = (CheckThreadData*)elem;
if( cld->currentTime > ctd->expiryTime ) {
if ( cld->currentTime > ctd->expiryTime ) {
XP_LOGFF( "FAIL: %s() on line %d in %s unable to lock mutex",
ctd->caller, ctd->lineNo, ctd->file );
XP_ASSERT(0);

View file

@ -2435,7 +2435,8 @@ initParams( LaunchParams* params )
// linux_util_vt_init( MPPARM(params->mpool) params->util );
params->dutil = linux_dutils_init( MPPARM(params->mpool) params->vtMgr, params );
params->dutil = linux_dutils_init( MPPARM(params->mpool) params->vtMgr,
params );
}
static void
@ -3390,8 +3391,6 @@ main( int argc, char** argv )
}
#endif
srandom( seed ); /* init linux random number generator */
XP_LOGFF( "seeded srandom with %d", seed );

View file

@ -36,12 +36,11 @@ typedef struct _MQTTConStorage {
GSList* queue;
} MQTTConStorage;
#define DEFAULT_QOS 2
typedef struct _QElem {
gchar* topic;
uint8_t* buf;
uint16_t len;
XP_U8 qos;
int mid;
} QElem;
@ -57,7 +56,7 @@ sendQueueHead( MQTTConStorage* storage )
int err =
#endif
mosquitto_publish( storage->mosq, &elem->mid, elem->topic,
elem->len, elem->buf, DEFAULT_QOS, true );
elem->len, elem->buf, elem->qos, true );
XP_LOGFF( "mosquitto_publish(topic=%s, msgLen=%d) => %s; mid=%d", elem->topic,
elem->len, mosquitto_strerror(err), elem->mid );
/* Remove this so all are resent together? */
@ -117,12 +116,13 @@ tickleQueue( MQTTConStorage* storage )
/* Add to queue if not already there */
static void
enqueue( MQTTConStorage* storage, const char* topic,
const XP_U8* buf, XP_U16 len )
const XP_U8* buf, XP_U16 len, XP_U8 qos )
{
FindState fs = {
.elem.buf = (uint8_t*)buf,
.elem.len = len,
.elem.topic = (gchar*)topic,
.elem.qos = qos,
};
g_slist_foreach( storage->queue, findMsg, &fs );
@ -133,6 +133,7 @@ enqueue( MQTTConStorage* storage, const char* topic,
elem->topic = g_strdup( topic );
elem->buf = G_MEMDUP( buf, len );
elem->len = len;
elem->qos = qos;
storage->queue = g_slist_append( storage->queue, elem );
XP_LOGFF( "added elem; len now %d", g_slist_length(storage->queue) );
@ -245,12 +246,13 @@ connect_callback( struct mosquitto* mosq, void* userdata,
XP_UCHAR topicStorage[256];
XP_UCHAR* topics[4];
XP_U16 nTopics = VSIZE(topics);
XP_U8 qos;
dvc_getMQTTSubTopics( storage->params->dutil, NULL_XWE,
topicStorage, VSIZE(topicStorage),
&nTopics, topics );
&nTopics, topics, &qos );
int mid;
int err = mosquitto_subscribe_multiple( mosq, &mid, nTopics, topics,
DEFAULT_QOS, 0, NULL );
qos, 0, NULL );
XP_LOGFF( "mosquitto_subscribe(topics[0]=%s, etc) => %s, mid=%d", topics[0],
mosquitto_strerror(err), mid );
XP_USE(err);
@ -423,10 +425,11 @@ mqttc_getDevIDStr( LaunchParams* params )
static void
msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf,
XP_U16 len, XP_U8 qos )
{
MQTTConStorage* storage = (MQTTConStorage*)closure;
(void)enqueue( storage, topic, buf, len );
(void)enqueue( storage, topic, buf, len, qos );
}
void