add timestamp to saved msgs, modify mqtt header to include later

I want receiver to know when message was originally created. This adds
timestamp to messages and passes it via send proc. Client needs to
send it where possible. So far, MQTT format can't include it without
change, so I'm adding a new proto version. This change can read the
new version. Once that's well-enough distributed I can start sending
using it. Other transmission types than MQTT are for later.
This commit is contained in:
Eric House 2022-06-29 12:29:32 -07:00
parent ec4822bbe9
commit 2dfc9129f3
22 changed files with 130 additions and 76 deletions

View file

@ -57,10 +57,10 @@ public class CommsTransport implements TransportProcs {
@Override @Override
public int transportSend( byte[] buf, String msgID, CommsAddrRec addr, public int transportSend( byte[] buf, String msgID, CommsAddrRec addr,
CommsConnType conType, int gameID ) CommsConnType conType, int gameID, int timestamp )
{ {
Log.d( TAG, "transportSend(len=%d, typ=%s)", buf.length, Log.d( TAG, "transportSend(len=%d, typ=%s, ts=%d)", buf.length,
conType.toString() ); conType.toString(), timestamp );
int nSent = -1; int nSent = -1;
Assert.assertNotNull( addr ); Assert.assertNotNull( addr );
Assert.assertTrueNR( addr.contains( conType ) ); // fired per google Assert.assertTrueNR( addr.contains( conType ) ); // fired per google
@ -76,7 +76,7 @@ public class CommsTransport implements TransportProcs {
} }
} else { } else {
nSent = sendForAddr( m_context, addr, conType, m_rowid, gameID, nSent = sendForAddr( m_context, addr, conType, m_rowid, gameID,
buf, msgID ); timestamp, buf, msgID );
} }
// Keep this while debugging why the resend_all that gets // Keep this while debugging why the resend_all that gets
@ -95,7 +95,8 @@ public class CommsTransport implements TransportProcs {
private int sendForAddr( Context context, CommsAddrRec addr, private int sendForAddr( Context context, CommsAddrRec addr,
CommsConnType conType, long rowID, CommsConnType conType, long rowID,
int gameID, byte[] buf, String msgID ) int gameID, int timestamp,
byte[] buf, String msgID )
{ {
int nSent = -1; int nSent = -1;
switch ( conType ) { switch ( conType ) {
@ -117,7 +118,7 @@ public class CommsTransport implements TransportProcs {
nSent = NFCUtils.addMsgFor( buf, gameID ); nSent = NFCUtils.addMsgFor( buf, gameID );
break; break;
case COMMS_CONN_MQTT: case COMMS_CONN_MQTT:
nSent = MQTTUtils.send( context, addr.mqtt_devID, gameID, buf ); nSent = MQTTUtils.send( context, addr.mqtt_devID, gameID, timestamp, buf );
break; break;
default: default:
Assert.failDbg(); Assert.failDbg();

View file

@ -525,12 +525,12 @@ public class MQTTUtils extends Thread
} }
public static int send( Context context, String addressee, int gameID, public static int send( Context context, String addressee, int gameID,
byte[] buf ) int timestamp, byte[] buf )
{ {
Log.d( TAG, "send(to:%s, len: %d)", addressee, buf.length ); Log.d( TAG, "send(to:%s, len: %d)", addressee, buf.length );
Assert.assertTrueNR( 16 == addressee.length() ); Assert.assertTrueNR( 16 == addressee.length() );
String[] topic = {addressee}; String[] topic = {addressee};
byte[] packet = XwJNI.dvc_makeMQTTMessage( gameID, buf, topic ); byte[] packet = XwJNI.dvc_makeMQTTMessage( gameID, timestamp, buf, topic );
addToSendQueue( context, topic[0], packet ); addToSendQueue( context, topic[0], packet );
return buf.length; return buf.length;
} }

View file

@ -82,9 +82,9 @@ public class MultiMsgSink implements TransportProcs {
return NFCUtils.addMsgFor( buf, gameID ); return NFCUtils.addMsgFor( buf, gameID );
} }
int sendViaMQTT( String addressee, byte[] buf, int gameID ) int sendViaMQTT( String addressee, byte[] buf, int gameID, int timestamp )
{ {
return MQTTUtils.send( m_context, addressee, gameID, buf ); return MQTTUtils.send( m_context, addressee, gameID, timestamp, buf );
} }
public int numSent() public int numSent()
@ -98,7 +98,7 @@ public class MultiMsgSink implements TransportProcs {
@Override @Override
public int transportSend( byte[] buf, String msgID, CommsAddrRec addr, public int transportSend( byte[] buf, String msgID, CommsAddrRec addr,
CommsConnType typ, int gameID ) CommsConnType typ, int gameID, int timestamp )
{ {
int nSent = -1; int nSent = -1;
switch ( typ ) { switch ( typ ) {
@ -118,7 +118,7 @@ public class MultiMsgSink implements TransportProcs {
nSent = sendViaNFC( buf, gameID ); nSent = sendViaNFC( buf, gameID );
break; break;
case COMMS_CONN_MQTT: case COMMS_CONN_MQTT:
nSent = sendViaMQTT( addr.mqtt_devID, buf, gameID ); nSent = sendViaMQTT( addr.mqtt_devID, buf, gameID, timestamp );
break; break;
default: default:
Assert.failDbg(); Assert.failDbg();

View file

@ -29,7 +29,7 @@ public interface TransportProcs {
int getFlags(); int getFlags();
int transportSend( byte[] buf, String msgNo, CommsAddrRec addr, int transportSend( byte[] buf, String msgNo, CommsAddrRec addr,
CommsConnType conType, int gameID ); CommsConnType conType, int gameID, int timestamp );
void countChanged( int newCount ); void countChanged( int newCount );

View file

@ -152,10 +152,11 @@ public class XwJNI {
return dvc_makeMQTTInvite( getJNI().m_ptrGlobals, nli, addrToTopic ); return dvc_makeMQTTInvite( getJNI().m_ptrGlobals, nli, addrToTopic );
} }
public static byte[] dvc_makeMQTTMessage( int gameID, byte[] buf, public static byte[] dvc_makeMQTTMessage( int gameID, int timestamp,
String[] addrToTopic ) byte[] buf, String[] addrToTopic )
{ {
return dvc_makeMQTTMessage( getJNI().m_ptrGlobals, gameID, buf, addrToTopic ); return dvc_makeMQTTMessage( getJNI().m_ptrGlobals, gameID, timestamp,
buf, addrToTopic );
} }
public static byte[] dvc_makeMQTTNoSuchGame( int gameID, String[] addrToTopic ) public static byte[] dvc_makeMQTTNoSuchGame( int gameID, String[] addrToTopic )
@ -716,8 +717,8 @@ public class XwJNI {
private static native void dvc_resetMQTTDevID( long jniState ); private static native void dvc_resetMQTTDevID( long jniState );
private static native byte[] dvc_makeMQTTInvite( long jniState, NetLaunchInfo nli, private static native byte[] dvc_makeMQTTInvite( long jniState, NetLaunchInfo nli,
String[] addrToTopic ); String[] addrToTopic );
private static native byte[] dvc_makeMQTTMessage( long jniState, int gameID, byte[] buf, private static native byte[] dvc_makeMQTTMessage( long jniState, int gameID, int timestamp,
String[] addrToTopic ); byte[] buf, String[] addrToTopic );
private static native byte[] dvc_makeMQTTNoSuchGame( long jniState, int gameID, private static native byte[] dvc_makeMQTTNoSuchGame( long jniState, int gameID,
String[] addrToTopic ); String[] addrToTopic );

View file

@ -48,6 +48,8 @@ LOCAL_DEFINES += \
-DRELAY_ROOM_DEFAULT=\"\" \ -DRELAY_ROOM_DEFAULT=\"\" \
-D__LITTLE_ENDIAN \ -D__LITTLE_ENDIAN \
# -DMQTT_USE_PROTO=2 \
# XWFEATURE_RAISETILE: first, fix to not use timer # XWFEATURE_RAISETILE: first, fix to not use timer
# -DXWFEATURE_RAISETILE \ # -DXWFEATURE_RAISETILE \

View file

@ -895,16 +895,12 @@ and_dutil_onInviteReceived( XW_DUtilCtxt* duc, XWEnv xwe, const NetLaunchInfo* n
static void static void
and_dutil_onMessageReceived( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID, and_dutil_onMessageReceived( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID,
const CommsAddrRec* from, XWStreamCtxt* stream ) const CommsAddrRec* from, const XP_U8* data, XP_U16 len )
{ {
LOG_FUNC(); LOG_FUNC();
DUTIL_CBK_HEADER( "onMessageReceived", DUTIL_CBK_HEADER( "onMessageReceived",
"(IL" PKG_PATH("jni/CommsAddrRec") ";[B)V" ); "(IL" PKG_PATH("jni/CommsAddrRec") ";[B)V" );
XP_U16 len = stream_getSize( stream );
XP_U8 data[len];
stream_getBytes( stream, data, len );
jbyteArray jmsg = makeByteArray( env, len, (jbyte*)data ); jbyteArray jmsg = makeByteArray( env, len, (jbyte*)data );
jobject jaddr = makeJAddr( env, from ); jobject jaddr = makeJAddr( env, from );

View file

@ -50,8 +50,9 @@ and_xport_getFlags( XWEnv xwe, void* closure )
static XP_S16 static XP_S16
and_xport_send( XWEnv xwe, const XP_U8* buf, XP_U16 len, and_xport_send( XWEnv xwe, const XP_U8* buf, XP_U16 len,
const XP_UCHAR* msgNo, const CommsAddrRec* addr, const XP_UCHAR* msgNo, XP_U32 timestamp,
CommsConnType conType, XP_U32 gameID, void* closure ) const CommsAddrRec* addr, CommsConnType conType,
XP_U32 gameID, void* closure )
{ {
jint result = -1; jint result = -1;
LOG_FUNC(); LOG_FUNC();
@ -60,7 +61,7 @@ and_xport_send( XWEnv xwe, const XP_U8* buf, XP_U16 len,
if ( NULL != aprocs->jxport ) { if ( NULL != aprocs->jxport ) {
JNIEnv* env = xwe; JNIEnv* env = xwe;
const char* sig = "([BLjava/lang/String;L" PKG_PATH("jni/CommsAddrRec") const char* sig = "([BLjava/lang/String;L" PKG_PATH("jni/CommsAddrRec")
";L" PKG_PATH("jni/CommsAddrRec$CommsConnType") ";I)I"; ";L" PKG_PATH("jni/CommsAddrRec$CommsConnType") ";II)I";
jmethodID mid = getMethodID( env, aprocs->jxport, "transportSend", sig ); jmethodID mid = getMethodID( env, aprocs->jxport, "transportSend", sig );
@ -70,7 +71,8 @@ and_xport_send( XWEnv xwe, const XP_U8* buf, XP_U16 len,
intToJEnum(env, conType, PKG_PATH("jni/CommsAddrRec$CommsConnType")); intToJEnum(env, conType, PKG_PATH("jni/CommsAddrRec$CommsConnType"));
jstring jMsgNo = !!msgNo ? (*env)->NewStringUTF( env, msgNo ) : NULL; jstring jMsgNo = !!msgNo ? (*env)->NewStringUTF( env, msgNo ) : NULL;
result = (*env)->CallIntMethod( env, aprocs->jxport, mid, result = (*env)->CallIntMethod( env, aprocs->jxport, mid,
jbytes, jMsgNo, jaddr, jConType, gameID ); jbytes, jMsgNo, jaddr, jConType,
gameID, timestamp );
deleteLocalRefs( env, jaddr, jbytes, jMsgNo, jConType, DELETE_NO_REF ); deleteLocalRefs( env, jaddr, jbytes, jMsgNo, jConType, DELETE_NO_REF );
} }

View file

@ -707,7 +707,7 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTInvite
XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool) XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool)
globalState->vtMgr, globalState->vtMgr,
NULL, 0, NULL ); NULL, 0, NULL );
dvc_makeMQTTInvite( globalState->dutil, env, stream, &nli ); dvc_makeMQTTInvite( globalState->dutil, env, stream, &nli, 0 );
result = streamToBArray( env, stream ); result = streamToBArray( env, stream );
stream_destroy( stream, env ); stream_destroy( stream, env );
@ -721,7 +721,7 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTInvite
JNIEXPORT jbyteArray JNICALL JNIEXPORT jbyteArray JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessage Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessage
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jint jGameID, ( JNIEnv* env, jclass C, jlong jniGlobalPtr, jint jGameID,
jbyteArray jmsg, jobjectArray jAddrToTopic ) jint jTimestamp, jbyteArray jmsg, jobjectArray jAddrToTopic )
{ {
jbyteArray result; jbyteArray result;
LOG_FUNC(); LOG_FUNC();
@ -734,7 +734,7 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTMessage
XP_U16 len = (*env)->GetArrayLength( env, jmsg ); XP_U16 len = (*env)->GetArrayLength( env, jmsg );
jbyte* buf = (*env)->GetByteArrayElements( env, jmsg, NULL ); jbyte* buf = (*env)->GetByteArrayElements( env, jmsg, NULL );
dvc_makeMQTTMessage( globalState->dutil, env, stream, jGameID, dvc_makeMQTTMessage( globalState->dutil, env, stream, jGameID,
(const XP_U8*)buf, len ); jTimestamp, (const XP_U8*)buf, len );
(*env)->ReleaseByteArrayElements( env, jmsg, buf, 0 ); (*env)->ReleaseByteArrayElements( env, jmsg, buf, 0 );
result = streamToBArray( env, stream ); result = streamToBArray( env, stream );
@ -756,7 +756,7 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1makeMQTTNoSuchGame
XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool) XWStreamCtxt* stream = mem_stream_make( MPPARM(globalState->mpool)
globalState->vtMgr, globalState->vtMgr,
NULL, 0, NULL ); NULL, 0, NULL );
dvc_makeMQTTNoSuchGame( globalState->dutil, env, stream, jgameid ); dvc_makeMQTTNoSuchGame( globalState->dutil, env, stream, jgameid, 0 );
result = streamToBArray( env, stream ); result = streamToBArray( env, stream );
stream_destroy( stream, env ); stream_destroy( stream, env );

View file

@ -76,6 +76,7 @@ typedef struct MsgQueueElem {
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
XP_UCHAR* checksum; XP_UCHAR* checksum;
#endif #endif
XP_U32 createdStamp;
} MsgQueueElem; } MsgQueueElem;
typedef struct AddressRecord { typedef struct AddressRecord {
@ -836,6 +837,9 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
msg->msgID = stream_getU32( stream ); msg->msgID = stream_getU32( stream );
msg->len = stream_getU16( stream ); msg->len = stream_getU16( stream );
} }
if ( version >= STREAM_VERS_MSGTIMESTAMP ) {
msg->createdStamp = stream_getU32( stream );
}
#ifdef DEBUG #ifdef DEBUG
msg->sendCount = 0; msg->sendCount = 0;
#endif #endif
@ -1075,6 +1079,7 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe),
stream_putU32VL( stream, msg->msgID ); stream_putU32VL( stream, msg->msgID );
stream_putU32VL( stream, msg->len ); stream_putU32VL( stream, msg->len );
stream_putU32( stream, msg->createdStamp );
stream_putBytes( stream, msg->msg, msg->len ); stream_putBytes( stream, msg->msg, msg->len );
} }
@ -1342,6 +1347,7 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
sizeof( *newMsgElem ) ); sizeof( *newMsgElem ) );
newMsgElem->channelNo = channelNo; newMsgElem->channelNo = channelNo;
newMsgElem->msgID = msgID; newMsgElem->msgID = msgID;
newMsgElem->createdStamp = dutil_getCurSeconds( comms->dutil, xwe );
XP_Bool useSmallHeader = !!rec && (COMMS_VERSION == rec->flags); XP_Bool useSmallHeader = !!rec && (COMMS_VERSION == rec->flags);
XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool) XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool)
@ -1723,7 +1729,8 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi
logAddr( comms, xwe, &addr, __func__ ); logAddr( comms, xwe, &addr, __func__ );
XP_UCHAR msgNo[16]; XP_UCHAR msgNo[16];
formatMsgNo( comms, elem, msgNo, sizeof(msgNo) ); formatMsgNo( comms, elem, msgNo, sizeof(msgNo) );
nSent = (*comms->procs.send)( xwe, elem->msg, elem->len, msgNo, &addr, nSent = (*comms->procs.send)( xwe, elem->msg, elem->len, msgNo,
elem->createdStamp, &addr,
typ, gameid, comms->procs.closure ); typ, gameid, comms->procs.closure );
break; break;
} }
@ -3648,8 +3655,8 @@ send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp, XP_PlayerAdd
XP_MEMCPY( &buf[1], data, dlen ); XP_MEMCPY( &buf[1], data, dlen );
} }
nSent = (*comms->procs.send)( xwe, buf, dlen+1, msgNo, addr, typ, gameID(comms), nSent = (*comms->procs.send)( xwe, buf, dlen+1, msgNo, 0,
comms->procs.closure ); addr, typ, gameID(comms), comms->procs.closure );
XP_FREE( comms->mpool, buf ); XP_FREE( comms->mpool, buf );
setHeartbeatTimer( comms ); setHeartbeatTimer( comms );

