bring in changes from relay_via_http branch

It's time to make them live so client development can use a live relay,
and all the old tests pass, so why not.
This commit is contained in:
Eric House 2017-11-12 20:25:29 -08:00
parent 39deeeb089
commit f072c68bf9
9 changed files with 231 additions and 120 deletions

View file

@ -42,7 +42,7 @@ SRC = \
# STATIC ?= -static # STATIC ?= -static
GITINFO = gitversion.txt GITINFO = gitversion.txt
HASH=$(shell git describe) HASH=$(shell git rev-parse --verify HEAD)
OBJ = $(patsubst %.cpp,obj/%.o,$(SRC)) OBJ = $(patsubst %.cpp,obj/%.o,$(SRC))
#LDFLAGS += -pthread -g -lmcheck $(STATIC) #LDFLAGS += -pthread -g -lmcheck $(STATIC)
@ -70,7 +70,7 @@ endif
memdebug all: xwrelay rq memdebug all: xwrelay rq
REQUIRED_DEBS = libpq-dev \ REQUIRED_DEBS = libpq-dev g++ libglib2.0-dev postgresql \
.PHONY: debcheck debs_install .PHONY: debcheck debs_install

View file

@ -875,13 +875,13 @@ putNetShort( uint8_t** bufpp, unsigned short s )
*bufpp += sizeof(s); *bufpp += sizeof(s);
} }
void int
CookieRef::store_message( HostID dest, const uint8_t* buf, CookieRef::store_message( HostID dest, const uint8_t* buf,
unsigned int len ) unsigned int len )
{ {
logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__, logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__,
len, dest ); len, dest );
DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len ); return DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len );
} }
void void
@ -1044,6 +1044,7 @@ CookieRef::postCheckAllHere()
void void
CookieRef::postDropDevice( HostID hostID ) CookieRef::postDropDevice( HostID hostID )
{ {
logf( XW_LOGINFO, "%s(hostID=%d)", __func__, hostID );
CRefEvent evt( XWE_ACKTIMEOUT ); CRefEvent evt( XWE_ACKTIMEOUT );
evt.u.ack.srcID = hostID; evt.u.ack.srcID = hostID;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
@ -1192,21 +1193,16 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
} }
typedef struct _StoreData { typedef struct _StoreData {
string connName; int msgID;
HostID dest;
uint8_t* buf;
int buflen;
} StoreData; } StoreData;
void void
CookieRef::storeNoAck( bool acked, uint32_t packetID, void* data ) CookieRef::storeNoAck( bool acked, uint32_t packetID, void* data )
{ {
StoreData* sdata = (StoreData*)data; StoreData* sdata = (StoreData*)data;
if ( !acked ) { if ( acked ) {
DBMgr::Get()->StoreMessage( sdata->connName.c_str(), sdata->dest, DBMgr::Get()->RemoveStoredMessages( &sdata->msgID, 1 );
sdata->buf, sdata->buflen );
} }
free( sdata->buf );
delete sdata; delete sdata;
} }
@ -1237,17 +1233,13 @@ CookieRef::forward_or_store( const CRefEvent* evt )
} }
uint32_t packetID = 0; uint32_t packetID = 0;
int msgID = store_message( dest, buf, buflen );
if ( (NULL == destAddr) if ( (NULL == destAddr)
|| !send_with_length( destAddr, dest, buf, buflen, true, || !send_with_length( destAddr, dest, buf, buflen, true,
&packetID ) ) { &packetID ) ) {
store_message( dest, buf, buflen ); } else if ( 0 != msgID && 0 != packetID ) { // sent via UDP
} else if ( 0 != packetID ) { // sent via UDP
StoreData* data = new StoreData; StoreData* data = new StoreData;
data->connName = m_connName; data->msgID = msgID;
data->dest = dest;
data->buf = (uint8_t*)malloc( buflen );
memcpy( data->buf, buf, buflen );
data->buflen = buflen;
UDPAckTrack::setOnAck( storeNoAck, packetID, data ); UDPAckTrack::setOnAck( storeNoAck, packetID, data );
} }
@ -1376,20 +1368,16 @@ CookieRef::sendAllHere( bool initial )
through the vector each time. */ through the vector each time. */
HostID dest; HostID dest;
for ( dest = 1; dest <= m_nPlayersSought; ++dest ) { for ( dest = 1; dest <= m_nPlayersSought; ++dest ) {
bool sent = false;
*idLoc = dest; /* write in this target's hostId */ *idLoc = dest; /* write in this target's hostId */
{ {
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
HostRec* hr = m_sockets[dest-1]; HostRec* hr = m_sockets[dest-1];
if ( !!hr ) { if ( !!hr ) {
sent = send_with_length( &hr->m_addr, dest, buf, (void)send_with_length( &hr->m_addr, dest, buf, bufp-buf, true );
bufp-buf, true );
} }
} }
if ( !sent ) { (void)store_message( dest, buf, bufp-buf );
store_message( dest, buf, bufp-buf );
}
} }
} /* sendAllHere */ } /* sendAllHere */

View file

@ -275,8 +275,7 @@ class CookieRef {
bool notInUse(void) { return m_cid == 0; } bool notInUse(void) { return m_cid == 0; }
void store_message( HostID dest, const uint8_t* buf, int store_message( HostID dest, const uint8_t* buf, unsigned int len );
unsigned int len );
void send_stored_messages( HostID dest, const AddrInfo* addr ); void send_stored_messages( HostID dest, const AddrInfo* addr );
void printSeeds( const char* caller ); void printSeeds( const char* caller );

View file

@ -337,7 +337,7 @@ CRefMgr::getMakeCookieRef( const char* connName, const char* cookie,
} /* getMakeCookieRef */ } /* getMakeCookieRef */
CidInfo* CidInfo*
CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead ) CRefMgr::getMakeCookieRef( const char* const connName, HostID hid, bool* isDead )
{ {
CookieRef* cref = NULL; CookieRef* cref = NULL;
CidInfo* cinfo = NULL; CidInfo* cinfo = NULL;
@ -347,7 +347,7 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead )
int nAlreadyHere = 0; int nAlreadyHere = 0;
for ( ; ; ) { /* for: see comment above */ for ( ; ; ) { /* for: see comment above */
CookieID cid = m_db->FindGame( connName, curCookie, sizeof(curCookie), CookieID cid = m_db->FindGame( connName, hid, curCookie, sizeof(curCookie),
&curLangCode, &nPlayersT, &nAlreadyHere, &curLangCode, &nPlayersT, &nAlreadyHere,
isDead ); isDead );
if ( 0 != cid ) { /* already open */ if ( 0 != cid ) { /* already open */
@ -375,6 +375,48 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead )
return cinfo; return cinfo;
} }
CidInfo*
CRefMgr::getMakeCookieRef( const AddrInfo::ClientToken clientToken, HostID srcID )
{
CookieRef* cref = NULL;
CidInfo* cinfo = NULL;
char curCookie[MAX_INVITE_LEN+1];
int curLangCode;
int nPlayersT = 0;
int nAlreadyHere = 0;
for ( ; ; ) { /* for: see comment above */
char connName[MAX_CONNNAME_LEN+1] = {0};
CookieID cid = m_db->FindGame( clientToken, srcID,
connName, sizeof(connName),
curCookie, sizeof(curCookie),
&curLangCode, &nPlayersT, &nAlreadyHere );
// &seed );
if ( 0 != cid ) { /* already open */
cinfo = m_cidlock->Claim( cid );
if ( NULL == cinfo->GetRef() ) {
m_cidlock->Relinquish( cinfo, true );
continue;
}
} else if ( nPlayersT == 0 ) { /* wasn't in the DB */
/* do nothing; insufficient info to fake it */
} else {
cinfo = m_cidlock->Claim();
if ( !m_db->AddCID( connName, cinfo->GetCid() ) ) {
m_cidlock->Relinquish( cinfo, true );
continue;
}
logf( XW_LOGINFO, "%s(): added cid???", __func__ );
cref = AddNew( curCookie, connName, cinfo->GetCid(), curLangCode,
nPlayersT, nAlreadyHere );
cinfo->SetRef( cref );
}
break;
}
logf( XW_LOGINFO, "%s() => %p", __func__, cinfo );
return cinfo;
}
void void
CRefMgr::RemoveSocketRefs( const AddrInfo* addr ) CRefMgr::RemoveSocketRefs( const AddrInfo* addr )
{ {
@ -672,13 +714,13 @@ SafeCref::SafeCref( const char* connName, const char* cookie, HostID hid,
} }
/* ConnName case -- must exist (unless DB record's been removed */ /* ConnName case -- must exist (unless DB record's been removed */
SafeCref::SafeCref( const char* const connName ) SafeCref::SafeCref( const char* const connName, HostID hid )
: m_cinfo( NULL ) : m_cinfo( NULL )
, m_mgr( CRefMgr::Get() ) , m_mgr( CRefMgr::Get() )
, m_isValid( false ) , m_isValid( false )
{ {
bool isDead = false; bool isDead = false;
CidInfo* cinfo = m_mgr->getMakeCookieRef( connName, &isDead ); CidInfo* cinfo = m_mgr->getMakeCookieRef( connName, hid, &isDead );
if ( NULL != cinfo && NULL != cinfo->GetRef() ) { if ( NULL != cinfo && NULL != cinfo->GetRef() ) {
assert( cinfo->GetCid() == cinfo->GetRef()->GetCid() ); assert( cinfo->GetCid() == cinfo->GetRef()->GetCid() );
m_locked = cinfo->GetRef()->Lock(); m_locked = cinfo->GetRef()->Lock();
@ -722,6 +764,19 @@ SafeCref::SafeCref( const AddrInfo* addr )
} }
} }
SafeCref::SafeCref( const AddrInfo::ClientToken clientToken, HostID srcID )
: m_cinfo( NULL )
, m_mgr( CRefMgr::Get() )
, m_isValid( false )
{
CidInfo* cinfo = m_mgr->getMakeCookieRef( clientToken, srcID );
if ( NULL != cinfo && NULL != cinfo->GetRef() ) {
m_locked = cinfo->GetRef()->Lock();
m_cinfo = cinfo;
m_isValid = true;
}
}
SafeCref::~SafeCref() SafeCref::~SafeCref()
{ {
if ( m_cinfo != NULL ) { if ( m_cinfo != NULL ) {

View file

@ -128,7 +128,8 @@ class CRefMgr {
int nPlayersS, int seed, int langCode, int nPlayersS, int seed, int langCode,
bool isPublic, bool* isDead ); bool isPublic, bool* isDead );
CidInfo* getMakeCookieRef( const char* const connName, bool* isDead ); CidInfo* getMakeCookieRef( const char* const connName, HostID hid, bool* isDead );
CidInfo* getMakeCookieRef( const AddrInfo::ClientToken clientToken, HostID srcID );
CidInfo* getCookieRef( CookieID cid, bool failOk = false ); CidInfo* getCookieRef( CookieID cid, bool failOk = false );
CidInfo* getCookieRef( const AddrInfo* addr ); CidInfo* getCookieRef( const AddrInfo* addr );
@ -179,9 +180,10 @@ class SafeCref {
const AddrInfo* addr, int clientVersion, DevID* devID, const AddrInfo* addr, int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS, unsigned short gameSeed, int nPlayersH, int nPlayersS, unsigned short gameSeed,
int clientIndx, int langCode, bool wantsPublic, bool makePublic ); int clientIndx, int langCode, bool wantsPublic, bool makePublic );
SafeCref( const char* const connName ); SafeCref( const char* const connName, HostID hid );
SafeCref( CookieID cid, bool failOk = false ); SafeCref( CookieID cid, bool failOk = false );
SafeCref( const AddrInfo* addr ); SafeCref( const AddrInfo* addr );
SafeCref( const AddrInfo::ClientToken clientToken, HostID srcID );
/* SafeCref( CookieRef* cref ); */ /* SafeCref( CookieRef* cref ); */
~SafeCref(); ~SafeCref();

View file

@ -70,20 +70,6 @@ DBMgr::DBMgr()
pthread_mutex_init( &m_haveNoMessagesMutex, NULL ); pthread_mutex_init( &m_haveNoMessagesMutex, NULL );
/* Now figure out what the largest cid currently is. There must be a way
to get postgres to do this for me.... */
/* const char* query = "SELECT cid FROM games ORDER BY cid DESC LIMIT 1"; */
/* PGresult* result = PQexec( m_pgconn, query ); */
/* if ( 0 == PQntuples( result ) ) { */
/* m_nextCID = 1; */
/* } else { */
/* char* value = PQgetvalue( result, 0, 0 ); */
/* m_nextCID = 1 + atoi( value ); */
/* } */
/* PQclear(result); */
/* logf( XW_LOGINFO, "%s: m_nextCID=%d", __func__, m_nextCID ); */
// I've seen rand returning the same series several times....
srand( time( NULL ) ); srand( time( NULL ) );
} }
@ -136,7 +122,7 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen,
{ {
bool found = false; bool found = false;
const char* fmt = "SELECT cid, room, lang, nPerDevice, dead FROM " const char* fmt = "SELECT cid, room, lang, dead FROM "
GAMES_TABLE " WHERE connName = '%s' AND nTotal = %d " GAMES_TABLE " WHERE connName = '%s' AND nTotal = %d "
"AND %d = seeds[%d] AND 'A' = ack[%d] " "AND %d = seeds[%d] AND 'A' = ack[%d] "
; ;
@ -148,10 +134,11 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen,
assert( 1 >= PQntuples( result ) ); assert( 1 >= PQntuples( result ) );
found = 1 == PQntuples( result ); found = 1 == PQntuples( result );
if ( found ) { if ( found ) {
*cidp = atoi( PQgetvalue( result, 0, 0 ) ); int col = 0;
snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); *cidp = atoi( PQgetvalue( result, 0, col++ ) );
*langP = atoi( PQgetvalue( result, 0, 2 ) ); snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) );
*isDead = 't' == PQgetvalue( result, 0, 4 )[0]; *langP = atoi( PQgetvalue( result, 0, col++ ) );
*isDead = 't' == PQgetvalue( result, 0, col++ )[0];
} }
PQclear( result ); PQclear( result );
@ -160,28 +147,29 @@ DBMgr::FindGameFor( const char* connName, char* cookieBuf, int bufLen,
} /* FindGameFor */ } /* FindGameFor */
CookieID CookieID
DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen, DBMgr::FindGame( const char* connName, HostID hid, char* roomBuf, int roomBufLen,
int* langP, int* nPlayersTP, int* nPlayersHP, bool* isDead ) int* langP, int* nPlayersTP, int* nPlayersHP, bool* isDead )
{ {
CookieID cid = 0; CookieID cid = 0;
const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice, dead FROM " const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice[%d], dead FROM "
GAMES_TABLE " WHERE connName = '%s'" GAMES_TABLE " WHERE connName = '%s'"
// " LIMIT 1" // " LIMIT 1"
; ;
StrWPF query; StrWPF query;
query.catf( fmt, connName ); query.catf( fmt, hid, connName );
logf( XW_LOGINFO, "query: %s", query.c_str() ); logf( XW_LOGINFO, "query: %s", query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() ); PGresult* result = PQexec( getThreadConn(), query.c_str() );
assert( 1 >= PQntuples( result ) ); assert( 1 >= PQntuples( result ) );
if ( 1 == PQntuples( result ) ) { if ( 1 == PQntuples( result ) ) {
cid = atoi( PQgetvalue( result, 0, 0 ) ); int col = 0;
snprintf( cookieBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); cid = atoi( PQgetvalue( result, 0, col++ ) );
*langP = atoi( PQgetvalue( result, 0, 2 ) ); snprintf( roomBuf, roomBufLen, "%s", PQgetvalue( result, 0, col++ ) );
*nPlayersTP = atoi( PQgetvalue( result, 0, 3 ) ); *langP = atoi( PQgetvalue( result, 0, col++ ) );
*nPlayersHP = atoi( PQgetvalue( result, 0, 4 ) ); *nPlayersTP = atoi( PQgetvalue( result, 0, col++ ) );
*isDead = 't' == PQgetvalue( result, 0, 5 )[0]; *nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) );
*isDead = 't' == PQgetvalue( result, 0, col++ )[0];
} }
PQclear( result ); PQclear( result );
@ -189,6 +177,40 @@ DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen,
return cid; return cid;
} /* FindGame */ } /* FindGame */
CookieID
DBMgr::FindGame( const AddrInfo::ClientToken clientToken, HostID hid,
char* connNameBuf, int connNameBufLen,
char* roomBuf, int roomBufLen,
int* langP, int* nPlayersTP, int* nPlayersHP )
{
CookieID cid = 0;
const char* fmt = "SELECT cid, room, lang, nTotal, nPerDevice[%d], connname FROM "
GAMES_TABLE " WHERE tokens[%d] = %d and NOT dead";
// " LIMIT 1"
;
StrWPF query;
query.catf( fmt, hid, hid, clientToken );
logf( XW_LOGINFO, "query: %s", query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
int col = 0;
cid = atoi( PQgetvalue( result, 0, col++ ) );
// room
snprintf( roomBuf, roomBufLen, "%s", PQgetvalue( result, 0, col++ ) );
// lang
*langP = atoi( PQgetvalue( result, 0, col++ ) );
*nPlayersTP = atoi( PQgetvalue( result, 0, col++ ) );
*nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) );
snprintf( connNameBuf, connNameBufLen, "%s", PQgetvalue( result, 0, col++ ) );
}
PQclear( result );
logf( XW_LOGINFO, "%s(ct=%d,hid=%d) => %d (connname=%s)", __func__, clientToken,
hid, cid, connNameBuf );
return cid;
}
bool bool
DBMgr::FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken token, DBMgr::FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken token,
string& connName, HostID* hidp, unsigned short* seed ) string& connName, HostID* hidp, unsigned short* seed )
@ -294,11 +316,13 @@ DBMgr::SeenSeed( const char* cookie, unsigned short seed,
NULL, NULL, 0 ); NULL, NULL, 0 );
bool found = 1 == PQntuples( result ); bool found = 1 == PQntuples( result );
if ( found ) { if ( found ) {
*cid = atoi( PQgetvalue( result, 0, 0 ) ); int col = 0;
*nPlayersHP = here_less_seed( PQgetvalue( result, 0, 2 ), *cid = atoi( PQgetvalue( result, 0, col++ ) );
atoi( PQgetvalue( result, 0, 3 ) ), snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) );
seed );
snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); const char* seeds = PQgetvalue( result, 0, col++ );
int perDeviceSum = atoi( PQgetvalue( result, 0, col++ ) );
*nPlayersHP = here_less_seed( seeds, perDeviceSum, seed );
} }
PQclear( result ); PQclear( result );
logf( XW_LOGINFO, "%s(%4X)=>%s", __func__, seed, found?"true":"false" ); logf( XW_LOGINFO, "%s(%4X)=>%s", __func__, seed, found?"true":"false" );
@ -333,9 +357,10 @@ DBMgr::FindOpen( const char* cookie, int lang, int nPlayersT, int nPlayersH,
NULL, NULL, 0 ); NULL, NULL, 0 );
CookieID cid = 0; CookieID cid = 0;
if ( 1 == PQntuples( result ) ) { if ( 1 == PQntuples( result ) ) {
cid = atoi( PQgetvalue( result, 0, 0 ) ); int col = 0;
snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, 1 ) ); cid = atoi( PQgetvalue( result, 0, col++ ) );
*nPlayersHP = atoi( PQgetvalue( result, 0, 2 ) ); snprintf( connNameBuf, bufLen, "%s", PQgetvalue( result, 0, col++ ) );
*nPlayersHP = atoi( PQgetvalue( result, 0, col++ ) );
/* cid may be 0, but should use game anyway */ /* cid may be 0, but should use game anyway */
} }
PQclear( result ); PQclear( result );
@ -699,9 +724,11 @@ DBMgr::RecordSent( const int* msgIDs, int nMsgIDs )
if ( PGRES_TUPLES_OK == PQresultStatus( result ) ) { if ( PGRES_TUPLES_OK == PQresultStatus( result ) ) {
int ntuples = PQntuples( result ); int ntuples = PQntuples( result );
for ( int ii = 0; ii < ntuples; ++ii ) { for ( int ii = 0; ii < ntuples; ++ii ) {
RecordSent( PQgetvalue( result, ii, 0 ), int col = 0;
atoi( PQgetvalue( result, ii, 1 ) ), const char* const connName = PQgetvalue( result, ii, col++ );
atoi( PQgetvalue( result, ii, 2 ) ) ); HostID hid = atoi( PQgetvalue( result, ii, col++ ) );
int nBytes = atoi( PQgetvalue( result, ii, col++ ) );
RecordSent( connName, hid, nBytes );
} }
} }
PQclear( result ); PQclear( result );
@ -1014,15 +1041,16 @@ DBMgr::CountStoredMessages( DevIDRelay relayID )
return getCountWhere( MSGS_TABLE, test ); return getCountWhere( MSGS_TABLE, test );
} }
void int
DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf, DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf,
int len ) int len )
{ {
int msgID = 0;
clearHasNoMessages( destDevID ); clearHasNoMessages( destDevID );
size_t newLen; size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE " " const char* fmt = "INSERT INTO " MSGS_TABLE " "
"(devid, %s, msglen) VALUES(%d, %s'%s', %d)"; "(devid, %s, msglen) VALUES(%d, %s'%s', %d) RETURNING id";
StrWPF query; StrWPF query;
if ( m_useB64 ) { if ( m_useB64 ) {
@ -1038,13 +1066,20 @@ DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf,
} }
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
msgID = atoi( PQgetvalue( result, 0, 0 ) );
}
PQclear( result );
return msgID;
} }
void int
DBMgr::StoreMessage( const char* const connName, int destHid, DBMgr::StoreMessage( const char* const connName, int destHid,
const uint8_t* buf, int len ) const uint8_t* buf, int len )
{ {
int msgID = 0;
clearHasNoMessages( connName, destHid ); clearHasNoMessages( connName, destHid );
DevIDRelay devID = getDevID( connName, destHid ); DevIDRelay devID = getDevID( connName, destHid );
@ -1074,7 +1109,7 @@ DBMgr::StoreMessage( const char* const connName, int destHid,
#ifdef HAVE_STIME #ifdef HAVE_STIME
" AND stime='epoch'" " AND stime='epoch'"
#endif #endif
" );", connName, destHid, b64 ); " )", connName, destHid, b64 );
g_free( b64 ); g_free( b64 );
} else { } else {
uint8_t* bytes = PQescapeByteaConn( getThreadConn(), buf, uint8_t* bytes = PQescapeByteaConn( getThreadConn(), buf,
@ -1085,9 +1120,17 @@ DBMgr::StoreMessage( const char* const connName, int destHid,
"E", bytes, len ); "E", bytes, len );
PQfreemem( bytes ); PQfreemem( bytes );
} }
query.catf(" RETURNING id;");
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query ); PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
msgID = atoi( PQgetvalue( result, 0, 0 ) );
} else {
logf( XW_LOGINFO, "Not stored; duplicate?" );
}
PQclear( result );
return msgID;
} }
void void

View file

@ -75,9 +75,13 @@ class DBMgr {
bool FindRelayIDFor( const char* connName, HostID hid, unsigned short seed, bool FindRelayIDFor( const char* connName, HostID hid, unsigned short seed,
const DevID* host, DevIDRelay* devID ); const DevID* host, DevIDRelay* devID );
CookieID FindGame( const char* connName, char* cookieBuf, int bufLen, CookieID FindGame( const char* connName, HostID hid, char* cookieBuf, int bufLen,
int* langP, int* nPlayersTP, int* nPlayersHP, int* langP, int* nPlayersTP, int* nPlayersHP,
bool* isDead ); bool* isDead );
CookieID FindGame( const AddrInfo::ClientToken clientToken, HostID hid,
char* connNameBuf, int connNameBufLen,
char* cookieBuf, int cookieBufLen,
int* langP, int* nPlayersTP, int* nPlayersHP );
bool FindGameFor( const char* connName, char* cookieBuf, int bufLen, bool FindGameFor( const char* connName, char* cookieBuf, int bufLen,
unsigned short seed, HostID hid, unsigned short seed, HostID hid,
@ -137,10 +141,10 @@ class DBMgr {
/* message storage -- different DB */ /* message storage -- different DB */
int CountStoredMessages( const char* const connName ); int CountStoredMessages( const char* const connName );
int CountStoredMessages( DevIDRelay relayID ); int CountStoredMessages( DevIDRelay relayID );
void StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf, int StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf,
int len ); int len );
void StoreMessage( const char* const connName, int destHid, int StoreMessage( const char* const connName, int destHid,
const uint8_t* const buf, int len ); const uint8_t* const buf, int len );
void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs ); void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs );
void GetStoredMessages( const char* const connName, HostID hid, void GetStoredMessages( const char* const connName, HostID hid,
vector<DBMgr::MsgInfo>& msgs ); vector<DBMgr::MsgInfo>& msgs );
@ -171,6 +175,7 @@ class DBMgr {
int clientVersion, const char* const model, int clientVersion, const char* const model,
const char* const osVers, DevIDRelay relayID ); const char* const osVers, DevIDRelay relayID );
PGconn* getThreadConn( void ); PGconn* getThreadConn( void );
void clearThreadConn(); void clearThreadConn();

