shrink the comms header

Sending in 32 bits what can probably fit in 8 -- twice -- is dumb. As is
sending the gameID (as connID) when it's already known. In 2016 I added
a fixed marker, so I can count on all current code sending that. If I
fail to find that, I know the sender is new (current release or better.)
In that case, use 12 bits instead of 32 for two fields, drop the marker
and the connID. Result is that an ACK's size, including MQTT headers,
drops from 28 bytes to 21.
This commit is contained in:
Eric House 2021-12-21 16:23:24 -08:00
parent 483a49ad67
commit 2b7daf067d
2 changed files with 165 additions and 80 deletions

View file

@ -40,9 +40,14 @@
#ifndef COMMS_VERSION
# define COMMS_VERSION 0
#endif
/* Flags: 7 bits of header len, 5 bits of flags (2 used so far), and 4 bits of
comms version */
#define VERSION_BITS 0x000F
#define IS_SERVER_BIT 0x0010
#define CAN_SMALLHEADER_BIT 0x0020
#define NO_CONNID_BIT 0x0040
#define HEADER_LEN_OFFSET 9
#ifndef XWFEATURE_STANDALONE_ONLY
@ -90,6 +95,7 @@ typedef struct AddressRecord {
MsgID lastMsgSaved;
/* only used if COMMS_HEARTBEAT set except for serialization (to_stream) */
XP_PlayerAddr channelNo;
XP_U16 flags; /* only using CAN_SMALLHEADER_BIT */
struct {
XWHostID hostID; /* used for relay case */
} rr;
@ -99,6 +105,7 @@ typedef struct AddressRecord {
} AddressRecord;
#define ADDRESSRECORD_SIZE_68K 20
#define MSGID_NBITS 12
struct CommsCtxt {
XW_UtilCtxt* util;
@ -126,7 +133,7 @@ struct CommsCtxt {
XP_U16 queueLen;
XP_U16 channelSeed; /* tries to be unique per device to aid
dupe elimination at start */
XP_U32 nextResend;
XP_U32 nextResend; /* timestamp */
XP_U16 resendBackoff;
#ifdef COMMS_HEARTBEAT
@ -197,8 +204,8 @@ typedef enum {
****************************************************************************/
static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe,
XP_PlayerAddr channelNo,
XWHostID id,
const CommsAddrRec* addr );
XWHostID id,
const CommsAddrRec* addr, XP_U16 flags );
static void augmentChannelAddr( CommsCtxt* comms, AddressRecord* rec,
const CommsAddrRec* addr, XWHostID hostID );
static XP_Bool augmentAddrIntrnl( CommsCtxt* comms, CommsAddrRec* dest,
@ -218,7 +225,7 @@ static XP_U16 countAddrRecs( const CommsCtxt* comms );
static void sendConnect( CommsCtxt* comms, XWEnv xwe, XP_Bool breakExisting );
static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe );
#if 0 < COMMS_VERSION
static XP_U16 makeFlags( const CommsCtxt* comms );
static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen );
#endif
static XP_Bool sendNoConn( CommsCtxt* comms, XWEnv xwe,
@ -680,7 +687,6 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
MsgQueueElem** prevsQueueNext;
XP_U16 version = stream_getVersion( stream );
CommsAddrRec addr;
short ii;
XP_U8 flags = stream_getU8( stream );
if ( version < STREAM_VERS_GICREATED ) {
@ -730,7 +736,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
XP_U16 nAddrRecs = stream_getU8( stream );
prevsAddrNext = &comms->recs;
for ( ii = 0; ii < nAddrRecs; ++ii ) {
for ( int ii = 0; ii < nAddrRecs; ++ii ) {
AddressRecord* rec = (AddressRecord*)XP_CALLOC( mpool, sizeof(*rec));
addrFromStream( &rec->addr, stream );
@ -738,6 +744,9 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
rec->nextMsgID = stream_getU16( stream );
rec->lastMsgSaved = rec->lastMsgRcd = stream_getU16( stream );
if ( STREAM_VERS_SMALLCOMMS <= version ) {
rec->flags = stream_getU16( stream );
}
#ifdef LOG_COMMS_MSGNOS
XP_LOGF( "%s(): read lastMsgRcd of %d for addr %d", __func__, rec->lastMsgRcd, ii );
#endif
@ -754,11 +763,15 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream,
}
prevsQueueNext = &comms->msgQueueHead;
for ( ii = 0; ii < comms->queueLen; ++ii ) {
for ( int ii = 0; ii < comms->queueLen; ++ii ) {
MsgQueueElem* msg = (MsgQueueElem*)XP_CALLOC( mpool, sizeof(*msg) );
msg->channelNo = stream_getU16( stream );
msg->msgID = stream_getU32( stream );
if ( version >= STREAM_VERS_SMALLCOMMS ) {
msg->msgID = stream_getU16( stream );
} else {
msg->msgID = stream_getU32( stream );
}
#ifdef DEBUG
msg->sendCount = 0;
#endif
@ -970,8 +983,9 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe),
stream_putU16( stream, (XP_U16)rec->nextMsgID );
stream_putU16( stream, (XP_U16)rec->lastMsgRcd );
stream_putU16( stream, rec->flags );
#ifdef LOG_COMMS_MSGNOS
XP_LOGF( "%s(): wrote lastMsgRcd of %d for addr %d", __func__, rec->lastMsgRcd, ii++ );
XP_LOGFF( "wrote lastMsgRcd of %d for addr %d", rec->lastMsgRcd, ii++ );
#endif
stream_putU16( stream, (XP_U16)rec->lastMsgAckd );
stream_putU16( stream, rec->channelNo );
@ -982,7 +996,7 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe),
for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) {
stream_putU16( stream, msg->channelNo );
stream_putU32( stream, msg->msgID );
stream_putU16( stream, msg->msgID );
stream_putU16( stream, msg->len );
stream_putBytes( stream, msg->msg, msg->len );
@ -1243,48 +1257,67 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
{
CNO_FMT( cbuf, channelNo );
XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf );
XP_U16 headerLen;
XP_U16 streamSize = NULL == stream? 0 : stream_getSize( stream );
MsgID lastMsgSaved = (!!rec)? rec->lastMsgSaved : 0;
MsgQueueElem* newMsgElem;
XWStreamCtxt* hdrStream;
newMsgElem = (MsgQueueElem*)XP_MALLOC( comms->mpool,
sizeof( *newMsgElem ) );
MsgQueueElem* newMsgElem = (MsgQueueElem*)XP_CALLOC( comms->mpool,
sizeof( *newMsgElem ) );
newMsgElem->channelNo = channelNo;
newMsgElem->msgID = msgID;
#ifdef DEBUG
newMsgElem->sendCount = 0;
#endif
hdrStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
stream_open( hdrStream );
#if 0 < COMMS_VERSION
stream_putU16( hdrStream, HAS_VERSION_FLAG );
stream_putU16( hdrStream, makeFlags( comms ) );
#endif
XP_LOGFF( TAGFMT() "putting connID %x", TAGPRMS, comms->connID );
stream_putU32( hdrStream, comms->connID );
XP_Bool useSmallHeader = !!rec && 0 != (rec->flags & CAN_SMALLHEADER_BIT);
XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
XP_ASSERT( 0 < COMMS_VERSION );
XP_ASSERT( 0L == comms->connID || comms->connID == comms->util->gameInfo->gameID );
if ( !useSmallHeader ) {
XP_LOGFF( TAGFMT() "putting connID %x", TAGPRMS, comms->connID );
stream_putU32( hdrStream, comms->connID );
}
stream_putU16( hdrStream, channelNo );
stream_putU32( hdrStream, msgID );
if ( useSmallHeader ) {
stream_putBits( hdrStream, MSGID_NBITS, msgID );
stream_putBits( hdrStream, MSGID_NBITS, lastMsgSaved );
#if 0 && defined DEBUG
/* Test receiver's ability to skip unexpected header fields */
stream_putU8( hdrStream, 0x01 );
stream_putU8( hdrStream, 0x02 );
stream_putU8( hdrStream, 0x03 );
#endif
} else {
stream_putU32( hdrStream, msgID );
stream_putU32( hdrStream, lastMsgSaved );
}
XP_LOGFF( TAGFMT() "put lastMsgSaved: %d", TAGPRMS, lastMsgSaved );
stream_putU32( hdrStream, lastMsgSaved );
if ( !!rec ) {
rec->lastMsgAckd = lastMsgSaved;
}
headerLen = stream_getSize( hdrStream );
newMsgElem->len = streamSize + headerLen;
newMsgElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newMsgElem->len );
stream_getBytes( hdrStream, newMsgElem->msg, headerLen );
stream_destroy( hdrStream, xwe );
if ( 0 < streamSize ) {
stream_getBytes( stream, newMsgElem->msg + headerLen, streamSize );
/* Now we'll use a third stream to combine them all */
XP_U16 headerLen = stream_getSize( hdrStream );
XP_U16 flags = makeFlags( comms, headerLen );
XWStreamCtxt* msgStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
if ( useSmallHeader ) {
XP_ASSERT( HAS_VERSION_FLAG != flags );
} else {
stream_putU16( msgStream, HAS_VERSION_FLAG );
}
stream_putU16( msgStream, flags );
stream_getFromStream( msgStream, hdrStream, stream_getSize(hdrStream) );
stream_destroy( hdrStream, xwe );
if ( 0 < streamSize ) {
stream_getFromStream( msgStream, stream, streamSize );
}
newMsgElem->len = stream_getSize( msgStream );
newMsgElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newMsgElem->len );
stream_getBytes( msgStream, newMsgElem->msg, newMsgElem->len );
stream_destroy( msgStream, xwe );
#ifdef COMMS_CHECKSUM
newMsgElem->checksum = dutil_md5sum( comms->dutil, xwe, newMsgElem->msg,
@ -2246,7 +2279,7 @@ static AddressRecord*
validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
XP_Bool XP_UNUSED_HEARTBEAT(hasPayload),
const CommsAddrRec* addr, XWHostID senderID,
XP_PlayerAddr* channelNo )
XP_PlayerAddr* channelNo, XP_U16 flags )
{
CNO_FMT( cbuf, *channelNo );
XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf );
@ -2284,7 +2317,7 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
XP_LOGFF( TAGFMT() "ORd channel onto channelNo: now %s", TAGPRMS, cbuf1 );
XP_ASSERT( comms->nextChannelNo <= CHANNEL_MASK );
}
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr );
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, flags );
if ( hasPayload ) {
rec->initialSeen = XP_TRUE;
} else {
@ -2315,7 +2348,7 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
goto errExit;
}
}
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr );
rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, flags );
}
}
errExit:
@ -2325,34 +2358,26 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
#if 0 < COMMS_VERSION
static XP_U16
makeFlags( const CommsCtxt* comms )
makeFlags( const CommsCtxt* comms, XP_U16 headerLen )
{
XP_U16 flags = COMMS_VERSION;
if ( comms->isServer ) {
flags |= IS_SERVER_BIT;
}
XP_LOGFF( TAGFMT() "=>%x", TAGPRMS, flags );
flags |= CAN_SMALLHEADER_BIT;
if ( CONN_ID_NONE == comms->connID ) {
flags |= NO_CONNID_BIT;
}
XP_ASSERT( headerLen == ((headerLen << HEADER_LEN_OFFSET) >> HEADER_LEN_OFFSET) );
flags |= headerLen << HEADER_LEN_OFFSET;
return flags;
}
#else
error( "I don't support that case now!!!" ) /* syntax error too :-) */
#endif
static XP_Bool
getFlags( XWStreamCtxt* stream, XP_U32* connIDP, XP_U16* flagsP )
{
XP_U16 flags = 0;
XWStreamPos pos = stream_getPos( stream, POS_READ );
XP_U16 marker = stream_getU16( stream );
if ( HAS_VERSION_FLAG == marker ) {
flags = stream_getU16( stream );
XP_LOGF( "%s: found marker; read flags %x", __func__, flags );
} else {
stream_setPos( stream, POS_READ, pos );
}
*connIDP = stream_getU32( stream );
*flagsP = flags;
return XP_TRUE;
}
/* Messages with established connIDs are valid only if they have the msgID
* that's expected on that channel. Their addresses need to match what we
* have for that channel, and in fact we'll overwrite what we have in case a
@ -2447,15 +2472,42 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
TAGPRMS, state->len, state->sum );
XP_FREE( comms->mpool, tmpsum );
#endif
XP_U16 sizeAtHeaderStart; /* stream len at beginning of comms header */
XP_U16 headerLen = 0;
XP_U16 flags;
XP_Bool usingSmallHeader = XP_FALSE;
XP_U16 streamSize = stream_getSize( stream );
/* reject too-small message */
messageValid = stream_getSize( stream )
>= (sizeof(connID) + sizeof(channelNo)
+ sizeof(msgID) + sizeof(lastMsgRcd));
if ( messageValid ) {
XP_U16 payloadSize = 0;
messageValid = streamSize >= sizeof(channelNo) + sizeof(flags);
XP_U16 flags;
(void)getFlags( stream, &connID, &flags );
if ( messageValid ) {
/* If BEEF is next sender is using old format. Otherwise
assume the bits are flags and BEEF is skipped by newer
code. Should work with anything newer then six years
ago. */
flags = stream_getU16( stream );
usingSmallHeader = HAS_VERSION_FLAG != flags;
if ( usingSmallHeader ) {
headerLen = flags >> HEADER_LEN_OFFSET;
} else {
flags = stream_getU16( stream ); /* flags are the next short */
}
sizeAtHeaderStart = stream_getSize(stream);
XP_ASSERT( headerLen <= sizeAtHeaderStart );
messageValid = headerLen <= sizeAtHeaderStart;
}
if ( messageValid ) {
if ( usingSmallHeader ) {
if ( 0 == (flags & NO_CONNID_BIT) ) {
connID = comms->util->gameInfo->gameID;
} else {
connID = CONN_ID_NONE;
}
} else {
connID = stream_getU32( stream );
}
XP_LOGFF( TAGFMT() "read connID (gameID) of %x", TAGPRMS, connID );
channelNo = stream_getU16( stream );
@ -2481,12 +2533,42 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
}
if ( messageValid ) {
state->msgID = msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
streamSize = stream_getSize( stream );
if ( usingSmallHeader ) {
messageValid = streamSize >= ((MSGID_NBITS*2)+7)/8;
} else {
messageValid = streamSize >= sizeof(msgID) + sizeof(lastMsgRcd);
}
}
if ( messageValid ) {
if ( usingSmallHeader ) {
msgID = stream_getBits( stream, MSGID_NBITS );
lastMsgRcd = stream_getBits( stream, MSGID_NBITS );
} else {
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
}
state->msgID = msgID;
CNO_FMT( cbuf, channelNo );
XP_LOGFF( TAGFMT() "rcd on %s: msgID=%d, lastMsgRcd=%d ",
TAGPRMS, cbuf, msgID, lastMsgRcd );
payloadSize = stream_getSize( stream ); /* anything left? */
/* Now consume anything of the header we don't
recognize. We have the size at start of header. We want
the size to be that plus header len. */
if ( 0 < headerLen ) {
XP_U16 goal = sizeAtHeaderStart - headerLen;
for ( ; ; ) {
XP_ASSERT( goal <= stream_getSize( stream ) );
if ( goal == stream_getSize( stream ) ) {
break;
}
XP_LOGFF( "discarding one byte of header" );
(void)stream_getU8( stream );
}
}
streamSize = stream_getSize( stream ); /* anything left? */
} else {
XP_LOGFF( TAGFMT() "got message to self?", TAGPRMS );
}
@ -2495,8 +2577,8 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
if ( messageValid ) {
if ( connID == CONN_ID_NONE ) {
/* special case: initial message from client or server */
rec = validateInitialMessage( comms, xwe, payloadSize > 0, retAddr,
senderID, &channelNo );
rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr,
senderID, &channelNo, flags );
state->rec = rec;
} else if ( comms->connID == connID ) {
rec = validateChannelMessage( comms, xwe, retAddr, channelNo,
@ -2512,11 +2594,11 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
if ( messageValid ) {
CNO_FMT( cbuf, channelNo );
XP_LOGFF( TAGFMT() "got %s; msgID=%d; len=%d", TAGPRMS, cbuf,
msgID, payloadSize );
msgID, streamSize );
state->channelNo = channelNo;
comms->lastSaveToken = 0; /* lastMsgRcd no longer valid */
stream_setAddress( stream, channelNo );
messageValid = payloadSize > 0;
messageValid = streamSize > 0;
resetBackoff( comms );
}
} else {
@ -2866,7 +2948,7 @@ comms_getAddrDisabled( const CommsCtxt* comms, CommsConnType typ,
static AddressRecord*
rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo,
XWHostID hostID, const CommsAddrRec* addr )
XWHostID hostID, const CommsAddrRec* addr, XP_U16 flags )
{
CNO_FMT( cbuf, channelNo );
XP_LOGF( "%s(%s)", __func__, cbuf );
@ -2880,10 +2962,12 @@ rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo,
rec->channelNo = channelNo;
rec->rr.hostID = hostID;
rec->flags = flags & CAN_SMALLHEADER_BIT;
rec->next = comms->recs;
comms->recs = rec;
XP_LOGF( "%s() creating rec %p for %s, hostID = %d", __func__,
rec, cbuf, hostID );
XP_LOGFF( "creating rec %p for %s, hostID = %d, flags=0x%x",
rec, cbuf, hostID, flags );
}
/* overwrite existing address with new one. I assume that's the right

View file

@ -47,6 +47,7 @@
#endif
#define MAX_COLS MAX_ROWS
#define STREAM_VERS_SMALLCOMMS 0x1F
#define STREAM_VERS_NINETILES 0x1E
#define STREAM_VERS_NOEMPTYDICT 0x1D
#define STREAM_VERS_GICREATED 0x1C /* game struct gets created timestamp */
@ -92,7 +93,7 @@
#define STREAM_VERS_405 0x01
/* search for FIX_NEXT_VERSION_CHANGE next time this is changed */
#define CUR_STREAM_VERS STREAM_VERS_NINETILES
#define CUR_STREAM_VERS STREAM_VERS_SMALLCOMMS
typedef struct XP_Rect {
XP_S16 left;