snapshot: mqtt invites for gtk work via comms

Merge that won't compile, but I want it to start fromp
This commit is contained in:
Eric House 2022-09-06 12:16:29 -07:00
parent 31777c3807
commit b4e197f8af
10 changed files with 295 additions and 74 deletions

View file

@ -34,6 +34,7 @@
#include "dbgutil.h" #include "dbgutil.h"
#include "knownplyr.h" #include "knownplyr.h"
#include "device.h" #include "device.h"
#include "nli.h"
#define HEARTBEAT_NONE 0 #define HEARTBEAT_NONE 0
@ -48,6 +49,10 @@
#define NO_MSGID_BIT 0x0020 #define NO_MSGID_BIT 0x0020
#define HEADER_LEN_OFFSET 9 #define HEADER_LEN_OFFSET 9
/* Low two bits treated as channel, third as short-term flag indicating
* sender's role; rest can be random to aid detection of duplicate packets. */
#define CHANNEL_MASK 0x0003
#ifndef XWFEATURE_STANDALONE_ONLY #ifndef XWFEATURE_STANDALONE_ONLY
#ifndef INITIAL_CLIENT_VERS #ifndef INITIAL_CLIENT_VERS
@ -67,8 +72,8 @@ EXTERN_C_START
typedef struct MsgQueueElem { typedef struct MsgQueueElem {
struct MsgQueueElem* next; struct MsgQueueElem* next;
XP_U8* msg; XP_U8* msg; /* ptr to NetLaunchInfo if isInvite is true */
XP_U16 len; XP_U16 len; /* stored as 0 if msg is a nli */
XP_PlayerAddr channelNo; XP_PlayerAddr channelNo;
#ifdef DEBUG #ifdef DEBUG
XP_U16 sendCount; /* how many times sent? */ XP_U16 sendCount; /* how many times sent? */
@ -202,8 +207,7 @@ typedef enum {
* prototypes * prototypes
****************************************************************************/ ****************************************************************************/
static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe,
XP_PlayerAddr channelNo, XP_PlayerAddr channelNo, XWHostID id,
XWHostID id,
const CommsAddrRec* addr, XP_U16 flags ); const CommsAddrRec* addr, XP_U16 flags );
static void augmentChannelAddr( CommsCtxt* comms, AddressRecord* rec, static void augmentChannelAddr( CommsCtxt* comms, AddressRecord* rec,
const CommsAddrRec* addr, XWHostID hostID ); const CommsAddrRec* addr, XWHostID hostID );
@ -217,7 +221,7 @@ static AddressRecord* getRecordFor( CommsCtxt* comms, XWEnv xwe,
static void augmentSelfAddr( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr ); static void augmentSelfAddr( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr );
static XP_S16 sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, static XP_S16 sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
CommsConnType filter ); CommsConnType filter );
static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newMsgElem ); static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem );
static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ; static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ;
static void freeElem( const CommsCtxt* comms, MsgQueueElem* elem ); static void freeElem( const CommsCtxt* comms, MsgQueueElem* elem );
@ -228,7 +232,8 @@ static void sendConnect( CommsCtxt* comms, XWEnv xwe
#endif #endif
); );
static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe ); static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe );
static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen, MsgID msgID ); static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen,
MsgID msgID );
static XP_Bool formatRelayID( const CommsCtxt* comms, XWHostID hostID, static XP_Bool formatRelayID( const CommsCtxt* comms, XWHostID hostID,
XP_UCHAR* buf, XP_U16* lenp ); XP_UCHAR* buf, XP_U16* lenp );
@ -872,10 +877,27 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
#ifdef DEBUG #ifdef DEBUG
msg->sendCount = 0; msg->sendCount = 0;
#endif #endif
msg->msg = (XP_U8*)XP_MALLOC( mpool, msg->len ); XP_U16 len = msg->len;
stream_getBytes( stream, msg->msg, msg->len ); if ( 0 == len ) {
XP_U32 nliLen = stream_getU32VL( stream );
XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
stream_getFromStream( nliStream, stream, nliLen );
NetLaunchInfo nli;
if ( nli_makeFromStream( &nli, nliStream ) ) {
msg->msg = (XP_U8*)XP_MALLOC( mpool, sizeof(nli) );
XP_MEMCPY( msg->msg, &nli, sizeof(nli) );
len = sizeof(nli); /* needed for checksum calc */
} else {
XP_ASSERT(0);
}
stream_destroy( nliStream, xwe );
} else {
msg->msg = (XP_U8*)XP_MALLOC( mpool, len );
stream_getBytes( stream, msg->msg, len );
}
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
msg->checksum = dutil_md5sum( comms->dutil, xwe, msg->msg, msg->len ); msg->checksum = dutil_md5sum( comms->dutil, xwe, msg->msg, len );
#endif #endif
msg->next = (MsgQueueElem*)NULL; msg->next = (MsgQueueElem*)NULL;
*prevsQueueNext = comms->msgQueueTail = msg; *prevsQueueNext = comms->msgQueueTail = msg;
@ -1110,7 +1132,18 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe),
stream_putU32VL( stream, msg->len ); stream_putU32VL( stream, msg->len );
stream_putU32( stream, msg->createdStamp ); stream_putU32( stream, msg->createdStamp );
stream_putBytes( stream, msg->msg, msg->len ); if ( 0 == msg->len ) {
XWStreamCtxt* nliStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
NetLaunchInfo* nli = (NetLaunchInfo*)msg->msg;
nli_saveToStream( nli, nliStream );
XP_U16 nliLen = stream_getSize( nliStream );
stream_putU32VL( stream, nliLen );
stream_getFromStream( stream, nliStream, nliLen );
stream_destroy( nliStream, xwe );
} else {
stream_putBytes( stream, msg->msg, msg->len );
}
} }
/* This writes 2 bytes instead of 1 if it were smarter. Not worth the work /* This writes 2 bytes instead of 1 if it were smarter. Not worth the work
@ -1336,6 +1369,18 @@ comms_getIsServer( const CommsCtxt* comms )
return comms->isServer; return comms->isServer;
} }
static MsgQueueElem*
makeNewElem( const CommsCtxt* comms, XWEnv xwe, MsgID msgID,
XP_PlayerAddr channelNo )
{
MsgQueueElem* newElem = (MsgQueueElem*)XP_CALLOC( comms->mpool,
sizeof( *newElem ) );
newElem->createdStamp = dutil_getCurSeconds( comms->dutil, xwe );
newElem->channelNo = channelNo;
newElem->msgID = msgID;
return newElem;
}
static MsgQueueElem* static MsgQueueElem*
makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec, makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
XP_PlayerAddr channelNo, XWStreamCtxt* stream ) XP_PlayerAddr channelNo, XWStreamCtxt* stream )
@ -1344,11 +1389,7 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf ); XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf );
XP_U16 streamSize = NULL == stream? 0 : stream_getSize( stream ); XP_U16 streamSize = NULL == stream? 0 : stream_getSize( stream );
MsgID lastMsgSaved = (!!rec)? rec->lastMsgSaved : 0; MsgID lastMsgSaved = (!!rec)? rec->lastMsgSaved : 0;
MsgQueueElem* newMsgElem = (MsgQueueElem*)XP_CALLOC( comms->mpool, MsgQueueElem* newElem = makeNewElem( comms, xwe, msgID, channelNo );
sizeof( *newMsgElem ) );
newMsgElem->channelNo = channelNo;
newMsgElem->msgID = msgID;
newMsgElem->createdStamp = dutil_getCurSeconds( comms->dutil, xwe );
XP_Bool useSmallHeader = !!rec && (COMMS_VERSION == rec->flags); XP_Bool useSmallHeader = !!rec && (COMMS_VERSION == rec->flags);
XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool) XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool)
@ -1400,18 +1441,38 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
stream_getFromStream( msgStream, stream, streamSize ); stream_getFromStream( msgStream, stream, streamSize );
} }
newMsgElem->len = stream_getSize( msgStream ); newElem->len = stream_getSize( msgStream );
newMsgElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newMsgElem->len ); XP_ASSERT( 0 < newElem->len );
stream_getBytes( msgStream, newMsgElem->msg, newMsgElem->len ); newElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newElem->len );
stream_getBytes( msgStream, newElem->msg, newElem->len );
stream_destroy( msgStream, xwe ); stream_destroy( msgStream, xwe );
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
newMsgElem->checksum = dutil_md5sum( comms->dutil, xwe, newMsgElem->msg, newElem->checksum = dutil_md5sum( comms->dutil, xwe, newElem->msg,
newMsgElem->len ); newElem->len );
#endif #endif
return newMsgElem; XP_ASSERT( 0 < newElem->len ); /* else NLI assumptions fail */
return newElem;
} /* makeElemWithID */ } /* makeElemWithID */
#ifdef XWFEATURE_COMMS_INVITE
static MsgQueueElem*
makeInviteElem( CommsCtxt* comms, XWEnv xwe,
XP_PlayerAddr channelNo, const NetLaunchInfo* nli )
{
MsgQueueElem* newElem = makeNewElem( comms, xwe, 0, channelNo );
XP_ASSERT( 0 == newElem->len ); /* len == 0 signals is NLI */
newElem->msg = XP_MALLOC( comms->mpool, sizeof(*nli) );
XP_MEMCPY( newElem->msg, nli, sizeof(*nli) );
# ifdef COMMS_CHECKSUM
newElem->checksum = dutil_md5sum( comms->dutil, xwe, newElem->msg,
sizeof(*nli) );
# endif
return newElem;
}
#endif
XP_U16 XP_U16
comms_getChannelSeed( CommsCtxt* comms ) comms_getChannelSeed( CommsCtxt* comms )
{ {
@ -1426,6 +1487,78 @@ comms_getChannelSeed( CommsCtxt* comms )
return result; return result;
} }
#ifdef XWFEATURE_COMMS_INVITE
/* PENDING: this needs to handle more than MQTT!!!! */
static XP_Bool
isSameAddr( const CommsAddrRec* rec1, const CommsAddrRec* rec2 )
{
XP_Bool result =
addr_hasType( rec1, COMMS_CONN_MQTT )
&& addr_hasType( rec2, COMMS_CONN_MQTT )
&& rec1->u.mqtt.devID == rec2->u.mqtt.devID;
LOG_RETURNF( "%s", boolToStr(result) );
return result;
}
/* We're adding invites to comms so they'll be persisted and resent etc. can
work in common code. Rule will be there's only one invitation present per
channel (remote device.) We'll add a channel if necessary. Then if there is
an invitation already there we'll replace it.
Interesting case is where I send two invitations only one of which can be
accepted (as e.g. when I invite one friend, then on not receiving a
response decide to invite a different friend instead. In the two-player
game case I can detect that there are too many invitations and delete/reuse
the previous channel when the new invitation is received. But what about
the two-remotes game? Would I delete the oldest that hadn't been responded
to? Or maybe keep them all, and when an invitation is responded to, meaning
that there's incoming traffic on that channel, I promote it somehow, and
when a game is complete (all players present) delete channels that have
only outgoing invitations pending.
*/
void
comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
const CommsAddrRec* destAddr )
{
LOG_FUNC();
/* See if we have a channel for this address. Then see if we have an
invite matching this one, and if not add one. Then trigger a send of
it. */
MsgQueueElem* elem = NULL;
XP_PlayerAddr channelNo = 0; /* = findOrMakeChannel( comms, destAddr ); */
for ( AddressRecord* rec = comms->recs; !!rec; rec = rec->next ) {
const CommsAddrRec* addr = &rec->addr;
if ( isSameAddr( addr, destAddr ) ) {
channelNo = rec->channelNo;
// found = XP_TRUE;
break;
}
}
for ( MsgQueueElem* elem = comms->msgQueueHead; !!elem; elem = elem-> next ) {
if ( channelNo == elem->channelNo ) {
XP_ASSERT( 0 == elem->msgID );
XP_ASSERT( 0 != channelNo );
freeElem( comms, elem );
XP_LOGFF( "nuked old invite" );
}
}
if ( 0 == channelNo ) {
channelNo = comms_getChannelSeed(comms) & ~CHANNEL_MASK;
}
/*AddressRecord* rec = */rememberChannelAddress( comms, xwe, channelNo,
0, destAddr, 0 );
elem = makeInviteElem( comms, xwe, channelNo, nli );
elem = addToQueue( comms, xwe, elem );
sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
}
#endif
/* Send a message using the sequentially next MsgID. Save the message so /* Send a message using the sequentially next MsgID. Save the message so
* resend can work. */ * resend can work. */
XP_S16 XP_S16
@ -1680,9 +1813,10 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi
XP_PlayerAddr channelNo = elem->channelNo; XP_PlayerAddr channelNo = elem->channelNo;
CNO_FMT( cbuf, channelNo ); CNO_FMT( cbuf, channelNo );
XP_Bool isInvite = 0 == elem->len;
#ifdef COMMS_CHECKSUM #ifdef COMMS_CHECKSUM
XP_LOGFF( TAGFMT() "sending message on %s: id: %d; len: %d; sum: %s", TAGPRMS, XP_LOGFF( TAGFMT() "sending message on %s: id: %d; len: %d; sum: %s; isInvite: %s",
cbuf, elem->msgID, elem->len, elem->checksum ); TAGPRMS, cbuf, elem->msgID, elem->len, elem->checksum, boolToStr(isInvite) );
#endif #endif
const CommsAddrRec* addrP = NULL; const CommsAddrRec* addrP = NULL;
@ -1771,12 +1905,36 @@ sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType fi
typ, gameid, comms->procs.closure ); typ, gameid, comms->procs.closure );
break; break;
} }
} /* switch */ /* <<<<<<< HEAD */
} /* } /\* switch *\/ */
XP_LOGFF( TAGFMT() "sent %d bytes using typ %s", TAGPRMS, nSent, /* } */
ConnType2Str(typ) ); /* XP_LOGFF( TAGFMT() "sent %d bytes using typ %s", TAGPRMS, nSent, */
if ( nSent > result ) { /* ConnType2Str(typ) ); */
result = nSent; /* if ( nSent > result ) { */
/* result = nSent; */
/* ======= */
/* if ( 0 ) { */
/* #ifdef XWFEATURE_COMMS_INVITE */
/* } else if ( isInvite ) { */
/* XP_ASSERT( !!comms->procs.sendInvt ); */
/* NetLaunchInfo* nli = (NetLaunchInfo*)elem->msg; */
/* nSent = (*comms->procs.sendInvt)( xwe, nli, elem->createdStamp, */
/* &addr, comms->procs.closure ); */
/* #endif */
/* } else { */
/* XP_ASSERT( !!comms->procs.sendMsg ); */
/* XP_U32 gameid = gameID( comms ); */
/* logAddr( comms, xwe, &addr, __func__ ); */
/* XP_UCHAR msgNo[16]; */
/* formatMsgNo( comms, elem, msgNo, sizeof(msgNo) ); */
/* nSent = (*comms->procs.sendMsg)( xwe, elem->msg, elem->len, msgNo, */
/* elem->createdStamp, &addr, */
/* typ, gameid, */
/* comms->procs.closure ); */
/* } */
/* break; */
/* >>>>>>> d9781d21e (snapshot: mqtt invites for gtk work via comms) */
} }
} }
@ -1823,9 +1981,7 @@ resendImpl( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool force,
success = XP_FALSE; success = XP_FALSE;
} else if ( !!comms->msgQueueHead ) { } else if ( !!comms->msgQueueHead ) {
MsgQueueElem* msg; for ( MsgQueueElem* msg = comms->msgQueueHead; !!msg; msg = msg->next ) {
for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) {
XP_S16 len = (*proc)( comms, xwe, msg, filter, closure ); XP_S16 len = (*proc)( comms, xwe, msg, filter, closure );
if ( 0 > len ) { if ( 0 > len ) {
success = XP_FALSE; success = XP_FALSE;
@ -2436,8 +2592,9 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
XP_ASSERT( (*channelNo & CHANNEL_MASK) == 0 ); XP_ASSERT( (*channelNo & CHANNEL_MASK) == 0 );
*channelNo |= ++comms->nextChannelNo; *channelNo |= ++comms->nextChannelNo;
CNO_FMT( cbuf1, *channelNo ); CNO_FMT( cbuf1, *channelNo );
XP_LOGFF( TAGFMT() "ORd channel onto channelNo: now %s", TAGPRMS, cbuf1 ); XP_LOGFF( TAGFMT() "ORd channel onto channelNo: now %s", TAGPRMS,
XP_ASSERT( comms->nextChannelNo <= CHANNEL_MASK ); cbuf1 );
XP_ASSERT( comms->nextChannelNo <= CHANNEL_ID_MASK );
} }
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr,
flags ); flags );
@ -2480,7 +2637,8 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
goto errExit; goto errExit;
} }
} }
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, flags ); rec = rememberChannelAddress( comms, xwe, *channelNo, senderID,
addr, flags );
} }
} }
errExit: errExit:
@ -2575,8 +2733,10 @@ getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff
} else if ( 0 == stuff->channelNo || 0 == channelSeed ) { } else if ( 0 == stuff->channelNo || 0 == channelSeed ) {
XP_LOGFF( TAGFMT() "one of channelNos still 0", TAGPRMS ); XP_LOGFF( TAGFMT() "one of channelNos still 0", TAGPRMS );
XP_ASSERT(0); XP_ASSERT(0);
} else if ( (stuff->channelNo & ~CHANNEL_MASK) != (channelSeed & ~CHANNEL_MASK) ) { } else if ( (stuff->channelNo & ~CHANNEL_MASK)
XP_LOGFF( "channelNos test fails: %x vs %x", stuff->channelNo, channelSeed ); != (channelSeed & ~CHANNEL_MASK) ) {
XP_LOGFF( "channelNos test fails: %x vs %x", stuff->channelNo,
channelSeed );
messageValid = XP_FALSE; messageValid = XP_FALSE;
} }
} }
@ -2610,8 +2770,9 @@ parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream,
XP_ASSERT( 0 < headerLen ); XP_ASSERT( 0 < headerLen );
XP_ASSERT( headerLen <= stream_getSize( msgStream ) ); XP_ASSERT( headerLen <= stream_getSize( msgStream ) );
if ( headerLen <= stream_getSize( msgStream ) ) { if ( headerLen <= stream_getSize( msgStream ) ) {
XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool) XWStreamCtxt* hdrStream =
dutil_getVTManager(comms->dutil)); mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
stream_getFromStream( hdrStream, msgStream, headerLen ); stream_getFromStream( hdrStream, msgStream, headerLen );
stuff->connID = 0 == (stuff->flags & NO_CONNID_BIT) stuff->connID = 0 == (stuff->flags & NO_CONNID_BIT)
? comms->util->gameInfo->gameID : CONN_ID_NONE; ? comms->util->gameInfo->gameID : CONN_ID_NONE;
@ -2714,14 +2875,17 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
if ( stuff.connID == CONN_ID_NONE ) { if ( stuff.connID == CONN_ID_NONE ) {
/* special case: initial message from client or server */ /* special case: initial message from client or server */
rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr, rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr,
senderID, &stuff.channelNo, stuff.flags ); senderID, &stuff.channelNo,
stuff.flags );
state->rec = rec; state->rec = rec;
} else if ( comms->connID == stuff.connID ) { } else if ( comms->connID == stuff.connID ) {
rec = validateChannelMessage( comms, xwe, retAddr, stuff.channelNo, rec = validateChannelMessage( comms, xwe, retAddr,
senderID, stuff.msgID, stuff.lastMsgRcd ); stuff.channelNo, senderID,
stuff.msgID, stuff.lastMsgRcd );
} else { } else {
XP_LOGFF( TAGFMT() "unexpected connID (%x vs %x) ; " XP_LOGFF( TAGFMT() "unexpected connID (%x vs %x) ; "
"dropping message", TAGPRMS, comms->connID, stuff.connID ); "dropping message", TAGPRMS, comms->connID,
stuff.connID );
} }
} }
@ -2881,9 +3045,8 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI
static void static void
sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ) sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
{ {
MsgQueueElem* elem = makeElemWithID( comms, xwe, 0, // msgID MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */,
rec, rec? rec->channelNo : 0, rec, rec? rec->channelNo : 0, NULL );
NULL );
(void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); (void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
freeElem( comms, elem ); freeElem( comms, elem );
} /* sendEmptyMsg */ } /* sendEmptyMsg */
@ -3712,8 +3875,8 @@ send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp, XP_PlayerAdd
XP_MEMCPY( &buf[1], data, dlen ); XP_MEMCPY( &buf[1], data, dlen );
} }
nSent = (*comms->procs.send)( xwe, buf, dlen+1, msgNo, 0, nSent = (*comms->procs.sendMsg)( xwe, buf, dlen+1, msgNo, 0, addr, typ,
addr, typ, gameID(comms), comms->procs.closure ); gameID(comms), comms->procs.closure );
XP_FREE( comms->mpool, buf ); XP_FREE( comms->mpool, buf );
setHeartbeatTimer( comms ); setHeartbeatTimer( comms );

