Rewrite of checkIncomingStream to be cleaner and fix bug with initial messages getting dropped

while heartbeat feature was enabled.  Fixed the problem on Linux, but experience on Treo650
is still better without HB feature, so turning it off on Palm.  This seems ready for beta,
but will test a bit more.
This commit is contained in:
ehouse 2007-12-05 06:33:37 +00:00
parent 036294a42e
commit 3a85db7b38
2 changed files with 290 additions and 241 deletions

View file

@ -1,6 +1,6 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2001-2005 by Eric House (xwords@eehouse.org). All rights reserved.
* Copyright 2001-2007 by Eric House (xwords@eehouse.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -43,7 +43,9 @@ compilation_error_here( "Choose one or the other or none." );
/* It might make sense for this to be a parameter or somehow tied to the
platform and transport. But in that case it'd have to be passed across
since all devices must agree. */
# define HB_INTERVAL 3
# ifndef HB_INTERVAL
# define HB_INTERVAL 5
# endif
#endif
EXTERN_C_START
@ -52,7 +54,7 @@ typedef struct MsgQueueElem {
struct MsgQueueElem* next;
XP_U8* msg;
XP_U16 len;
XP_U16 channelNo;
XP_PlayerAddr channelNo;
XP_U16 sendCount; /* how many times sent? */
MsgID msgID; /* saved for ease of deletion */
} MsgQueueElem;
@ -60,18 +62,20 @@ typedef struct MsgQueueElem {
typedef struct AddressRecord {
struct AddressRecord* next;
CommsAddrRec addr;
XP_U16 lastACK;
#ifdef DEBUG
XP_U16 lastACK;
XP_U16 nUniqueBytes;
#endif
MsgID nextMsgID; /* on a per-channel basis */
MsgID lastMsgReceived; /* on a per-channel basis */
MsgID lastMsgRcd; /* on a per-channel basis */
/* only used if COMMS_HEARTBEAT set except for serialization (to_stream) */
XP_U32 lastMsgRcvdTime; /* when did we set lastMsgReceived? */
XP_PlayerAddr channelNo;
struct {
XWHostID hostID; /* used for relay case */
} r;
#ifdef COMMS_HEARTBEAT
XP_Bool initialSeen;
#endif
} AddressRecord;
#define ADDRESSRECORD_SIZE_68K 20
@ -87,14 +91,14 @@ struct CommsCtxt {
XW_UtilCtxt* util;
XP_U32 connID; /* 0 means ignore; otherwise must match */
XP_U16 nextChannelNo;
XP_PlayerAddr nextChannelNo;
AddressRecord* recs; /* return addresses */
TransportSend sendproc;
#ifdef COMMS_HEARTBEAT
TransportReset resetproc;
XP_U32 hbStartTime;
XP_U32 lastMsgRcd;
#endif
void* sendClosure;
@ -104,6 +108,8 @@ struct CommsCtxt {
#ifdef COMMS_HEARTBEAT
XP_Bool doHeartbeat;
XP_Bool hbTimerPending;
XP_U32 lastMsgRcvdTime;
#endif
/* The following fields, down to isServer, are only used if
@ -143,6 +149,7 @@ typedef enum {
BTIPMSG_NONE = 0
,BTIPMSG_DATA
,BTIPMSG_RESET
,BTIPMSG_HB
} BTIPMsgType;
#endif
@ -153,6 +160,7 @@ static AddressRecord* rememberChannelAddress( CommsCtxt* comms,
XP_PlayerAddr channelNo,
XWHostID id,
const CommsAddrRec* addr );
static void updateChannelAddress( AddressRecord* rec, const CommsAddrRec* addr );
static XP_Bool channelToAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
const CommsAddrRec** addr );
static AddressRecord* getRecordFor( CommsCtxt* comms,
@ -161,6 +169,7 @@ static XP_S16 sendMsg( CommsCtxt* comms, MsgQueueElem* elem );
static void addToQueue( CommsCtxt* comms, MsgQueueElem* newMsgElem );
static XP_U16 countAddrRecs( const CommsCtxt* comms );
static void sendConnect( CommsCtxt* comms );
static AddressRecord* addrToRecord( CommsCtxt* comms, const CommsAddrRec* adr );
#ifdef XWFEATURE_RELAY
static void relayConnect( CommsCtxt* comms );
@ -169,7 +178,7 @@ static XP_Bool send_via_relay( CommsCtxt* comms, XWRELAY_Cmd cmd,
XWHostID destID, void* data, int dlen );
static XWHostID getDestID( CommsCtxt* comms, XP_PlayerAddr channelNo );
#endif
#if defined XWFEATURE_RELAY || defined COMMS_HEARTBEAT
#if defined RELAY_HEARTBEAT || defined COMMS_HEARTBEAT
static void setHeartbeatTimer( CommsCtxt* comms );
#else
# define setHeartbeatTimer( comms )
@ -282,7 +291,7 @@ void
comms_setConnID( CommsCtxt* comms, XP_U32 connID )
{
comms->connID = connID;
XP_STATUSF( "set connID to %lx", connID );
XP_STATUSF( "%s: set connID to %lx", __func__, connID );
} /* comms_setConnID */
static void
@ -383,7 +392,7 @@ comms_makeFromStream( MPFORMAL XWStreamCtxt* stream, XW_UtilCtxt* util,
addrFromStream( &rec->addr, stream );
rec->nextMsgID = stream_getU16( stream );
rec->lastMsgReceived = stream_getU16( stream );
rec->lastMsgRcd = stream_getU16( stream );
rec->channelNo = stream_getU16( stream );
if ( rec->addr.conType == COMMS_CONN_RELAY ) {
rec->r.hostID = stream_getU8( stream );
@ -429,7 +438,6 @@ comms_start( CommsCtxt* comms )
{
#ifdef COMMS_HEARTBEAT
comms->doHeartbeat = comms->addr.conType != COMMS_CONN_IR;
comms->hbStartTime = util_getCurSeconds( comms->util );
#endif
sendConnect( comms );
@ -448,8 +456,8 @@ sendConnect( CommsCtxt* comms )
#if defined XWFEATURE_BLUETOOTH || defined XWFEATURE_IP_DIRECT
case COMMS_CONN_BT:
case COMMS_CONN_IP_DIRECT:
send_via_bt_or_ip( comms, BTIPMSG_RESET,
CHANNEL_NONE, NULL, 0 );
/* This will only work on host side when there's a single guest! */
(void)send_via_bt_or_ip( comms, BTIPMSG_RESET, CHANNEL_NONE, NULL, 0 );
(void)comms_resendAll( comms );
break;
#endif
@ -535,7 +543,7 @@ comms_writeToStream( const CommsCtxt* comms, XWStreamCtxt* stream )
addrToStream( stream, addr );
stream_putU16( stream, (XP_U16)rec->nextMsgID );
stream_putU16( stream, (XP_U16)rec->lastMsgReceived );
stream_putU16( stream, (XP_U16)rec->lastMsgRcd );
stream_putU16( stream, rec->channelNo );
if ( rec->addr.conType == COMMS_CONN_RELAY ) {
stream_putU8( stream, rec->r.hostID ); /* unneeded unless RELAY */
@ -577,7 +585,6 @@ comms_setAddr( CommsCtxt* comms, const CommsAddrRec* addr )
#ifdef COMMS_HEARTBEAT
comms->doHeartbeat = comms->addr.conType != COMMS_CONN_IR;
comms->hbStartTime = util_getCurSeconds( comms->util );
#endif
sendConnect( comms );
@ -623,7 +630,7 @@ makeElemWithID( CommsCtxt* comms, MsgID msgID, AddressRecord* rec,
{
XP_U16 headerLen;
XP_U16 streamSize = NULL == stream? 0 : stream_getSize( stream );
MsgID lastMsgRcd = (!!rec)? rec->lastMsgReceived : 0;
MsgID lastMsgRcd = (!!rec)? rec->lastMsgRcd : 0;
MsgQueueElem* newMsgElem;
XWStreamCtxt* msgStream;
@ -648,10 +655,12 @@ makeElemWithID( CommsCtxt* comms, MsgID msgID, AddressRecord* rec,
NULL, 0,
(MemStreamCloseCallback)NULL );
stream_open( msgStream );
XP_LOGF( "%s: putting connID %ld", __func__, comms->connID );
stream_putU32( msgStream, comms->connID );
stream_putU16( msgStream, channelNo );
stream_putU32( msgStream, msgID );
XP_LOGF( "put lastMsgRcd: %ld", lastMsgRcd );
stream_putU32( msgStream, lastMsgRcd );
headerLen = stream_getSize( msgStream );
@ -679,7 +688,9 @@ comms_send( CommsCtxt* comms, XWStreamCtxt* stream )
MsgQueueElem* elem;
XP_S16 result = -1;
XP_DEBUGF( "assigning msgID=" XP_LD " on chnl %d", msgID, channelNo );
XP_DEBUGF( "%s: assigning msgID=" XP_LD " on chnl %d", __func__,
msgID, channelNo );
elem = makeElemWithID( comms, msgID, rec, channelNo, stream );
if ( NULL != elem ) {
@ -938,6 +949,17 @@ relayPreProcess( CommsCtxt* comms, XWStreamCtxt* stream, XWHostID* senderID )
} /* relayPreProcess */
#endif
#ifdef COMMS_HEARTBEAT
static void
noteHBReceived( CommsCtxt* comms/* , const CommsAddrRec* addr */ )
{
comms->lastMsgRcvdTime = util_getCurSeconds( comms->util );
setHeartbeatTimer( comms );
}
#else
# define noteHBReceived(a)
#endif
#if defined XWFEATURE_BLUETOOTH || defined XWFEATURE_IP_DIRECT
static XP_Bool
btIpPreProcess( CommsCtxt* comms, XWStreamCtxt* stream )
@ -947,8 +969,13 @@ btIpPreProcess( CommsCtxt* comms, XWStreamCtxt* stream )
if ( consumed ) {
/* This is all there is so far */
XP_ASSERT( typ == BTIPMSG_RESET );
(void)comms_resendAll( comms );
if ( typ == BTIPMSG_RESET ) {
(void)comms_resendAll( comms );
} else if ( typ == BTIPMSG_HB ) {
/* noteHBReceived( comms, addr ); */
} else {
XP_ASSERT( 0 );
}
}
return consumed;
@ -983,192 +1010,224 @@ preProcess( CommsCtxt* comms, XWStreamCtxt* stream,
return consumed;
} /* preProcess */
static XP_Bool
addressUnknown( CommsCtxt* comms, const CommsAddrRec* addr )
static AddressRecord*
addrToRecord( CommsCtxt* comms, const CommsAddrRec* addr )
{
XP_Bool unknown = XP_TRUE;
if ( !!addr ) {
CommsConnType conType = addr->conType;
const AddressRecord* rec;
for ( rec = comms->recs; !!rec && unknown ; rec = rec->next ) {
XP_ASSERT( conType == rec->addr.conType );
switch( conType ) {
case COMMS_CONN_RELAY:
if ( (addr->u.ip_relay.ipAddr == rec->addr.u.ip_relay.ipAddr)
&& (addr->u.ip_relay.port == rec->addr.u.ip_relay.port ) ) {
unknown = XP_FALSE;
}
break;
case COMMS_CONN_BT:
if ( 0 == XP_MEMCMP( &addr->u.bt.btAddr, &rec->addr.u.bt.btAddr,
sizeof(addr->u.bt.btAddr) ) ) {
unknown = XP_FALSE;
}
break;
case COMMS_CONN_IR: /* no way to test */
default:
break;
CommsConnType conType = addr->conType;
AddressRecord* rec;
XP_Bool matched = XP_FALSE;
for ( rec = comms->recs; !!rec; rec = rec->next ) {
XP_ASSERT( conType == rec->addr.conType );
switch( conType ) {
case COMMS_CONN_RELAY:
if ( (addr->u.ip_relay.ipAddr == rec->addr.u.ip_relay.ipAddr)
&& (addr->u.ip_relay.port == rec->addr.u.ip_relay.port ) ) {
matched = XP_TRUE;
}
break;
case COMMS_CONN_BT:
if ( 0 == XP_MEMCMP( &addr->u.bt.btAddr, &rec->addr.u.bt.btAddr,
sizeof(addr->u.bt.btAddr) ) ) {
matched = XP_TRUE;
}
break;
case COMMS_CONN_IP_DIRECT:
if ( (addr->u.ip.ipAddr_ip == rec->addr.u.ip.ipAddr_ip)
&& (addr->u.ip.port_ip == rec->addr.u.ip.port_ip) ) {
matched = XP_TRUE;
}
break;
case COMMS_CONN_IR: /* no way to test */
default:
break;
}
if ( matched ) {
break;
}
}
return rec;
} /* addrToRecord */
/* An initial message comes only from a client to a server, and from the
* server in response to that initial message. Once the inital messages are
* exchanged there's a connID associated. The greatest danger is that it's a
* dup, resent for whatever reason. To detect that we check that the address
* is unknown. But addresses can change, e.g. if a reset of a socket-based
* transport causes the local socket to change. How to deal with this?
* Likely a boolean set when we call comms->resetproc that causes us to accept
* changed addresses.
*
* But: before we're connected heartbeats will also come here, but with
* hasPayload false. We want to remember their address, but not give them a
* channel ID. So if we have a payload we insist that it's the first we've
* seen on this channel.
*
* If it's a HB, then we want to add a rec/channel if there's none, but mark
* it invalid
*/
static AddressRecord*
validateInitialMessage( CommsCtxt* comms, XP_Bool hasPayload,
const CommsAddrRec* addr, XWHostID senderID,
XP_PlayerAddr* channelNo )
{
#ifdef COMMS_HEARTBEAT
XP_Bool addRec = XP_FALSE;
AddressRecord* rec = addrToRecord( comms, addr );
LOG_FUNC();
if ( hasPayload ) {
if ( rec ) {
if ( rec->initialSeen ) {
rec = NULL; /* reject it! */
}
} else {
addRec = XP_TRUE;
}
} else {
/* This is a heartbeat */
if ( !rec && comms->isServer ) {
addRec = XP_TRUE;
}
}
return unknown;
} /* addressUnknown */
if ( addRec ) {
if ( comms->isServer ) {
XP_ASSERT( *channelNo == 0 );
*channelNo = ++comms->nextChannelNo;
}
rec = rememberChannelAddress( comms, *channelNo, senderID, addr );
if ( hasPayload ) {
rec->initialSeen = XP_TRUE;
} else {
rec = NULL;
}
}
LOG_RETURNF( "%lx", rec );
return rec;
#else
AddressRecord* rec = addrToRecord( comms, addr );
if ( !!rec ) {
rec = NULL; /* reject: we've already seen init message on channel */
} else {
if ( comms->isServer ) {
XP_ASSERT( *channelNo == 0 );
*channelNo = ++comms->nextChannelNo;
}
rec = rememberChannelAddress( comms, *channelNo, senderID, addr );
}
return rec;
#endif
} /* validateInitialMessage */
/* read a raw buffer into a stream, stripping off the headers and keeping
* any necessary stats.
*
* Keep track of return addresses by channel. If the message's channel number
* is 0, assign a new channel number and associate an address with it.
* Otherwise update the address, which may have changed since we last heard
* from this channel.
*
* There may be messages that are only about the comms 'protocol', that
* contain nothing to be passed to the server. In that case, return false
* indicating the caller that all processing is finished.
*
* conType tells both how to interpret the addr and whether to expect any
* special fields in the message itself. In the IP case, for example, the
* port component of a return address is in the message but the IP address
* component will be passed in.
/* Messages with established connIDs are valid only if they have the msgID
* that's expected on that channel. Their addresses need to match what we
* have for that channel, and in fact we'll overwrite what we have in case a
* reset has changed the address. The danger is that somebody might sneak in
* with a forged message, but this isn't internet banking.
*/
static AddressRecord*
validateChannelMessage( CommsCtxt* comms, const CommsAddrRec* addr,
XP_PlayerAddr channelNo, MsgID msgID, MsgID lastMsgRcd )
{
AddressRecord* rec;
LOG_FUNC();
rec = getRecordFor( comms, channelNo );
if ( !!rec ) {
removeFromQueue( comms, channelNo, lastMsgRcd );
if ( msgID == rec->lastMsgRcd + 1 ) {
updateChannelAddress( rec, addr );
#ifdef DEBUG
rec->lastACK = (XP_U16)lastMsgRcd;
#endif
} else {
XP_LOGF( "%s: expected %d, got %d", __func__,
rec->lastMsgRcd + 1, msgID );
rec = NULL;
}
} else {
XP_LOGF( "%s: no rec for addr", __func__ );
}
LOG_RETURNF( "%lx", rec );
return rec;
} /* validateChannelMessage */
XP_Bool
comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
const CommsAddrRec* addr )
{
XP_U16 channelNo;
XP_U32 connID;
MsgID msgID;
MsgID lastMsgRcd;
XP_Bool validMessage = XP_FALSE;
AddressRecord* recs = (AddressRecord*)NULL;
XWHostID senderID = 0; /* unset; default for non-relay cases */
XP_Bool usingRelay = XP_FALSE;
XP_Bool channelWas0 = XP_FALSE;
AddressRecord* rec = NULL;
XP_ASSERT( addr == NULL || comms->addr.conType == addr->conType );
if ( !preProcess( comms, stream, &usingRelay, &senderID ) ) {
if ( stream_getSize( stream ) >= sizeof(connID) ) {
XP_U32 connID;
XP_PlayerAddr channelNo;
MsgID msgID;
MsgID lastMsgRcd;
/* reject too-small message */
if ( stream_getSize( stream ) >=
(sizeof(connID) + sizeof(channelNo)
+ sizeof(msgID) + sizeof(lastMsgRcd)) ) {
XP_U16 payloadSize;
connID = stream_getU32( stream );
XP_STATUSF( "%s: read connID of %lx", __func__, connID );
channelNo = stream_getU16( stream );
XP_STATUSF( "read channelNo %d", channelNo );
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
XP_DEBUGF( "rcd: msgID=" XP_LD ",lastMsgRcd=" XP_LD " on chnl %d",
msgID, lastMsgRcd, channelNo );
if ( comms->connID == connID || comms->connID == CONN_ID_NONE ) {
if ( stream_getSize( stream ) >= sizeof(channelNo)
+ sizeof(msgID) + sizeof(lastMsgRcd) ) {
payloadSize = stream_getSize( stream ) > 0; /* anything left? */
if ( connID == CONN_ID_NONE ) {
/* special case: initial message from client */
rec = validateInitialMessage( comms, payloadSize > 0, addr, senderID,
&channelNo );
} else if ( comms->connID == connID ) {
rec = validateChannelMessage( comms, addr, channelNo, msgID,
lastMsgRcd );
}
channelNo = stream_getU16( stream );
XP_STATUSF( "read channelNo %d", channelNo );
msgID = stream_getU32( stream );
lastMsgRcd = stream_getU32( stream );
XP_DEBUGF( "rcd: msgID=" XP_LD " on chnl %d", msgID,
channelNo );
removeFromQueue( comms, channelNo, lastMsgRcd );
/* Problem: need to detect duplicate messages even before
the server's had a chance to assign channels.
Solution, which is a hack: since hostID does the same
thing, use it in the relay case. But in the relay-less
case, which still needs to work, do assign channels.
The dup message problem is far less common there. */
if ( channelNo != 0 ) {
validMessage = XP_TRUE;
} else {
/* If we've seen a channel0 msg from same addr, drop
it. It's most likely a dupe, and server can't
handle e.g. duplicate client reg messages.. */
validMessage = addressUnknown( comms, addr );
if ( validMessage ) {
XP_ASSERT( comms->isServer );
if ( usingRelay ) {
XP_ASSERT( senderID != 0 );
channelNo = senderID;
} else {
XP_ASSERT( msgID == 0 );
/* check that address isn't already associated
with an existing channel. */
channelNo = ++comms->nextChannelNo;
XP_LOGF( "%s: incrementled nextChannelNo "
"to %d", __func__,
comms->nextChannelNo );
channelWas0 = XP_TRUE;
}
XP_STATUSF( "assigning channelNo=%d", channelNo );
}
}
if ( validMessage && (usingRelay || !channelWas0) ) {
recs = getRecordFor( comms, channelNo );
/* messageID for an incoming message should be one
* greater than the id most recently used for that
* channel. */
if ( !!recs ) {
#ifdef COMMS_HEARTBEAT
/* Good for heartbeat even if not "valid." In
fact, all HB messages are invalid by design. */
recs->lastMsgRcvdTime = util_getCurSeconds( comms->util );
#endif
if ( msgID != recs->lastMsgReceived + 1 ) {
XP_DEBUGF( "on channel %d, msgID=" XP_LD
" (next should be " XP_LD ")",
channelNo, msgID,
recs->lastMsgReceived+1 );
validMessage = XP_FALSE;
}
} else if ( msgID > 1 ) {
validMessage = XP_FALSE;
}
#ifdef DEBUG
if ( !!recs ) {
/* XP_ASSERT( lastMsgRcd <= recs->nextMsgID ); */
if ( lastMsgRcd > recs->nextMsgID ) {
XP_LOGF( "bad: got lastMsgRcd of %ld, "
"nextMsgID is %ld",
lastMsgRcd, recs->nextMsgID );
validMessage = XP_FALSE;
} else {
XP_ASSERT( lastMsgRcd < 0x0000FFFF );
recs->lastACK = (XP_U16)lastMsgRcd;
}
}
#endif
}
if ( validMessage ) {
XP_LOGF( "remembering senderID %x for channel %d",
senderID, channelNo );
recs = rememberChannelAddress( comms, channelNo,
senderID, addr );
stream_setAddress( stream, channelNo );
if ( !!recs ) {
recs->lastMsgReceived = msgID;
XP_STATUSF( "set channel %d's lastMsgReceived to "
XP_LD, channelNo, msgID );
}
}
} else {
XP_LOGF( "%s: message too small", __func__ );
}
} else {
XP_STATUSF( "refusing non-matching connID; got %lx, "
"wanted %lx", connID, comms->connID );
validMessage = NULL != rec;
if ( validMessage ) {
rec->lastMsgRcd = msgID;
XP_LOGF( "%s: set channel %d's lastMsgRcd to " XP_LD,
__func__, channelNo, msgID );
stream_setAddress( stream, channelNo );
validMessage = payloadSize > 0;
}
} else {
XP_LOGF( "%s: message too small", __func__ );
}
}
/* Call after we've had a chance to create rec for addr */
noteHBReceived( comms/* , addr */ );
LOG_RETURNF( "%d", (XP_U16)validMessage );
return validMessage;
} /* comms_checkIncomingStream */
#ifdef COMMS_HEARTBEAT
static void
sendEmptyMsg( CommsCtxt* comms, AddressRecord* rec )
{
MsgQueueElem* elem = makeElemWithID( comms,
0 /*rec? rec->lastMsgRcd : 0*/,
rec,
rec? rec->channelNo : 0, NULL );
sendMsg( comms, elem );
freeElem( comms, elem );
} /* sendEmptyMsg */
/* Heartbeat.
*
* Goal is to allow all participants to detect when another is gone quickly.
@ -1190,56 +1249,33 @@ comms_checkIncomingStream( CommsCtxt* comms, XWStreamCtxt* stream,
static void
heartbeat_checks( CommsCtxt* comms )
{
XP_U32 now, tooLongAgo;
XP_U16 pendingPacks[MAX_NUM_PLAYERS] = { 0 };
AddressRecord* rec;
const MsgQueueElem* elem;
XP_Bool resetTimer = XP_FALSE;
LOG_FUNC();
for ( elem = comms->msgQueueHead; !!elem; elem = elem->next ) {
XP_ASSERT( elem->channelNo < MAX_NUM_PLAYERS );
if ( elem->sendCount == 0 ) { /* still waiting being sent? */
++pendingPacks[elem->channelNo];
}
}
now = util_getCurSeconds( comms->util );
tooLongAgo = now - (HB_INTERVAL * 2);
for ( rec = comms->recs; !!rec; rec = rec->next ) {
XP_U32 lastMsgRcvdTime = rec->lastMsgRcvdTime;
if ( lastMsgRcvdTime == 0 ) { /* nothing received yet */
XP_LOGF( "no last message" );
lastMsgRcvdTime = comms->hbStartTime;
}
XP_ASSERT( lastMsgRcvdTime > 0 );
if ( lastMsgRcvdTime < tooLongAgo ) {
XP_LOGF( "calling reset proc; last was %ld secs too long ago",
tooLongAgo-lastMsgRcvdTime );
(*comms->resetproc)(comms->sendClosure);
resetTimer = XP_FALSE;
break;
} else if ( 0 == pendingPacks[rec->channelNo] ) {
MsgQueueElem* hb;
XP_LOGF( "sending heartbeat on channel %d with msgID %d",
rec->channelNo, rec->lastMsgReceived );
hb = makeElemWithID( comms, rec->lastACK, rec, rec->channelNo, NULL );
if ( NULL != hb ) {
sendMsg( comms, hb );
freeElem( comms, hb );
} else {
XP_ASSERT( XP_FALSE );
do {
if ( comms->lastMsgRcvdTime > 0 ) {
XP_U32 now = util_getCurSeconds( comms->util );
XP_U32 tooLongAgo = now - (HB_INTERVAL * 2);
if ( comms->lastMsgRcvdTime < tooLongAgo ) {
XP_LOGF( "calling reset proc; last was %ld secs too long ago",
tooLongAgo - comms->lastMsgRcvdTime );
(*comms->resetproc)(comms->sendClosure);
comms->lastMsgRcvdTime = 0;
break; /* outta here */
}
} else {
XP_LOGF( "All's well (%d pending)", pendingPacks[rec->channelNo] );
resetTimer = XP_TRUE;
}
}
if ( resetTimer ) {
setHeartbeatTimer( comms );
}
if ( comms->recs ) {
AddressRecord* rec;
for ( rec = comms->recs; !!rec; rec = rec->next ) {
sendEmptyMsg( comms, rec );
}
} else if ( !comms->isServer ) {
/* Client still waiting for inital ALL_REG message */
sendEmptyMsg( comms, NULL );
}
} while ( XP_FALSE );
setHeartbeatTimer( comms );
} /* heartbeat_checks */
#endif
@ -1249,7 +1285,8 @@ p_comms_timerFired( void* closure, XWTimerReason XP_UNUSED_DBG(why) )
{
CommsCtxt* comms = (CommsCtxt*)closure;
XP_ASSERT( why == TIMER_HEARTBEAT );
XP_LOGF( "comms_timerFired" );
LOG_FUNC();
comms->hbTimerPending = XP_FALSE;
if (0 ) {
#ifdef RELAY_HEARTBEAT
} else if ( (comms->addr.conType == COMMS_CONN_RELAY )
@ -1268,17 +1305,28 @@ static void
setHeartbeatTimer( CommsCtxt* comms )
{
LOG_FUNC();
if ( !comms->hbTimerPending ) {
XP_U16 when = 0;
#ifdef RELAY_HEARTBEAT
if ( comms->addr.conType == COMMS_CONN_RELAY ) {
util_setTimer( comms->util, TIMER_HEARTBEAT, comms->r.heartbeat,
p_comms_timerFired, comms );
}
if ( comms->addr.conType == COMMS_CONN_RELAY ) {
when = comms->r.heartbeat;
}
#elif defined COMMS_HEARTBEAT
if ( comms->doHeartbeat ) {
util_setTimer( comms->util, TIMER_HEARTBEAT, HB_INTERVAL,
p_comms_timerFired, comms );
}
if ( comms->doHeartbeat ) {
XP_LOGF( "%s: calling util_setTimer", __func__ );
when = HB_INTERVAL;
} else {
XP_LOGF( "%s: doHeartbeat not set", __func__ );
}
#endif
if ( when != 0 ) {
util_setTimer( comms->util, TIMER_HEARTBEAT, when,
p_comms_timerFired, comms );
comms->hbTimerPending = XP_TRUE;
}
} else {
XP_LOGF( "%s: skipping b/c pending", __func__ );
}
} /* setHeartbeatTimer */
#endif
@ -1329,12 +1377,6 @@ comms_getStats( CommsCtxt* comms, XWStreamCtxt* stream )
rec->lastACK);
stream_putString( stream, buf );
#ifdef COMMS_HEARTBEAT
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
(XP_UCHAR*)"Last ack'd %ld secs ago\n",
now - rec->lastMsgRcvdTime );
stream_putString( stream, buf );
#endif
}
} /* comms_getStats */
#endif
@ -1348,13 +1390,10 @@ rememberChannelAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
if ( !recs ) {
/* not found; add a new entry */
recs = (AddressRecord*)XP_MALLOC( comms->mpool, sizeof(*recs) );
XP_MEMSET( recs, 0, sizeof(*recs) );
recs->nextMsgID = 0;
recs->channelNo = channelNo;
recs->r.hostID = hostID;
#ifdef DEBUG
recs->nUniqueBytes = 0;
#endif
recs->next = comms->recs;
comms->recs = recs;
}
@ -1373,6 +1412,13 @@ rememberChannelAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
return recs;
} /* rememberChannelAddress */
static void
updateChannelAddress( AddressRecord* rec, const CommsAddrRec* addr )
{
XP_ASSERT( !!rec );
XP_MEMCPY( &rec->addr, addr, sizeof(rec->addr) );
} /* updateChannelAddress */
static XP_Bool
channelToAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
const CommsAddrRec** addr )
@ -1537,9 +1583,10 @@ static XP_S16
send_via_bt_or_ip( CommsCtxt* comms, BTIPMsgType typ, XP_PlayerAddr channelNo,
void* data, int dlen )
{
XP_S16 nSent;
XP_U8* buf;
XP_S16 nSent = -1;
LOG_FUNC();
nSent = -1;
buf = XP_MALLOC( comms->mpool, dlen + 1 );
if ( !!buf ) {
const CommsAddrRec* addr;
@ -1555,6 +1602,7 @@ send_via_bt_or_ip( CommsCtxt* comms, BTIPMsgType typ, XP_PlayerAddr channelNo,
setHeartbeatTimer( comms );
}
LOG_RETURNF( "%d", nSent );
return nSent;
} /* send_via_bt_or_ip */

View file

@ -116,7 +116,8 @@ MYDEFS_COMMON += -DXWFEATURE_SEARCHLIMIT
# MYDEFS_COMMON += -DXWFEATURE_RELAY
# turn on bluetooth comms option for 68K and ARM
BLUETOOTH = -DXWFEATURE_BLUETOOTH -DBT_USE_L2CAP -DCOMMS_HEARTBEAT
BLUETOOTH = -DXWFEATURE_BLUETOOTH -DBT_USE_L2CAP
# -DCOMMS_HEARTBEAT
#BLUETOOTH = -DXWFEATURE_BLUETOOTH -DBT_USE_RFCOMM
MYDEFS_COMMON += $(BLUETOOTH)