add debug-only code to check for thread-safety

This commit is contained in:
Eric House 2023-01-06 07:52:41 -08:00
parent df5da2c4aa
commit a5b0f3dd20

View file

@ -59,6 +59,57 @@
# define INITIAL_CLIENT_VERS 2
#endif
#ifdef DEBUG
typedef struct StackData {
const char* func;
pthread_t prevThread;
struct StackData* prev;
int count;
} StackData;
static void
printStack( const StackData* data )
{
for ( ; !!data; data = data->prev ) {
XP_LOGFF( " proc[%d]: %s", data->count, data->func );
}
}
typedef struct _SD {
StackData* head;
pthread_t thread;
} _SD;
# define OBJ_DECL _SD _sd
# define THREAD_CHECK_START(OBJ) { \
_SD* _sdp = &(OBJ)->_sd; \
StackData _data = { .func = __func__, \
.prev = _sdp->head, \
.prevThread = _sdp->thread, \
}; \
_data.count = NULL == _data.prev ? 0 : _data.prev->count + 1; \
\
(OBJ)->_sd.head = &_data; \
pthread_t thread = pthread_self(); \
if ( 0 == (OBJ)->_sd.thread ) { \
(OBJ)->_sd.thread = thread; \
} else if ( thread != (OBJ)->_sd.thread ) { \
printStack( &_data ); \
XP_ASSERT(0); \
} \
# define THREAD_CHECK_END() \
_sdp->head = _data.prev; \
_sdp->thread = _data.prevThread; \
} \
#else
# define OBJ_DECL
# define THREAD_CHECK_START(OBJ)
# define THREAD_CHECK_END()
#endif
#ifdef COMMS_HEARTBEAT
/* It might make sense for this to be a parameter or somehow tied to the
platform and transport. But in that case it'd have to be passed across
@ -185,7 +236,7 @@ struct CommsCtxt {
XP_Bool processingMsg;
const XP_UCHAR* tag;
#endif
OBJ_DECL;
MPSLOT
};
@ -218,7 +269,7 @@ static XP_Bool channelToAddress( const CommsCtxt* comms, XWEnv xwe,
static AddressRecord* getRecordFor( const CommsCtxt* comms, XWEnv xwe,
const CommsAddrRec* addr, XP_PlayerAddr channelNo );
static void augmentSelfAddr( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr );
static XP_S16 sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
static XP_S16 sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
CommsConnType filter );
static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem );
static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ;
@ -291,7 +342,7 @@ static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp
#endif
#if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec );
static void sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec );
#endif
static inline XP_Bool IS_INVITE(const MsgQueueElem* elem)
{
@ -613,10 +664,12 @@ comms_destroy( CommsCtxt* comms, XWEnv xwe )
void
comms_setConnID( CommsCtxt* comms, XP_U32 connID )
{
THREAD_CHECK_START(comms);
XP_ASSERT( CONN_ID_NONE != connID );
XP_ASSERT( 0 == comms->connID || connID == comms->connID );
comms->connID = connID;
XP_LOGFF( "set connID (gameID) to %x", connID );
THREAD_CHECK_END();
} /* comms_setConnID */
static void
@ -1177,6 +1230,7 @@ resetBackoff( CommsCtxt* comms )
void
comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken )
{
THREAD_CHECK_START(comms);
XP_LOGFF( "(saveToken=%d)", saveToken );
XP_ASSERT( !!comms );
if ( saveToken == comms->lastSaveToken ) {
@ -1190,6 +1244,7 @@ comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken )
comms_ackAny( comms, xwe ); /* might not want this for all transports */
#endif
}
THREAD_CHECK_END();
}
void
@ -1247,6 +1302,7 @@ void
comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo,
const MQTTDevID* devID )
{
THREAD_CHECK_START(comms);
#ifdef NO_ADD_MQTT_TO_ALL /* set for (usually) BT testing on Android */
XP_LOGFF("ifdef'd out");
XP_USE( comms );
@ -1279,6 +1335,7 @@ comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo,
XP_ASSERT(0);
}
#endif
THREAD_CHECK_END();
}
void
@ -1370,8 +1427,10 @@ comms_getConTypes( const CommsCtxt* comms )
void
comms_dropHostAddr( CommsCtxt* comms, CommsConnType typ )
{
THREAD_CHECK_START(comms);
addr_rmType( &comms->selfAddr, typ );
ASSERT_ADDR_OK( &comms->selfAddr );
THREAD_CHECK_END();
}
XP_Bool
@ -1394,7 +1453,7 @@ makeNewElem( const CommsCtxt* comms, XWEnv xwe, MsgID msgID,
}
static MsgQueueElem*
makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
makeElemWithID( const CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec,
XP_PlayerAddr channelNo, XWStreamCtxt* stream )
{
CNO_FMT( cbuf, channelNo );
@ -1640,6 +1699,7 @@ XP_S16
comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream )
{
XP_S16 result = -1;
THREAD_CHECK_START(comms);
if ( 0 == stream_getSize(stream) ) {
XP_LOGFF( "dropping 0-len message" );
} else {
@ -1664,6 +1724,7 @@ comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream )
result = sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
}
}
THREAD_CHECK_END();
return result;
} /* comms_send */
@ -1684,6 +1745,7 @@ static MsgQueueElem*
addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem )
{
MsgQueueElem* asAdded = newElem;
THREAD_CHECK_START( comms );
newElem->next = (MsgQueueElem*)NULL;
if ( !comms->msgQueueHead ) {
comms->msgQueueHead = comms->msgQueueTail = newElem;
@ -1707,6 +1769,7 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem )
notifyQueueChanged( comms, xwe );
}
XP_ASSERT( comms->queueLen <= 128 ); /* reasonable limit in testing */
THREAD_CHECK_END();
return asAdded;
} /* addToQueue */
@ -1814,6 +1877,7 @@ freeElem( MPFORMAL MsgQueueElem* elem )
static void
removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID )
{
THREAD_CHECK_START( comms );
CNO_FMT( cbuf, channelNo );
XP_LOGFF( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)",
channelNo, msgID, cbuf, comms->queueLen );
@ -1866,6 +1930,7 @@ removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msg
assertQueueOk( comms );
printQueue( comms );
#endif
THREAD_CHECK_END();
} /* removeFromQueue */
static XP_U32
@ -1889,7 +1954,8 @@ gameID( const CommsCtxt* comms )
}
static XP_S16
sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType filter )
sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem,
const CommsConnType filter )
{
XP_S16 result = -1;
XP_PlayerAddr channelNo = elem->channelNo;
@ -2091,6 +2157,7 @@ comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool forc
static void
ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force )
{
THREAD_CHECK_START(comms);
if ( CONN_ID_NONE == comms->connID ) {
XP_LOGFF( "doing nothing because connID still unset" );
} else {
@ -2115,7 +2182,8 @@ ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force )
}
}
XP_LOGFF( "sent for %d channels (of %d)", nSent, nSent );
}
}
THREAD_CHECK_END();
}
void
@ -2771,6 +2839,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
{
AddressRecord* rec;
THREAD_CHECK_START(comms);
LOG_FUNC();
rec = getRecordFor( comms, xwe, NULL, channelNo );
@ -2796,6 +2865,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
}
LOG_RETURNF( XP_P, rec );
THREAD_CHECK_END();
return rec;
} /* validateChannelMessage */
@ -2810,7 +2880,9 @@ typedef struct _HeaderStuff {
static XP_Bool
getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff )
{
XP_Bool messageValid = stream_gotU16( stream, &stuff->channelNo );
XP_Bool messageValid;
THREAD_CHECK_START(comms);
messageValid = stream_gotU16( stream, &stuff->channelNo );
if ( messageValid ) {
XP_U16 channelSeed = comms_getChannelSeed( comms );
XP_U16 flags = stuff->flags;
@ -2833,6 +2905,7 @@ getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff
}
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
THREAD_CHECK_END();
return messageValid;
}
@ -2858,6 +2931,7 @@ parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream,
HeaderStuff* stuff )
{
XP_Bool messageValid = XP_FALSE;
THREAD_CHECK_START(comms);
XP_U16 headerLen = stuff->flags >> HEADER_LEN_OFFSET;
XP_ASSERT( 0 < headerLen );
XP_ASSERT( headerLen <= stream_getSize( msgStream ) );
@ -2881,6 +2955,7 @@ parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream,
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
THREAD_CHECK_END();
return messageValid;
}
@ -2888,6 +2963,8 @@ XP_Bool
comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
const CommsAddrRec* retAddr, CommsMsgState* state )
{
XP_Bool messageValid = XP_FALSE;
THREAD_CHECK_START(comms);
XP_ASSERT( !!retAddr ); /* for now */
XP_MEMSET( state, 0, sizeof(*state) );
#ifdef DEBUG
@ -2901,7 +2978,6 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
CommsConnType addrType = addr_getType( retAddr );
#endif
XP_Bool messageValid = XP_FALSE;
XP_LOGFF( TAGFMT(retAddr.typ=%s), TAGPRMS, ConnType2Str(addrType ) );
if ( comms_getAddrDisabled( comms, addrType, XP_FALSE ) ) {
XP_LOGFF( "dropping message because %s disabled",
@ -3009,6 +3085,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
#else
LOG_RETURNF( "%s (len: %d)", boolToStr(messageValid), state->len );
#endif
THREAD_CHECK_END();
return messageValid;
} /* comms_checkIncomingStream */
@ -3016,6 +3093,7 @@ void
comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
CommsMsgState* state, XP_Bool rejected )
{
THREAD_CHECK_START(comms);
#ifdef COMMS_CHECKSUM
XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec,
state->len, state->sum, state->msgID, boolToStr(rejected) );
@ -3049,6 +3127,7 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
#ifdef DEBUG
comms->processingMsg = XP_FALSE;
#endif
THREAD_CHECK_END();
}
XP_Bool
@ -3112,6 +3191,7 @@ comms_isConnected( const CommsCtxt* const comms )
void
comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created )
{
THREAD_CHECK_START(comms);
LOG_FUNC();
if ( 0 == (comms->flags & FLAG_HARVEST_DONE) ) {
CommsAddrRec addrs[4] = {{0}};
@ -3129,6 +3209,7 @@ comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created )
}
}
}
THREAD_CHECK_END();
}
#endif
@ -3147,12 +3228,14 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI
#if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK
static void
sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
{
// THREAD_CHECK_START(comms); -- doesn't work with const yet
MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */,
rec, rec? rec->channelNo : 0, NULL );
(void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
freeElem( MPPARM(comms->mpool) elem );
// THREAD_CHECK_END();
} /* sendEmptyMsg */
#endif
@ -3344,12 +3427,13 @@ static AddressRecord*
rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo,
XWHostID hostID, const CommsAddrRec* addr, XP_U16 flags )
{
AddressRecord* rec = NULL;
THREAD_CHECK_START( comms );
CNO_FMT( cbuf, channelNo );
XP_LOGFF( "(%s)", cbuf );
listRecs( comms, "entering rememberChannelAddress" );
logAddr( comms, xwe, addr, __func__ );
AddressRecord* rec = NULL;
rec = getRecordFor( comms, xwe, NULL, channelNo );
if ( !rec ) {
/* not found; add a new entry */
@ -3382,6 +3466,7 @@ rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo,
}
}
listRecs( comms, "leaving rememberChannelAddress" );
THREAD_CHECK_END();
return rec;
} /* rememberChannelAddress */
@ -3459,6 +3544,7 @@ static void
augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec,
const CommsAddrRec* addr, XWHostID hostID )
{
THREAD_CHECK_START( comms );
augmentAddrIntrnl( comms, &rec->addr, addr, XP_TRUE );
#ifdef XWFEATURE_RELAY
if ( addr_hasType( &rec->addr, COMMS_CONN_RELAY ) ) {
@ -3479,14 +3565,15 @@ augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec,
}
}
#endif
THREAD_CHECK_END();
}
static XP_Bool
augmentAddrIntrnl( CommsCtxt* comms, CommsAddrRec* destAddr,
const CommsAddrRec* srcAddr, XP_Bool isNewer )
{
ASSERT_ADDR_OK( srcAddr );
XP_Bool changed = XP_FALSE;
ASSERT_ADDR_OK( srcAddr );
const CommsAddrRec empty = {0};
if ( !!srcAddr ) {
CommsConnType typ;