diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index 142fe432d..28d8b70ac 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -40,9 +40,14 @@ #ifndef COMMS_VERSION # define COMMS_VERSION 0 #endif + +/* Flags: 7 bits of header len, 5 bits of flags (2 used so far), and 4 bits of + comms version */ #define VERSION_BITS 0x000F #define IS_SERVER_BIT 0x0010 - +#define CAN_SMALLHEADER_BIT 0x0020 +#define NO_CONNID_BIT 0x0040 +#define HEADER_LEN_OFFSET 9 #ifndef XWFEATURE_STANDALONE_ONLY @@ -90,6 +95,7 @@ typedef struct AddressRecord { MsgID lastMsgSaved; /* only used if COMMS_HEARTBEAT set except for serialization (to_stream) */ XP_PlayerAddr channelNo; + XP_U16 flags; /* only using CAN_SMALLHEADER_BIT */ struct { XWHostID hostID; /* used for relay case */ } rr; @@ -99,6 +105,7 @@ typedef struct AddressRecord { } AddressRecord; #define ADDRESSRECORD_SIZE_68K 20 +#define MSGID_NBITS 12 struct CommsCtxt { XW_UtilCtxt* util; @@ -126,7 +133,7 @@ struct CommsCtxt { XP_U16 queueLen; XP_U16 channelSeed; /* tries to be unique per device to aid dupe elimination at start */ - XP_U32 nextResend; + XP_U32 nextResend; /* timestamp */ XP_U16 resendBackoff; #ifdef COMMS_HEARTBEAT @@ -197,8 +204,8 @@ typedef enum { ****************************************************************************/ static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, - XWHostID id, - const CommsAddrRec* addr ); + XWHostID id, + const CommsAddrRec* addr, XP_U16 flags ); static void augmentChannelAddr( CommsCtxt* comms, AddressRecord* rec, const CommsAddrRec* addr, XWHostID hostID ); static XP_Bool augmentAddrIntrnl( CommsCtxt* comms, CommsAddrRec* dest, @@ -218,7 +225,7 @@ static XP_U16 countAddrRecs( const CommsCtxt* comms ); static void sendConnect( CommsCtxt* comms, XWEnv xwe, XP_Bool breakExisting ); static void notifyQueueChanged( const CommsCtxt* comms, XWEnv xwe ); #if 0 < COMMS_VERSION -static XP_U16 makeFlags( const CommsCtxt* comms ); +static XP_U16 makeFlags( const CommsCtxt* comms, XP_U16 headerLen ); #endif static XP_Bool sendNoConn( CommsCtxt* comms, XWEnv xwe, @@ -680,7 +687,6 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, MsgQueueElem** prevsQueueNext; XP_U16 version = stream_getVersion( stream ); CommsAddrRec addr; - short ii; XP_U8 flags = stream_getU8( stream ); if ( version < STREAM_VERS_GICREATED ) { @@ -730,7 +736,7 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, XP_U16 nAddrRecs = stream_getU8( stream ); prevsAddrNext = &comms->recs; - for ( ii = 0; ii < nAddrRecs; ++ii ) { + for ( int ii = 0; ii < nAddrRecs; ++ii ) { AddressRecord* rec = (AddressRecord*)XP_CALLOC( mpool, sizeof(*rec)); addrFromStream( &rec->addr, stream ); @@ -738,6 +744,9 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, rec->nextMsgID = stream_getU16( stream ); rec->lastMsgSaved = rec->lastMsgRcd = stream_getU16( stream ); + if ( STREAM_VERS_SMALLCOMMS <= version ) { + rec->flags = stream_getU16( stream ); + } #ifdef LOG_COMMS_MSGNOS XP_LOGF( "%s(): read lastMsgRcd of %d for addr %d", __func__, rec->lastMsgRcd, ii ); #endif @@ -754,11 +763,15 @@ comms_makeFromStream( MPFORMAL XWEnv xwe, XWStreamCtxt* stream, } prevsQueueNext = &comms->msgQueueHead; - for ( ii = 0; ii < comms->queueLen; ++ii ) { + for ( int ii = 0; ii < comms->queueLen; ++ii ) { MsgQueueElem* msg = (MsgQueueElem*)XP_CALLOC( mpool, sizeof(*msg) ); msg->channelNo = stream_getU16( stream ); - msg->msgID = stream_getU32( stream ); + if ( version >= STREAM_VERS_SMALLCOMMS ) { + msg->msgID = stream_getU16( stream ); + } else { + msg->msgID = stream_getU32( stream ); + } #ifdef DEBUG msg->sendCount = 0; #endif @@ -970,8 +983,9 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), stream_putU16( stream, (XP_U16)rec->nextMsgID ); stream_putU16( stream, (XP_U16)rec->lastMsgRcd ); + stream_putU16( stream, rec->flags ); #ifdef LOG_COMMS_MSGNOS - XP_LOGF( "%s(): wrote lastMsgRcd of %d for addr %d", __func__, rec->lastMsgRcd, ii++ ); + XP_LOGFF( "wrote lastMsgRcd of %d for addr %d", rec->lastMsgRcd, ii++ ); #endif stream_putU16( stream, (XP_U16)rec->lastMsgAckd ); stream_putU16( stream, rec->channelNo ); @@ -982,7 +996,7 @@ comms_writeToStream( CommsCtxt* comms, XWEnv XP_UNUSED_DBG(xwe), for ( msg = comms->msgQueueHead; !!msg; msg = msg->next ) { stream_putU16( stream, msg->channelNo ); - stream_putU32( stream, msg->msgID ); + stream_putU16( stream, msg->msgID ); stream_putU16( stream, msg->len ); stream_putBytes( stream, msg->msg, msg->len ); @@ -1243,48 +1257,67 @@ makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec, { CNO_FMT( cbuf, channelNo ); XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf ); - XP_U16 headerLen; XP_U16 streamSize = NULL == stream? 0 : stream_getSize( stream ); MsgID lastMsgSaved = (!!rec)? rec->lastMsgSaved : 0; - MsgQueueElem* newMsgElem; - XWStreamCtxt* hdrStream; - - newMsgElem = (MsgQueueElem*)XP_MALLOC( comms->mpool, - sizeof( *newMsgElem ) ); + MsgQueueElem* newMsgElem = (MsgQueueElem*)XP_CALLOC( comms->mpool, + sizeof( *newMsgElem ) ); newMsgElem->channelNo = channelNo; newMsgElem->msgID = msgID; -#ifdef DEBUG - newMsgElem->sendCount = 0; -#endif - hdrStream = mem_stream_make_raw( MPPARM(comms->mpool) - dutil_getVTManager(comms->dutil)); - stream_open( hdrStream ); -#if 0 < COMMS_VERSION - stream_putU16( hdrStream, HAS_VERSION_FLAG ); - stream_putU16( hdrStream, makeFlags( comms ) ); -#endif - XP_LOGFF( TAGFMT() "putting connID %x", TAGPRMS, comms->connID ); - stream_putU32( hdrStream, comms->connID ); + XP_Bool useSmallHeader = !!rec && 0 != (rec->flags & CAN_SMALLHEADER_BIT); + XWStreamCtxt* hdrStream = mem_stream_make_raw( MPPARM(comms->mpool) + dutil_getVTManager(comms->dutil)); + XP_ASSERT( 0 < COMMS_VERSION ); + + XP_ASSERT( 0L == comms->connID || comms->connID == comms->util->gameInfo->gameID ); + if ( !useSmallHeader ) { + XP_LOGFF( TAGFMT() "putting connID %x", TAGPRMS, comms->connID ); + stream_putU32( hdrStream, comms->connID ); + } stream_putU16( hdrStream, channelNo ); - stream_putU32( hdrStream, msgID ); + + if ( useSmallHeader ) { + stream_putBits( hdrStream, MSGID_NBITS, msgID ); + stream_putBits( hdrStream, MSGID_NBITS, lastMsgSaved ); +#if 0 && defined DEBUG + /* Test receiver's ability to skip unexpected header fields */ + stream_putU8( hdrStream, 0x01 ); + stream_putU8( hdrStream, 0x02 ); + stream_putU8( hdrStream, 0x03 ); +#endif + } else { + stream_putU32( hdrStream, msgID ); + stream_putU32( hdrStream, lastMsgSaved ); + } XP_LOGFF( TAGFMT() "put lastMsgSaved: %d", TAGPRMS, lastMsgSaved ); - stream_putU32( hdrStream, lastMsgSaved ); if ( !!rec ) { rec->lastMsgAckd = lastMsgSaved; } - headerLen = stream_getSize( hdrStream ); - newMsgElem->len = streamSize + headerLen; - newMsgElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newMsgElem->len ); - - stream_getBytes( hdrStream, newMsgElem->msg, headerLen ); - stream_destroy( hdrStream, xwe ); - - if ( 0 < streamSize ) { - stream_getBytes( stream, newMsgElem->msg + headerLen, streamSize ); + /* Now we'll use a third stream to combine them all */ + XP_U16 headerLen = stream_getSize( hdrStream ); + XP_U16 flags = makeFlags( comms, headerLen ); + XWStreamCtxt* msgStream = mem_stream_make_raw( MPPARM(comms->mpool) + dutil_getVTManager(comms->dutil)); + if ( useSmallHeader ) { + XP_ASSERT( HAS_VERSION_FLAG != flags ); + } else { + stream_putU16( msgStream, HAS_VERSION_FLAG ); } + stream_putU16( msgStream, flags ); + + stream_getFromStream( msgStream, hdrStream, stream_getSize(hdrStream) ); + stream_destroy( hdrStream, xwe ); + + if ( 0 < streamSize ) { + stream_getFromStream( msgStream, stream, streamSize ); + } + + newMsgElem->len = stream_getSize( msgStream ); + newMsgElem->msg = (XP_U8*)XP_MALLOC( comms->mpool, newMsgElem->len ); + stream_getBytes( msgStream, newMsgElem->msg, newMsgElem->len ); + stream_destroy( msgStream, xwe ); #ifdef COMMS_CHECKSUM newMsgElem->checksum = dutil_md5sum( comms->dutil, xwe, newMsgElem->msg, @@ -2246,7 +2279,7 @@ static AddressRecord* validateInitialMessage( CommsCtxt* comms, XWEnv xwe, XP_Bool XP_UNUSED_HEARTBEAT(hasPayload), const CommsAddrRec* addr, XWHostID senderID, - XP_PlayerAddr* channelNo ) + XP_PlayerAddr* channelNo, XP_U16 flags ) { CNO_FMT( cbuf, *channelNo ); XP_LOGFF( TAGFMT(%s), TAGPRMS, cbuf ); @@ -2284,7 +2317,7 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe, XP_LOGFF( TAGFMT() "ORd channel onto channelNo: now %s", TAGPRMS, cbuf1 ); XP_ASSERT( comms->nextChannelNo <= CHANNEL_MASK ); } - rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr ); + rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, flags ); if ( hasPayload ) { rec->initialSeen = XP_TRUE; } else { @@ -2315,7 +2348,7 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe, goto errExit; } } - rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr ); + rec = rememberChannelAddress( comms, xwe, *channelNo, senderID, addr, flags ); } } errExit: @@ -2325,34 +2358,26 @@ validateInitialMessage( CommsCtxt* comms, XWEnv xwe, #if 0 < COMMS_VERSION static XP_U16 -makeFlags( const CommsCtxt* comms ) +makeFlags( const CommsCtxt* comms, XP_U16 headerLen ) { XP_U16 flags = COMMS_VERSION; if ( comms->isServer ) { flags |= IS_SERVER_BIT; } - XP_LOGFF( TAGFMT() "=>%x", TAGPRMS, flags ); + flags |= CAN_SMALLHEADER_BIT; + if ( CONN_ID_NONE == comms->connID ) { + flags |= NO_CONNID_BIT; + } + + XP_ASSERT( headerLen == ((headerLen << HEADER_LEN_OFFSET) >> HEADER_LEN_OFFSET) ); + flags |= headerLen << HEADER_LEN_OFFSET; + return flags; } +#else +error( "I don't support that case now!!!" ) /* syntax error too :-) */ #endif -static XP_Bool -getFlags( XWStreamCtxt* stream, XP_U32* connIDP, XP_U16* flagsP ) -{ - XP_U16 flags = 0; - XWStreamPos pos = stream_getPos( stream, POS_READ ); - XP_U16 marker = stream_getU16( stream ); - if ( HAS_VERSION_FLAG == marker ) { - flags = stream_getU16( stream ); - XP_LOGF( "%s: found marker; read flags %x", __func__, flags ); - } else { - stream_setPos( stream, POS_READ, pos ); - } - *connIDP = stream_getU32( stream ); - *flagsP = flags; - return XP_TRUE; -} - /* Messages with established connIDs are valid only if they have the msgID * that's expected on that channel. Their addresses need to match what we * have for that channel, and in fact we'll overwrite what we have in case a @@ -2447,15 +2472,42 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, TAGPRMS, state->len, state->sum ); XP_FREE( comms->mpool, tmpsum ); #endif + XP_U16 sizeAtHeaderStart; /* stream len at beginning of comms header */ + XP_U16 headerLen = 0; + XP_U16 flags; + XP_Bool usingSmallHeader = XP_FALSE; + XP_U16 streamSize = stream_getSize( stream ); /* reject too-small message */ - messageValid = stream_getSize( stream ) - >= (sizeof(connID) + sizeof(channelNo) - + sizeof(msgID) + sizeof(lastMsgRcd)); - if ( messageValid ) { - XP_U16 payloadSize = 0; + messageValid = streamSize >= sizeof(channelNo) + sizeof(flags); - XP_U16 flags; - (void)getFlags( stream, &connID, &flags ); + if ( messageValid ) { + /* If BEEF is next sender is using old format. Otherwise + assume the bits are flags and BEEF is skipped by newer + code. Should work with anything newer then six years + ago. */ + flags = stream_getU16( stream ); + usingSmallHeader = HAS_VERSION_FLAG != flags; + if ( usingSmallHeader ) { + headerLen = flags >> HEADER_LEN_OFFSET; + } else { + flags = stream_getU16( stream ); /* flags are the next short */ + } + + sizeAtHeaderStart = stream_getSize(stream); + XP_ASSERT( headerLen <= sizeAtHeaderStart ); + messageValid = headerLen <= sizeAtHeaderStart; + } + + if ( messageValid ) { + if ( usingSmallHeader ) { + if ( 0 == (flags & NO_CONNID_BIT) ) { + connID = comms->util->gameInfo->gameID; + } else { + connID = CONN_ID_NONE; + } + } else { + connID = stream_getU32( stream ); + } XP_LOGFF( TAGFMT() "read connID (gameID) of %x", TAGPRMS, connID ); channelNo = stream_getU16( stream ); @@ -2481,12 +2533,42 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, } if ( messageValid ) { - state->msgID = msgID = stream_getU32( stream ); - lastMsgRcd = stream_getU32( stream ); + streamSize = stream_getSize( stream ); + if ( usingSmallHeader ) { + messageValid = streamSize >= ((MSGID_NBITS*2)+7)/8; + } else { + messageValid = streamSize >= sizeof(msgID) + sizeof(lastMsgRcd); + } + } + + if ( messageValid ) { + if ( usingSmallHeader ) { + msgID = stream_getBits( stream, MSGID_NBITS ); + lastMsgRcd = stream_getBits( stream, MSGID_NBITS ); + } else { + msgID = stream_getU32( stream ); + lastMsgRcd = stream_getU32( stream ); + } + state->msgID = msgID; CNO_FMT( cbuf, channelNo ); XP_LOGFF( TAGFMT() "rcd on %s: msgID=%d, lastMsgRcd=%d ", TAGPRMS, cbuf, msgID, lastMsgRcd ); - payloadSize = stream_getSize( stream ); /* anything left? */ + + /* Now consume anything of the header we don't + recognize. We have the size at start of header. We want + the size to be that plus header len. */ + if ( 0 < headerLen ) { + XP_U16 goal = sizeAtHeaderStart - headerLen; + for ( ; ; ) { + XP_ASSERT( goal <= stream_getSize( stream ) ); + if ( goal == stream_getSize( stream ) ) { + break; + } + XP_LOGFF( "discarding one byte of header" ); + (void)stream_getU8( stream ); + } + } + streamSize = stream_getSize( stream ); /* anything left? */ } else { XP_LOGFF( TAGFMT() "got message to self?", TAGPRMS ); } @@ -2495,8 +2577,8 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, if ( messageValid ) { if ( connID == CONN_ID_NONE ) { /* special case: initial message from client or server */ - rec = validateInitialMessage( comms, xwe, payloadSize > 0, retAddr, - senderID, &channelNo ); + rec = validateInitialMessage( comms, xwe, streamSize > 0, retAddr, + senderID, &channelNo, flags ); state->rec = rec; } else if ( comms->connID == connID ) { rec = validateChannelMessage( comms, xwe, retAddr, channelNo, @@ -2512,11 +2594,11 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, if ( messageValid ) { CNO_FMT( cbuf, channelNo ); XP_LOGFF( TAGFMT() "got %s; msgID=%d; len=%d", TAGPRMS, cbuf, - msgID, payloadSize ); + msgID, streamSize ); state->channelNo = channelNo; comms->lastSaveToken = 0; /* lastMsgRcd no longer valid */ stream_setAddress( stream, channelNo ); - messageValid = payloadSize > 0; + messageValid = streamSize > 0; resetBackoff( comms ); } } else { @@ -2866,7 +2948,7 @@ comms_getAddrDisabled( const CommsCtxt* comms, CommsConnType typ, static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, - XWHostID hostID, const CommsAddrRec* addr ) + XWHostID hostID, const CommsAddrRec* addr, XP_U16 flags ) { CNO_FMT( cbuf, channelNo ); XP_LOGF( "%s(%s)", __func__, cbuf ); @@ -2880,10 +2962,12 @@ rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, rec->channelNo = channelNo; rec->rr.hostID = hostID; + rec->flags = flags & CAN_SMALLHEADER_BIT; + rec->next = comms->recs; comms->recs = rec; - XP_LOGF( "%s() creating rec %p for %s, hostID = %d", __func__, - rec, cbuf, hostID ); + XP_LOGFF( "creating rec %p for %s, hostID = %d, flags=0x%x", + rec, cbuf, hostID, flags ); } /* overwrite existing address with new one. I assume that's the right diff --git a/xwords4/common/comtypes.h b/xwords4/common/comtypes.h index 57fd823a0..e4ce74ce9 100644 --- a/xwords4/common/comtypes.h +++ b/xwords4/common/comtypes.h @@ -47,6 +47,7 @@ #endif #define MAX_COLS MAX_ROWS +#define STREAM_VERS_SMALLCOMMS 0x1F #define STREAM_VERS_NINETILES 0x1E #define STREAM_VERS_NOEMPTYDICT 0x1D #define STREAM_VERS_GICREATED 0x1C /* game struct gets created timestamp */ @@ -92,7 +93,7 @@ #define STREAM_VERS_405 0x01 /* search for FIX_NEXT_VERSION_CHANGE next time this is changed */ -#define CUR_STREAM_VERS STREAM_VERS_NINETILES +#define CUR_STREAM_VERS STREAM_VERS_SMALLCOMMS typedef struct XP_Rect { XP_S16 left;