View file

@ -74,11 +74,17 @@ typedef enum {
# define IF_CH(a) # define IF_CH(a)
#endif #endif
typedef XP_S16 (*TransportSend)( XWEnv xwe, const XP_U8* buf, XP_U16 len, #ifdef XWFEATURE_COMMS_INVITE
const XP_UCHAR* msgNo, XP_U32 createdStamp, typedef XP_S16 (*TransportSendInvt)( XWEnv xwe, const NetLaunchInfo* nli,
const CommsAddrRec* addr, XP_U32 createdStamp,
CommsConnType conType, const CommsAddrRec* addr, void* closure );
XP_U32 gameID, void* closure ); #endif
typedef XP_S16 (*TransportSendMsg)( XWEnv xwe, const XP_U8* buf, XP_U16 len,
const XP_UCHAR* msgNo, XP_U32 createdStamp,
const CommsAddrRec* addr,
CommsConnType conType, XP_U32 gameID,
void* closure );
#ifdef COMMS_HEARTBEAT #ifdef COMMS_HEARTBEAT
typedef void (*TransportReset)( XWEnv xwe, void* closure ); typedef void (*TransportReset)( XWEnv xwe, void* closure );
#endif #endif
@ -119,7 +125,10 @@ typedef struct _TransportProcs {
#else #else
XP_U32 flags; XP_U32 flags;
#endif #endif
TransportSend send; TransportSendMsg sendMsg;
#ifdef XWFEATURE_COMMS_INVITE
TransportSendInvt sendInvt;
#endif
MsgCountChange countChanged; MsgCountChange countChanged;
#ifdef COMMS_HEARTBEAT #ifdef COMMS_HEARTBEAT
TransportReset reset; TransportReset reset;
@ -205,7 +214,10 @@ void comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken );
void addrFromStream( CommsAddrRec* addr, XWStreamCtxt* stream ); void addrFromStream( CommsAddrRec* addr, XWStreamCtxt* stream );
void addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addr ); void addrToStream( XWStreamCtxt* stream, const CommsAddrRec* addr );
#ifdef XWFEATURE_COMMS_INVITE
void comms_invite( CommsCtxt* comms, XWEnv xwe, const NetLaunchInfo* nli,
const CommsAddrRec* destAddr );
#endif
XP_S16 comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream ); XP_S16 comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream );
XP_S16 comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_S16 comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter,
XP_Bool force ); XP_Bool force );

View file

@ -175,9 +175,7 @@ typedef struct PoolContext PoolContext;
typedef struct XW_UtilCtxt XW_UtilCtxt; typedef struct XW_UtilCtxt XW_UtilCtxt;
typedef struct XW_DUtilCtxt XW_DUtilCtxt; typedef struct XW_DUtilCtxt XW_DUtilCtxt;
/* Low two bits treated as channel, third as short-term flag indicating /* Opaque bitfield type meant to be parsed only inside comms.c */
* sender's role; rest can be random to aid detection of duplicate packets. */
#define CHANNEL_MASK 0x0003
typedef XP_U16 XP_PlayerAddr; typedef XP_U16 XP_PlayerAddr;
typedef enum { typedef enum {

View file

@ -132,6 +132,7 @@ DEFINES += -DCOMMS_XPORT_FLAGSPROC
DEFINES += -DINITIAL_CLIENT_VERS=3 DEFINES += -DINITIAL_CLIENT_VERS=3
DEFINES += -DCOMMON_LAYOUT DEFINES += -DCOMMON_LAYOUT
DEFINES += -DNATIVE_NLI DEFINES += -DNATIVE_NLI
DEFINES += -DXWFEATURE_COMMS_INVITE
# DEFINES += -DRELAY_VIA_HTTP # DEFINES += -DRELAY_VIA_HTTP
# MAX_ROWS controls STREAM_VERS_BIGBOARD and with it move hashing # MAX_ROWS controls STREAM_VERS_BIGBOARD and with it move hashing

View file

@ -381,7 +381,10 @@ commonInit( CursesBoardState* cbState, sqlite3_int64 rowid,
cGlobals->addAcceptor = curses_socket_acceptor; cGlobals->addAcceptor = curses_socket_acceptor;
bGlobals->procs.closure = cGlobals; bGlobals->procs.closure = cGlobals;
bGlobals->procs.send = LINUX_SEND; bGlobals->procs.sendMsg = linux_send;
#ifdef XWFEATURE_COMMS_INVITE
bGlobals->procs.sendInvt = linux_send_invt;
#endif
#ifdef COMMS_HEARTBEAT #ifdef COMMS_HEARTBEAT
bGlobals->procs.reset = linux_reset; bGlobals->procs.reset = linux_reset;
#endif #endif
@ -1215,7 +1218,7 @@ inviteList( CommonGlobals* cGlobals, CommsAddrRec* addr, GSList* invitees,
MQTTDevID devID; MQTTDevID devID;
const gchar* str = g_slist_nth_data( invitees, ii ); const gchar* str = g_slist_nth_data( invitees, ii );
if ( strToMQTTCDevID( str, &devID ) ) { if ( strToMQTTCDevID( str, &devID ) ) {
mqttc_invite( params, &nli, &devID ); mqttc_invite( params, 0, &nli, &devID );
} else { } else {
XP_LOGFF( "unable to convert devid %s", str ); XP_LOGFF( "unable to convert devid %s", str );
} }
@ -1261,11 +1264,19 @@ handleInvite( void* closure, int XP_UNUSED(key) )
} else if ( inviteList( cGlobals, &selfAddr, params->connInfo.mqtt.inviteeDevIDs, COMMS_CONN_MQTT ) ) { } else if ( inviteList( cGlobals, &selfAddr, params->connInfo.mqtt.inviteeDevIDs, COMMS_CONN_MQTT ) ) {
/* do nothing */ /* do nothing */
/* Try sending to self, using the phone number or relayID of this device */ /* Try sending to self, using the phone number or relayID of this device */
/* <<<<<<< HEAD */
} else if ( addr_hasType( &selfAddr, COMMS_CONN_SMS ) ) { } else if ( addr_hasType( &selfAddr, COMMS_CONN_SMS ) ) {
linux_sms_invite( params, &nli, selfAddr.u.sms.phone, selfAddr.u.sms.port ); linux_sms_invite( params, &nli, selfAddr.u.sms.phone, selfAddr.u.sms.port );
} else if ( addr_hasType( &selfAddr, COMMS_CONN_MQTT ) ) { } else if ( addr_hasType( &selfAddr, COMMS_CONN_MQTT ) ) {
mqttc_invite( params, &nli, mqttc_getDevID( params ) ); mqttc_invite( params, &nli, mqttc_getDevID( params ) );
} else if ( addr_hasType( &selfAddr, COMMS_CONN_RELAY ) ) { } else if ( addr_hasType( &selfAddr, COMMS_CONN_RELAY ) ) {
/* ======= */
/* } else if ( addr_hasType( &addr, COMMS_CONN_SMS ) ) { */
/* linux_sms_invite( params, &nli, addr.u.sms.phone, addr.u.sms.port ); */
/* } else if ( addr_hasType( &addr, COMMS_CONN_MQTT ) ) { */
/* mqttc_invite( params, 0, &nli, mqttc_getDevID( params ) ); */
/* } else if ( addr_hasType( &addr, COMMS_CONN_RELAY ) ) { */
/* >>>>>>> d9781d21e (snapshot: mqtt invites for gtk work via comms) */
XP_U32 relayID = linux_getDevIDRelay( params ); XP_U32 relayID = linux_getDevIDRelay( params );
if ( 0 != relayID ) { if ( 0 != relayID ) {
relaycon_invite( params, relayID, NULL, &nli ); relaycon_invite( params, relayID, NULL, &nli );

View file

@ -487,7 +487,10 @@ static void
setTransportProcs( TransportProcs* procs, GtkGameGlobals* globals ) setTransportProcs( TransportProcs* procs, GtkGameGlobals* globals )
{ {
procs->closure = globals; procs->closure = globals;
procs->send = LINUX_SEND; procs->sendMsg = linux_send;
#ifdef XWFEATURE_COMMS_INVITE
procs->sendInvt = linux_send_invt;
#endif
#ifdef COMMS_XPORT_FLAGSPROC #ifdef COMMS_XPORT_FLAGSPROC
procs->getFlags = gtk_getFlags; procs->getFlags = gtk_getFlags;
#endif #endif
@ -844,7 +847,10 @@ new_game_impl( GtkGameGlobals* globals, XP_Bool fireConnDlg )
#endif #endif
TransportProcs procs = { TransportProcs procs = {
.closure = globals, .closure = globals,
.send = LINUX_SEND, .sendMsg = linux_send,
#ifdef XWFEATURE_COMMS_INVITE
.sendInvt = linux_send_invt,
#endif
#ifdef COMMS_HEARTBEAT #ifdef COMMS_HEARTBEAT
.reset = linux_reset, .reset = linux_reset,
#endif #endif
@ -1424,8 +1430,8 @@ static void
send_invites( CommonGlobals* cGlobals, XP_U16 nPlayers, send_invites( CommonGlobals* cGlobals, XP_U16 nPlayers,
const CommsAddrRec* destAddr ) const CommsAddrRec* destAddr )
{ {
CommsAddrRec myAddr = {0};
CommsCtxt* comms = cGlobals->game.comms; CommsCtxt* comms = cGlobals->game.comms;
CommsAddrRec myAddr = {0};
XP_ASSERT( comms ); XP_ASSERT( comms );
comms_getSelfAddr( comms, &myAddr ); comms_getSelfAddr( comms, &myAddr );
@ -1433,11 +1439,13 @@ send_invites( CommonGlobals* cGlobals, XP_U16 nPlayers,
NetLaunchInfo nli = {0}; /* include everything!!! */ NetLaunchInfo nli = {0}; /* include everything!!! */
nli_init( &nli, cGlobals->gi, &myAddr, nPlayers, forceChannel ); nli_init( &nli, cGlobals->gi, &myAddr, nPlayers, forceChannel );
#ifdef XWFEATURE_RELAY
if ( addr_hasType( &myAddr, COMMS_CONN_RELAY ) ) { if ( addr_hasType( &myAddr, COMMS_CONN_RELAY ) ) {
XP_UCHAR buf[32]; XP_UCHAR buf[32];
snprintf( buf, sizeof(buf), "%X", makeRandomInt() ); snprintf( buf, sizeof(buf), "%X", makeRandomInt() );
nli_setInviteID( &nli, buf ); /* PENDING: should not be relay only!!! */ nli_setInviteID( &nli, buf ); /* PENDING: should not be relay only!!! */
} }
#endif
// nli_setDevID( &nli, linux_getDevIDRelay( cGlobals->params ) ); // nli_setDevID( &nli, linux_getDevIDRelay( cGlobals->params ) );
if ( addr_hasType( &myAddr, COMMS_CONN_MQTT ) ) { if ( addr_hasType( &myAddr, COMMS_CONN_MQTT ) ) {
@ -1457,6 +1465,9 @@ send_invites( CommonGlobals* cGlobals, XP_U16 nPlayers,
} }
#endif #endif
#ifdef XWFEATURE_COMMS_INVITE
comms_invite( comms, NULL_XWE, &nli, destAddr );
#else
if ( !!destAddr && '\0' != destAddr->u.sms.phone[0] && 0 < destAddr->u.sms.port ) { if ( !!destAddr && '\0' != destAddr->u.sms.phone[0] && 0 < destAddr->u.sms.port ) {
gchar gameName[64]; gchar gameName[64];
snprintf( gameName, VSIZE(gameName), "Game %d", cGlobals->gi->gameID ); snprintf( gameName, VSIZE(gameName), "Game %d", cGlobals->gi->gameID );
@ -1464,16 +1475,17 @@ send_invites( CommonGlobals* cGlobals, XP_U16 nPlayers,
linux_sms_invite( cGlobals->params, &nli, linux_sms_invite( cGlobals->params, &nli,
destAddr->u.sms.phone, destAddr->u.sms.port ); destAddr->u.sms.phone, destAddr->u.sms.port );
} }
#ifdef XWFEATURE_RELAY # ifdef XWFEATURE_RELAY
if ( 0 != relayDevID || !!relayID ) { if ( 0 != relayDevID || !!relayID ) {
XP_ASSERT( 0 != relayDevID || (!!relayID && !!relayID[0]) ); XP_ASSERT( 0 != relayDevID || (!!relayID && !!relayID[0]) );
relaycon_invite( cGlobals->params, relayDevID, relayID, &nli ); relaycon_invite( cGlobals->params, relayDevID, relayID, &nli );
} }
#endif # endif
if ( addr_hasType( destAddr, COMMS_CONN_MQTT ) ) { if ( addr_hasType( destAddr, COMMS_CONN_MQTT ) ) {
mqttc_invite( cGlobals->params, &nli, &destAddr->u.mqtt.devID ); mqttc_invite( cGlobals->params, 0, &nli, &destAddr->u.mqtt.devID );
} }
#endif
/* while ( gtkaskm( "Invite how many and how?", infos, VSIZE(infos) ) ) { */ /* while ( gtkaskm( "Invite how many and how?", infos, VSIZE(infos) ) ) { */
/* int nPlayers = atoi( countStr ); */ /* int nPlayers = atoi( countStr ); */

View file

@ -1494,6 +1494,28 @@ linux_send( XWEnv XP_UNUSED(xwe), const XP_U8* buf, XP_U16 buflen,
return nSent; return nSent;
} /* linux_send */ } /* linux_send */
XP_S16
linux_send_invt( XWEnv XP_UNUSED(xwe), const NetLaunchInfo* nli,
XP_U32 createdStamp,
const CommsAddrRec* destAddr, void* closure )
{
XP_S16 nSent = -1;
CommonGlobals* cGlobals = (CommonGlobals*)closure;
CommsConnType typ;
for ( XP_U32 st = 0; addr_iter( destAddr, &typ, &st ); ) {
switch ( typ ) {
case COMMS_CONN_MQTT:
mqttc_invite( cGlobals->params, createdStamp, nli,
&destAddr->u.mqtt.devID );
break;
default:
XP_ASSERT(0);
}
}
return nSent;
}
static int static int
blocking_read( int fd, unsigned char* buf, const int len ) blocking_read( int fd, unsigned char* buf, const int len )
{ {

View file

@ -41,12 +41,11 @@ int initListenerSocket( int port );
XP_S16 linux_send( XWEnv xwe, const XP_U8* buf, XP_U16 buflen, XP_S16 linux_send( XWEnv xwe, const XP_U8* buf, XP_U16 buflen,
const XP_UCHAR* msgNo, XP_U32 createdStamp, const XP_UCHAR* msgNo, XP_U32 createdStamp,
const CommsAddrRec* addrRec, const CommsAddrRec* addrRec,
CommsConnType conType, XP_U32 gameID, void* closure ); CommsConnType conType, XP_U32 gameID,
#ifndef XWFEATURE_STANDALONE_ONLY void* closure );
# define LINUX_SEND linux_send XP_S16 linux_send_invt( XWEnv xwe, const NetLaunchInfo* nli,
#else XP_U32 createdStamp,
# define LINUX_SEND NULL const CommsAddrRec* addr, void* closure );
#endif
#ifdef COMMS_HEARTBEAT #ifdef COMMS_HEARTBEAT
void linux_reset( void* closure ); void linux_reset( void* closure );

View file

@ -249,7 +249,8 @@ mqttc_getDevIDStr( LaunchParams* params )
} }
void void
mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* invitee ) mqttc_invite( LaunchParams* params, XP_U32 timestamp, const NetLaunchInfo* nli,
const MQTTDevID* invitee )
{ {
MQTTConStorage* storage = getStorage( params ); MQTTConStorage* storage = getStorage( params );
#ifdef DEBUG #ifdef DEBUG
@ -257,6 +258,7 @@ mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* invitee
XP_LOGFF( "need to send to %s", formatMQTTDevID(invitee, buf, sizeof(buf) ) ); XP_LOGFF( "need to send to %s", formatMQTTDevID(invitee, buf, sizeof(buf) ) );
XP_ASSERT( 16 == strlen(buf) ); XP_ASSERT( 16 == strlen(buf) );
#endif #endif
XP_USE( timestamp );
XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool) XWStreamCtxt* stream = mem_stream_make_raw( MPPARM(params->mpool)
params->vtMgr ); params->vtMgr );

View file

@ -28,7 +28,8 @@ void mqttc_cleanup( LaunchParams* params );
const MQTTDevID* mqttc_getDevID( LaunchParams* params ); const MQTTDevID* mqttc_getDevID( LaunchParams* params );
const gchar* mqttc_getDevIDStr( LaunchParams* params ); const gchar* mqttc_getDevIDStr( LaunchParams* params );
void mqttc_invite( LaunchParams* params, NetLaunchInfo* nli, const MQTTDevID* mqttInvitee ); void mqttc_invite( LaunchParams* params, XP_U32 timestamp,
const NetLaunchInfo* nli, const MQTTDevID* mqttInvitee );
XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID, XP_U32 timestamp, XP_S16 mqttc_send( LaunchParams* params, XP_U32 gameID, XP_U32 timestamp,
const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee ); const XP_U8* buf, XP_U16 len, const MQTTDevID* addressee );
void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID ); void mqttc_notifyGameGone( LaunchParams* params, const MQTTDevID* addressee, XP_U32 gameID );