Once received, nuke invite on mqtt broker

To prevent deleted games' ghost invitations from remaining as
persisted on a topic and so being received over and over, have
recipient of invitation send an empty message on the same topic to
remove them. Any message sent once the game start would have replaced
the invitation, but sometimes if the sender is deleted at the right
time there's none.
This commit is contained in:
Eric House 2023-01-24 17:23:30 -08:00
parent 6bb14548c9
commit 863a54bfe1
8 changed files with 79 additions and 4 deletions

View file

@ -223,6 +223,8 @@ public class MQTTUtils extends Thread
MqttMessage message = new MqttMessage( pair.mPackets[ii] );
message.setRetained( true );
mClient.publish( pair.mTopics[ii], message );
Log.d( TAG, "%H: published msg of len %d to topic %s", MQTTUtils.this,
pair.mPackets[ii].length, pair.mTopics[ii] );
}
} catch ( MqttException me ) {
me.printStackTrace();
@ -635,8 +637,12 @@ public class MQTTUtils extends Thread
public void messageArrived( String topic, MqttMessage message )
throws Exception
{
Log.d( TAG, "%H.messageArrived(topic=%s)", this, topic );
mRxMsgThread.add( topic, message.getPayload() );
byte[] payload = message.getPayload();
Log.d( TAG, "%H.messageArrived(topic=%s, len=%d)", this, topic,
payload.length );
if ( 0 < payload.length ) {
mRxMsgThread.add( topic, payload );
}
ConnStatusHandler
.updateStatusIn( mContext, CommsConnType.COMMS_CONN_MQTT, true );
@ -706,6 +712,8 @@ public class MQTTUtils extends Thread
= new LinkedBlockingQueue<>();
void add( String topic, byte[] msg ) {
Log.d( TAG, "%H.RxMsgThread.add(topic: %s, len: %d)", MQTTUtils.this,
topic, msg.length );
mQueue.add( new MessagePair( topic, msg ) );
}
@ -803,11 +811,15 @@ public class MQTTUtils extends Thread
private void handleInvitation( NetLaunchInfo nli )
{
handleInvitation( nli, null, MultiService.DictFetchOwner.OWNER_MQTT );
// Now nuke the invitation so we don't keep getting it, e.g. if
// the sender deletes the game
TopicsAndPackets tap = XwJNI.dvc_makeMQTTNukeInvite( nli );
addToSendQueue( getContext(), tap );
}
private void receiveMessage( long rowid, MultiMsgSink sink, byte[] msg )
{
Log.d( TAG, "receiveMessage(rowid=%d, len=%d)", rowid, msg.length );
// Log.d( TAG, "receiveMessage(rowid=%d, len=%d)", rowid, msg.length );
receiveMessage( rowid, sink, msg, mReturnAddr );
}
}

View file

@ -169,6 +169,11 @@ public class XwJNI {
return dvc_makeMQTTInvites( getJNI().m_ptrGlobals, invitee, nli );
}
public static TopicsAndPackets dvc_makeMQTTNukeInvite( NetLaunchInfo nli )
{
return dvc_makeMQTTNukeInvite( getJNI().m_ptrGlobals, nli );
}
public static TopicsAndPackets dvc_makeMQTTMessages( String addressee, int gameID, byte[] buf )
{
return dvc_makeMQTTMessages( getJNI().m_ptrGlobals, addressee, gameID, buf );
@ -777,6 +782,8 @@ public class XwJNI {
dvc_makeMQTTInvites( long jniState,
String invitee,
NetLaunchInfo nli );
private static native TopicsAndPackets dvc_makeMQTTNukeInvite( long jniState,
NetLaunchInfo nli );
private static native TopicsAndPackets
dvc_makeMQTTMessages( long jniState,
String addressee,

View file

@ -758,6 +758,25 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTInvites
return result;
}
JNIEXPORT jobject JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTNukeInvite
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jobject jnli )
{
jobject result;
DVC_HEADER(jniGlobalPtr);
NetLaunchInfo nli;
loadNLI( env, &nli, jnli );
MTPData mtp = { .env = env, };
dvc_makeMQTTNukeInvite( globalState->dutil, env,
msgAndTopicProc, &mtp, &nli );
result = wrapResults( &mtp );
DVC_HEADER_END();
return result;
}
JNIEXPORT jobject JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessages
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jstring jAddressee,

View file

@ -282,7 +282,9 @@ addProto3HeaderCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
static void
callProc( MsgAndTopicProc proc, void* closure, const XP_UCHAR* topic, XWStreamCtxt* stream )
{
(*proc)( closure, topic, stream_getPtr(stream), stream_getSize(stream) );
const XP_U8* msgBuf = !!stream ? stream_getPtr(stream) : NULL;
XP_U16 msgLen = !!stream ? stream_getSize(stream) : 0;
(*proc)( closure, topic, msgBuf, msgLen );
}
void
@ -314,6 +316,26 @@ dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
stream_destroy( stream, xwe );
}
void
dvc_makeMQTTNukeInvite( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const NetLaunchInfo* nli )
{
LOG_FUNC();
#ifdef MQTT_GAMEID_TOPICS
MQTTDevID myID;
dvc_getMQTTDevID( dutil, xwe, &myID );
XP_UCHAR devTopic[32];
formatMQTTDevTopic( &myID, devTopic, VSIZE(devTopic) );
XP_UCHAR gameTopic[64];
size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic),
"%s/%X", devTopic, nli->gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
XP_USE(siz);
callProc( proc, closure, gameTopic, NULL );
#endif
}
/* Ship with == 1, but increase to test */
#define MULTI_MSG_COUNT 1

View file

@ -42,6 +42,9 @@ void dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
const NetLaunchInfo* nli );
void dvc_makeMQTTNukeInvite( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const NetLaunchInfo* nli );
void dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,

View file

@ -31,6 +31,7 @@
#include "linuxdict.h"
#include "cursesmain.h"
#include "gtkmain.h"
#include "mqttcon.h"
static XP_U32 linux_dutil_getCurSeconds( XW_DUtilCtxt* duc, XWEnv xwe );
static const XP_UCHAR* linux_dutil_getUserString( XW_DUtilCtxt* duc, XWEnv xwe, XP_U16 code );
@ -129,6 +130,7 @@ linux_dutil_onInviteReceived( XW_DUtilCtxt* duc, XWEnv XP_UNUSED(xwe),
} else {
inviteReceivedGTK( params->appGlobals, nli );
}
mqttc_onInviteHandled( params, nli );
}
static void

View file

@ -302,6 +302,15 @@ mqttc_send( LaunchParams* params, XP_U32 gameID,
return len;
}
void
mqttc_onInviteHandled( LaunchParams* params, const NetLaunchInfo* nli )
{
LOG_FUNC();
MQTTConStorage* storage = getStorage( params );
dvc_makeMQTTNukeInvite( params->dutil, NULL_XWE,
msgAndTopicProc, storage, nli );
}
void
mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID )
{

View file

@ -30,6 +30,7 @@ const MQTTDevID* mqttc_getDevID( LaunchParams* params );
const gchar* mqttc_getDevIDStr( LaunchParams* params );
void mqttc_invite( LaunchParams* params, const NetLaunchInfo* nli,
const MQTTDevID* mqttInvitee );
void mqttc_onInviteHandled( LaunchParams* params, const NetLaunchInfo* nli );
XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID,
const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee );
void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID );