diff --git a/xwords4/relay/Makefile b/xwords4/relay/Makefile index af2bbae6f..06fc29fa8 100644 --- a/xwords4/relay/Makefile +++ b/xwords4/relay/Makefile @@ -42,7 +42,7 @@ SRC = \ # STATIC ?= -static GITINFO = gitversion.txt -HASH=$(shell git describe) +HASH=$(shell git rev-parse --verify HEAD) OBJ = $(patsubst %.cpp,obj/%.o,$(SRC)) #LDFLAGS += -pthread -g -lmcheck $(STATIC) @@ -70,7 +70,7 @@ endif memdebug all: xwrelay rq -REQUIRED_DEBS = libpq-dev \ +REQUIRED_DEBS = libpq-dev g++ libglib2.0-dev postgresql \ .PHONY: debcheck debs_install diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 86c8c8e8f..07dfa354e 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -875,13 +875,13 @@ putNetShort( uint8_t** bufpp, unsigned short s ) *bufpp += sizeof(s); } -void +int CookieRef::store_message( HostID dest, const uint8_t* buf, unsigned int len ) { logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__, len, dest ); - DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len ); + return DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len ); } void @@ -1044,6 +1044,7 @@ CookieRef::postCheckAllHere() void CookieRef::postDropDevice( HostID hostID ) { + logf( XW_LOGINFO, "%s(hostID=%d)", __func__, hostID ); CRefEvent evt( XWE_ACKTIMEOUT ); evt.u.ack.srcID = hostID; m_eventQueue.push_back( evt ); @@ -1192,21 +1193,16 @@ CookieRef::sendAnyStored( const CRefEvent* evt ) } typedef struct _StoreData { - string connName; - HostID dest; - uint8_t* buf; - int buflen; + int msgID; } StoreData; void CookieRef::storeNoAck( bool acked, uint32_t packetID, void* data ) { StoreData* sdata = (StoreData*)data; - if ( !acked ) { - DBMgr::Get()->StoreMessage( sdata->connName.c_str(), sdata->dest, - sdata->buf, sdata->buflen ); + if ( acked ) { + DBMgr::Get()->RemoveStoredMessages( &sdata->msgID, 1 ); } - free( sdata->buf ); delete sdata; } @@ -1237,17 +1233,13 @@ CookieRef::forward_or_store( const CRefEvent* evt ) } uint32_t packetID = 0; + int msgID = store_message( dest, buf, buflen ); if ( (NULL == destAddr) || !send_with_length( destAddr, dest, buf, buflen, true, &packetID ) ) { - store_message( dest, buf, buflen ); - } else if ( 0 != packetID ) { // sent via UDP + } else if ( 0 != msgID && 0 != packetID ) { // sent via UDP StoreData* data = new StoreData; - data->connName = m_connName; - data->dest = dest; - data->buf = (uint8_t*)malloc( buflen ); - memcpy( data->buf, buf, buflen ); - data->buflen = buflen; + data->msgID = msgID; UDPAckTrack::setOnAck( storeNoAck, packetID, data ); } @@ -1376,20 +1368,16 @@ CookieRef::sendAllHere( bool initial ) through the vector each time. */ HostID dest; for ( dest = 1; dest <= m_nPlayersSought; ++dest ) { - bool sent = false; *idLoc = dest; /* write in this target's hostId */ { RWReadLock rrl( &m_socketsRWLock ); HostRec* hr = m_sockets[dest-1]; if ( !!hr ) { - sent = send_with_length( &hr->m_addr, dest, buf, - bufp-buf, true ); + (void)send_with_length( &hr->m_addr, dest, buf, bufp-buf, true ); } } - if ( !sent ) { - store_message( dest, buf, bufp-buf ); - } + (void)store_message( dest, buf, bufp-buf ); } } /* sendAllHere */ diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 508a4485c..61a677976 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -275,8 +275,7 @@ class CookieRef { bool notInUse(void) { return m_cid == 0; } - void store_message( HostID dest, const uint8_t* buf, - unsigned int len ); + int store_message( HostID dest, const uint8_t* buf, unsigned int len ); void send_stored_messages( HostID dest, const AddrInfo* addr ); void printSeeds( const char* caller ); diff --git a/xwords4/relay/crefmgr.cpp b/xwords4/relay/crefmgr.cpp index c08a0d2c9..882f0eb27 100644 --- a/xwords4/relay/crefmgr.cpp +++ b/xwords4/relay/crefmgr.cpp @@ -337,7 +337,7 @@ CRefMgr::getMakeCookieRef( const char* connName, const char* cookie, } /* getMakeCookieRef */ CidInfo* -CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead ) +CRefMgr::getMakeCookieRef( const char* const connName, HostID hid, bool* isDead ) { CookieRef* cref = NULL; CidInfo* cinfo = NULL; @@ -347,7 +347,7 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead ) int nAlreadyHere = 0; for ( ; ; ) { /* for: see comment above */ - CookieID cid = m_db->FindGame( connName, curCookie, sizeof(curCookie), + CookieID cid = m_db->FindGame( connName, hid, curCookie, sizeof(curCookie), &curLangCode, &nPlayersT, &nAlreadyHere, isDead ); if ( 0 != cid ) { /* already open */ @@ -375,6 +375,48 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead ) return cinfo; } +CidInfo* +CRefMgr::getMakeCookieRef( const AddrInfo::ClientToken clientToken, HostID srcID ) +{ + CookieRef* cref = NULL; + CidInfo* cinfo = NULL; + char curCookie[MAX_INVITE_LEN+1]; + int curLangCode; + int nPlayersT = 0; + int nAlreadyHere = 0; + + for ( ; ; ) { /* for: see comment above */ + char connName[MAX_CONNNAME_LEN+1] = {0}; + CookieID cid = m_db->FindGame( clientToken, srcID, + connName, sizeof(connName), + curCookie, sizeof(curCookie), + &curLangCode, &nPlayersT, &nAlreadyHere ); + // &seed ); + if ( 0 != cid ) { /* already open */ + cinfo = m_cidlock->Claim( cid ); + if ( NULL == cinfo->GetRef() ) { + m_cidlock->Relinquish( cinfo, true ); + continue; + } + } else if ( nPlayersT == 0 ) { /* wasn't in the DB */ + /* do nothing; insufficient info to fake it */ + } else { + cinfo = m_cidlock->Claim(); + if ( !m_db->AddCID( connName, cinfo->GetCid() ) ) { + m_cidlock->Relinquish( cinfo, true ); + continue; + } + logf( XW_LOGINFO, "%s(): added cid???", __func__ ); + cref = AddNew( curCookie, connName, cinfo->GetCid(), curLangCode, + nPlayersT, nAlreadyHere ); + cinfo->SetRef( cref ); + } + break; + } + logf( XW_LOGINFO, "%s() => %p", __func__, cinfo ); + return cinfo; +} + void CRefMgr::RemoveSocketRefs( const AddrInfo* addr ) { @@ -672,13 +714,13 @@ SafeCref::SafeCref( const char* connName, const char* cookie, HostID hid, } /* ConnName case -- must exist (unless DB record's been removed */ -SafeCref::SafeCref( const char* const connName ) +SafeCref::SafeCref( const char* const connName, HostID hid ) : m_cinfo( NULL ) , m_mgr( CRefMgr::Get() ) , m_isValid( false ) { bool isDead = false; - CidInfo* cinfo = m_mgr->getMakeCookieRef( connName, &isDead ); + CidInfo* cinfo = m_mgr->getMakeCookieRef( connName, hid, &isDead ); if ( NULL != cinfo && NULL != cinfo->GetRef() ) { assert( cinfo->GetCid() == cinfo->GetRef()->GetCid() ); m_locked = cinfo->GetRef()->Lock(); @@ -722,6 +764,19 @@ SafeCref::SafeCref( const AddrInfo* addr ) } } +SafeCref::SafeCref( const AddrInfo::ClientToken clientToken, HostID srcID ) + : m_cinfo( NULL ) + , m_mgr( CRefMgr::Get() ) + , m_isValid( false ) +{ + CidInfo* cinfo = m_mgr->getMakeCookieRef( clientToken, srcID ); + if ( NULL != cinfo && NULL != cinfo->GetRef() ) { + m_locked = cinfo->GetRef()->Lock(); + m_cinfo = cinfo; + m_isValid = true; + } +} + SafeCref::~SafeCref() { if ( m_cinfo != NULL ) { diff --git a/xwords4/relay/crefmgr.h b/xwords4/relay/crefmgr.h index 315bce9c8..7694759dd 100644 --- a/xwords4/relay/crefmgr.h +++ b/xwords4/relay/crefmgr.h @@ -128,7 +128,8 @@ class CRefMgr { int nPlayersS, int seed, int langCode, bool isPublic, bool* isDead ); - CidInfo* getMakeCookieRef( const char* const connName, bool* isDead ); + CidInfo* getMakeCookieRef( const char* const connName, HostID hid, bool* isDead ); + CidInfo* getMakeCookieRef( const AddrInfo::ClientToken clientToken, HostID srcID ); CidInfo* getCookieRef( CookieID cid, bool failOk = false ); CidInfo* getCookieRef( const AddrInfo* addr ); @@ -179,9 +180,10 @@ class SafeCref { const AddrInfo* addr, int clientVersion, DevID* devID, int nPlayersH, int nPlayersS, unsigned short gameSeed, int clientIndx, int langCode, bool wantsPublic, bool makePublic ); - SafeCref( const char* const connName ); + SafeCref( const char* const connName, HostID hid ); SafeCref( CookieID cid, bool failOk = false ); SafeCref( const AddrInfo* addr ); + SafeCref( const AddrInfo::ClientToken clientToken, HostID srcID ); /* SafeCref( CookieRef* cref ); */ ~SafeCref(); diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index 95625a9b6..99492a87a 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -70,20 +70,6 @@ DBMgr::DBMgr() pthread_mutex_init( &m_haveNoMessagesMutex, NULL ); - /* Now figure out what the largest cid currently is. There must be a way - to get postgres to do this for me.... */ - /* const char* query = "SELECT cid FROM games ORDER BY cid DESC LIMIT 1"; */ - /* PGresult* result = PQexec( m_pgconn, query ); */ - /* if ( 0 == PQntuples( result ) ) { */ - /* m_nextCID = 1; */ - /* } else { */ - /* char* value = PQgetvalue( result, 0, 0 ); */ - /* m_nextCID = 1 + atoi( value ); */ - /* } */ - /* PQclear(result); */ - /* logf( XW_LOGINFO, "%s: m_nextCID=%d", __func__, m_nextCID ); */ - - // I've seen rand returning the same series several times.... srand( time( NULL ) ); } @@ -107,7 +93,7 @@ DBMgr::AddNew( const char* cookie, const char* connName, CookieID cid, qb.appendQueryf( "INSERT INTO " GAMES_TABLE " (cid, room, connName, nTotal, lang, pub)" " VALUES( $$, $$, $$, $$, $$, $$ )" ) - .appendParam(cid) + .appendParam(cid) .appendParam(cookie) .appendParam(connName) .appendParam(nPlayersT) @@ -136,7 +122,7 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen, { bool found = false; - const char* fmt = "SELECT cid, room, lang, nPerDevice, dead FROM " + const char* fmt = "SELECT cid, room, lang, dead FROM " GAMES_TABLE " WHERE connName = '%s' AND nTotal = %d " "AND %d = seeds[%d] AND 'A' = ack[%d] " ; @@ -148,10 +134,11 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen, assert( 1 >= PQntuples( result ) ); found = 1 == PQntuples( result ); if ( found ) { - *cidp = atoi( PQgetvalue( result, 0, 0 ) ); - snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); - *langP = atoi( PQgetvalue( result, 0, 2 ) ); - *isDead = 't' == PQgetvalue( result, 0, 4 )[0]; + int col = 0; + *cidp = atoi( PQgetvalue( result, 0, col++ ) ); + snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) ); + *langP = atoi( PQgetvalue( result, 0, col++ ) ); + *isDead = 't' == PQgetvalue( result, 0, col++ )[0]; } PQclear( result ); @@ -160,28 +147,29 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen, } /* FindGameFor */ CookieID -DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen, +DBMgr::FindGame( const char* connName, HostID hid, char* roomBuf, int roomBufLen, int* langP, int* nPlayersTP, int* nPlayersHP, bool* isDead ) { CookieID cid = 0; - const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice, dead FROM " + const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice[%d], dead FROM " GAMES_TABLE " WHERE connName = '%s'" // " LIMIT 1" ; StrWPF query; - query.catf( fmt, connName ); + query.catf( fmt, hid, connName ); logf( XW_LOGINFO, "query: %s", query.c_str() ); PGresult* result = PQexec( getThreadConn(), query.c_str() ); assert( 1 >= PQntuples( result ) ); if ( 1 == PQntuples( result ) ) { - cid = atoi( PQgetvalue( result, 0, 0 ) ); - snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); - *langP = atoi( PQgetvalue( result, 0, 2 ) ); - *nPlayersTP = atoi( PQgetvalue( result, 0, 3 ) ); - *nPlayersHP = atoi( PQgetvalue( result, 0, 4 ) ); - *isDead = 't' == PQgetvalue( result, 0, 5 )[0]; + int col = 0; + cid = atoi( PQgetvalue( result, 0, col++ ) ); + snprintf( roomBuf, roomBufLen, "%s", PQgetvalue( result, 0, col++ ) ); + *langP = atoi( PQgetvalue( result, 0, col++ ) ); + *nPlayersTP = atoi( PQgetvalue( result, 0, col++ ) ); + *nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) ); + *isDead = 't' == PQgetvalue( result, 0, col++ )[0]; } PQclear( result ); @@ -189,6 +177,40 @@ DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen, return cid; } /* FindGame */ +CookieID +DBMgr::FindGame( const AddrInfo::ClientToken clientToken, HostID hid, + char* connNameBuf, int connNameBufLen, + char* roomBuf, int roomBufLen, + int* langP, int* nPlayersTP, int* nPlayersHP ) +{ + CookieID cid = 0; + const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice[%d], connname FROM " + GAMES_TABLE " WHERE tokens[%d] = %d and NOT dead"; + // " LIMIT 1" + ; + StrWPF query; + query.catf( fmt, hid, hid, clientToken ); + logf( XW_LOGINFO, "query: %s", query.c_str() ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + if ( 1 == PQntuples( result ) ) { + int col = 0; + cid = atoi( PQgetvalue( result, 0, col++ ) ); + // room + snprintf( roomBuf, roomBufLen, "%s", PQgetvalue( result, 0, col++ ) ); + // lang + *langP = atoi( PQgetvalue( result, 0, col++ ) ); + *nPlayersTP = atoi( PQgetvalue( result, 0, col++ ) ); + *nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) ); + snprintf( connNameBuf, connNameBufLen, "%s", PQgetvalue( result, 0, col++ ) ); + } + PQclear( result ); + + logf( XW_LOGINFO, "%s(ct=%d,hid=%d) => %d (connname=%s)", __func__, clientToken, + hid, cid, connNameBuf ); + return cid; +} + bool DBMgr::FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken token, string& connName, HostID* hidp, unsigned short* seed ) @@ -294,11 +316,13 @@ DBMgr::SeenSeed( const char* cookie, unsigned short seed, NULL, NULL, 0 ); bool found = 1 == PQntuples( result ); if ( found ) { - *cid = atoi( PQgetvalue( result, 0, 0 ) ); - *nPlayersHP = here_less_seed( PQgetvalue( result, 0, 2 ), - atoi( PQgetvalue( result, 0, 3 ) ), - seed ); - snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); + int col = 0; + *cid = atoi( PQgetvalue( result, 0, col++ ) ); + snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) ); + + const char* seeds = PQgetvalue( result, 0, col++ ); + int perDeviceSum = atoi( PQgetvalue( result, 0, col++ ) ); + *nPlayersHP = here_less_seed( seeds, perDeviceSum, seed ); } PQclear( result ); logf( XW_LOGINFO, "%s(%4X)=>%s", __func__, seed, found?"true":"false" ); @@ -333,9 +357,10 @@ DBMgr::FindOpen( const char* cookie, int lang, int nPlayersT, int nPlayersH, NULL, NULL, 0 ); CookieID cid = 0; if ( 1 == PQntuples( result ) ) { - cid = atoi( PQgetvalue( result, 0, 0 ) ); - snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); - *nPlayersHP = atoi( PQgetvalue( result, 0, 2 ) ); + int col = 0; + cid = atoi( PQgetvalue( result, 0, col++ ) ); + snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) ); + *nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) ); /* cid may be 0, but should use game anyway */ } PQclear( result ); @@ -699,9 +724,11 @@ DBMgr::RecordSent( const int* msgIDs, int nMsgIDs ) if ( PGRES_TUPLES_OK == PQresultStatus( result ) ) { int ntuples = PQntuples( result ); for ( int ii = 0; ii < ntuples; ++ii ) { - RecordSent( PQgetvalue( result, ii, 0 ), - atoi( PQgetvalue( result, ii, 1 ) ), - atoi( PQgetvalue( result, ii, 2 ) ) ); + int col = 0; + const char* const connName = PQgetvalue( result, ii, col++ ); + HostID hid = atoi( PQgetvalue( result, ii, col++ ) ); + int nBytes = atoi( PQgetvalue( result, ii, col++ ) ); + RecordSent( connName, hid, nBytes ); } } PQclear( result ); @@ -1014,15 +1041,16 @@ DBMgr::CountStoredMessages( DevIDRelay relayID ) return getCountWhere( MSGS_TABLE, test ); } -void +int DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf, int len ) { + int msgID = 0; clearHasNoMessages( destDevID ); size_t newLen; const char* fmt = "INSERT INTO " MSGS_TABLE " " - "(devid, %s, msglen) VALUES(%d, %s'%s', %d)"; + "(devid, %s, msglen) VALUES(%d, %s'%s', %d) RETURNING id"; StrWPF query; if ( m_useB64 ) { @@ -1038,13 +1066,20 @@ DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf, } logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - execSql( query ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + if ( 1 == PQntuples( result ) ) { + msgID = atoi( PQgetvalue( result, 0, 0 ) ); + } + PQclear( result ); + return msgID; } -void +int DBMgr::StoreMessage( const char* const connName, int destHid, const uint8_t* buf, int len ) { + int msgID = 0; clearHasNoMessages( connName, destHid ); DevIDRelay devID = getDevID( connName, destHid ); @@ -1074,7 +1109,7 @@ DBMgr::StoreMessage( const char* const connName, int destHid, #ifdef HAVE_STIME " AND stime='epoch'" #endif - " );", connName, destHid, b64 ); + " )", connName, destHid, b64 ); g_free( b64 ); } else { uint8_t* bytes = PQescapeByteaConn( getThreadConn(), buf, @@ -1085,9 +1120,17 @@ DBMgr::StoreMessage( const char* const connName, int destHid, "E", bytes, len ); PQfreemem( bytes ); } + query.catf(" RETURNING id;"); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - execSql( query ); + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + if ( 1 == PQntuples( result ) ) { + msgID = atoi( PQgetvalue( result, 0, 0 ) ); + } else { + logf( XW_LOGINFO, "Not stored; duplicate?" ); + } + PQclear( result ); + return msgID; } void diff --git a/xwords4/relay/dbmgr.h b/xwords4/relay/dbmgr.h index 690ca5c39..d23622c7e 100644 --- a/xwords4/relay/dbmgr.h +++ b/xwords4/relay/dbmgr.h @@ -75,9 +75,13 @@ class DBMgr { bool FindRelayIDFor( const char* connName, HostID hid, unsigned short seed, const DevID* host, DevIDRelay* devID ); - CookieID FindGame( const char* connName, char* cookieBuf, int bufLen, + CookieID FindGame( const char* connName, HostID hid, char* cookieBuf, int bufLen, int* langP, int* nPlayersTP, int* nPlayersHP, bool* isDead ); + CookieID FindGame( const AddrInfo::ClientToken clientToken, HostID hid, + char* connNameBuf, int connNameBufLen, + char* cookieBuf, int cookieBufLen, + int* langP, int* nPlayersTP, int* nPlayersHP ); bool FindGameFor( const char* connName, char* cookieBuf, int bufLen, unsigned short seed, HostID hid, @@ -137,10 +141,10 @@ class DBMgr { /* message storage -- different DB */ int CountStoredMessages( const char* const connName ); int CountStoredMessages( DevIDRelay relayID ); - void StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf, - int len ); - void StoreMessage( const char* const connName, int destHid, - const uint8_t* const buf, int len ); + int StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf, + int len ); + int StoreMessage( const char* const connName, int destHid, + const uint8_t* const buf, int len ); void GetStoredMessages( DevIDRelay relayID, vector& msgs ); void GetStoredMessages( const char* const connName, HostID hid, vector& msgs ); @@ -171,6 +175,7 @@ class DBMgr { int clientVersion, const char* const model, const char* const osVers, DevIDRelay relayID ); + PGconn* getThreadConn( void ); void clearThreadConn(); diff --git a/xwords4/relay/scripts/showinplay.sh b/xwords4/relay/scripts/showinplay.sh index f4c7eeec7..8a593ea02 100755 --- a/xwords4/relay/scripts/showinplay.sh +++ b/xwords4/relay/scripts/showinplay.sh @@ -54,13 +54,14 @@ echo "; relay pid[s]: $(pidof xwrelay)" echo "Row count:" $(psql -t xwgames -c "select count(*) FROM games $QUERY;") # Games -echo "SELECT dead as d,connname,cid,room,lang as lg,clntVers as cv ,ntotal as t,nperdevice as nPerDev,nsents as snts, seeds,devids,tokens,ack, mtimes "\ +echo "SELECT dead as d,connname,cid,room,lang as lg,clntVers as cv ,ntotal as t,nperdevice as npd,nsents as snts, seeds,devids,tokens,ack, mtimes "\ "FROM games $QUERY ORDER BY NOT dead, ctime DESC LIMIT $LIMIT;" \ | psql xwgames # Messages -echo "SELECT * "\ - "FROM msgs WHERE connname IN (SELECT connname from games $QUERY) "\ +echo "Unack'd msgs count:" $(psql -t xwgames -c "select count(*) FROM msgs where stime = 'epoch' AND connname IN (SELECT connname from games $QUERY);") +echo "SELECT id,connName,hid as h,token,ctime,stime,devid,msg64 "\ + "FROM msgs WHERE stime = 'epoch' AND connname IN (SELECT connname from games $QUERY) "\ "ORDER BY ctime DESC, connname LIMIT $LIMIT;" \ | psql xwgames diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 666bee3f3..def2b44f1 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -550,18 +550,18 @@ assemble_packet( vector& packet, uint32_t* packetIDP, XWRelayReg cmd, } #ifdef LOG_UDP_PACKETS - gsize size = 0; - gint state = 0; - gint save = 0; - gchar out[1024]; - for ( unsigned int ii = 0; ii < iocount; ++ii ) { - size += g_base64_encode_step( (const guchar*)vec[ii].iov_base, - vec[ii].iov_len, - FALSE, &out[size], &state, &save ); - } - size += g_base64_encode_close( FALSE, &out[size], &state, &save ); - assert( size < sizeof(out) ); - out[size] = '\0'; + // gsize size = 0; + // gint state = 0; + // gint save = 0; + // gchar out[1024]; + // for ( unsigned int ii = 0; ii < iocount; ++ii ) { + // size += g_base64_encode_step( (const guchar*)vec[ii].iov_base, + // vec[ii].iov_len, + // FALSE, &out[size], &state, &save ); + // } + // size += g_base64_encode_close( FALSE, &out[size], &state, &save ); + // assert( size < sizeof(out) ); + // out[size] = '\0'; #endif } @@ -640,8 +640,10 @@ send_via_udp_impl( int sock, const struct sockaddr* dest_addr, #ifdef LOG_UDP_PACKETS gchar* b64 = g_base64_encode( (uint8_t*)dest_addr, sizeof(*dest_addr) ); + gchar* out = g_base64_encode( packet.data(), packet.size() ); logf( XW_LOGINFO, "%s()=>%d; addr='%s'; msg='%s'", __func__, nSent, b64, out ); + g_free( out ); g_free( b64 ); #else logf( XW_LOGINFO, "%s()=>%d", __func__, nSent ); @@ -761,13 +763,17 @@ send_havemsgs( const AddrInfo* addr ) class MsgClosure { public: MsgClosure( DevIDRelay dest, const vector* packet, - OnMsgAckProc proc, void* procClosure ) + int msgID, OnMsgAckProc proc, void* procClosure ) { + assert(m_msgID != 0); m_destDevID = dest; m_packet = *packet; m_proc = proc; m_procClosure = procClosure; + m_msgID = msgID; } + int getMsgID() { return m_msgID; } + int m_msgID; DevIDRelay m_destDevID; vector m_packet; OnMsgAckProc m_proc; @@ -778,9 +784,14 @@ static void onPostedMsgAcked( bool acked, uint32_t packetID, void* data ) { MsgClosure* mc = (MsgClosure*)data; - if ( !acked ) { - DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(), - mc->m_packet.size() ); + int msgID = mc->getMsgID(); + if ( acked ) { + DBMgr::Get()->RemoveStoredMessages( &msgID, 1 ); + } else { + assert( msgID != 0 ); + // So we only store after ack fails? Change that!!! + // DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(), + // mc->m_packet.size() ); } if ( NULL != mc->m_proc ) { (*mc->m_proc)( acked, mc->m_destDevID, packetID, mc->m_procClosure ); @@ -793,6 +804,8 @@ static bool post_or_store( DevIDRelay destDevID, vector& packet, uint32_t packetID, OnMsgAckProc proc, void* procClosure ) { + int msgID = DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() ); + const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( destDevID ); bool canSendNow = !!addru; @@ -804,16 +817,13 @@ post_or_store( DevIDRelay destDevID, vector& packet, uint32_t packetID, if ( get_addr_info_if( &addr, &sock, &dest_addr ) ) { sent = 0 < send_packet_via_udp_impl( packet, sock, dest_addr ); - if ( sent ) { - MsgClosure* mc = new MsgClosure( destDevID, &packet, + if ( sent && msgID != 0 ) { + MsgClosure* mc = new MsgClosure( destDevID, &packet, msgID, proc, procClosure ); UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc ); } } } - if ( !sent ) { - DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() ); - } return sent; } @@ -988,13 +998,13 @@ processReconnect( const uint8_t* bufp, int bufLen, const AddrInfo* addr ) } /* processReconnect */ static bool -processAck( const uint8_t* bufp, int bufLen, const AddrInfo* addr ) +processAck( const uint8_t* bufp, int bufLen, AddrInfo::ClientToken clientToken ) { bool success = false; const uint8_t* end = bufp + bufLen; HostID srcID; if ( getNetByte( &bufp, end, &srcID ) ) { - SafeCref scr( addr ); + SafeCref scr( clientToken, srcID ); success = scr.HandleAck( srcID ); } return success; @@ -1084,7 +1094,8 @@ forwardMessage( const uint8_t* buf, int buflen, const AddrInfo* addr ) } /* forwardMessage */ static bool -processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr ) +processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr, + AddrInfo::ClientToken clientToken ) { bool success = false; /* default is failure */ XWRELAY_Cmd cmd = *buf; @@ -1099,7 +1110,11 @@ processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr ) success = processReconnect( buf+1, bufLen-1, addr ); break; case XWRELAY_ACK: - success = processAck( buf+1, bufLen-1, addr ); + if ( clientToken != 0 ) { + success = processAck( buf+1, bufLen-1, clientToken ); + } else { + logf( XW_LOGERROR, "%s(): null client token", __func__ ); + } break; case XWRELAY_GAME_DISCONNECT: success = processDisconnect( buf+1, bufLen-1, addr ); @@ -1334,6 +1349,9 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull, logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten ); if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); + // This is wrong: should be removed when ACK returns and not + // before. But for some reason if I make that change apps wind up + // stalling. dbmgr->RemoveStoredMessages( msgIDs ); } } @@ -1438,7 +1456,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const uint8_t* bufp, } unsigned short nMsgs; if ( getNetShort( &bufp, end, &nMsgs ) ) { - SafeCref scr( connName ); + SafeCref scr( connName, hid ); while ( scr.IsValid() && nMsgs-- > 0 ) { unsigned short len; if ( getNetShort( &bufp, end, &len ) ) { @@ -1460,7 +1478,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const uint8_t* bufp, static void game_thread_proc( UdpThreadClosure* utc ) { - if ( !processMessage( utc->buf(), utc->len(), utc->addr() ) ) { + if ( !processMessage( utc->buf(), utc->len(), utc->addr(), 0 ) ) { XWThreadPool::GetTPool()->CloseSocket( utc->addr() ); } } @@ -1528,7 +1546,7 @@ proxy_thread_proc( UdpThreadClosure* utc ) sizeof( connName ), &hid ) ) { break; } - SafeCref scr( connName ); + SafeCref scr( connName, hid ); scr.DeviceGone( hid, seed ); } } @@ -1748,7 +1766,7 @@ handle_udp_packet( UdpThreadClosure* utc ) clientToken = ntohl( clientToken ); if ( AddrInfo::NULL_TOKEN != clientToken ) { AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); - (void)processMessage( ptr, end - ptr, &addr ); + (void)processMessage( ptr, end - ptr, &addr, clientToken ); } else { logf( XW_LOGERROR, "%s: dropping packet with token of 0", __func__ ); @@ -1766,7 +1784,7 @@ handle_udp_packet( UdpThreadClosure* utc ) logf( XW_LOGERROR, "parse failed!!!" ); break; } - SafeCref scr( connName ); + SafeCref scr( connName, hid ); if ( scr.IsValid() ) { AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end ); @@ -1833,7 +1851,7 @@ handle_udp_packet( UdpThreadClosure* utc ) string connName; if ( DBMgr::Get()->FindPlayer( devID.asRelayID(), clientToken, connName, &hid, &seed ) ) { - SafeCref scr( connName.c_str() ); + SafeCref scr( connName.c_str(), hid ); scr.DeviceGone( hid, seed ); } } @@ -1980,7 +1998,7 @@ maint_str_loop( int udpsock, const char* str ) } // maint_str_loop static uint32_t -getIPAddr( void ) +getUDPIPAddr( void ) { uint32_t result = INADDR_ANY; char iface[16] = {0}; @@ -2215,7 +2233,7 @@ main( int argc, char** argv ) struct sockaddr_in saddr; g_udpsock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); saddr.sin_family = PF_INET; - saddr.sin_addr.s_addr = getIPAddr(); + saddr.sin_addr.s_addr = getUDPIPAddr(); saddr.sin_port = htons(udpport); int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) ); if ( 0 == err ) {