View file

@ -75,7 +75,7 @@ typedef enum {
#endif #endif
typedef XP_S16 (*TransportSend)( XWEnv xwe, const XP_U8* buf, XP_U16 len, typedef XP_S16 (*TransportSend)( XWEnv xwe, const XP_U8* buf, XP_U16 len,
const XP_UCHAR* msgNo, const XP_UCHAR* msgNo, XP_U32 createdStamp,
const CommsAddrRec* addr, const CommsAddrRec* addr,
CommsConnType conType, CommsConnType conType,
XP_U32 gameID, void* closure ); XP_U32 gameID, void* closure );

View file

@ -48,6 +48,7 @@
#define MAX_COLS MAX_ROWS #define MAX_COLS MAX_ROWS
#define MIN_COLS 11 #define MIN_COLS 11
#define STREAM_VERS_MSGTIMESTAMP 0x21
#define STREAM_VERS_GI_ISO 0x20 #define STREAM_VERS_GI_ISO 0x20
#define STREAM_VERS_SMALLCOMMS 0x1F #define STREAM_VERS_SMALLCOMMS 0x1F
#define STREAM_VERS_NINETILES 0x1E #define STREAM_VERS_NINETILES 0x1E
@ -95,7 +96,7 @@
#define STREAM_VERS_405 0x01 #define STREAM_VERS_405 0x01
/* search for FIX_NEXT_VERSION_CHANGE next time this is changed */ /* search for FIX_NEXT_VERSION_CHANGE next time this is changed */
#define CUR_STREAM_VERS STREAM_VERS_GI_ISO #define CUR_STREAM_VERS STREAM_VERS_MSGTIMESTAMP
typedef struct XP_Rect { typedef struct XP_Rect {
XP_S16 left; XP_S16 left;

View file

@ -171,12 +171,16 @@ typedef enum { CMD_INVITE, CMD_MSG, CMD_DEVGONE, } MQTTCmd;
// #define PROTO_0 0 // #define PROTO_0 0
#define PROTO_1 1 /* moves gameID into "header" relay2 knows about */ #define PROTO_1 1 /* moves gameID into "header" relay2 knows about */
#define PROTO_2 2 /* adds timestamp to header */
#ifndef MQTT_USE_PROTO
# define MQTT_USE_PROTO PROTO_1
#endif
static void static void
addHeaderGameIDAndCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd, addHeaderGameIDAndCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
XP_U32 gameID, XWStreamCtxt* stream ) XP_U32 gameID, XP_U32 timestamp, XWStreamCtxt* stream )
{ {
stream_putU8( stream, PROTO_1 ); stream_putU8( stream, MQTT_USE_PROTO );
MQTTDevID myID; MQTTDevID myID;
dvc_getMQTTDevID( dutil, xwe, &myID ); dvc_getMQTTDevID( dutil, xwe, &myID );
@ -184,45 +188,57 @@ addHeaderGameIDAndCmd( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTCmd cmd,
stream_putBytes( stream, &myID, sizeof(myID) ); stream_putBytes( stream, &myID, sizeof(myID) );
stream_putU32( stream, gameID ); stream_putU32( stream, gameID );
if ( PROTO_2 <= MQTT_USE_PROTO ) {
if ( 0 == timestamp ) {
timestamp = dutil_getCurSeconds( dutil, xwe );
XP_LOGFF( "replacing timestamp of 0" );
}
stream_putU32( stream, timestamp );
}
stream_putU8( stream, cmd ); stream_putU8( stream, cmd );
} }
void void
dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream, dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
const NetLaunchInfo* nli ) const NetLaunchInfo* nli, XP_U32 timestamp )
{ {
addHeaderGameIDAndCmd( dutil, xwe, CMD_INVITE, nli->gameID, stream ); addHeaderGameIDAndCmd( dutil, xwe, CMD_INVITE, nli->gameID,
timestamp, stream );
nli_saveToStream( nli, stream ); nli_saveToStream( nli, stream );
} }
void void
dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream, dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
XP_U32 gameID, const XP_U8* buf, XP_U16 len ) XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len )
{ {
addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, stream ); addHeaderGameIDAndCmd( dutil, xwe, CMD_MSG, gameID, timestamp, stream );
if ( PROTO_2 <= MQTT_USE_PROTO ) {
stream_putU32VL( stream, len );
}
stream_putBytes( stream, buf, len ); stream_putBytes( stream, buf, len );
} }
void void
dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe, dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream, XP_U32 gameID ) XWStreamCtxt* stream, XP_U32 gameID,
XP_U32 timestamp )
{ {
addHeaderGameIDAndCmd( dutil, xwe, CMD_DEVGONE, gameID, stream ); addHeaderGameIDAndCmd( dutil, xwe, CMD_DEVGONE, gameID,
timestamp, stream );
} }
void void
dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 len ) dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe,
const XP_U8* buf, XP_U16 len )
{ {
LOG_FUNC(); LOG_FUNC();
XWStreamCtxt* stream = mkStream( dutil ); XWStreamCtxt* stream = mkStream( dutil );
stream_putBytes( stream, buf, len ); stream_putBytes( stream, buf, len );
XP_U8 proto = stream_getU8( stream ); XP_U8 proto = stream_getU8( stream );
if ( proto != PROTO_1 ) { if ( proto == PROTO_1 || proto == PROTO_2 ) {
XP_LOGFF( "read proto %d, expected %d; dropping packet",
proto, PROTO_1 );
} else {
MQTTDevID senderID; MQTTDevID senderID;
stream_getBytes( stream, &senderID, sizeof(senderID) ); stream_getBytes( stream, &senderID, sizeof(senderID) );
senderID = be64toh( senderID ); senderID = be64toh( senderID );
@ -232,11 +248,19 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 le
XP_LOGFF( "senderID: %s", tmp ); XP_LOGFF( "senderID: %s", tmp );
#endif #endif
MQTTCmd cmd; XP_U32 gameID = stream_getU32( stream );
XP_U32 gameID = 0;
gameID = stream_getU32( stream ); XP_U32 timestamp = 0;
cmd = stream_getU8( stream ); if ( PROTO_2 == proto ) {
timestamp = stream_getU32( stream );
#ifdef DEBUG
if ( 0 < timestamp ) {
XP_U32 now = dutil_getCurSeconds( dutil, xwe );
XP_LOGFF( "delivery took %ds", now - timestamp );
}
#endif
}
MQTTCmd cmd = stream_getU8( stream );
/* Need to ack even if discarded/malformed */ /* Need to ack even if discarded/malformed */
dutil_ackMQTTMsg( dutil, xwe, gameID, &senderID, buf, len ); dutil_ackMQTTMsg( dutil, xwe, gameID, &senderID, buf, len );
@ -255,7 +279,22 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 le
addr_addType( &from, COMMS_CONN_MQTT ); addr_addType( &from, COMMS_CONN_MQTT );
from.u.mqtt.devID = senderID; from.u.mqtt.devID = senderID;
if ( CMD_MSG == cmd ) { if ( CMD_MSG == cmd ) {
dutil_onMessageReceived( dutil, xwe, gameID, &from, stream ); XP_U32 msgLen;
if ( PROTO_2 == proto ) {
msgLen = stream_getU32VL( stream );
if ( msgLen > stream_getSize( stream ) ) {
XP_LOGFF( "msglen %d too large", msgLen );
msgLen = 0;
}
} else {
msgLen = stream_getSize( stream );
}
if ( 0 < msgLen ) {
XP_U8 msgBuf[msgLen];
stream_getBytes( stream, msgBuf, msgLen );
dutil_onMessageReceived( dutil, xwe, gameID,
&from, msgBuf, msgLen );
}
} else if ( CMD_DEVGONE == cmd ) { } else if ( CMD_DEVGONE == cmd ) {
dutil_onGameGoneReceived( dutil, xwe, gameID, &from ); dutil_onGameGoneReceived( dutil, xwe, gameID, &from );
} }
@ -265,6 +304,8 @@ dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 le
XP_LOGFF( "unknown command %d; dropping message", cmd ); XP_LOGFF( "unknown command %d; dropping message", cmd );
XP_ASSERT(0); XP_ASSERT(0);
} }
} else {
XP_LOGFF( "bad proto %d; dropping packet", proto );
} }
stream_destroy( stream, xwe ); stream_destroy( stream, xwe );
} }