View file

@ -54,13 +54,14 @@ echo "; relay pid[s]: $(pidof xwrelay)"
echo "Row count:" $(psql -t xwgames -c "select count(*) FROM games $QUERY;") echo "Row count:" $(psql -t xwgames -c "select count(*) FROM games $QUERY;")
# Games # Games
echo "SELECT dead as d,connname,cid,room,lang as lg,clntVers as cv ,ntotal as t,nperdevice as nPerDev,nsents as snts, seeds,devids,tokens,ack, mtimes "\ echo "SELECT dead as d,connname,cid,room,lang as lg,clntVers as cv ,ntotal as t,nperdevice as npd,nsents as snts, seeds,devids,tokens,ack, mtimes "\
"FROM games $QUERY ORDER BY NOT dead, ctime DESC LIMIT $LIMIT;" \ "FROM games $QUERY ORDER BY NOT dead, ctime DESC LIMIT $LIMIT;" \
| psql xwgames | psql xwgames
# Messages # Messages
echo "SELECT * "\ echo "Unack'd msgs count:" $(psql -t xwgames -c "select count(*) FROM msgs where stime = 'epoch' AND connname IN (SELECT connname from games $QUERY);")
"FROM msgs WHERE connname IN (SELECT connname from games $QUERY) "\ echo "SELECT id,connName,hid as h,token,ctime,stime,devid,msg64 "\
"FROM msgs WHERE stime = 'epoch' AND connname IN (SELECT connname from games $QUERY) "\
"ORDER BY ctime DESC, connname LIMIT $LIMIT;" \ "ORDER BY ctime DESC, connname LIMIT $LIMIT;" \
| psql xwgames | psql xwgames

