diff --git a/xwords4/common/comms.c b/xwords4/common/comms.c index a4172b1dd..b92f4f8c9 100644 --- a/xwords4/common/comms.c +++ b/xwords4/common/comms.c @@ -59,6 +59,57 @@ # define INITIAL_CLIENT_VERS 2 #endif +#ifdef DEBUG +typedef struct StackData { + const char* func; + pthread_t prevThread; + struct StackData* prev; + int count; +} StackData; + +static void +printStack( const StackData* data ) +{ + for ( ; !!data; data = data->prev ) { + XP_LOGFF( " proc[%d]: %s", data->count, data->func ); + } +} + +typedef struct _SD { + StackData* head; + pthread_t thread; +} _SD; + +# define OBJ_DECL _SD _sd + +# define THREAD_CHECK_START(OBJ) { \ + _SD* _sdp = &(OBJ)->_sd; \ + StackData _data = { .func = __func__, \ + .prev = _sdp->head, \ + .prevThread = _sdp->thread, \ + }; \ + _data.count = NULL == _data.prev ? 0 : _data.prev->count + 1; \ + \ + (OBJ)->_sd.head = &_data; \ + pthread_t thread = pthread_self(); \ + if ( 0 == (OBJ)->_sd.thread ) { \ + (OBJ)->_sd.thread = thread; \ + } else if ( thread != (OBJ)->_sd.thread ) { \ + printStack( &_data ); \ + XP_ASSERT(0); \ + } \ + +# define THREAD_CHECK_END() \ + _sdp->head = _data.prev; \ + _sdp->thread = _data.prevThread; \ + } \ + +#else +# define OBJ_DECL +# define THREAD_CHECK_START(OBJ) +# define THREAD_CHECK_END() +#endif + #ifdef COMMS_HEARTBEAT /* It might make sense for this to be a parameter or somehow tied to the platform and transport. But in that case it'd have to be passed across @@ -185,7 +236,7 @@ struct CommsCtxt { XP_Bool processingMsg; const XP_UCHAR* tag; #endif - + OBJ_DECL; MPSLOT }; @@ -218,7 +269,7 @@ static XP_Bool channelToAddress( const CommsCtxt* comms, XWEnv xwe, static AddressRecord* getRecordFor( const CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, XP_PlayerAddr channelNo ); static void augmentSelfAddr( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr ); -static XP_S16 sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, +static XP_S16 sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, CommsConnType filter ); static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem ); static XP_Bool elems_same( const MsgQueueElem* e1, const MsgQueueElem* e2 ) ; @@ -291,7 +342,7 @@ static XP_S16 send_via_bt_or_ip( CommsCtxt* comms, XWEnv xwe, BTIPMsgType msgTyp #endif #if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK -static void sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ); +static void sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ); #endif static inline XP_Bool IS_INVITE(const MsgQueueElem* elem) { @@ -613,10 +664,12 @@ comms_destroy( CommsCtxt* comms, XWEnv xwe ) void comms_setConnID( CommsCtxt* comms, XP_U32 connID ) { + THREAD_CHECK_START(comms); XP_ASSERT( CONN_ID_NONE != connID ); XP_ASSERT( 0 == comms->connID || connID == comms->connID ); comms->connID = connID; XP_LOGFF( "set connID (gameID) to %x", connID ); + THREAD_CHECK_END(); } /* comms_setConnID */ static void @@ -1177,6 +1230,7 @@ resetBackoff( CommsCtxt* comms ) void comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken ) { + THREAD_CHECK_START(comms); XP_LOGFF( "(saveToken=%d)", saveToken ); XP_ASSERT( !!comms ); if ( saveToken == comms->lastSaveToken ) { @@ -1190,6 +1244,7 @@ comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken ) comms_ackAny( comms, xwe ); /* might not want this for all transports */ #endif } + THREAD_CHECK_END(); } void @@ -1247,6 +1302,7 @@ void comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo, const MQTTDevID* devID ) { + THREAD_CHECK_START(comms); #ifdef NO_ADD_MQTT_TO_ALL /* set for (usually) BT testing on Android */ XP_LOGFF("ifdef'd out"); XP_USE( comms ); @@ -1279,6 +1335,7 @@ comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo, XP_ASSERT(0); } #endif + THREAD_CHECK_END(); } void @@ -1370,8 +1427,10 @@ comms_getConTypes( const CommsCtxt* comms ) void comms_dropHostAddr( CommsCtxt* comms, CommsConnType typ ) { + THREAD_CHECK_START(comms); addr_rmType( &comms->selfAddr, typ ); ASSERT_ADDR_OK( &comms->selfAddr ); + THREAD_CHECK_END(); } XP_Bool @@ -1394,7 +1453,7 @@ makeNewElem( const CommsCtxt* comms, XWEnv xwe, MsgID msgID, } static MsgQueueElem* -makeElemWithID( CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec, +makeElemWithID( const CommsCtxt* comms, XWEnv xwe, MsgID msgID, AddressRecord* rec, XP_PlayerAddr channelNo, XWStreamCtxt* stream ) { CNO_FMT( cbuf, channelNo ); @@ -1640,6 +1699,7 @@ XP_S16 comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream ) { XP_S16 result = -1; + THREAD_CHECK_START(comms); if ( 0 == stream_getSize(stream) ) { XP_LOGFF( "dropping 0-len message" ); } else { @@ -1664,6 +1724,7 @@ comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream ) result = sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); } } + THREAD_CHECK_END(); return result; } /* comms_send */ @@ -1684,6 +1745,7 @@ static MsgQueueElem* addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem ) { MsgQueueElem* asAdded = newElem; + THREAD_CHECK_START( comms ); newElem->next = (MsgQueueElem*)NULL; if ( !comms->msgQueueHead ) { comms->msgQueueHead = comms->msgQueueTail = newElem; @@ -1707,6 +1769,7 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem ) notifyQueueChanged( comms, xwe ); } XP_ASSERT( comms->queueLen <= 128 ); /* reasonable limit in testing */ + THREAD_CHECK_END(); return asAdded; } /* addToQueue */ @@ -1814,6 +1877,7 @@ freeElem( MPFORMAL MsgQueueElem* elem ) static void removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID ) { + THREAD_CHECK_START( comms ); CNO_FMT( cbuf, channelNo ); XP_LOGFF( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)", channelNo, msgID, cbuf, comms->queueLen ); @@ -1866,6 +1930,7 @@ removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msg assertQueueOk( comms ); printQueue( comms ); #endif + THREAD_CHECK_END(); } /* removeFromQueue */ static XP_U32 @@ -1889,7 +1954,8 @@ gameID( const CommsCtxt* comms ) } static XP_S16 -sendMsg( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, const CommsConnType filter ) +sendMsg( const CommsCtxt* comms, XWEnv xwe, MsgQueueElem* elem, + const CommsConnType filter ) { XP_S16 result = -1; XP_PlayerAddr channelNo = elem->channelNo; @@ -2091,6 +2157,7 @@ comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool forc static void ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force ) { + THREAD_CHECK_START(comms); if ( CONN_ID_NONE == comms->connID ) { XP_LOGFF( "doing nothing because connID still unset" ); } else { @@ -2115,7 +2182,8 @@ ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force ) } } XP_LOGFF( "sent for %d channels (of %d)", nSent, nSent ); - } + } + THREAD_CHECK_END(); } void @@ -2771,6 +2839,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, { AddressRecord* rec; + THREAD_CHECK_START(comms); LOG_FUNC(); rec = getRecordFor( comms, xwe, NULL, channelNo ); @@ -2796,6 +2865,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr, } LOG_RETURNF( XP_P, rec ); + THREAD_CHECK_END(); return rec; } /* validateChannelMessage */ @@ -2810,7 +2880,9 @@ typedef struct _HeaderStuff { static XP_Bool getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff ) { - XP_Bool messageValid = stream_gotU16( stream, &stuff->channelNo ); + XP_Bool messageValid; + THREAD_CHECK_START(comms); + messageValid = stream_gotU16( stream, &stuff->channelNo ); if ( messageValid ) { XP_U16 channelSeed = comms_getChannelSeed( comms ); XP_U16 flags = stuff->flags; @@ -2833,6 +2905,7 @@ getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff } } LOG_RETURNF( "%s", boolToStr(messageValid) ); + THREAD_CHECK_END(); return messageValid; } @@ -2858,6 +2931,7 @@ parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream, HeaderStuff* stuff ) { XP_Bool messageValid = XP_FALSE; + THREAD_CHECK_START(comms); XP_U16 headerLen = stuff->flags >> HEADER_LEN_OFFSET; XP_ASSERT( 0 < headerLen ); XP_ASSERT( headerLen <= stream_getSize( msgStream ) ); @@ -2881,6 +2955,7 @@ parseSmallHeader( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* msgStream, } LOG_RETURNF( "%s", boolToStr(messageValid) ); + THREAD_CHECK_END(); return messageValid; } @@ -2888,6 +2963,8 @@ XP_Bool comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, const CommsAddrRec* retAddr, CommsMsgState* state ) { + XP_Bool messageValid = XP_FALSE; + THREAD_CHECK_START(comms); XP_ASSERT( !!retAddr ); /* for now */ XP_MEMSET( state, 0, sizeof(*state) ); #ifdef DEBUG @@ -2901,7 +2978,6 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, CommsConnType addrType = addr_getType( retAddr ); #endif - XP_Bool messageValid = XP_FALSE; XP_LOGFF( TAGFMT(retAddr.typ=%s), TAGPRMS, ConnType2Str(addrType ) ); if ( comms_getAddrDisabled( comms, addrType, XP_FALSE ) ) { XP_LOGFF( "dropping message because %s disabled", @@ -3009,6 +3085,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream, #else LOG_RETURNF( "%s (len: %d)", boolToStr(messageValid), state->len ); #endif + THREAD_CHECK_END(); return messageValid; } /* comms_checkIncomingStream */ @@ -3016,6 +3093,7 @@ void comms_msgProcessed( CommsCtxt* comms, XWEnv xwe, CommsMsgState* state, XP_Bool rejected ) { + THREAD_CHECK_START(comms); #ifdef COMMS_CHECKSUM XP_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec, state->len, state->sum, state->msgID, boolToStr(rejected) ); @@ -3049,6 +3127,7 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe, #ifdef DEBUG comms->processingMsg = XP_FALSE; #endif + THREAD_CHECK_END(); } XP_Bool @@ -3112,6 +3191,7 @@ comms_isConnected( const CommsCtxt* const comms ) void comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created ) { + THREAD_CHECK_START(comms); LOG_FUNC(); if ( 0 == (comms->flags & FLAG_HARVEST_DONE) ) { CommsAddrRec addrs[4] = {{0}}; @@ -3129,6 +3209,7 @@ comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created ) } } } + THREAD_CHECK_END(); } #endif @@ -3147,12 +3228,14 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI #if defined COMMS_HEARTBEAT || defined XWFEATURE_COMMSACK static void -sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ) +sendEmptyMsg( const CommsCtxt* comms, XWEnv xwe, AddressRecord* rec ) { + // THREAD_CHECK_START(comms); -- doesn't work with const yet MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */, rec, rec? rec->channelNo : 0, NULL ); (void)sendMsg( comms, xwe, elem, COMMS_CONN_NONE ); freeElem( MPPARM(comms->mpool) elem ); + // THREAD_CHECK_END(); } /* sendEmptyMsg */ #endif @@ -3344,12 +3427,13 @@ static AddressRecord* rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, XWHostID hostID, const CommsAddrRec* addr, XP_U16 flags ) { + AddressRecord* rec = NULL; + THREAD_CHECK_START( comms ); CNO_FMT( cbuf, channelNo ); XP_LOGFF( "(%s)", cbuf ); listRecs( comms, "entering rememberChannelAddress" ); logAddr( comms, xwe, addr, __func__ ); - AddressRecord* rec = NULL; rec = getRecordFor( comms, xwe, NULL, channelNo ); if ( !rec ) { /* not found; add a new entry */ @@ -3382,6 +3466,7 @@ rememberChannelAddress( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, } } listRecs( comms, "leaving rememberChannelAddress" ); + THREAD_CHECK_END(); return rec; } /* rememberChannelAddress */ @@ -3459,6 +3544,7 @@ static void augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec, const CommsAddrRec* addr, XWHostID hostID ) { + THREAD_CHECK_START( comms ); augmentAddrIntrnl( comms, &rec->addr, addr, XP_TRUE ); #ifdef XWFEATURE_RELAY if ( addr_hasType( &rec->addr, COMMS_CONN_RELAY ) ) { @@ -3479,14 +3565,15 @@ augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec, } } #endif + THREAD_CHECK_END(); } static XP_Bool augmentAddrIntrnl( CommsCtxt* comms, CommsAddrRec* destAddr, const CommsAddrRec* srcAddr, XP_Bool isNewer ) { - ASSERT_ADDR_OK( srcAddr ); XP_Bool changed = XP_FALSE; + ASSERT_ADDR_OK( srcAddr ); const CommsAddrRec empty = {0}; if ( !!srcAddr ) { CommsConnType typ;