View file

@ -33,10 +33,10 @@ void dvc_store( XW_DUtilCtxt* dctxt, XWEnv xwe );
void dvc_getMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTDevID* devID ); void dvc_getMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe, MQTTDevID* devID );
void dvc_resetMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe ); void dvc_resetMQTTDevID( XW_DUtilCtxt* dutil, XWEnv xwe );
void dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream, void dvc_makeMQTTInvite( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
const NetLaunchInfo* nli ); const NetLaunchInfo* nli, XP_U32 timestamp );
void dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream, void dvc_makeMQTTMessage( XW_DUtilCtxt* dutil, XWEnv xwe, XWStreamCtxt* stream,
XP_U32 gameID, const XP_U8* buf, XP_U16 len ); XP_U32 gameID, XP_U32 timestamp, const XP_U8* buf, XP_U16 len );
void dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe, void dvc_makeMQTTNoSuchGame( XW_DUtilCtxt* dutil, XWEnv xwe,
XWStreamCtxt* stream, XP_U32 gameID ); XWStreamCtxt* stream, XP_U32 gameID, XP_U32 timestamp );
void dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 len ); void dvc_parseMQTTPacket( XW_DUtilCtxt* dutil, XWEnv xwe, const XP_U8* buf, XP_U16 len );
#endif #endif

