use mutex in comms

I've long had logging to detect (and assert(0))when two methods access
comms at the same time on different threads. I'm getting that
assertion more often now due, I guess, to subtle changes introduced by
the kotlin port. I wanted to avoid a mutex in cross-platform code, but
I've been using them without problems in a couple of less critical
modules so let's just do the simple thing.
This commit is contained in:
Eric House 2024-07-13 18:25:15 -07:00
parent c5851207fa
commit 3ee909df7d

View file

@ -76,60 +76,6 @@
# define COMMS_LOGFFV(...)
#endif
#ifdef DEBUG
typedef struct StackData {
const char* func;
pthread_t prevThread;
struct StackData* prev;
int count;
} StackData;
static void
printStack( const StackData* data )
{
for ( ; !!data; data = data->prev ) {
XP_LOGFF( " proc[%d]: %s (thread: %lX)", data->count,
data->func, data->prevThread );
}
}
typedef struct _SD {
StackData* head;
pthread_t thread;
} _SD;
# define OBJ_DECL _SD _sd
# define THREAD_CHECK_START(OBJ) { \
_SD* _sdp = &(OBJ)->_sd; \
StackData _data = { .func = __func__, \
.prev = _sdp->head, \
.prevThread = _sdp->thread, \
.count = NULL == _data.prev ? 0 : _data.prev->count + 1, \
}; \
\
(OBJ)->_sd.head = &_data; \
pthread_t thread = pthread_self(); \
if ( 0 == (OBJ)->_sd.thread ) { \
(OBJ)->_sd.thread = thread; \
} else if ( thread != (OBJ)->_sd.thread ) { \
XP_LOGFF( "ERROR: from %s(); new thread: %lX; old thread: %lX", \
__func__, thread, (OBJ)->_sd.thread ); \
printStack( &_data ); \
XP_ASSERT(0); \
} \
# define THREAD_CHECK_END() \
_sdp->head = _data.prev; \
_sdp->thread = _data.prevThread; \
} \
#else
# define OBJ_DECL
# define THREAD_CHECK_START(OBJ)
# define THREAD_CHECK_END()
#endif
EXTERN_C_START
typedef struct MsgQueueElem {
@ -170,6 +116,11 @@ struct CommsCtxt {
XW_UtilCtxt* util;
XW_DUtilCtxt* dutil;
pthread_mutex_t mutex;
#ifdef DEBUG
pthread_t lockHolder;
#endif
XP_U32 connID; /* set from gameID: 0 means ignore; otherwise
must match. Set by server. */
XP_U16 streamVersion; /* negotiated by server */
@ -231,10 +182,64 @@ struct CommsCtxt {
XP_Bool processingMsg;
const XP_UCHAR* tag;
#endif
OBJ_DECL;
MPSLOT
};
static void
mutex_init(CommsCtxt* comms)
{
pthread_mutexattr_t attr;
int ret = pthread_mutexattr_init(&attr);
XP_ASSERT(0 == ret);
ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
XP_ASSERT(0 == ret);
pthread_mutex_init( &comms->mutex, &attr );
ret = pthread_mutexattr_destroy(&attr);
XP_ASSERT(0 == ret);
}
#define COMMS_MUTEX_LOCK_DEBUG(COMMS) { \
CommsCtxt* _comms = COMMS; \
time_t startTime = time(NULL); \
pthread_mutex_lock(&_comms->mutex); \
time_t gotItTime = time(NULL); \
time_t _elapsed = gotItTime-startTime; \
if ( 0 < _elapsed ) { \
XP_LOGFF("took %lds to get mutex", _elapsed); \
} \
pthread_t _oldHolder = comms->lockHolder; \
comms->lockHolder = pthread_self(); \
XP_ASSERT(0 == _oldHolder || _oldHolder == comms->lockHolder); \
if ( comms->lockHolder == _oldHolder ) { \
XP_LOGFF("recursive mutex"); \
}
#define COMMS_MUTEX_UNLOCK_DEBUG() \
time_t unlockTime = time(NULL); \
_elapsed = unlockTime-gotItTime; \
if ( 0 < _elapsed ) { \
XP_LOGFF("held mutex for %lds", _elapsed); \
} \
comms->lockHolder = _oldHolder; \
pthread_mutex_unlock(&_comms->mutex); \
} \
#define COMMS_MUTEX_LOCK_RELEASE(COMMS) { \
CommsCtxt* _comms = COMMS; \
pthread_mutex_lock(&_comms->mutex); \
#define COMMS_MUTEX_UNLOCK_RELEASE() \
pthread_mutex_unlock(&_comms->mutex); \
} \
#ifdef DEBUG
#define COMMS_MUTEX_LOCK COMMS_MUTEX_LOCK_DEBUG
#define COMMS_MUTEX_UNLOCK COMMS_MUTEX_UNLOCK_DEBUG
#else
#define COMMS_MUTEX_LOCK COMMS_MUTEX_LOCK_RELEASE
#define COMMS_MUTEX_UNLOCK COMMS_MUTEX_UNLOCK_RELEASE
#endif
#define FLAG_HARVEST_DONE 1
#define FLAG_QUASHED 2
@ -466,6 +471,7 @@ comms_make( MPFORMAL XWEnv xwe, XW_UtilCtxt* util, XP_Bool isServer,
XP_U16 forceChannel )
{
CommsCtxt* comms = (CommsCtxt*)XP_CALLOC( mpool, sizeof(*comms) );
mutex_init(comms);
comms->util = util;
comms->dutil = util_getDevUtilCtxt( util, xwe );
#ifdef DEBUG
@ -530,7 +536,7 @@ static void
forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure )
{
THREAD_CHECK_START(comms); /* firing */
COMMS_MUTEX_LOCK(comms);
for ( AddressRecord* recs = comms->recs; !!recs; recs = recs->next ) {
for ( MsgQueueElem** home = &recs->_msgQueueHead; !!*home; ) {
MsgQueueElem* elem = *home;
@ -553,7 +559,7 @@ forEachElem( CommsCtxt* comms, EachMsgProc proc, void* closure )
}
done:
assertQueueOk( comms );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
static ForEachAct
@ -624,7 +630,7 @@ set_reset_timer( CommsCtxt* comms, XWEnv xwe )
void
comms_destroy( CommsCtxt* comms, XWEnv xwe )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
/* did I call comms_stop()? */
XP_ASSERT( ! addr_hasType( &comms->selfAddr, COMMS_CONN_RELAY )
|| COMMS_RELAYSTATE_UNCONNECTED == comms->rr.relayState );
@ -634,14 +640,14 @@ comms_destroy( CommsCtxt* comms, XWEnv xwe )
util_clearTimer( comms->util, xwe, TIMER_COMMS );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
XP_FREE( comms->mpool, comms );
} /* comms_destroy */
void
comms_setConnID( CommsCtxt* comms, XP_U32 connID, XP_U16 streamVersion )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
XP_ASSERT( CONN_ID_NONE != connID );
XP_ASSERT( 0 == comms->connID || connID == comms->connID );
comms->connID = connID;
@ -650,7 +656,7 @@ comms_setConnID( CommsCtxt* comms, XP_U32 connID, XP_U16 streamVersion )
comms->streamVersion = streamVersion;
COMMS_LOGFF( "set connID (gameID) to %08X, streamVersion to 0x%X",
connID, streamVersion );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
} /* comms_setConnID */
static void
@ -1161,7 +1167,7 @@ elemToStream( MsgQueueElem* elem, void* closure )
void
comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
XP_U16 nAddrRecs;
AddressRecord* rec;
@ -1240,7 +1246,7 @@ comms_writeToStream( CommsCtxt* comms, XWStreamCtxt* stream, XP_U16 saveToken )
}
comms->lastSaveToken = saveToken;
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
} /* comms_writeToStream */
static void
@ -1254,7 +1260,7 @@ resetBackoff( CommsCtxt* comms )
void
comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
COMMS_LOGFF( "(saveToken=%d)", saveToken );
XP_ASSERT( !!comms );
if ( saveToken == comms->lastSaveToken ) {
@ -1268,7 +1274,7 @@ comms_saveSucceeded( CommsCtxt* comms, XWEnv xwe, XP_U16 saveToken )
comms_ackAny( comms, xwe ); /* might not want this for all transports */
#endif
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
void
@ -1296,7 +1302,7 @@ void
comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo,
const MQTTDevID* devID )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
#ifdef NO_ADD_MQTT_TO_ALL /* set for (usually) BT testing on Android */
COMMS_LOGFF("ifdef'd out");
XP_USE( comms );
@ -1331,7 +1337,7 @@ comms_addMQTTDevID( CommsCtxt* comms, XP_PlayerAddr channelNo,
XP_ASSERT(0);
}
#endif
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
void
@ -1423,7 +1429,7 @@ XP_U16
comms_countPendingPackets( RELCONST CommsCtxt* comms, XP_Bool* quashed )
{
NonAcks na = {0};
// THREAD_CHECK_START(comms); <-- this crashes
COMMS_MUTEX_LOCK(comms);
if ( !!quashed ) {
*quashed = QUASHED(comms);
}
@ -1431,7 +1437,7 @@ comms_countPendingPackets( RELCONST CommsCtxt* comms, XP_Bool* quashed )
forEachElem( (CommsCtxt*)comms, countNonAcks, &na );
// COMMS_LOGFF( "=> %d (queueLen = %d)", na.count, comms->queueLen );
// THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return na.count;
}
@ -1502,10 +1508,10 @@ comms_getConTypes( const CommsCtxt* comms )
void
comms_dropHostAddr( CommsCtxt* comms, CommsConnType typ )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
addr_rmType( &comms->selfAddr, typ );
ASSERT_ADDR_OK( &comms->selfAddr );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
XP_Bool
@ -1770,12 +1776,12 @@ getInvitedProc( MsgQueueElem* elem, void* closure )
void
comms_getInvited( RELCONST CommsCtxt* comms, XP_U16* nInvites )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
GetInvitedData gid = {0};
forEachElem( (CommsCtxt*)comms, getInvitedProc, &gid );
*nInvites = gid.count;
// LOG_RETURNF( "%d", *nInvites );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
#endif
@ -1785,7 +1791,7 @@ XP_S16
comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream )
{
XP_S16 result = -1;
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
if ( 0 == stream_getSize(stream) ) {
COMMS_LOGFF( "dropping 0-len message" );
} else {
@ -1812,7 +1818,7 @@ comms_send( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream )
}
}
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return result;
} /* comms_send */
@ -1834,7 +1840,7 @@ static MsgQueueElem*
addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem, XP_Bool notify )
{
MsgQueueElem* asAdded = newElem;
THREAD_CHECK_START( comms );
COMMS_MUTEX_LOCK( comms );
newElem->smp.next = NULL;
MsgQueueElem** head;
@ -1874,7 +1880,7 @@ addToQueue( CommsCtxt* comms, XWEnv xwe, MsgQueueElem* newElem, XP_Bool notify )
}
dropPacket:
XP_ASSERT( comms->queueLen <= 128 ); /* reasonable limit in testing */
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return asAdded;
} /* addToQueue */
@ -2009,7 +2015,7 @@ removeProc( MsgQueueElem* elem, void* closure )
static void
removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msgID )
{
THREAD_CHECK_START( comms );
COMMS_MUTEX_LOCK( comms );
assertQueueOk( comms );
CNO_FMT( cbuf, channelNo );
COMMS_LOGFFV( "(channelNo=%d): remove msgs <= " XP_LD " for %s (queueLen: %d)",
@ -2037,7 +2043,7 @@ removeFromQueue( CommsCtxt* comms, XWEnv xwe, XP_PlayerAddr channelNo, MsgID msg
assertQueueOk( comms );
printQueue( comms );
#endif
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
} /* removeFromQueue */
static XP_U32
@ -2318,7 +2324,7 @@ comms_resendAll( CommsCtxt* comms, XWEnv xwe, CommsConnType filter, XP_Bool forc
static void
ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
if ( CONN_ID_NONE == comms->connID ) {
COMMS_LOGFF( "doing nothing because connID still unset" );
} else {
@ -2343,7 +2349,7 @@ ackAnyImpl( CommsCtxt* comms, XWEnv xwe, XP_Bool force )
}
COMMS_LOGFF( "sent for %d channels (of %d)", nSent, nSeen );
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
void
@ -2916,7 +2922,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
{
AddressRecord* rec;
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
LOG_FUNC();
rec = getRecordFor( comms, channelNo );
@ -2943,7 +2949,7 @@ validateChannelMessage( CommsCtxt* comms, XWEnv xwe, const CommsAddrRec* addr,
}
LOG_RETURNF( XP_P, rec );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return rec;
} /* validateChannelMessage */
@ -2959,7 +2965,7 @@ static XP_Bool
getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff )
{
XP_Bool messageValid;
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
messageValid = stream_gotU16( stream, &stuff->channelNo );
if ( messageValid ) {
XP_U16 channelSeed = comms_getChannelSeed( comms );
@ -2983,7 +2989,7 @@ getCheckChannelSeed( CommsCtxt* comms, XWStreamCtxt* stream, HeaderStuff* stuff
}
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return messageValid;
}
@ -3009,7 +3015,7 @@ parseSmallHeader( CommsCtxt* comms, XWStreamCtxt* msgStream,
HeaderStuff* stuff )
{
XP_Bool messageValid = XP_FALSE;
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
XP_U16 headerLen = stuff->flags >> HEADER_LEN_OFFSET;
XP_ASSERT( 0 < headerLen );
XP_ASSERT( headerLen <= stream_getSize( msgStream ) );
@ -3033,7 +3039,7 @@ parseSmallHeader( CommsCtxt* comms, XWStreamCtxt* msgStream,
}
LOG_RETURNF( "%s", boolToStr(messageValid) );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return messageValid;
}
@ -3042,7 +3048,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
const CommsAddrRec* retAddr, CommsMsgState* state )
{
XP_Bool messageValid = XP_FALSE;
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
XP_ASSERT( !!retAddr ); /* for now */
XP_MEMSET( state, 0, sizeof(*state) );
#ifdef DEBUG
@ -3152,7 +3158,7 @@ comms_checkIncomingStream( CommsCtxt* comms, XWEnv xwe, XWStreamCtxt* stream,
}
LOG_RETURNF( "%s (len: %d; sum: %s)", boolToStr(messageValid), state->len, state->sum );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return messageValid;
} /* comms_checkIncomingStream */
@ -3160,7 +3166,7 @@ void
comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
CommsMsgState* state, XP_Bool rejected )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
assertQueueOk( comms );
COMMS_LOGFF( "rec: %p; len: %d; sum: %s; id: %d; rejected: %s", state->rec,
@ -3195,7 +3201,7 @@ comms_msgProcessed( CommsCtxt* comms, XWEnv xwe,
#ifdef DEBUG
comms->processingMsg = XP_FALSE;
#endif
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
XP_Bool
@ -3277,7 +3283,7 @@ comms_setQuashed( CommsCtxt* comms, XWEnv xwe, XP_Bool quashed )
void
comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
if ( 0 == (comms->flags & FLAG_HARVEST_DONE) ) {
CommsAddrRec addrs[4] = {{0}};
XP_U16 nRecs = VSIZE(addrs);
@ -3294,7 +3300,7 @@ comms_gatherPlayers( CommsCtxt* comms, XWEnv xwe, XP_U32 created )
/* } */
}
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
LOG_RETURN_VOID();
}
#endif
@ -3316,7 +3322,7 @@ comms_gameJoined( CommsCtxt* comms, XWEnv xwe, const XP_UCHAR* connname, XWHostI
static void
sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
MsgQueueElem* elem = makeElemWithID( comms, xwe, 0 /* msgID */,
rec, rec? rec->channelNo : 0, NULL );
XP_ASSERT( !!elem );
@ -3324,7 +3330,7 @@ sendEmptyMsg( CommsCtxt* comms, XWEnv xwe, AddressRecord* rec )
if ( !!elem ) {
sendMsg( comms, xwe, elem, COMMS_CONN_NONE );
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
} /* sendEmptyMsg */
#endif
@ -3410,7 +3416,7 @@ statsProc( MsgQueueElem* elem, void* closure )
void
comms_getStats( RELCONST CommsCtxt* comms, XWStreamCtxt* stream )
{
THREAD_CHECK_START(comms);
COMMS_MUTEX_LOCK(comms);
XP_UCHAR buf[100];
XP_SNPRINTF( (XP_UCHAR*)buf, sizeof(buf),
@ -3437,7 +3443,7 @@ comms_getStats( RELCONST CommsCtxt* comms, XWStreamCtxt* stream )
rec->lastMsgRcd );
stream_catString( stream, buf );
}
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
} /* comms_getStats */
void
@ -3464,7 +3470,7 @@ rememberChannelAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
XWHostID hostID, const CommsAddrRec* addr, XP_U16 flags )
{
AddressRecord* rec = NULL;
THREAD_CHECK_START( comms );
COMMS_MUTEX_LOCK( comms );
CNO_FMT( cbuf, channelNo );
COMMS_LOGFF( "(%s)", cbuf );
listRecs( comms, "entering rememberChannelAddress" );
@ -3502,7 +3508,7 @@ rememberChannelAddress( CommsCtxt* comms, XP_PlayerAddr channelNo,
}
}
listRecs( comms, "leaving rememberChannelAddress()" );
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
return rec;
} /* rememberChannelAddress */
@ -3590,7 +3596,7 @@ static void
augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec,
const CommsAddrRec* addr, XWHostID hostID )
{
THREAD_CHECK_START( comms );
COMMS_MUTEX_LOCK( comms );
augmentAddrIntrnl( comms, &rec->addr, addr, XP_TRUE );
#ifdef XWFEATURE_RELAY
if ( addr_hasType( &rec->addr, COMMS_CONN_RELAY ) ) {
@ -3611,7 +3617,7 @@ augmentChannelAddr( CommsCtxt* comms, AddressRecord* const rec,
}
}
#endif
THREAD_CHECK_END();
COMMS_MUTEX_UNLOCK();
}
static XP_Bool