API cleanup and all done

This should complete sending to multiple topics (for backwards
compatibility) and supporting combined messages in the future (sending
them is hard; receiving not so much.)
This commit is contained in:
Eric House 2022-12-20 11:55:30 -08:00
parent 6b029d5a85
commit 933da2de07
6 changed files with 145 additions and 159 deletions

View file

@ -548,7 +548,7 @@ public class MQTTUtils extends Thread
Log.d( TAG, "sendInvite(invitee: %s, nli: %s)", invitee, nli );
byte[][][] packets = {null};
String[][] topics = {null};
XwJNI.dvc_makeMQTTInvite( nli, invitee, topics, packets );
XwJNI.dvc_makeMQTTInvites( invitee, nli, topics, packets );
addToSendQueue( context, topics[0], packets[0] );
}
@ -565,7 +565,7 @@ public class MQTTUtils extends Thread
{
String[][] topics = {null};
byte[][][] packets = {null};
XwJNI.dvc_makeMQTTNoSuchGame( addressee, gameID, topics, packets );
XwJNI.dvc_makeMQTTNoSuchGames( addressee, gameID, topics, packets );
addToSendQueue( context, topics[0], packets[0] );
}
@ -593,7 +593,7 @@ public class MQTTUtils extends Thread
{
String[][] topics = {null};
byte[][][] packets = {null};
XwJNI.dvc_makeMQTTNoSuchGame( devID, gameID, topics, packets );
XwJNI.dvc_makeMQTTNoSuchGames( devID, gameID, topics, packets );
addToSendQueue( context, topics[0], packets[0] );
}

View file

