track what's been ack'd, and add method to ack what hasn't been.

This commit is contained in:
Eric House 2012-04-13 20:21:59 -07:00
parent 4ffdeefe3b
commit f22d9530b1
4 changed files with 60 additions and 12 deletions

View file

@ -66,7 +66,8 @@ typedef struct AddressRecord {
struct AddressRecord* next; struct AddressRecord* next;
CommsAddrRec addr; CommsAddrRec addr;
MsgID nextMsgID; /* on a per-channel basis */ MsgID nextMsgID; /* on a per-channel basis */
MsgID lastMsgRcd; /* on a per-channel basis */ MsgID lastMsgRcd; /* on a per-channel basis */
MsgID lastMsgAckd; /* on a per-channel basis */
/* only used if COMMS_HEARTBEAT set except for serialization (to_stream) */ /* only used if COMMS_HEARTBEAT set except for serialization (to_stream) */
XP_PlayerAddr channelNo; XP_PlayerAddr channelNo;
struct { struct {
@ -180,12 +181,17 @@ static void set_reset_timer( CommsCtxt* comms );
static const char* relayCmdToStr( XWRELAY_Cmd cmd ); static const char* relayCmdToStr( XWRELAY_Cmd cmd );
# endif # endif
#endif #endif
#if defined XWFEATURE_RELAY || defined COMMS_HEARTBEAT #if defined RELAY_HEARTBEAT || defined COMMS_HEARTBEAT
static void setHeartbeatTimer( CommsCtxt* comms ); static void setHeartbeatTimer( CommsCtxt* comms );
#else #else
# define setHeartbeatTimer( comms ) # define setHeartbeatTimer( comms )
#endif #endif
#if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void sendEmptyMsg( CommsCtxt* comms, AddressRecord* rec );
#endif
#if defined XWFEATURE_BLUETOOTH || defined XWFEATURE_IP_DIRECT #if defined XWFEATURE_BLUETOOTH || defined XWFEATURE_IP_DIRECT
static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, BTIPMsgType typ, static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, BTIPMsgType typ,
XP_PlayerAddr channelNo, XP_PlayerAddr channelNo,
@ -520,7 +526,7 @@ comms_makeFromStream( MPFORMAL XWStreamCtxt* stream, XW_UtilCtxt* util,
MsgQueueElem** prevsQueueNext; MsgQueueElem** prevsQueueNext;
XP_U16 version = stream_getVersion( stream ); XP_U16 version = stream_getVersion( stream );
CommsAddrRec addr; CommsAddrRec addr;
short i; short ii;
isServer = stream_getU8( stream ); isServer = stream_getU8( stream );
if ( version < STREAM_VERS_RELAY ) { if ( version < STREAM_VERS_RELAY ) {
@ -563,14 +569,16 @@ comms_makeFromStream( MPFORMAL XWStreamCtxt* stream, XW_UtilCtxt* util,
nAddrRecs = stream_getU8( stream ); nAddrRecs = stream_getU8( stream );
prevsAddrNext = &comms->recs; prevsAddrNext = &comms->recs;
for ( i = 0; i < nAddrRecs; ++i ) { for ( ii = 0; ii < nAddrRecs; ++ii ) {
AddressRecord* rec = (AddressRecord*)XP_MALLOC( mpool, sizeof(*rec)); AddressRecord* rec = (AddressRecord*)XP_CALLOC( mpool, sizeof(*rec));
XP_MEMSET( rec, 0, sizeof(*rec) );
addrFromStream( &rec->addr, stream ); addrFromStream( &rec->addr, stream );
rec->nextMsgID = stream_getU16( stream ); rec->nextMsgID = stream_getU16( stream );
rec->lastMsgRcd = stream_getU16( stream ); rec->lastMsgRcd = stream_getU16( stream );
if ( version >= STREAM_SAVE_LASTACK ) {
rec->lastMsgAckd = stream_getU16( stream );
}
rec->channelNo = stream_getU16( stream ); rec->channelNo = stream_getU16( stream );
if ( rec->addr.conType == COMMS_CONN_RELAY ) { if ( rec->addr.conType == COMMS_CONN_RELAY ) {
rec->r.hostID = stream_getU8( stream ); rec->r.hostID = stream_getU8( stream );
@ -581,8 +589,8 @@ comms_makeFromStream( MPFORMAL XWStreamCtxt* stream, XW_UtilCtxt* util,
} }
prevsQueueNext = &comms->msgQueueHead; prevsQueueNext = &comms->msgQueueHead;
for ( i = 0; i < comms->queueLen; ++i ) { for ( ii = 0; ii < comms->queueLen; ++ii ) {
MsgQueueElem* msg = (MsgQueueElem*)XP_MALLOC( mpool, sizeof(*msg) ); MsgQueueElem* msg = (MsgQueueElem*)XP_CALLOC( mpool, sizeof(*msg) );
msg->channelNo = stream_getU16( stream ); msg->channelNo = stream_getU16( stream );
msg->msgID = stream_getU32( stream ); msg->msgID = stream_getU32( stream );
@ -740,6 +748,7 @@ comms_writeToStream( const CommsCtxt* comms, XWStreamCtxt* stream )
stream_putU16( stream, (XP_U16)rec->nextMsgID ); stream_putU16( stream, (XP_U16)rec->nextMsgID );
stream_putU16( stream, (XP_U16)rec->lastMsgRcd ); stream_putU16( stream, (XP_U16)rec->lastMsgRcd );
stream_putU16( stream, (XP_U16)rec->lastMsgAckd );
stream_putU16( stream, rec->channelNo ); stream_putU16( stream, rec->channelNo );
if ( rec->addr.conType == COMMS_CONN_RELAY ) { if ( rec->addr.conType == COMMS_CONN_RELAY ) {
stream_putU8( stream, rec->r.hostID ); /* unneeded unless RELAY */ stream_putU8( stream, rec->r.hostID ); /* unneeded unless RELAY */
@ -905,6 +914,9 @@ makeElemWithID( CommsCtxt* comms, MsgID msgID, AddressRecord* rec,
stream_putU32( msgStream, msgID ); stream_putU32( msgStream, msgID );
XP_LOGF( "put lastMsgRcd: %ld", lastMsgRcd ); XP_LOGF( "put lastMsgRcd: %ld", lastMsgRcd );
stream_putU32( msgStream, lastMsgRcd ); stream_putU32( msgStream, lastMsgRcd );
if ( !!rec ) {
rec->lastMsgAckd = lastMsgRcd;
}
headerLen = stream_getSize( msgStream ); headerLen = stream_getSize( msgStream );
newMsgElem->len = streamSize + headerLen; newMsgElem->len = streamSize + headerLen;
@ -1169,6 +1181,21 @@ comms_resendAll( CommsCtxt* comms )
return success; return success;
} /* comms_resend */ } /* comms_resend */
#ifdef XWFEATURE_COMMSACK
void
comms_ackAny( CommsCtxt* comms )
{
AddressRecord* rec;
for ( rec = comms->recs; !!rec; rec = rec->next ) {
if ( rec->lastMsgAckd < rec->lastMsgRcd ) {
XP_LOGF( "%s: %ld < %ld: rec needs ack", __func__,
rec->lastMsgAckd, rec->lastMsgRcd );
sendEmptyMsg( comms, rec );
}
}
}
#endif
#ifdef XWFEATURE_RELAY #ifdef XWFEATURE_RELAY
# ifdef DEBUG # ifdef DEBUG
# define CASESTR(s) case s: return #s # define CASESTR(s) case s: return #s
@ -1733,7 +1760,7 @@ comms_isConnected( const CommsCtxt* const comms )
return result; return result;
} }
#ifdef COMMS_HEARTBEAT #if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void static void
sendEmptyMsg( CommsCtxt* comms, AddressRecord* rec ) sendEmptyMsg( CommsCtxt* comms, AddressRecord* rec )
{ {
@ -1744,7 +1771,9 @@ sendEmptyMsg( CommsCtxt* comms, AddressRecord* rec )
sendMsg( comms, elem ); sendMsg( comms, elem );
freeElem( comms, elem ); freeElem( comms, elem );
} /* sendEmptyMsg */ } /* sendEmptyMsg */
#endif
#ifdef COMMS_HEARTBEAT
/* Heartbeat. /* Heartbeat.
* *
* Goal is to allow all participants to detect when another is gone quickly. * Goal is to allow all participants to detect when another is gone quickly.
@ -1796,7 +1825,7 @@ heartbeat_checks( CommsCtxt* comms )
} /* heartbeat_checks */ } /* heartbeat_checks */
#endif #endif
#if defined XWFEATURE_RELAY || defined COMMS_HEARTBEAT #if defined RELAY_HEARTBEAT || defined COMMS_HEARTBEAT
static XP_Bool static XP_Bool
p_comms_timerFired( void* closure, XWTimerReason XP_UNUSED_DBG(why) ) p_comms_timerFired( void* closure, XWTimerReason XP_UNUSED_DBG(why) )
{ {

View file

@ -204,6 +204,9 @@ XP_S16 comms_send( CommsCtxt* comms, XWStreamCtxt* stream );
XP_Bool comms_resendAll( CommsCtxt* comms ); XP_Bool comms_resendAll( CommsCtxt* comms );
XP_U16 comms_getChannelSeed( CommsCtxt* comms ); XP_U16 comms_getChannelSeed( CommsCtxt* comms );
#ifdef XWFEATURE_COMMSACK
void comms_ackAny( CommsCtxt* comms );
#endif
XP_Bool comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_Bool comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
const CommsAddrRec* addr ); const CommsAddrRec* addr );

View file

@ -48,8 +48,9 @@
#define MAX_COLS MAX_ROWS #define MAX_COLS MAX_ROWS
#if MAX_COLS > 16 #if MAX_COLS > 16
# define STREAM_VERS_BIGBOARD 0x12 # define STREAM_VERS_BIGBOARD 0x13
#endif #endif
#define STREAM_SAVE_LASTACK 0x12
#define STREAM_SAVE_PREVWORDS 0x11 #define STREAM_SAVE_PREVWORDS 0x11
#define STREAM_VERS_SERVER_SAVES_TOSHOW 0x10 #define STREAM_VERS_SERVER_SAVES_TOSHOW 0x10
/* STREAM_VERS_PLAYERDICTS affects stream sent between devices. May not be /* STREAM_VERS_PLAYERDICTS affects stream sent between devices. May not be
@ -80,7 +81,7 @@
#if MAX_COLS > 16 #if MAX_COLS > 16
# define CUR_STREAM_VERS STREAM_VERS_BIGBOARD # define CUR_STREAM_VERS STREAM_VERS_BIGBOARD
#else #else
# define CUR_STREAM_VERS STREAM_SAVE_PREVWORDS # define CUR_STREAM_VERS STREAM_SAVE_LASTACK
#endif #endif
typedef struct XP_Rect { typedef struct XP_Rect {

View file

@ -917,6 +917,17 @@ handle_resend( GtkWidget* XP_UNUSED(widget), GtkAppGlobals* globals )
} }
} /* handle_resend */ } /* handle_resend */
#ifdef XWFEATURE_COMMSACK
static void
handle_ack( GtkWidget* XP_UNUSED(widget), GtkAppGlobals* globals )
{
CommsCtxt* comms = globals->cGlobals.game.comms;
if ( comms != NULL ) {
comms_ackAny( comms );
}
}
#endif
#ifdef DEBUG #ifdef DEBUG
static void static void
handle_commstats( GtkWidget* XP_UNUSED(widget), GtkAppGlobals* globals ) handle_commstats( GtkWidget* XP_UNUSED(widget), GtkAppGlobals* globals )
@ -1023,6 +1034,10 @@ makeMenus( GtkAppGlobals* globals, int XP_UNUSED(argc),
#ifndef XWFEATURE_STANDALONE_ONLY #ifndef XWFEATURE_STANDALONE_ONLY
(void)createAddItem( fileMenu, "Resend", (void)createAddItem( fileMenu, "Resend",
GTK_SIGNAL_FUNC(handle_resend), globals ); GTK_SIGNAL_FUNC(handle_resend), globals );
#ifdef XWFEATURE_COMMSACK
(void)createAddItem( fileMenu, "ack any",
GTK_SIGNAL_FUNC(handle_ack), globals );
#endif
# ifdef DEBUG # ifdef DEBUG
(void)createAddItem( fileMenu, "Stats", (void)createAddItem( fileMenu, "Stats",
GTK_SIGNAL_FUNC(handle_commstats), globals ); GTK_SIGNAL_FUNC(handle_commstats), globals );