View file

@ -550,18 +550,18 @@ assemble_packet( vector<uint8_t>& packet, uint32_t* packetIDP, XWRelayReg cmd,
} }
#ifdef LOG_UDP_PACKETS #ifdef LOG_UDP_PACKETS
gsize size = 0; // gsize size = 0;
gint state = 0; // gint state = 0;
gint save = 0; // gint save = 0;
gchar out[1024]; // gchar out[1024];
for ( unsigned int ii = 0; ii < iocount; ++ii ) { // for ( unsigned int ii = 0; ii < iocount; ++ii ) {
size += g_base64_encode_step( (const guchar*)vec[ii].iov_base, // size += g_base64_encode_step( (const guchar*)vec[ii].iov_base,
vec[ii].iov_len, // vec[ii].iov_len,
FALSE, &out[size], &state, &save ); // FALSE, &out[size], &state, &save );
} // }
size += g_base64_encode_close( FALSE, &out[size], &state, &save ); // size += g_base64_encode_close( FALSE, &out[size], &state, &save );
assert( size < sizeof(out) ); // assert( size < sizeof(out) );
out[size] = '\0'; // out[size] = '\0';
#endif #endif
} }
@ -640,8 +640,10 @@ send_via_udp_impl( int sock, const struct sockaddr* dest_addr,
#ifdef LOG_UDP_PACKETS #ifdef LOG_UDP_PACKETS
gchar* b64 = g_base64_encode( (uint8_t*)dest_addr, gchar* b64 = g_base64_encode( (uint8_t*)dest_addr,
sizeof(*dest_addr) ); sizeof(*dest_addr) );
gchar* out = g_base64_encode( packet.data(), packet.size() );
logf( XW_LOGINFO, "%s()=>%d; addr='%s'; msg='%s'", __func__, nSent, logf( XW_LOGINFO, "%s()=>%d; addr='%s'; msg='%s'", __func__, nSent,
b64, out ); b64, out );
g_free( out );
g_free( b64 ); g_free( b64 );
#else #else
logf( XW_LOGINFO, "%s()=>%d", __func__, nSent ); logf( XW_LOGINFO, "%s()=>%d", __func__, nSent );
@ -761,13 +763,17 @@ send_havemsgs( const AddrInfo* addr )
class MsgClosure { class MsgClosure {
public: public:
MsgClosure( DevIDRelay dest, const vector<uint8_t>* packet, MsgClosure( DevIDRelay dest, const vector<uint8_t>* packet,
OnMsgAckProc proc, void* procClosure ) int msgID, OnMsgAckProc proc, void* procClosure )
{ {
assert(m_msgID != 0);
m_destDevID = dest; m_destDevID = dest;
m_packet = *packet; m_packet = *packet;
m_proc = proc; m_proc = proc;
m_procClosure = procClosure; m_procClosure = procClosure;
m_msgID = msgID;
} }
int getMsgID() { return m_msgID; }
int m_msgID;
DevIDRelay m_destDevID; DevIDRelay m_destDevID;
vector<uint8_t> m_packet; vector<uint8_t> m_packet;
OnMsgAckProc m_proc; OnMsgAckProc m_proc;
@ -778,9 +784,14 @@ static void
onPostedMsgAcked( bool acked, uint32_t packetID, void* data ) onPostedMsgAcked( bool acked, uint32_t packetID, void* data )
{ {
MsgClosure* mc = (MsgClosure*)data; MsgClosure* mc = (MsgClosure*)data;
if ( !acked ) { int msgID = mc->getMsgID();
DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(), if ( acked ) {
mc->m_packet.size() ); DBMgr::Get()->RemoveStoredMessages( &msgID, 1 );
} else {
assert( msgID != 0 );
// So we only store after ack fails? Change that!!!
// DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(),
// mc->m_packet.size() );
} }
if ( NULL != mc->m_proc ) { if ( NULL != mc->m_proc ) {
(*mc->m_proc)( acked, mc->m_destDevID, packetID, mc->m_procClosure ); (*mc->m_proc)( acked, mc->m_destDevID, packetID, mc->m_procClosure );
@ -793,6 +804,8 @@ static bool
post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID, post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID,
OnMsgAckProc proc, void* procClosure ) OnMsgAckProc proc, void* procClosure )
{ {
int msgID = DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() );
const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( destDevID ); const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( destDevID );
bool canSendNow = !!addru; bool canSendNow = !!addru;
@ -804,16 +817,13 @@ post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID,
if ( get_addr_info_if( &addr, &sock, &dest_addr ) ) { if ( get_addr_info_if( &addr, &sock, &dest_addr ) ) {
sent = 0 < send_packet_via_udp_impl( packet, sock, dest_addr ); sent = 0 < send_packet_via_udp_impl( packet, sock, dest_addr );
if ( sent ) { if ( sent && msgID != 0 ) {
MsgClosure* mc = new MsgClosure( destDevID, &packet, MsgClosure* mc = new MsgClosure( destDevID, &packet, msgID,
proc, procClosure ); proc, procClosure );
UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc ); UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc );
} }
} }
} }
if ( !sent ) {
DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() );
}
return sent; return sent;
} }
@ -988,13 +998,13 @@ processReconnect( const uint8_t* bufp, int bufLen, const AddrInfo* addr )
} /* processReconnect */ } /* processReconnect */
static bool static bool
processAck( const uint8_t* bufp, int bufLen, const AddrInfo* addr ) processAck( const uint8_t* bufp, int bufLen, AddrInfo::ClientToken clientToken )
{ {
bool success = false; bool success = false;
const uint8_t* end = bufp + bufLen; const uint8_t* end = bufp + bufLen;
HostID srcID; HostID srcID;
if ( getNetByte( &bufp, end, &srcID ) ) { if ( getNetByte( &bufp, end, &srcID ) ) {
SafeCref scr( addr ); SafeCref scr( clientToken, srcID );
success = scr.HandleAck( srcID ); success = scr.HandleAck( srcID );
} }
return success; return success;
@ -1084,7 +1094,8 @@ forwardMessage( const uint8_t* buf, int buflen, const AddrInfo* addr )
} /* forwardMessage */ } /* forwardMessage */
static bool static bool
processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr ) processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr,
AddrInfo::ClientToken clientToken )
{ {
bool success = false; /* default is failure */ bool success = false; /* default is failure */
XWRELAY_Cmd cmd = *buf; XWRELAY_Cmd cmd = *buf;
@ -1099,7 +1110,11 @@ processMessage( const uint8_t* buf, int bufLen, const AddrInfo* addr )
success = processReconnect( buf+1, bufLen-1, addr ); success = processReconnect( buf+1, bufLen-1, addr );
break; break;
case XWRELAY_ACK: case XWRELAY_ACK:
success = processAck( buf+1, bufLen-1, addr ); if ( clientToken != 0 ) {
success = processAck( buf+1, bufLen-1, clientToken );
} else {
logf( XW_LOGERROR, "%s(): null client token", __func__ );
}
break; break;
case XWRELAY_GAME_DISCONNECT: case XWRELAY_GAME_DISCONNECT:
success = processDisconnect( buf+1, bufLen-1, addr ); success = processDisconnect( buf+1, bufLen-1, addr );
@ -1334,6 +1349,9 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull,
logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten ); logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten );
if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) {
dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); dbmgr->RecordSent( &msgIDs[0], msgIDs.size() );
// This is wrong: should be removed when ACK returns and not
// before. But for some reason if I make that change apps wind up
// stalling.
dbmgr->RemoveStoredMessages( msgIDs ); dbmgr->RemoveStoredMessages( msgIDs );
} }
} }
@ -1438,7 +1456,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const uint8_t* bufp,
} }
unsigned short nMsgs; unsigned short nMsgs;
if ( getNetShort( &bufp, end, &nMsgs ) ) { if ( getNetShort( &bufp, end, &nMsgs ) ) {
SafeCref scr( connName ); SafeCref scr( connName, hid );
while ( scr.IsValid() && nMsgs-- > 0 ) { while ( scr.IsValid() && nMsgs-- > 0 ) {
unsigned short len; unsigned short len;
if ( getNetShort( &bufp, end, &len ) ) { if ( getNetShort( &bufp, end, &len ) ) {
@ -1460,7 +1478,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const uint8_t* bufp,
static void static void
game_thread_proc( UdpThreadClosure* utc ) game_thread_proc( UdpThreadClosure* utc )
{ {
if ( !processMessage( utc->buf(), utc->len(), utc->addr() ) ) { if ( !processMessage( utc->buf(), utc->len(), utc->addr(), 0 ) ) {
XWThreadPool::GetTPool()->CloseSocket( utc->addr() ); XWThreadPool::GetTPool()->CloseSocket( utc->addr() );
} }
} }
@ -1528,7 +1546,7 @@ proxy_thread_proc( UdpThreadClosure* utc )
sizeof( connName ), &hid ) ) { sizeof( connName ), &hid ) ) {
break; break;
} }
SafeCref scr( connName ); SafeCref scr( connName, hid );
scr.DeviceGone( hid, seed ); scr.DeviceGone( hid, seed );
} }
} }
@ -1748,7 +1766,7 @@ handle_udp_packet( UdpThreadClosure* utc )
clientToken = ntohl( clientToken ); clientToken = ntohl( clientToken );
if ( AddrInfo::NULL_TOKEN != clientToken ) { if ( AddrInfo::NULL_TOKEN != clientToken ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
(void)processMessage( ptr, end - ptr, &addr ); (void)processMessage( ptr, end - ptr, &addr, clientToken );
} else { } else {
logf( XW_LOGERROR, "%s: dropping packet with token of 0", logf( XW_LOGERROR, "%s: dropping packet with token of 0",
__func__ ); __func__ );
@ -1766,7 +1784,7 @@ handle_udp_packet( UdpThreadClosure* utc )
logf( XW_LOGERROR, "parse failed!!!" ); logf( XW_LOGERROR, "parse failed!!!" );
break; break;
} }
SafeCref scr( connName ); SafeCref scr( connName, hid );
if ( scr.IsValid() ) { if ( scr.IsValid() ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end ); handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end );
@ -1833,7 +1851,7 @@ handle_udp_packet( UdpThreadClosure* utc )
string connName; string connName;
if ( DBMgr::Get()->FindPlayer( devID.asRelayID(), clientToken, if ( DBMgr::Get()->FindPlayer( devID.asRelayID(), clientToken,
connName, &hid, &seed ) ) { connName, &hid, &seed ) ) {
SafeCref scr( connName.c_str() ); SafeCref scr( connName.c_str(), hid );
scr.DeviceGone( hid, seed ); scr.DeviceGone( hid, seed );
} }
} }
@ -1980,7 +1998,7 @@ maint_str_loop( int udpsock, const char* str )
} // maint_str_loop } // maint_str_loop
static uint32_t static uint32_t
getIPAddr( void ) getUDPIPAddr( void )
{ {
uint32_t result = INADDR_ANY; uint32_t result = INADDR_ANY;
char iface[16] = {0}; char iface[16] = {0};
@ -2215,7 +2233,7 @@ main( int argc, char** argv )
struct sockaddr_in saddr; struct sockaddr_in saddr;
g_udpsock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); g_udpsock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
saddr.sin_family = PF_INET; saddr.sin_family = PF_INET;
saddr.sin_addr.s_addr = getIPAddr(); saddr.sin_addr.s_addr = getUDPIPAddr();
saddr.sin_port = htons(udpport); saddr.sin_port = htons(udpport);
int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) ); int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) );
if ( 0 == err ) { if ( 0 == err ) {