@ -153,10 +153,10 @@ public class XwJNI {
return dvc_getMQTTSubTopics( getJNI().m_ptrGlobals );
}
public static void dvc_makeMQTTInvite( NetLaunchInfo nli, String invitee,
public static void dvc_makeMQTTInvites( String invitee, NetLaunchInfo nli,
String[][] topics, byte[][][] packets )
{
dvc_makeMQTTInvite( getJNI().m_ptrGlobals, nli, invitee, topics, packets );
dvc_makeMQTTInvites( getJNI().m_ptrGlobals, invitee, nli, topics, packets );
}
public static void dvc_makeMQTTMessages( String addressee, int gameID,
@ -167,10 +167,10 @@ public class XwJNI {
timestamp, buf, topics, packets );
}
public static void dvc_makeMQTTNoSuchGame( String addressee, int gameID,
public static void dvc_makeMQTTNoSuchGames( String addressee, int gameID,
String[][] topics, byte[][][] packets )
{
dvc_makeMQTTNoSuchGame( getJNI().m_ptrGlobals, addressee, gameID,
dvc_makeMQTTNoSuchGames( getJNI().m_ptrGlobals, addressee, gameID,
topics, packets );
}
@ -768,9 +768,9 @@ public class XwJNI {
private static native String dvc_getMQTTDevID( long jniState );
private static native void dvc_resetMQTTDevID( long jniState );
private static native String[] dvc_getMQTTSubTopics( long jniState );
private static native void dvc_makeMQTTInvite( long jniState,
NetLaunchInfo nli,
private static native void dvc_makeMQTTInvites( long jniState,
String invitee,
NetLaunchInfo nli,
String[][] topics,
byte[][][] packets );
private static native void dvc_makeMQTTMessages( long jniState,
@ -780,7 +780,7 @@ public class XwJNI {
String[][] topics,
byte[][][] packets );
private static native void dvc_makeMQTTNoSuchGame( long jniState, String addressee,
private static native void dvc_makeMQTTNoSuchGames( long jniState, String addressee,
int gameID, String[][] topics,
byte[][][] packets );
private static native void dvc_parseMQTTPacket( long jniState, String topic,

View file

@ -686,27 +686,6 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTSubTopics
return result;
}
JNIEXPORT jbyteArray JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTInvite
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jobject jnli )
{
jbyteArray result;
DVC_HEADER(jniGlobalPtr);
NetLaunchInfo nli;
loadNLI( env, &nli, jnli );
LOGNLI( &nli );
XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool)
globalState->vtMgr,
NULL, 0, NULL );
dvc_makeMQTTInvite( globalState->dutil, env, stream, &nli, 0 );
result = streamToBArray( env, stream );
stream_destroy( stream, env );
DVC_HEADER_END();
return result;
}
typedef struct _MTPData {
@ -720,25 +699,63 @@ typedef struct _MTPData {
static void
msgAndTopicProc( void* closure, const XP_UCHAR* topic,
XWStreamCtxt* msg )
const XP_U8* msgBuf, XP_U16 msgLen )
{
MTPData* mtp = (MTPData*)closure;
XP_LOGFF( "(topic=%s); count=%d", topic, mtp->count );
JNIEnv* env = mtp->env;
const XP_UCHAR* ptr = mtp->topics[mtp->count] = &mtp->storage[mtp->offset];
size_t siz = XP_SNPRINTF( (char*)ptr, VSIZE(mtp->storage) - mtp->offset,
"%s", topic );
XP_ASSERT( siz < VSIZE(mtp->storage) - mtp->offset );
XP_LOGFF( "topic %s looks good", mtp->topics[mtp->count] );
mtp->offset += 1 + XP_STRLEN(ptr);
mtp->jPackets[mtp->count] = streamToBArray( env, msg );
mtp->jPackets[mtp->count] = makeByteArray( env, msgLen, (const jbyte*)msgBuf );
++mtp->count;
XP_ASSERT( mtp->count < VSIZE(mtp->topics) );
}
static void
wrapResults( jobjectArray jTopicsOut, jobjectArray jPacketsOut, MTPData* mtp )
{
JNIEnv* env = mtp->env;
jobjectArray jTopics = makeStringArray( env, mtp->count, mtp->topics );
(*env)->SetObjectArrayElement( env, jTopicsOut, 0, jTopics );
jobjectArray jPackets = makeByteArrayArray( env, mtp->count );
for ( int ii = 0; ii < mtp->count; ++ii ) {
(*env)->SetObjectArrayElement( env, jPackets, ii, mtp->jPackets[ii] );
deleteLocalRef( env, mtp->jPackets[ii] );
}
(*env)->SetObjectArrayElement( env, jPacketsOut, 0, jPackets );
deleteLocalRefs( env, jTopics, jPackets, DELETE_NO_REF );
}
JNIEXPORT void JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTInvites
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jstring jAddressee,
jobject jnli, jobjectArray jTopicsOut, jobjectArray jPacketsOut )
{
DVC_HEADER(jniGlobalPtr);
NetLaunchInfo nli;
loadNLI( env, &nli, jnli );
LOGNLI( &nli );
MTPData mtp = { .env = env, };
MQTTDevID addressee;
jstrToDevID( env, jAddressee, &addressee );
dvc_makeMQTTInvites( globalState->dutil, env, msgAndTopicProc, &mtp,
&addressee, &nli, 0 );
wrapResults( jTopicsOut, jPacketsOut, &mtp );
DVC_HEADER_END();
}
JNIEXPORT void JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessages
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jstring jAddressee,
@ -763,42 +780,29 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessages
(const XP_U8*)buf, len );
(*env)->ReleaseByteArrayElements( env, jmsg, buf, 0 );
XP_LOGFF( "making array of %d strings, e.g. %s", mtp.count, mtp.topics[0] );
jobjectArray jTopics = makeStringArray( env, mtp.count, mtp.topics );
(*env)->SetObjectArrayElement( env, jTopicsOut, 0, jTopics );
deleteLocalRef( env, jTopics );
XP_LOGFF( "making array of %d msgs", mtp.count );
jobjectArray jPackets = makeByteArrayArray( env, mtp.count );
for ( int ii = 0; ii < mtp.count; ++ii ) {
(*env)->SetObjectArrayElement( env, jPackets, ii, mtp.jPackets[ii] );
deleteLocalRef( env, mtp.jPackets[ii] );
}
(*env)->SetObjectArrayElement( env, jPacketsOut, 0, jPackets );
deleteLocalRef( env, jPackets );
wrapResults( jTopicsOut, jPacketsOut, &mtp );
DVC_HEADER_END();
LOG_RETURN_VOID();
}
JNIEXPORT jbyteArray JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTNoSuchGame
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jint jgameid )
JNIEXPORT void JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTNoSuchGames
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jstring jAddressee,
jint jgameid, jobjectArray jTopicsOut, jobjectArray jPacketsOut )
{
jbyteArray result;
DVC_HEADER(jniGlobalPtr);
XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool)
globalState->vtMgr,
NULL, 0, NULL );
dvc_makeMQTTNoSuchGame( globalState->dutil, env, stream, jgameid, 0 );
MTPData mtp = { .env = env, };
MQTTDevID addressee;
jstrToDevID( env, jAddressee, &addressee );
result = streamToBArray( env, stream );
stream_destroy( stream, env );
dvc_makeMQTTNoSuchGames( globalState->dutil, env, msgAndTopicProc, &mtp,
&addressee, jgameid, 0 );
wrapResults( jTopicsOut, jPacketsOut, &mtp );
DVC_HEADER_END();
LOG_RETURN_VOID();
return result;
}
JNIEXPORT void JNICALL

View file