View file

@ -88,7 +88,7 @@ typedef struct _DUtilVtable {
void (*m_dutil_onInviteReceived)( XW_DUtilCtxt* duc, XWEnv xwe, void (*m_dutil_onInviteReceived)( XW_DUtilCtxt* duc, XWEnv xwe,
const NetLaunchInfo* nli ); const NetLaunchInfo* nli );
void (*m_dutil_onMessageReceived)( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID, void (*m_dutil_onMessageReceived)( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID,
const CommsAddrRec* from, XWStreamCtxt* stream ); const CommsAddrRec* from, const XP_U8* buf, XP_U16 len );
void (*m_dutil_onGameGoneReceived)( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID, void (*m_dutil_onGameGoneReceived)( XW_DUtilCtxt* duc, XWEnv xwe, XP_U32 gameID,
const CommsAddrRec* from ); const CommsAddrRec* from );
@ -158,8 +158,8 @@ void dutil_super_init( MPFORMAL XW_DUtilCtxt* dutil );
#define dutil_onInviteReceived(duc, xwe, nli) \ #define dutil_onInviteReceived(duc, xwe, nli) \
(duc)->vtable.m_dutil_onInviteReceived( (duc), (xwe), (nli) ) (duc)->vtable.m_dutil_onInviteReceived( (duc), (xwe), (nli) )
#define dutil_onMessageReceived(duc, xwe, gameID, from, stream) \ #define dutil_onMessageReceived(duc, xwe, gameID, from, buf, len) \
(duc)->vtable.m_dutil_onMessageReceived((duc),(xwe),(gameID),(from),(stream)) (duc)->vtable.m_dutil_onMessageReceived((duc),(xwe),(gameID),(from),(buf),(len))
#define dutil_onGameGoneReceived(duc, xwe, gameID, from) \ #define dutil_onGameGoneReceived(duc, xwe, gameID, from) \
(duc)->vtable.m_dutil_onGameGoneReceived((duc),(xwe),(gameID),(from)) (duc)->vtable.m_dutil_onGameGoneReceived((duc),(xwe),(gameID),(from))

View file

@ -268,6 +268,10 @@ LIBS += `pkg-config --libs glib-2.0`
CFLAGS += $(POINTER_SUPPORT) CFLAGS += $(POINTER_SUPPORT)
CFLAGS += -DDROP_BITMAPS CFLAGS += -DDROP_BITMAPS
# Turn on for testing. Eventually, just turn on, but sends to old
# builds will then fail
# CFLAGS += -DMQTT_USE_PROTO=2
ifneq (,$(findstring DPLATFORM_NCURSES,$(DEFINES))) ifneq (,$(findstring DPLATFORM_NCURSES,$(DEFINES)))
LIBS += $(OE_LIBDIR) -lncursesw LIBS += $(OE_LIBDIR) -lncursesw
endif endif

View file

@ -390,6 +390,7 @@ make_rematch( GtkAppGlobals* apg, const CommonGlobals* cGlobals )
stream_putU8( stream, nRecs ); stream_putU8( stream, nRecs );
for ( int ii = 0; ii < nRecs; ++ii ) { for ( int ii = 0; ii < nRecs; ++ii ) {
XP_LOGFF( "MQTT rematch not implemented" );
XP_ASSERT(0); /* REWRITE TO USE MQTT */ XP_ASSERT(0); /* REWRITE TO USE MQTT */
/* XP_UCHAR relayID[32]; */ /* XP_UCHAR relayID[32]; */
/* XP_U16 len = sizeof(relayID); */ /* XP_U16 len = sizeof(relayID); */

View file

@ -102,15 +102,11 @@ linux_dutil_onInviteReceived( XW_DUtilCtxt* duc, XWEnv XP_UNUSED(xwe),
static void static void
linux_dutil_onMessageReceived( XW_DUtilCtxt* duc, XWEnv XP_UNUSED(xwe), linux_dutil_onMessageReceived( XW_DUtilCtxt* duc, XWEnv XP_UNUSED(xwe),
XP_U32 gameID, const CommsAddrRec* from, XP_U32 gameID, const CommsAddrRec* from,
XWStreamCtxt* stream ) XP_U8* buf, XP_U16 len )
{ {
XP_LOGFF( "(gameID=%d)", gameID ); XP_LOGFF( "(gameID=%d)", gameID );
LaunchParams* params = (LaunchParams*)duc->closure; LaunchParams* params = (LaunchParams*)duc->closure;
XP_U16 len = stream_getSize( stream );
XP_U8 buf[len];
stream_getBytes( stream, buf, len );
if ( params->useCurses ) { if ( params->useCurses ) {
mqttMsgReceivedCurses( params->appGlobals, from, gameID, buf, len ); mqttMsgReceivedCurses( params->appGlobals, from, gameID, buf, len );
} else { } else {

View file

@ -1515,10 +1515,10 @@ linux_reset( XWEnv xwe, void* closure )
XP_S16 XP_S16
linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen, linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen,
const XP_UCHAR* msgNo, const CommsAddrRec* addrRec, CommsConnType conType, const XP_UCHAR* msgNo, XP_U32 createdStamp,
const CommsAddrRec* addrRec, CommsConnType conType,
XP_U32 gameID, void* closure ) XP_U32 gameID, void* closure )
{ {
XP_LOGF( "%s(mid=%s)", __func__, msgNo );
XP_S16 nSent = -1; XP_S16 nSent = -1;
CommonGlobals* cGlobals = (CommonGlobals*)closure; CommonGlobals* cGlobals = (CommonGlobals*)closure;
@ -1578,7 +1578,8 @@ linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen,
#endif #endif
case COMMS_CONN_MQTT: case COMMS_CONN_MQTT:
nSent = mqttc_send( cGlobals->params, gameID, buf, buflen, &addrRec->u.mqtt.devID ); nSent = mqttc_send( cGlobals->params, gameID, createdStamp, buf, buflen,
&addrRec->u.mqtt.devID );
break; break;
case COMMS_CONN_NFC: case COMMS_CONN_NFC:

View file

@ -39,7 +39,8 @@ typedef struct LinuxBMStruct {
int initListenerSocket( int port ); int initListenerSocket( int port );
XP_S16 linux_send( XWEnv xwe, const XP_U8* buf, XP_U16 buflen, XP_S16 linux_send( XWEnv xwe, const XP_U8* buf, XP_U16 buflen,
const XP_UCHAR* msgNo, const CommsAddrRec* addrRec, const XP_UCHAR* msgNo, XP_U32 createdStamp,
const CommsAddrRec* addrRec,
CommsConnType conType, XP_U32 gameID, void* closure ); CommsConnType conType, XP_U32 gameID, void* closure );
#ifndef XWFEATURE_STANDALONE_ONLY #ifndef XWFEATURE_STANDALONE_ONLY
# define LINUX_SEND linux_send # define LINUX_SEND linux_send

View file

@ -261,14 +261,14 @@ mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* invitee
XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool) XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool)
params->vtMgr ); params->vtMgr );
dvc_makeMQTTInvite( params->dutil, NULL_XWE, stream, nli ); dvc_makeMQTTInvite( params->dutil, NULL_XWE, stream, nli, 0 );
postMsg( storage, stream, invitee ); postMsg( storage, stream, invitee );
} }
XP_S16 XP_S16
mqttc_send( LaunchParams* params, XP_U32 gameID, const XP_U8* buf, mqttc_send( LaunchParams* params, XP_U32 gameID, XP_U32 timestamp,
XP_U16 len, const MQTTDevID* addressee ) const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee )
{ {
XP_S16 result = -1; XP_S16 result = -1;
MQTTConStorage* storage = getStorage( params ); MQTTConStorage* storage = getStorage( params );
@ -276,7 +276,7 @@ mqttc_send( LaunchParams* params, XP_U32 gameID, const XP_U8* buf,
params->vtMgr ); params->vtMgr );
dvc_makeMQTTMessage( params->dutil, NULL_XWE, stream, dvc_makeMQTTMessage( params->dutil, NULL_XWE, stream,
gameID, buf, len ); gameID, timestamp, buf, len );
if ( postMsg( storage, stream, addressee ) ) { if ( postMsg( storage, stream, addressee ) ) {
result = len; result = len;
} }
@ -289,6 +289,6 @@ mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 g
MQTTConStorage* storage = getStorage( params ); MQTTConStorage* storage = getStorage( params );
XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool) XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool)
params->vtMgr ); params->vtMgr );
dvc_makeMQTTNoSuchGame( params->dutil, NULL_XWE, stream, gameID ); dvc_makeMQTTNoSuchGame( params->dutil, NULL_XWE, stream, gameID, 0 );
postMsg( storage, stream, addressee ); postMsg( storage, stream, addressee );
} }

View file

@ -29,8 +29,8 @@ void mqttc_cleanup( LaunchParams* params );
const MQTTDevID* mqttc_getDevID( LaunchParams* params ); const MQTTDevID* mqttc_getDevID( LaunchParams* params );
const gchar* mqttc_getDevIDStr( LaunchParams* params ); const gchar* mqttc_getDevIDStr( LaunchParams* params );
void mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* mqttInvitee ); void mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* mqttInvitee );
XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID, const XP_U8* buf, XP_U16 len, XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID, XP_U32 timestamp,
const MQTTDevID* addressee ); const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee );
void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID ); void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID );
bool mqttc_strToDevID( const gchar* str, MQTTDevID* result ); bool mqttc_strToDevID( const gchar* str, MQTTDevID* result );