refactor, and make msgids variable-length

This commit is contained in:
Eric House 2021-12-28 18:49:56 -08:00
parent 863ef88c22
commit b67ab9e32a

View file

@ -1,6 +1,6 @@
/* -*- compile-command: "cd ../linux && make MEMDEBUG=TRUE -j3"; -*- */
/*
* Copyright 2001 - 2020 by Eric House (xwords@eehouse.org). All rights
* Copyright 2001 - 2021 by Eric House (xwords@eehouse.org). All rights
* reserved.
*
* This program is free software; you can redistribute it and/or
@ -41,11 +41,12 @@
# define COMMS_VERSION 0
#endif
/* Flags: 7 bits of header len, 5 bits of flags (2 used so far), and 4 bits of
/* Flags: 7 bits of header len, 6 bits of flags (4 used so far), and 3 bits of
comms version */
#define VERSION_BITS 0x000F
#define VERSION_BITS 0x0007
#define CAN_SMALLHEADER_BIT 0x0008
#define IS_SERVER_BIT 0x0010
#define CAN_SMALLHEADER_BIT 0x0020
#define NO_MSGID_BIT 0x0020
#define NO_CONNID_BIT 0x0040
#define HEADER_LEN_OFFSET 9
@ -105,7 +106,6 @@ typedef struct AddressRecord {
} AddressRecord;
#define ADDRESSRECORD_SIZE_68K 20
#define MSGID_NBITS 12
struct CommsCtxt {
XW_UtilCtxt* util;
@ -225,7 +225,7 @@ static XP_U16 countAddrRecs( const CommsCtxt* comms );
static void sendConnect( CommsCtxt* comms, XWEnv xwe, XP_Bool breakExisting );
static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe );
#if 0 < COMMS_VERSION
static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen );
static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen, MsgID msgID );
#endif
static XP_Bool sendNoConn( CommsCtxt* comms, XWEnv xwe,
@ -1276,8 +1276,10 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
stream_putU16( hdrStream, channelNo );
if ( useSmallHeader ) {
stream_putBits( hdrStream, MSGID_NBITS, msgID );
stream_putBits( hdrStream, MSGID_NBITS, lastMsgSaved );
if ( 0 != msgID ) { /* bit in flags says not there */
stream_putU32VL( hdrStream, msgID );
}
stream_putU32VL( hdrStream, lastMsgSaved );
#if 0 && defined DEBUG
/* Test receiver's ability to skip unexpected header fields */
stream_putU8( hdrStream, 0x01 );
@ -1295,7 +1297,7 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
/* Now we'll use a third stream to combine them all */
XP_U16 headerLen = stream_getSize( hdrStream );
XP_U16 flags = makeFlags( comms, headerLen );
XP_U16 flags = makeFlags( comms, headerLen, msgID );
XWStreamCtxt* msgStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
if ( useSmallHeader ) {
@ -2356,7 +2358,7 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe,
#if 0 < COMMS_VERSION
static XP_U16
makeFlags( const CommsCtxt* comms, XP_U16 headerLen )
makeFlags( const CommsCtxt* comms, XP_U16 headerLen, MsgID msgID )
{
XP_U16 flags = COMMS_VERSION;
if ( comms->isServer ) {
@ -2366,6 +2368,9 @@ makeFlags( const CommsCtxt* comms, XP_U16 headerLen )
if ( CONN_ID_NONE == comms->connID ) {
flags |= NO_CONNID_BIT;
}
if ( 0 == msgID ) {
flags |= NO_MSGID_BIT;
}
XP_ASSERT( headerLen == ((headerLen << HEADER_LEN_OFFSET) >> HEADER_LEN_OFFSET) );
flags |= headerLen << HEADER_LEN_OFFSET;
@ -2417,6 +2422,89 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
return rec;
} /* validateChannelMessage */
typedef struct _HeaderStuff {
XP_U16 flags;
XP_U32 connID;
XP_PlayerAddr channelNo;
MsgID msgID;
MsgID lastMsgRcd;
} HeaderStuff;
static XP_Bool
getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff )
{
XP_Bool messageValid = stream_gotU16( stream, &stuff->channelNo );
if ( messageValid ) {
XP_U16 channelSeed = comms_getChannelSeed( comms );
XP_U16 flags = stuff->flags;
/* First test isn't valid if we haven't passed the bit explicitly */
if ( 0 != flags && (comms->isServer == (0 != (flags & IS_SERVER_BIT))) ) {
XP_LOGFF( TAGFMT() "server bits mismatch; isServer: %d; flags: %x",
TAGPRMS, comms->isServer, flags );
messageValid = XP_FALSE;
} else if ( comms->isServer ) {
/* channelNo comparison invalid */
} else if ( 0 == stuff->channelNo || 0 == channelSeed ) {
XP_LOGFF( TAGFMT() "one of channelNos still 0", TAGPRMS );
XP_ASSERT(0);
} else if ( (stuff->channelNo & ~CHANNEL_MASK) != (channelSeed & ~CHANNEL_MASK) ) {
XP_LOGFF( "channelNos test fails: %x vs %x", stuff->channelNo, channelSeed );
messageValid = XP_FALSE;
}
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
return messageValid;
}
static XP_Bool
parseBeefHeader( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff )
{
XP_Bool messageValid =
stream_gotU16( stream, &stuff->flags ) /* flags are the next short */
&& stream_gotU32( stream, &stuff->connID );
XP_ASSERT((VERSION_BITS & stuff->flags) == COMMS_VERSION); /* changed? deal with this */
XP_LOGFF( TAGFMT() "read connID (gameID) of %x", TAGPRMS, stuff->connID );
messageValid = messageValid
&& getCheckChannelSeed( comms, stream, stuff )
&& stream_gotU32( stream, &stuff->msgID )
&& stream_gotU32( stream, &stuff->lastMsgRcd );
LOG_RETURNF( "%s", boolToStr(messageValid) );
return messageValid;
}
static XP_Bool
parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream,
HeaderStuff* stuff )
{
XP_Bool messageValid = XP_FALSE;
XP_U16 headerLen = stuff->flags >> HEADER_LEN_OFFSET;
XP_ASSERT( 0 < headerLen );
XP_ASSERT( headerLen <= stream_getSize( msgStream ) );
if ( headerLen <= stream_getSize( msgStream ) ) {
XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool)
dutil_getVTManager(comms->dutil));
stream_getFromStream( hdrStream, msgStream, headerLen );
stuff->connID = 0 == (stuff->flags & NO_CONNID_BIT)
? comms->util->gameInfo->gameID : CONN_ID_NONE;
if ( getCheckChannelSeed( comms, hdrStream, stuff ) ) {
XP_ASSERT( stuff->msgID == 0 );
if ( 0 == (stuff->flags & NO_MSGID_BIT) ) {
stuff->msgID = stream_getU32VL( hdrStream );
}
stuff->lastMsgRcd = stream_getU32VL( hdrStream );
messageValid = XP_TRUE;
}
stream_destroy( hdrStream, xwe );
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
return messageValid;
}
XP_Bool
comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
const CommsAddrRec* retAddr, CommsMsgState* state )
@ -2455,11 +2543,6 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
const CommsAddrRec* useAddr = !!retAddr ? retAddr : &comms->addr;
if ( !preProcess( comms, xwe, useAddr, stream, &usingRelay, &senderID ) ) {
XP_U32 connID;
XP_PlayerAddr channelNo;
MsgID msgID = 0; /* shut up compiler */
MsgID lastMsgRcd = 0;
#ifdef COMMS_CHECKSUM
state->len = stream_getSize( stream );
// stream_getPtr pts at base, but sum excludes relay header
@ -2470,137 +2553,58 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
TAGPRMS, state->len, state->sum );
XP_FREE( comms->mpool, tmpsum );
#endif
XP_U16 sizeAtHeaderStart; /* stream len at beginning of comms header */
XP_U16 headerLen = 0;
XP_U16 flags;
XP_Bool usingSmallHeader = XP_FALSE;
XP_U16 streamSize = stream_getSize( stream );
/* reject too-small message */
messageValid = streamSize >= sizeof(channelNo) + sizeof(flags);
HeaderStuff stuff = {0};
messageValid = stream_gotU16( stream, &stuff.flags );
if ( messageValid ) {
/* If BEEF is next sender is using old format. Otherwise
assume the bits are flags and BEEF is skipped by newer
code. Should work with anything newer then six years
ago. */
flags = stream_getU16( stream );
usingSmallHeader = HAS_VERSION_FLAG != flags;
if ( usingSmallHeader ) {
headerLen = flags >> HEADER_LEN_OFFSET;
if ( HAS_VERSION_FLAG == stuff.flags ) {
messageValid = parseBeefHeader( comms, stream, &stuff );
} else {
flags = stream_getU16( stream ); /* flags are the next short */
XP_ASSERT( COMMS_VERSION == (stuff.flags & VERSION_BITS) );
messageValid = parseSmallHeader( comms, xwe, stream, &stuff );
}
sizeAtHeaderStart = stream_getSize(stream);
XP_ASSERT( headerLen <= sizeAtHeaderStart );
messageValid = headerLen <= sizeAtHeaderStart;
}
if ( messageValid ) {
state->msgID = stuff.msgID;
CNO_FMT( cbuf, stuff.channelNo );
XP_LOGFF( TAGFMT() "rcd on %s: msgID=%d, lastMsgRcd=%d ",
TAGPRMS, cbuf, stuff.msgID, stuff.lastMsgRcd );
} else {
XP_LOGFF( TAGFMT() "got message to self?", TAGPRMS );
}
AddressRecord* rec = NULL;
XP_U16 streamSize = stream_getSize( stream ); /* anything left? */
if ( messageValid ) {
if ( usingSmallHeader ) {
if ( 0 == (flags & NO_CONNID_BIT) ) {
connID = comms->util->gameInfo->gameID;
} else {
connID = CONN_ID_NONE;
}
if ( stuff.connID == CONN_ID_NONE ) {
/* special case: initial message from client or server */
rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr,
senderID, &stuff.channelNo, stuff.flags );
state->rec = rec;
} else if ( comms->connID == stuff.connID ) {
rec = validateChannelMessage( comms, xwe, retAddr, stuff.channelNo,
senderID, stuff.msgID, stuff.lastMsgRcd );
} else {
connID = stream_getU32( stream );
XP_LOGFF( TAGFMT() "unexpected connID (%x vs %x) ; "
"dropping message", TAGPRMS, comms->connID, stuff.connID );
}
}
XP_LOGFF( TAGFMT() "read connID (gameID) of %x", TAGPRMS, connID );
channelNo = stream_getU16( stream );
XP_U16 channelSeed = comms_getChannelSeed( comms );
CNO_FMT( cbufX, channelSeed );
CNO_FMT( cbufY, channelNo );
XP_LOGFF( TAGFMT() "my seed %s vs %s!!!", TAGPRMS, cbufX, cbufY );
/* First test isn't valid if we haven't passed the bit explicitly */
if ( 0 != flags && (comms->isServer == (0 != (flags & IS_SERVER_BIT))) ) {
XP_LOGFF( TAGFMT() "server bits mismatch; isServer: %d; flags: %x",
TAGPRMS, comms->isServer, flags );
messageValid = XP_FALSE;
} else if ( comms->isServer ) {
/* channelNo comparison invalid */
} else if ( 0 == channelNo || 0 == channelSeed ) {
XP_LOGFF( TAGFMT() "one of channelNos still 0", TAGPRMS );
XP_ASSERT(0);
} else if ( (channelNo & ~CHANNEL_MASK) != (channelSeed & ~CHANNEL_MASK) ) {
XP_LOGFF( TAGFMT() "channelNos test fails", TAGPRMS );
messageValid = XP_FALSE;
}
if ( messageValid ) {
streamSize = stream_getSize( stream );
if ( usingSmallHeader ) {
messageValid = streamSize >= ((MSGID_NBITS*2)+7)/8;
} else {
messageValid = streamSize >= sizeof(msgID) + sizeof(lastMsgRcd);
}
}
if ( messageValid ) {
if ( usingSmallHeader ) {
msgID = stream_getBits( stream, MSGID_NBITS );
lastMsgRcd = stream_getBits( stream, MSGID_NBITS );
} else {
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
}
state->msgID = msgID;
CNO_FMT( cbuf, channelNo );
XP_LOGFF( TAGFMT() "rcd on %s: msgID=%d, lastMsgRcd=%d ",
TAGPRMS, cbuf, msgID, lastMsgRcd );
/* Now consume anything of the header we don't
recognize. We have the size at start of header. We want
the size to be that plus header len. */
if ( 0 < headerLen ) {
XP_U16 goal = sizeAtHeaderStart - headerLen;
for ( ; ; ) {
XP_ASSERT( goal <= stream_getSize( stream ) );
if ( goal == stream_getSize( stream ) ) {
break;
}
XP_LOGFF( "discarding one byte of header" );
(void)stream_getU8( stream );
}
}
streamSize = stream_getSize( stream ); /* anything left? */
} else {
XP_LOGFF( TAGFMT() "got message to self?", TAGPRMS );
}
AddressRecord* rec = NULL;
if ( messageValid ) {
if ( connID == CONN_ID_NONE ) {
/* special case: initial message from client or server */
rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr,
senderID, &channelNo, flags );
state->rec = rec;
} else if ( comms->connID == connID ) {
rec = validateChannelMessage( comms, xwe, retAddr, channelNo,
senderID, msgID, lastMsgRcd );
} else {
XP_LOGFF( TAGFMT() "unexpected connID (%x vs %x) ; "
"dropping message", TAGPRMS, comms->connID, connID );
}
}
messageValid = messageValid && (NULL != rec)
&& (0 == rec->lastMsgRcd || rec->lastMsgRcd <= msgID);
if ( messageValid ) {
CNO_FMT( cbuf, channelNo );
XP_LOGFF( TAGFMT() "got %s; msgID=%d; len=%d", TAGPRMS, cbuf,
msgID, streamSize );
state->channelNo = channelNo;
comms->lastSaveToken = 0; /* lastMsgRcd no longer valid */
stream_setAddress( stream, channelNo );
messageValid = streamSize > 0;
resetBackoff( comms );
}
} else {
XP_LOGF( "%s: message too small", __func__ );
messageValid = messageValid && (NULL != rec)
&& (0 == rec->lastMsgRcd || rec->lastMsgRcd <= stuff.msgID);
if ( messageValid ) {
CNO_FMT( cbuf, stuff.channelNo );
XP_LOGFF( TAGFMT() "got %s; msgID=%d; len=%d", TAGPRMS, cbuf,
stuff.msgID, streamSize );
state->channelNo = stuff.channelNo;
comms->lastSaveToken = 0; /* lastMsgRcd no longer valid */
stream_setAddress( stream, stuff.channelNo );
messageValid = streamSize > 0;
resetBackoff( comms );
}
}