@ -236,43 +236,6 @@ dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
LOG_RETURN_VOID();
}
void
dvc_getMQTTPubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
const MQTTDevID* devid, XP_U32 gameID,
XP_UCHAR* storage, XP_U16 storageLen,
XP_U16* nTopics, XP_UCHAR* topics[] )
{
/* Keep these in API in case we do cacheing or such later */
XP_USE( dutil );
XP_USE( xwe );
int offset = 0;
int count = 0;
XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( devid, devTopic, VSIZE(devTopic) );
#ifdef MQTT_DEV_TOPICS
/* device topic; eventually goes away; but invites? */
topics[count++] = appendToStorage( storage, &offset, devTopic );
#endif
#ifdef MQTT_GAMEID_TOPICS
XP_UCHAR buf[128];
/* gameid topic */
XP_SNPRINTF( buf, VSIZE(buf), "%s/%X", devTopic, gameID );
topics[count++] = appendToStorage( storage, &offset, buf );
#else
XP_USE(gameID);
#endif
XP_ASSERT( offset < storageLen );
XP_ASSERT( count <= *nTopics );
*nTopics = count;
logPtrs( __func__, *nTopics, topics );
LOG_RETURN_VOID();
}
typedef enum { CMD_INVITE, CMD_MSG, CMD_DEVGONE, } MQTTCmd;
// #define PROTO_0 0
@ -329,6 +292,12 @@ addProto3HeaderCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
}
#endif
static void
callProc( MsgAndTopicProc proc, void* closure, const XP_UCHAR* topic, XWStreamCtxt* stream )
{
(*proc)( closure, topic, stream_getPtr(stream), stream_getSize(stream) );
}
void
dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
@ -345,7 +314,7 @@ dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
nli_saveToStream( nli, stream );
#ifdef MQTT_DEV_TOPICS
(*proc)( closure, devTopic, stream );
callProc( proc, closure, devTopic, stream );
#endif
#ifdef MQTT_GAMEID_TOPICS
@ -353,7 +322,7 @@ dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic),
"%s/%X", devTopic, nli->gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
(*proc)(closure, gameTopic, stream );
callProc( proc, closure, devTopic, stream );
#endif
stream_destroy( stream, xwe );
@ -380,7 +349,7 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
stream_putU32VL( stream, len );
}
stream_putBytes( stream, buf, len );
(*proc)(closure, devTopic, stream );
callProc( proc, closure, devTopic, stream );
stream_destroy( stream, xwe );
}
#endif
@ -403,19 +372,37 @@ dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
"%s/%X", devTopic, gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
(*proc)( closure, gameTopic, stream );
callProc( proc, closure, gameTopic, stream );
stream_destroy( stream, xwe );
}
#endif
}
void
dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream, XP_U32 gameID,
XP_U32 timestamp )
dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
XP_U32 gameID, XP_U32 timestamp )
{
XP_UCHAR devTopic[64]; /* used by two below */
formatMQTTDevTopic( addressee, devTopic, VSIZE(devTopic) );
XWStreamCtxt* stream = mkStream( dutil );
addHeaderGameIDAndCmd( dutil, xwe, CMD_DEVGONE, gameID,
timestamp, stream );
#ifdef MQTT_DEV_TOPICS
callProc( proc, closure, devTopic, stream );
#endif
#ifdef MQTT_GAMEID_TOPICS
XP_UCHAR gameTopic[64];
size_t siz = XP_SNPRINTF( gameTopic, VSIZE(gameTopic),
"%s/%X", devTopic, gameID );
XP_ASSERT( siz < VSIZE(gameTopic) );
callProc( proc, closure, gameTopic, stream );
#endif
stream_destroy( stream, xwe );
}
static XP_Bool

View file

@ -31,17 +31,13 @@ void dvc_store( XW_DUtilCtxt* dctxt, XWEnv xwe );
# endif
typedef void (*MsgAndTopicProc)( void* closure, const XP_UCHAR* topic,
XWStreamCtxt* msg );
const XP_U8* msgBuf, XP_U16 msgLen );
void dvc_getMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTDevID* devID );
void dvc_resetMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe );
void dvc_getMQTTSubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
XP_UCHAR* storage, XP_U16 storageLen,
XP_U16* nTopics, XP_UCHAR* topics[] );
void dvc_getMQTTPubTopics( XW_DUtilCtxt* dutil, XWEnv xwe,
const MQTTDevID* devid, XP_U32 gameID,
XP_UCHAR* storage, XP_U16 storageLen,
XP_U16* nTopics, XP_UCHAR* topics[] );
void dvc_makeMQTTInvites( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
@ -53,8 +49,10 @@ void dvc_makeMQTTMessages( XW_DUtilCtxt* dutil, XWEnv xwe,
const MQTTDevID* addressee,
XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len );
void dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream, XP_U32 gameID, XP_U32 timestamp );
void dvc_makeMQTTNoSuchGames( XW_DUtilCtxt* dutil, XWEnv xwe,
MsgAndTopicProc proc, void* closure,
const MQTTDevID* addressee,
XP_U32 gameID, XP_U32 timestamp );
void dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_UCHAR* topic,
const XP_U8* buf, XP_U16 len );
#endif

View file

@ -164,50 +164,48 @@ handle_gotmsg( GIOChannel* source, GIOCondition XP_UNUSED(condition), gpointer d
return TRUE;
} /* handle_gotmsg */
static bool
postMsg( MQTTConStorage* storage, XWStreamCtxt* stream, XP_U32 gameID,
const MQTTDevID* invitee )
{
XP_ASSERT(0); /* I need to go away! */
const XP_U8* bytes = stream_getPtr( stream );
XP_U16 len = stream_getSize( stream );
/* static bool */
/* postMsg( MQTTConStorage* storage, XWStreamCtxt* stream, XP_U32 gameID, */
/* const MQTTDevID* invitee ) */
/* { */
/* XP_ASSERT(0); /\* I need to go away! *\/ */
/* const XP_U8* bytes = stream_getPtr( stream ); */
/* XP_U16 len = stream_getSize( stream ); */
int mid;
/* int mid; */
#ifdef DEBUG
XP_UCHAR* sum = dutil_md5sum( storage->params->dutil, NULL_XWE, bytes, len );
XP_LOGFF( "sending %d bytes with sum %s", len, sum );
XP_FREEP( storage->params->mpool, &sum );
#endif
/* #ifdef DEBUG */
/* XP_UCHAR* sum = dutil_md5sum( storage->params->dutil, NULL_XWE, bytes, len ); */
/* XP_LOGFF( "sending %d bytes with sum %s", len, sum ); */
/* XP_FREEP( storage->params->mpool, &sum ); */
/* #endif */
XP_UCHAR topicStorage[128];
XP_UCHAR* topics[4];
XP_U16 nTopics = VSIZE(topics);
dvc_getMQTTPubTopics( storage->params->dutil, NULL_XWE,
invitee, gameID, topicStorage, VSIZE(topicStorage),
&nTopics, topics );
/* XP_UCHAR topicStorage[128]; */
/* XP_UCHAR* topics[4]; */
/* XP_U16 nTopics = VSIZE(topics); */
/* dvc_getMQTTPubTopics( storage->params->dutil, NULL_XWE, */
/* invitee, gameID, topicStorage, VSIZE(topicStorage), */
/* &nTopics, topics ); */
bool success = XP_TRUE;
for ( int ii = 0; success && ii < nTopics; ++ii ) {
int err = mosquitto_publish( storage->mosq, &mid, topics[ii],
len, bytes, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s) => %s; mid=%d", topics[ii],
mosquitto_strerror(err), mid );
success = 0 == err;
}
/* bool success = XP_TRUE; */
/* for ( int ii = 0; success && ii < nTopics; ++ii ) { */
/* int err = mosquitto_publish( storage->mosq, &mid, topics[ii], */
/* len, bytes, DEFAULT_QOS, true ); */
/* XP_LOGFF( "mosquitto_publish(topic=%s) => %s; mid=%d", topics[ii], */
/* mosquitto_strerror(err), mid ); */
/* success = 0 == err; */
/* } */
stream_destroy( stream, NULL_XWE );
return success;
}
/* stream_destroy( stream, NULL_XWE ); */
/* return success; */
/* } */
static bool
postOne( MQTTConStorage* storage, const XP_UCHAR* topic, XWStreamCtxt* stream )
postOne( MQTTConStorage* storage, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
{
const XP_U8* bytes = stream_getPtr( stream );
XP_U16 len = stream_getSize( stream );
int mid;
int err = mosquitto_publish( storage->mosq, &mid, topic,
len, bytes, DEFAULT_QOS, true );
len, buf, DEFAULT_QOS, true );
XP_LOGFF( "mosquitto_publish(topic=%s) => %s; mid=%d", topic,
mosquitto_strerror(err), mid );
XP_ASSERT( 0 == err );
@ -310,10 +308,10 @@ mqttc_getDevIDStr( LaunchParams* params )
static void
msgAndTopicProc( void* closure, const XP_UCHAR* topic, XWStreamCtxt* stream )
msgAndTopicProc( void* closure, const XP_UCHAR* topic, const XP_U8* buf, XP_U16 len )
{
MQTTConStorage* storage = (MQTTConStorage*)closure;
(void)postOne( storage, topic, stream );
(void)postOne( storage, topic, buf, len );
}
void
@ -348,8 +346,7 @@ void
mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID )
{
MQTTConStorage* storage = getStorage( params );
XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool)
params->vtMgr );
dvc_makeMQTTNoSuchGame( params->dutil, NULL_XWE, stream, gameID, 0 );
postMsg( storage, stream, gameID, addressee );
dvc_makeMQTTNoSuchGames( params->dutil, NULL_XWE,
msgAndTopicProc, storage,
addressee, gameID, 0 );
}