update with all relay/ changes from gtk_multigame. This is what the

live relay's running anyway.
This commit is contained in:
Eric House 2013-02-04 06:08:39 -08:00
parent a18fb62b0f
commit 5e0fd89c9e
24 changed files with 1157 additions and 259 deletions

View file

@ -31,6 +31,9 @@ SRC = \
tpool.cpp \
cidlock.cpp \
addrinfo.cpp \
devmgr.cpp \
udpqueue.cpp \
udpack.cpp \
xwrelay.cpp \
# STATIC ?= -static
@ -39,12 +42,14 @@ HASH=$(shell git describe)
OBJ = $(patsubst %.cpp,%.o,$(SRC))
#LDFLAGS += -pthread -g -lmcheck $(STATIC)
LDFLAGS += -pthread -g $(STATIC) \
-L$(shell pg_config --libdir)
LDFLAGS += -pthread -g $(STATIC)
LDFLAGS += -L$(shell pg_config --libdir)
LDFLAGS += $(shell pkg-config --libs glib-2.0)
CPPFLAGS += -DSPAWN_SELF -g -Wall \
-I $(shell pg_config --includedir) \
-DSVN_REV=\"$(shell cat $(GITINFO) 2>/dev/null || echo -n $(HASH) )\"
CPPFLAGS += -DSPAWN_SELF -g -Wall
CPPFLAGS += -I $(shell pg_config --includedir)
CPPFLAGS += -DSVN_REV=\"$(shell cat $(GITINFO) 2>/dev/null || echo -n $(HASH) )\"
CPPFLAGS += $(shell pkg-config --cflags glib-2.0)
# CPPFLAGS += -DDO_HTTP
# CPPFLAGS += -DHAVE_STIME
@ -56,7 +61,7 @@ memdebug all: xwrelay rq
# Manual config in order to place -lpq after the .obj files as
# required by something Ubuntu did upgrading natty to oneiric
xwrelay: $(OBJ)
$(CXX) $(CPPFLAGS) $(LDFLAGS) -o $@ $^ -lpq
$(CXX) $(CPPFLAGS) -o $@ $^ -lpq $(LDFLAGS)
rq: rq.c

View file

@ -32,7 +32,10 @@ AddrInfo::equals( const AddrInfo& other ) const
if ( isTCP() ) {
equal = m_socket == other.m_socket;
} else {
assert(0); /* later.... */
// assert( m_socket == other.m_socket ); /* both same UDP socket */
/* what does equal mean on udp addresses? Same host, or same host AND game */
equal = m_clientToken == other.m_clientToken
&& 0 == memcmp( &m_saddr, &other.m_saddr, sizeof(m_saddr) );
}
}
return equal;

View file

@ -24,39 +24,61 @@
#include <netinet/in.h>
#include <string.h>
#include <assert.h>
class AddrInfo {
public:
typedef uint32_t ClientToken;
typedef union _AddrUnion {
struct sockaddr addr;
struct sockaddr_in addr_in;
} AddrUnion;
/* Those constructed without params are only valid after another copied on
top of it */
AddrInfo() {
memset( this, 0, sizeof(*this) );
m_socket = -1;
m_isValid = false;
}
AddrInfo( bool isTCP, int socket, const AddrUnion* saddr ) {
m_isValid = true;
m_isTCP = isTCP;
m_socket = socket;
memcpy( &m_saddr, saddr, sizeof(m_saddr) );
AddrInfo( int socket, const AddrUnion* saddr, bool isTCP ) {
construct( socket, saddr, isTCP );
}
AddrInfo( int socket, ClientToken clientToken, const AddrUnion* saddr ) {
init( socket, clientToken, saddr );
}
void init( int socket, ClientToken clientToken, const AddrUnion* saddr ) {
construct( socket, saddr, false );
m_clientToken = clientToken;
}
void setIsTCP( bool val ) { m_isTCP = val; }
bool isTCP() const { return m_isTCP; } /* later UDP will be here too */
int socket() const { assert(m_isValid); return m_socket; }
ClientToken clientToken() const { assert(m_isValid); return m_clientToken; }
struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; }
const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; }
const AddrUnion* saddr() const { assert(m_isValid); return &m_saddr; }
bool equals( const AddrInfo& other ) const;
private:
void construct( int socket, const AddrUnion* saddr, bool isTCP ) {
memset( this, 0, sizeof(*this) );
m_socket = socket;
m_isTCP = isTCP;
memcpy( &m_saddr, saddr, sizeof(m_saddr) );
m_isValid = true;
}
// AddrInfo& operator=(const AddrInfo&); // Prevent assignment
int m_socket;
bool m_isTCP;
bool m_isValid;
ClientToken m_clientToken; /* must be 32 bit */
AddrUnion m_saddr;
};

View file

@ -52,9 +52,9 @@ CidLock::~CidLock()
void
CidLock::print_claimed( const char* caller )
{
char buf[512] = {0};
int unclaimed = 0;
int len = snprintf( buf, sizeof(buf), "after %s: ", caller );
string str;
string_printf( str, "after %s: ", caller );
// Assume we have the mutex!!!!
map< CookieID, CidInfo*>::iterator iter;
for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) {
@ -62,13 +62,11 @@ CidLock::print_claimed( const char* caller )
if ( 0 == info->GetOwner() ) {
++unclaimed;
} else {
len += snprintf( &buf[len], sizeof(buf)-len, "%d,",
info->GetCid() );
string_printf( str, "%d,", info->GetCid() );
}
}
len += snprintf( &buf[len], sizeof(buf)-len, " (plus %d unclaimed.)",
unclaimed );
logf( XW_LOGINFO, "%s: claimed: %s", __func__, buf );
string_printf( str, "%d,", " (plus %d unclaimed.)", unclaimed );
logf( XW_LOGINFO, "%s: claimed: %s", __func__, str.c_str() );
}
#else
# define PRINT_CLAIMED()

View file

@ -133,7 +133,9 @@ RelayConfigs::SetValueFor( const char* key, const char* value )
m_values.erase(iter);
}
m_values.insert( pair<const char*,const char*>(strdup(key),strdup(value) ) );
pair<map<const char*,const char*>::iterator,bool> result =
m_values.insert( pair<const char*,const char*>(strdup(key),strdup(value) ) );
assert( result.second );
}
ino_t

View file

@ -40,6 +40,7 @@
#include "timermgr.h"
#include "configs.h"
#include "crefmgr.h"
#include "devmgr.h"
#include "permid.h"
using namespace std;
@ -261,7 +262,7 @@ CookieRef::_HandleAck( HostID hostID )
void
CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen )
const unsigned char* buf, int buflen )
{
CRefEvent evt( XWE_PROXYMSG, addr );
evt.u.fwd.src = srcID;
@ -276,7 +277,7 @@ CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
void
CookieRef::_Disconnect( const AddrInfo* addr, HostID hostID )
{
logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID );
logf( XW_LOGINFO, "%s(hostID=%d)", __func__, hostID );
CRefEvent evt( XWE_DISCONN, addr );
evt.u.discon.srcID = hostID;
@ -308,14 +309,14 @@ CookieRef::_Shutdown()
HostID
CookieRef::HostForSocket( const AddrInfo* addr )
{
HostID hid = -1;
HostID hid = HOST_ID_NONE;
ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock );
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_addr.equals( *addr ) ) {
hid = iter->m_hostID;
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid );
assert( HOST_ID_NONE != hid );
break;
}
}
@ -346,7 +347,7 @@ bool
CookieRef::AlreadyHere( unsigned short seed, const AddrInfo* addr,
HostID* prevHostID )
{
logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket );
logf( XW_LOGINFO, "%s(seed=%x(%d))", __func__, seed, seed );
bool here = false;
RWReadLock rrl( &m_socketsRWLock );
@ -372,7 +373,7 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, const AddrInfo* addr,
bool* spotTaken )
{
logf( XW_LOGINFO, "%s(hid=%d,seed=%x(%d),socket=%d)", __func__,
hid, seed, seed, socket );
hid, seed, seed, addr->socket() );
bool here = false;
RWWriteLock rwl( &m_socketsRWLock );
@ -404,7 +405,7 @@ CookieRef::notifyDisconn( const CRefEvent* evt )
evt->u.disnote.why
};
send_with_length( &evt->addr, buf, sizeof(buf), true );
send_with_length( &evt->addr, HOST_ID_NONE, buf, sizeof(buf), true );
} /* notifyDisconn */
void
@ -522,7 +523,7 @@ CookieRef::_CheckHeartbeats( time_t now )
void
CookieRef::_Forward( HostID src, const AddrInfo* addr,
HostID dest, unsigned char* buf, int buflen )
HostID dest, const unsigned char* buf, int buflen )
{
pushForwardEvent( src, addr, dest, buf, buflen );
handleEvents();
@ -587,9 +588,9 @@ CookieRef::pushHeartFailedEvent( int socket )
void
CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen )
const unsigned char* buf, int buflen )
{
logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest );
logf( XW_LOGVERBOSE1, "%s: %d -> %d", __func__, src, dest );
CRefEvent evt( XWE_FORWARDMSG, addr );
evt.u.fwd.src = src;
evt.u.fwd.dest = dest;
@ -636,7 +637,7 @@ CookieRef::handleEvents()
/* Assumption: has mutex!!!! */
while ( m_eventQueue.size () > 0 ) {
XW_RELAY_STATE nextState;
DBMgr::DevIDRelay devID;
DevIDRelay devID;
CRefEvent evt = m_eventQueue.front();
m_eventQueue.pop_front();
@ -703,6 +704,10 @@ CookieRef::handleEvents()
forward_or_store/*_proxy*/( &evt );
break;
case XWA_TRYTELL:
send_havemsgs( &evt.addr );
break;
case XWA_TIMERDISCONN:
disconnectSockets( XWRELAY_ERROR_TIMEOUT );
break;
@ -807,12 +812,19 @@ CookieRef::handleEvents()
} /* handleEvents */
bool
CookieRef::send_with_length( const AddrInfo* addr,
unsigned char* buf, int bufLen, bool cascade )
CookieRef::send_with_length( const AddrInfo* addr, HostID dest,
const unsigned char* buf, int bufLen, bool cascade )
{
bool failed = false;
if ( send_with_length_unsafe( addr, buf, bufLen ) ) {
DBMgr::Get()->RecordSent( ConnName(), HostForSocket(addr), bufLen );
if ( HOST_ID_NONE == dest ) {
dest = HostForSocket(addr);
}
if ( HOST_ID_NONE != dest ) {
DBMgr::Get()->RecordSent( ConnName(), dest, bufLen );
} else {
logf( XW_LOGERROR, "%s: no hid for addr", __func__ );
}
} else {
failed = true;
}
@ -848,7 +860,6 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
assert( dest > 0 && dest <= 4 );
assert( -1 != addr->socket() );
assert( addr->isTCP() );
for ( ; ; ) {
unsigned char buf[MAX_MSG_LEN];
@ -858,7 +869,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
buf, &buflen, &msgID ) ) {
break;
}
if ( ! send_with_length( addr, buf, buflen, true ) ) {
if ( ! send_with_length( addr, dest, buf, buflen, true ) ) {
break;
}
DBMgr::Get()->RemoveStoredMessages( &msgID, 1 );
@ -867,9 +878,9 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
bool
CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
DBMgr::DevIDRelay* devIDp )
DevIDRelay* devIDp )
{
DBMgr::DevIDRelay devID = DBMgr::DEVID_NONE;
DevIDRelay devID = DBMgr::DEVID_NONE;
int nPlayersH = evt->u.con.nPlayersH;
int seed = evt->u.con.seed;
@ -909,6 +920,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
evt->u.con.clientVersion, nPlayersH, seed,
&evt->addr, devID, reconn );
DevMgr::Get()->Remember( devID, &evt->addr );
HostID hostid = evt->u.con.srcID;
if ( NULL != hidp ) {
*hidp = hostid;
@ -916,8 +929,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
/* first add the rec here, whether it'll get ack'd or not */
logf( XW_LOGINFO, "%s: remembering pair: hostid=%x, "
"socket=%d (size=%d)",
__func__, hostid, socket, m_sockets.size());
"(size=%d)", __func__, hostid, m_sockets.size());
assert( m_sockets.size() < 4 );
@ -985,6 +997,14 @@ CookieRef::postDropDevice( HostID hostID )
handleEvents();
}
void
CookieRef::postTellHaveMsgs( const AddrInfo* addr )
{
CRefEvent evt( XWE_TRYTELL, addr );
m_eventQueue.push_back( evt );
assert( m_in_handleEvents );
}
void
CookieRef::setAllConnectedTimer()
{
@ -1038,7 +1058,7 @@ CookieRef::cancelAllConnectedTimer()
void
CookieRef::sendResponse( const CRefEvent* evt, bool initial,
const DBMgr::DevIDRelay* devID )
const DevIDRelay* devID )
{
/* Now send the response */
unsigned char buf[1 /* cmd */
@ -1105,7 +1125,7 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial,
}
}
send_with_length( &evt->addr, buf, bufp - buf, true );
send_with_length( &evt->addr, evt->u.con.srcID, buf, bufp - buf, true );
logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) );
} /* sendResponse */
@ -1121,12 +1141,13 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
void
CookieRef::forward_or_store( const CRefEvent* evt )
{
unsigned char* buf = evt->u.fwd.buf;
const unsigned char* cbuf = evt->u.fwd.buf;
do {
/* This is an ugly hack!!!! */
if ( *buf == XWRELAY_MSG_TORELAY ) {
*buf = XWRELAY_MSG_FROMRELAY;
} else if ( *buf == XWRELAY_MSG_TORELAY_NOCONN ) {
int buflen = evt->u.fwd.buflen;
unsigned char buf[buflen];
if ( *cbuf == XWRELAY_MSG_TORELAY ) {
buf[0] = XWRELAY_MSG_FROMRELAY;
} else if ( *cbuf == XWRELAY_MSG_TORELAY_NOCONN ) {
*buf = XWRELAY_MSG_FROMRELAY_NOCONN;
} else {
logf( XW_LOGERROR, "%s: got XWRELAY type of %d", __func__,
@ -1134,7 +1155,8 @@ CookieRef::forward_or_store( const CRefEvent* evt )
break;
}
int buflen = evt->u.fwd.buflen;
memcpy( &buf[1], &cbuf[1], buflen-1 );
HostID dest = evt->u.fwd.dest;
const AddrInfo* destAddr = SocketForHost( dest );
@ -1143,10 +1165,24 @@ CookieRef::forward_or_store( const CRefEvent* evt )
}
if ( (NULL == destAddr)
|| !send_with_length( destAddr, buf, buflen, true ) ) {
|| !send_with_length( destAddr, dest, buf, buflen, true ) ) {
store_message( dest, buf, buflen );
}
// If recipient GAME isn't connected, see if owner device is and can
// receive
if ( NULL == destAddr) {
DevIDRelay devid;
AddrInfo::ClientToken token;
if ( DBMgr::Get()->TokenFor( ConnName(), dest, &devid, &token ) ) {
const AddrInfo::AddrUnion* saddr = DevMgr::Get()->get( devid );
if ( !!saddr ) {
AddrInfo addr( -1, token, saddr );
postTellHaveMsgs( &addr );
}
}
}
/* also note that we've heard from src recently */
HostID src = evt->u.fwd.src;
DBMgr::Get()->RecordAddress( ConnName(), src, &evt->addr );
@ -1163,7 +1199,7 @@ CookieRef::send_denied( const CRefEvent* evt, XWREASON why )
}
void
CookieRef::send_msg( const AddrInfo* addr, HostID id,
CookieRef::send_msg( const AddrInfo* addr, HostID hid,
XWRelayMsg msg, XWREASON why, bool cascade )
{
unsigned char buf[10];
@ -1174,7 +1210,7 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id,
switch ( msg ) {
case XWRELAY_DISCONNECT_OTHER:
buf[len++] = why;
tmp = htons( id );
tmp = htons( hid );
memcpy( &buf[len], &tmp, 2 );
len += 2;
break;
@ -1184,14 +1220,13 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id,
}
assert( len <= sizeof(buf) );
send_with_length( addr, buf, len, cascade );
send_with_length( addr, HOST_ID_NONE, buf, len, cascade );
} /* send_msg */
void
CookieRef::notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why )
{
assert( addr->socket() != 0 );
assert( addr->isTCP() );
ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock );
@ -1212,7 +1247,7 @@ CookieRef::notifyGameDead( const AddrInfo* addr )
,XWRELAY_ERROR_DELETED
};
send_with_length( addr, buf, sizeof(buf), true );
send_with_length( addr, HOST_ID_NONE, buf, sizeof(buf), true );
}
/* void */
@ -1265,7 +1300,7 @@ CookieRef::sendAllHere( bool initial )
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == dest ) {
sent = send_with_length( &iter->m_addr, buf, bufp-buf, true );
sent = send_with_length( &iter->m_addr, dest, buf, bufp-buf, true );
break;
}
}

View file

@ -33,6 +33,7 @@
#include "devid.h"
#include "dbmgr.h"
#include "states.h"
#include "addrinfo.h"
typedef vector<unsigned char> MsgBuffer;
typedef deque<MsgBuffer*> MsgBufQueue;
@ -131,14 +132,14 @@ class CookieRef {
int seed, const AddrInfo* addr, bool gameDead );
void _HandleAck( HostID hostID );
void _PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen );
const unsigned char* buf, int buflen );
void _Disconnect( const AddrInfo* addr, HostID hostID );
void _DeviceGone( HostID hostID, int seed );
void _Shutdown();
void _HandleHeartbeat( HostID id, const AddrInfo* addr );
void _CheckHeartbeats( time_t now );
void _Forward( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen );
const unsigned char* buf, int buflen );
void _Remove( const AddrInfo* addr );
void _CheckAllConnected();
void _CheckNotAcked( HostID hid );
@ -159,7 +160,7 @@ class CookieRef {
struct {
HostID src;
HostID dest;
unsigned char* buf;
const unsigned char* buf;
int buflen;
} fwd;
struct {
@ -194,8 +195,8 @@ class CookieRef {
} u;
};
bool send_with_length( const AddrInfo* addr,
unsigned char* buf, int bufLen, bool cascade );
bool send_with_length( const AddrInfo* addr, HostID hid,
const unsigned char* buf, int bufLen, bool cascade );
void send_msg( const AddrInfo* addr, HostID id,
XWRelayMsg msg, XWREASON why, bool cascade );
void pushConnectEvent( int clientVersion, DevID* devID,
@ -208,7 +209,7 @@ class CookieRef {
void pushHeartFailedEvent( const AddrInfo* addr );
void pushForwardEvent( HostID src, const AddrInfo* addr,
HostID dest, unsigned char* buf, int buflen );
HostID dest, const unsigned char* buf, int buflen );
void pushDestBadEvent();
void pushLastSocketGoneEvent();
void pushGameDead( const AddrInfo* addr );
@ -219,16 +220,17 @@ class CookieRef {
void handleEvents();
void sendResponse( const CRefEvent* evt, bool initial,
const DBMgr::DevIDRelay* devID );
const DevIDRelay* devID );
void sendAnyStored( const CRefEvent* evt );
void initPlayerCounts( const CRefEvent* evt );
bool increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
DBMgr::DevIDRelay* devID );
DevIDRelay* devID );
void updateAck( HostID hostID, bool keep );
void dropPending( int seed );
void postCheckAllHere();
void postDropDevice( HostID hostID );
void postTellHaveMsgs( const AddrInfo* addr );
void setAllConnectedTimer();
void cancelAllConnectedTimer();
@ -237,6 +239,7 @@ class CookieRef {
void forward_or_store( const CRefEvent* evt );
void send_denied( const CRefEvent* evt, XWREASON why );
void send_trytell( const CRefEvent* evt );
void checkFromServer( const CRefEvent* evt );
void notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why );

View file

@ -405,16 +405,18 @@ CidInfo*
CRefMgr::getCookieRef( CookieID cid, bool failOk )
{
CidInfo* cinfo = NULL;
for ( ; ; ) {
for ( int count = 0; ; ++count ) {
cinfo = m_cidlock->Claim( cid );
if ( NULL != cinfo->GetRef() ) {
break;
} else if ( failOk ) {
} else if ( failOk || count > 20 ) {
break;
}
m_cidlock->Relinquish( cinfo, true );
logf( XW_LOGINFO, "%s: sleeping after failing to get cinfo", __func__ );
logf( XW_LOGINFO, "%s: (count=%d) sleeping after "
"failing to get cinfo", __func__, count );
usleep(200000); /* 2/10 second */
cinfo = NULL;
}
return cinfo;
} /* getCookieRef */
@ -462,7 +464,9 @@ CRefMgr::AddNew( const char* cookie, const char* connName, CookieID cid,
ref->assignConnName();
m_cookieMap.insert( pair<CookieID, CookieRef*>(ref->GetCid(), ref ) );
pair<CookieMap::iterator,bool> result =
m_cookieMap.insert( pair<CookieID, CookieRef*>(ref->GetCid(), ref ) );
assert( result.second );
logf( XW_LOGINFO, "%s: paired cookie %s/connName %s with cid %d", __func__,
(cookie?cookie:"NULL"), connName, ref->GetCid() );

View file

@ -186,7 +186,7 @@ class SafeCref {
~SafeCref();
bool Forward( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen ) {
const unsigned char* buf, int buflen ) {
if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() );
@ -198,7 +198,7 @@ class SafeCref {
}
void PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen ) {
const unsigned char* buf, int buflen ) {
if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() );

View file

@ -27,6 +27,8 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <glib.h>
#include "dbmgr.h"
#include "mlock.h"
#include "configs.h"
@ -41,6 +43,7 @@
static DBMgr* s_instance = NULL;
#define DELIM "\1"
#define MAX_NUM_PLAYERS 4
static void formatParams( char* paramValues[], int nParams, const char* fmt,
char* buf, int bufLen, ... );
@ -60,6 +63,7 @@ DBMgr::Get()
DBMgr::DBMgr()
{
logf( XW_LOGINFO, "%s called", __func__ );
m_useB64 = false;
pthread_key_create( &m_conn_key, destr_function );
@ -146,6 +150,54 @@ DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen,
return cid;
} /* FindGame */
bool
DBMgr::FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken token,
string& connName, HostID* hidp, unsigned short* seed )
{
int nSuccesses = 0;
const char* fmt =
"SELECT connName FROM %s WHERE %d = ANY(devids) AND %d = ANY(tokens)";
string query;
string_printf( query, fmt, GAMES_TABLE, relayID, token );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
vector<string> names(nTuples);
for ( int ii = 0; ii < nTuples; ++ii ) {
string name( PQgetvalue( result, ii, 0 ) );
names.push_back( name );
}
PQclear( result );
for ( vector<string>::const_iterator iter = names.begin();
iter != names.end(); ++iter ) {
const char* name = iter->c_str();
for ( HostID hid = 1; hid <= MAX_NUM_PLAYERS; ++hid ) {
fmt = "SELECT seeds[%d] FROM %s WHERE connname = '%s' "
"AND devids[%d] = %d AND tokens[%d] = %d";
string query;
string_printf( query, fmt, hid, GAMES_TABLE, name,
hid, relayID, hid, token );
result = PQexec( getThreadConn(), query.c_str() );
int nTuples2 = PQntuples( result );
for ( int jj = 0; jj < nTuples2; ++jj ) {
connName = name;
*hidp = hid;
*seed = atoi( PQgetvalue( result, 0, 0 ) );
++nSuccesses;
}
PQclear( result );
}
}
if ( 1 < nSuccesses ) {
logf( XW_LOGERROR, "%s found %d matches!!!", __func__, nSuccesses );
}
return nSuccesses >= 1;
} // FindPlayer
bool
DBMgr::SeenSeed( const char* cookie, unsigned short seed,
int langCode, int nPlayersT, bool wantsPublic,
@ -249,10 +301,10 @@ DBMgr::AllDevsAckd( const char* const connName )
// Return DevIDRelay for device, adding it to devices table IFF it's not
// already there.
DBMgr::DevIDRelay
DevIDRelay
DBMgr::RegisterDevice( const DevID* host )
{
DBMgr::DevIDRelay devID;
DevIDRelay devID;
assert( host->m_devIDType != ID_TYPE_NONE );
int ii;
bool success;
@ -261,15 +313,17 @@ DBMgr::RegisterDevice( const DevID* host )
devID = getDevID( host );
// If it's not present *and* of type ID_TYPE_RELAY, we can do nothing.
// Fail.
if ( DEVID_NONE == devID && ID_TYPE_RELAY < host->m_devIDType ) {
// Otherwise proceed.
if ( DEVID_NONE != devID ) {
(void)updateDevice( devID, false );
} else if ( ID_TYPE_RELAY < host->m_devIDType ) {
// loop until we're successful inserting the unique key. Ship with this
// coming from random, but test with increasing values initially to make
// sure duplicates are detected.
for ( success = false, ii = 0; !success; ++ii ) {
assert( 10 > ii ); // better to check that we're looping BECAUSE
// of uniqueness problem.
devID = (DBMgr::DevIDRelay)random();
devID = (DevIDRelay)random();
if ( DEVID_NONE == devID ) {
continue;
}
@ -290,7 +344,8 @@ DBMgr::RegisterDevice( const DevID* host )
NULL, NULL, 0 );
success = PGRES_COMMAND_OK == PQresultStatus(result);
if ( !success ) {
logf( XW_LOGERROR, "PQexec=>%s;%s", PQresStatus(PQresultStatus(result)),
logf( XW_LOGERROR, "PQexec=>%s;%s",
PQresStatus(PQresultStatus(result)),
PQresultErrorMessage(result) );
}
PQclear( result );
@ -299,6 +354,26 @@ DBMgr::RegisterDevice( const DevID* host )
return devID;
}
bool
DBMgr::updateDevice( DevIDRelay relayID, bool check )
{
bool exists = !check;
if ( !exists ) {
string test;
string_printf( test, "id = %d", relayID );
exists = 1 == getCountWhere( DEVICES_TABLE, test );
}
if ( exists ) {
const char* fmt =
"UPDATE " DEVICES_TABLE " SET mtime='now' WHERE id = %d";
string query;
string_printf( query, fmt, relayID );
execSql( query );
}
return exists;
}
HostID
DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion,
int nToAdd, unsigned short seed, const AddrInfo* addr,
@ -327,13 +402,14 @@ DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion,
const char* fmt = "UPDATE " GAMES_TABLE " SET nPerDevice[%d] = %d,"
" clntVers[%d] = %d,"
" seeds[%d] = %d, addrs[%d] = \'%s\', %s"
" mtimes[%d]='now', ack[%d]=\'%c\'"
" tokens[%d] = %d, mtimes[%d]='now', ack[%d]=\'%c\'"
" WHERE connName = '%s'";
string query;
char* ntoa = inet_ntoa( addr->sin_addr() );
string_printf( query, fmt, newID, nToAdd, newID, clientVersion,
newID, seed, newID, ntoa, devIDBuf.c_str(), newID,
newID, ackd?'A':'a', connName );
newID, seed, newID, ntoa, devIDBuf.c_str(),
newID, addr->clientToken(), newID, newID, ackd?'A':'a',
connName );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
@ -576,26 +652,41 @@ DBMgr::PublicRooms( int lang, int nPlayers, int* nNames, string& names )
*nNames = nTuples;
}
bool
DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid,
AddrInfo::ClientToken* token )
{
bool found = false;
const char* fmt = "SELECT tokens[%d], devids[%d] FROM " GAMES_TABLE
" WHERE connName='%s'";
string query;
string_printf( query, fmt, hid, hid, connName );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
AddrInfo::ClientToken token_tmp = atoi( PQgetvalue( result, 0, 0 ) );
DevIDRelay devid_tmp = atoi( PQgetvalue( result, 0, 1 ) );
if ( 0 != token_tmp // 0 is illegal (legacy/unset) value
&& 0 != devid_tmp ) {
*token = token_tmp;
*devid = devid_tmp;
found = true;
}
}
PQclear( result );
logf( XW_LOGINFO, "%s(%s,%d)=>%s (%d, %d)", __func__, connName, hid,
(found?"true":"false"), *devid, *token );
return found;
}
int
DBMgr::PendingMsgCount( const char* connName, int hid )
{
int count = 0;
const char* fmt = "SELECT COUNT(*) FROM " MSGS_TABLE
" WHERE connName = '%s' AND hid = %d "
string test;
string_printf( test, "connName = '%s' AND hid = %d ", connName, hid );
#ifdef HAVE_STIME
"AND stime IS NULL"
string_printf( test, " AND stime IS NULL" );
#endif
;
string query;
string_printf( query, fmt, connName, hid );
logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
count = atoi( PQgetvalue( result, 0, 0 ) );
}
PQclear( result );
return count;
return getCountWhere( MSGS_TABLE, test );
}
bool
@ -632,10 +723,10 @@ DBMgr::readArray( const char* const connName, int arr[] ) /* len 4 */
PQclear( result );
}
DBMgr::DevIDRelay
DevIDRelay
DBMgr::getDevID( const char* connName, int hid )
{
DBMgr::DevIDRelay devID;
DevIDRelay devID;
const char* fmt = "SELECT devids[%d] FROM " GAMES_TABLE " WHERE connName='%s'";
string query;
string_printf( query, fmt, hid, connName );
@ -643,29 +734,28 @@ DBMgr::getDevID( const char* connName, int hid )
PGresult* result = PQexec( getThreadConn(), query.c_str() );
assert( 1 == PQntuples( result ) );
devID = (DBMgr::DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 );
devID = (DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 );
PQclear( result );
return devID;
}
DBMgr::DevIDRelay
DevIDRelay
DBMgr::getDevID( const DevID* devID )
{
DBMgr::DevIDRelay rDevID = DEVID_NONE;
DevIDRelay rDevID = DEVID_NONE;
DevIDType devIDType = devID->m_devIDType;
string query;
assert( ID_TYPE_NONE < devIDType );
const char* asStr = devID->m_devIDString.c_str();
if ( ID_TYPE_RELAY == devIDType ) {
// confirm it's there
DBMgr::DevIDRelay cur = strtoul( asStr, NULL, 16 );
DevIDRelay cur = devID->asRelayID();
if ( DEVID_NONE != cur ) {
const char* fmt = "SELECT id FROM " DEVICES_TABLE " WHERE id=%d";
string_printf( query, fmt, cur );
}
} else {
const char* fmt = "SELECT id FROM " DEVICES_TABLE " WHERE devtype=%d and devid = '%s'";
string_printf( query, fmt, devIDType, asStr );
string_printf( query, fmt, devIDType, devID->m_devIDString.c_str() );
}
if ( 0 < query.size() ) {
@ -673,10 +763,12 @@ DBMgr::getDevID( const DevID* devID )
PGresult* result = PQexec( getThreadConn(), query.c_str() );
assert( 1 >= PQntuples( result ) );
if ( 1 == PQntuples( result ) ) {
rDevID = (DBMgr::DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 );
rDevID = (DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 );
}
PQclear( result );
}
logf( XW_LOGINFO, "%s(in=%s)=>%d (0x.8X)", __func__,
devID->m_devIDString.c_str(), rDevID, rDevID );
return rDevID;
}
@ -691,25 +783,16 @@ DBMgr::getDevID( const DevID* devID )
int
DBMgr::CountStoredMessages( const char* const connName, int hid )
{
const char* fmt = "SELECT count(*) FROM " MSGS_TABLE
" WHERE connname = '%s' "
string test;
string_printf( test, "connname = '%s'", connName );
#ifdef HAVE_STIME
"AND stime IS NULL"
string_printf( test, " AND stime IS NULL" );
#endif
;
string query;
string_printf( query, fmt, connName );
if ( hid != -1 ) {
string_printf( query, "AND hid = %d", hid );
string_printf( test, " AND hid = %d", hid );
}
PGresult* result = PQexec( getThreadConn(), query.c_str() );
assert( 1 == PQntuples( result ) );
int count = atoi( PQgetvalue( result, 0, 0 ) );
PQclear( result );
return count;
return getCountWhere( MSGS_TABLE, test );
}
int
@ -718,6 +801,38 @@ DBMgr::CountStoredMessages( const char* const connName )
return CountStoredMessages( connName, -1 );
} /* CountStoredMessages */
int
DBMgr::CountStoredMessages( DevIDRelay relayID )
{
string test;
string_printf( test, "devid = %d", relayID );
#ifdef HAVE_STIME
string_printf( test, "AND stime IS NULL" );
#endif
return getCountWhere( MSGS_TABLE, test );
}
void
DBMgr::GetStoredMessageIDs( DevIDRelay relayID, vector<int>& ids )
{
const char* fmt = "SELECT id FROM " MSGS_TABLE " WHERE devid=%d "
"AND connname IN (SELECT connname FROM " GAMES_TABLE
" WHERE NOT " GAMES_TABLE ".dead)";
string query;
string_printf( query, fmt, relayID );
// logf( XW_LOGINFO, "%s: query=\"%s\"", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
for ( int ii = 0; ii < nTuples; ++ii ) {
int id = atoi( PQgetvalue( result, ii, 0 ) );
// logf( XW_LOGINFO, "%s: adding id %d", __func__, id );
ids.push_back( id );
}
PQclear( result );
logf( XW_LOGINFO, "%s(relayID=%d)=>%d ids", __func__, relayID, ids.size() );
}
void
DBMgr::StoreMessage( const char* const connName, int hid,
const unsigned char* buf, int len )
@ -726,27 +841,67 @@ DBMgr::StoreMessage( const char* const connName, int hid,
size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE
" (connname, hid, devid, msg, msglen)"
" VALUES( '%s', %d, %d, E'%s', %d)";
unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf,
len, &newLen );
assert( NULL != bytes );
" (connname, hid, devid, token, %s, msglen)"
" VALUES( '%s', %d, %d, "
"(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), "
"%s'%s', %d)";
string query;
string_printf( query, fmt, connName, hid, devID, bytes, len );
PQfreemem( bytes );
if ( m_useB64 ) {
gchar* b64 = g_base64_encode( buf, len );
string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName,
"", b64, len );
g_free( b64 );
} else {
unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf,
len, &newLen );
assert( NULL != bytes );
string_printf( query, fmt, "msg", connName, hid, devID, hid, connName,
"E", bytes, len );
PQfreemem( bytes );
}
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
}
void
DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex,
unsigned char* buf, size_t* buflen )
{
const char* from = NULL;
if ( useB64 ) {
from = PQgetvalue( result, 0, b64indx );
}
if ( NULL == from || '\0' == from[0] ) {
useB64 = false;
from = PQgetvalue( result, 0, byteaIndex );
}
size_t to_length;
if ( useB64 ) {
gsize out_len;
guchar* txt = g_base64_decode( (const gchar*)from, &out_len );
to_length = out_len;
assert( to_length <= *buflen );
memcpy( buf, txt, to_length );
g_free( txt );
} else {
unsigned char* bytes = PQunescapeBytea( (const unsigned char*)from,
&to_length );
assert( to_length <= *buflen );
memcpy( buf, bytes, to_length );
PQfreemem( bytes );
}
*buflen = to_length;
}
bool
DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn,
unsigned char* buf, size_t* buflen, int* msgID )
{
const char* fmt = "SELECT id, msg, msglen FROM " MSGS_TABLE
const char* fmt = "SELECT id, msg, msg64, msglen FROM " MSGS_TABLE
" WHERE connName = '%s' AND hid = %d "
#ifdef HAVE_STIME
"AND stime IS NULL "
@ -765,18 +920,9 @@ DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn,
if ( NULL != msgID ) {
*msgID = atoi( PQgetvalue( result, 0, 0 ) );
}
size_t msglen = atoi( PQgetvalue( result, 0, 2 ) );
/* int len = PQgetlength( result, 0, 1 ); */
const unsigned char* from =
(const unsigned char* )PQgetvalue( result, 0, 1 );
size_t to_length;
unsigned char* bytes = PQunescapeBytea( from, &to_length );
assert( to_length <= *buflen );
memcpy( buf, bytes, to_length );
PQfreemem( bytes );
*buflen = to_length;
assert( 0 == msglen || to_length == msglen );
size_t msglen = atoi( PQgetvalue( result, 0, 3 ) );
decodeMessage( result, m_useB64, 2, 1, buf, buflen );
assert( 0 == msglen || msglen == *buflen );
}
PQclear( result );
return found;
@ -789,6 +935,51 @@ DBMgr::GetStoredMessage( const char* const connName, int hid,
return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID );
}
bool
DBMgr::GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen,
AddrInfo::ClientToken* token )
{
const char* fmt = "SELECT token, msg, msg64, msglen FROM " MSGS_TABLE
" WHERE id = %d "
#ifdef HAVE_STIME
"AND stime IS NULL "
#endif
;
string query;
string_printf( query, fmt, msgID );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
assert( nTuples <= 1 );
bool found = nTuples == 1;
if ( found ) {
*token = atoi( PQgetvalue( result, 0, 0 ) );
size_t msglen = atoi( PQgetvalue( result, 0, 3 ) );
decodeMessage( result, m_useB64, 2, 1, buf, buflen );
assert( 0 == msglen || *buflen == msglen );
}
PQclear( result );
return found;
}
void
DBMgr::RemoveStoredMessages( string& msgids )
{
const char* fmt =
#ifdef HAVE_STIME
"UPDATE " MSGS_TABLE " SET stime='now' "
#else
"DELETE FROM " MSGS_TABLE
#endif
" WHERE id IN (%s)";
string query;
string_printf( query, fmt, msgids.c_str() );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
}
void
DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs )
{
@ -805,21 +996,41 @@ DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs )
ids.append( "," );
}
}
const char* fmt =
#ifdef HAVE_STIME
"UPDATE " MSGS_TABLE " SET stime='now' "
#else
"DELETE FROM " MSGS_TABLE
#endif
" WHERE id IN (%s)";
string query;
string_printf( query, fmt, ids.c_str() );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
RemoveStoredMessages( ids );
}
}
void
DBMgr::RemoveStoredMessages( vector<int>& idv )
{
if ( 0 < idv.size() ) {
string ids;
vector<int>::const_iterator iter = idv.begin();
for ( ; ; ) {
string_printf( ids, "%d", *iter );
if ( ++iter == idv.end() ) {
break;
}
string_printf( ids, "," );
}
RemoveStoredMessages( ids );
}
}
int
DBMgr::getCountWhere( const char* table, string& test )
{
string query;
string_printf( query, "SELECT count(*) FROM %s WHERE %s", table, test.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
assert( 1 == PQntuples( result ) );
int count = atoi( PQgetvalue( result, 0, 0 ) );
PQclear( result );
logf( XW_LOGINFO, "%s(%s)=>%d", __func__, query.c_str(), count );
return count;
}
static void
formatParams( char* paramValues[], int nParams, const char* fmt, char* buf,
int bufLen, ... )

View file

@ -36,7 +36,6 @@ class DBMgr {
/* DevIDs on various platforms are stored in devices table. This is the
key, and used in msgs and games tables as a shorter way to refer to
them. */
typedef unsigned int DevIDRelay;
static const DevIDRelay DEVID_NONE = 0;
static DBMgr* Get();
@ -48,6 +47,9 @@ class DBMgr {
void AddNew( const char* cookie, const char* connName, CookieID cid,
int langCode, int nPlayersT, bool isPublic );
bool FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken,
string& connName, HostID* hid, unsigned short* seed );
CookieID FindGame( const char* connName, char* cookieBuf, int bufLen,
int* langP, int* nPlayersTP, int* nPlayersHP,
bool* isDead );
@ -62,7 +64,8 @@ class DBMgr {
char* connNameBuf, int bufLen, int* nPlayersHP );
bool AllDevsAckd( const char* const connName );
DevIDRelay RegisterDevice( const DevID* hosts );
DevIDRelay RegisterDevice( const DevID* host );
bool updateDevice( DevIDRelay relayID, bool check );
HostID AddDevice( const char* const connName, HostID curID, int clientVersion,
int nToAdd, unsigned short seed, const AddrInfo* addr,
@ -89,19 +92,30 @@ class DBMgr {
queries.*/
void PublicRooms( int lang, int nPlayers, int* nNames, string& names );
/* Get stored address info, if available and valid */
bool TokenFor( const char* const connName, int hid, DevIDRelay* devid,
AddrInfo::ClientToken* token );
/* Return number of messages pending for connName:hostid pair passed in */
int PendingMsgCount( const char* const connName, int hid );
/* message storage -- different DB */
int CountStoredMessages( const char* const connName );
int CountStoredMessages( const char* const connName, int hid );
int CountStoredMessages( DevIDRelay relayID );
void StoreMessage( const char* const connName, int hid,
const unsigned char* const buf, int len );
void GetStoredMessageIDs( DevIDRelay relayID, vector<int>& ids );
bool GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID );
bool GetNthStoredMessage( const char* const connName, int hid, int nn,
unsigned char* buf, size_t* buflen, int* msgID );
bool GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen,
AddrInfo::ClientToken* token );
void RemoveStoredMessages( const int* msgID, int nMsgIDs );
void RemoveStoredMessages( vector<int>& ids );
private:
DBMgr();
@ -110,11 +124,16 @@ class DBMgr {
void readArray( const char* const connName, int arr[] );
DevIDRelay getDevID( const char* connName, int hid );
DevIDRelay getDevID( const DevID* devID );
int getCountWhere( const char* table, string& test );
void RemoveStoredMessages( string& msgIDs );
void decodeMessage( PGresult* result, bool useB64, int b64indx,
int byteaIndex, unsigned char* buf, size_t* buflen );
PGconn* getThreadConn( void );
void conn_key_alloc();
pthread_key_t m_conn_key;
bool m_useB64;
}; /* DBMgr */

View file

@ -22,6 +22,8 @@
#define _DEVID_H_
#include <string>
#include <stdlib.h>
#include "xwrelay.h"
/* DevID protocol.
@ -52,11 +54,18 @@
*
*/
#include <assert.h>
using namespace std;
class DevID {
public:
DevID() { m_devIDType = ID_TYPE_NONE; }
DevID(DevIDType typ) { m_devIDType = typ; }
DevIDRelay asRelayID() const {
assert( ID_TYPE_RELAY == m_devIDType );
return strtoul( m_devIDString.c_str(), NULL, 16 );
}
string m_devIDString;
DevIDType m_devIDType;
};

View file

@ -171,7 +171,9 @@ ListenerMgr::addOne( int port, bool perGame )
success = sock != -1;
if ( success ) {
pair<int,bool>entry(port, perGame);
m_socks_to_ports.insert( pair<int,pair<int,bool> >(sock, entry ) );
pair<map<int,pair<int,bool> >::iterator, bool> result
= m_socks_to_ports.insert( pair<int,pair<int,bool> >(sock, entry ) );
assert( result.second );
}
return success;
}

View file

@ -6,8 +6,9 @@
#
# Depends on the gcm module
import getpass, sys, gcm, psycopg2, time, signal, shelve
import getpass, sys, psycopg2, time, signal, shelve, json, urllib2
from time import gmtime, strftime
from os import path
# I'm not checking my key in...
import mykey
@ -25,7 +26,7 @@ import mykey
# contact list if it is the target of at least one message in the msgs
# table.
k_shelfFile = "gcm_loop.shelf"
k_shelfFile = path.splitext( path.basename( sys.argv[0]) )[0] + ".shelf"
k_SENT = 'SENT'
g_con = None
g_sent = None
@ -65,7 +66,7 @@ def getPendingMsgs( con, typ ):
def unregister( gcmid ):
global g_con
print "unregister(", gcmid, ")"
query = "UPDATE devices SET unreg=TRUE WHERE devid = '%s'" % gcmid
query = "UPDATE devices SET unreg=TRUE WHERE devid = '%s' and devtype = 3" % gcmid
g_con.cursor().execute( query )
def asGCMIds(con, devids, typ):
@ -77,20 +78,23 @@ def asGCMIds(con, devids, typ):
def notifyGCM( devids, typ ):
if typ == DEVTYPE_GCM:
instance = gcm.GCM( mykey.myKey )
data = { 'getMoves': True, }
response = instance.json_request( registration_ids = devids,
data = data,
)
if 'errors' in response:
response = response['errors']
if 'NotRegistered' in response:
for gcmid in response['NotRegistered']:
unregister( gcmid )
else:
print "got some kind of error"
values = {
'data' : { 'getMoves': True, },
'registration_ids': devids,
}
params = json.dumps( values )
req = urllib2.Request("https://android.googleapis.com/gcm/send", params )
req.add_header( 'Content-Type' , 'application/x-www-form-urlencoded;charset=UTF-8' )
req.add_header( 'Authorization' , 'key=' + mykey.myKey )
req.add_header('Content-Type', 'application/json' )
response = urllib2.urlopen( req ).read()
asJson = json.loads( response )
if 'success' in asJson and 'failure' in asJson and len(devids) == asJson['success'] and 0 == asJson['failure']:
print "OK"
else:
if g_debug: print 'no errors:', response
print "Errors: "
print response
else:
print "not sending to", len(devids), "devices because typ ==", typ
@ -182,7 +186,7 @@ def main():
if 0 < emptyCount: print ""
emptyCount = 0
print strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
print "devices needing notification:", targets
print "devices needing notification:", targets, '=>',
notifyGCM( asGCMIds( g_con, targets, typ ), typ )
pruneSent( devids )
elif g_debug: print "no targets after backoff"

View file

@ -23,10 +23,11 @@ done
QUERY="WHERE NOT -NTOTAL = sum_array(nperdevice)"
echo "Device (pid) count: $(pidof xwords | wc | awk '{print $2}')"
echo -n "Device (pid) count: $(pidof xwords | wc | awk '{print $2}')"
echo "; relay pid[s]: $(pidof xwrelay)"
echo "Row count:" $(psql -t xwgames -c "select count(*) FROM games $QUERY;")
echo "SELECT dead,connname,cid,room,lang,clntVers,ntotal,nperdevice,seeds,devids,ack,nsent "\
echo "SELECT dead,connname,cid,room,lang,clntVers as cv ,ntotal,nperdevice,seeds,addrs,tokens,devids,ack,nsent as snt "\
"FROM games $QUERY ORDER BY NOT dead, connname LIMIT $LIMIT;" \
| psql xwgames

View file

@ -79,6 +79,11 @@ static StateTable g_stateTable[] = {
{ XWS_WAITMORE, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME },
{ XWS_ALLCONND, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME },
{ XWS_EMPTY, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME },
{ XWS_WAITMORE, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME },
{ XWS_ALLCONND, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME },
/* { XWS_WAITMORE, XWE_GAMEFULL, XWA_SENDALLHERE, XWS_ALLCONND }, */
/* { XWS_WAITMORE, XWE_CHECKFULL, XWA_, XWS_WAITMORE }, */
/* { XWS_INITED, XWE_DEVCONNECT, XWA_SEND_NO_ROOM, XWS_DEAD }, */
@ -229,6 +234,7 @@ eventString( XW_RELAY_EVENT evt )
CASESTR(XWE_RECONNECT);
CASESTR(XWE_GOTONEACK);
CASESTR(XWE_PROXYMSG);
CASESTR(XWE_TRYTELL);
CASESTR(XWE_GOTLASTACK);
CASESTR(XWE_ACKTIMEOUT);
CASESTR(XWE_DISCONN);
@ -276,6 +282,7 @@ actString( XW_RELAY_ACTION act )
CASESTR(XWA_SNDALLHERE_2);
CASESTR(XWA_FWD);
CASESTR(XWA_PROXYMSG);
CASESTR(XWA_TRYTELL);
CASESTR(XWA_NOTEHEART);
CASESTR(XWA_TIMERDISCONN);
CASESTR(XWA_DISCONNECT);

View file

@ -86,6 +86,7 @@ typedef enum {
this object */
,XWE_PROXYMSG /* msg when game may not be connected */
,XWE_TRYTELL /* tell the addressee to check for stored messages */
,XWE_GOTONEACK
,XWE_GOTLASTACK
@ -147,6 +148,8 @@ typedef enum {
,XWA_PROXYMSG /* out-of-band message */
,XWA_TRYTELL /* Tell the addresses to check for messages */
,XWA_NOTEHEART /* Record heartbeat received */
,XWA_NOTE_EMPTY /* No sockets left; check if can delete */

View file

@ -78,11 +78,10 @@ XWThreadPool::~XWThreadPool()
} /* ~XWThreadPool */
void
XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
XWThreadPool::Setup( int nThreads, kill_func kFunc )
{
m_nThreads = nThreads;
m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) );
m_pFunc = pFunc;
m_kFunc = kFunc;
for ( int ii = 0; ii < nThreads; ++ii ) {
@ -116,12 +115,13 @@ XWThreadPool::Stop()
}
void
XWThreadPool::AddSocket( SockType stype, const AddrInfo* from )
XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from )
{
{
RWWriteLock ml( &m_activeSocketsRWLock );
SockInfo si;
si.m_type = stype;
si.m_proc = proc;
si.m_addr = *from;
m_activeSockets.push_back( si );
logf( XW_LOGINFO, "%s: %d sockets active", __func__,
@ -173,7 +173,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
++iter;
}
}
logf( XW_LOGINFO, "CLOSING socket %d", socket );
logf( XW_LOGINFO, "CLOSING socket %d", addr->socket() );
close( addr->socket() );
/* if ( do_interrupt ) { */
/* We always need to interrupt the poll because the socket we're closing
@ -187,7 +187,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
void
XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
{
logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, socket, why );
logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, addr->socket(), why );
if ( addr->isTCP() ) {
SockInfo si;
si.m_type = STYPE_UNKNOWN;
@ -197,7 +197,7 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
}
bool
XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr )
{
bool success = false;
short packetSize;
@ -208,13 +208,14 @@ XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
if ( nRead < 0 ) {
EnqueueKill( addr, "bad packet" );
} else if ( STYPE_GAME == stype ) {
logf( XW_LOGINFO, "calling m_pFunc" );
success = (*m_pFunc)( buf, nRead, addr );
} else {
} else if ( STYPE_PROXY == stype && NULL != proc ) {
buf[nRead] = '\0';
handle_proxy_packet( buf, nRead, addr );
CloseSocket( addr );
UdpQueue::get()->handle( addr, buf, nRead+1, proc );
} else if ( STYPE_GAME == stype && NULL != proc ) {
UdpQueue::get()->handle( addr, buf, nRead, proc );
success = true;
} else {
assert(0);
}
return success;
} /* get_process_packet */
@ -261,8 +262,8 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip )
switch ( pr.m_act ) {
case Q_READ:
assert( socket >= 0 );
if ( get_process_packet( pr.m_info.m_type, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, &pr.m_info.m_addr );
if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr );
}
break;
case Q_KILL:

View file

@ -32,6 +32,7 @@
#include <set>
#include "addrinfo.h"
#include "udpqueue.h"
using namespace std;
@ -41,6 +42,7 @@ class XWThreadPool {
typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType;
typedef struct _SockInfo {
SockType m_type;
QueueCallback m_proc;
AddrInfo m_addr;
} SockInfo;
@ -51,18 +53,16 @@ class XWThreadPool {
} ThreadInfo;
static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen,
const AddrInfo* from );
typedef void (*kill_func)( const AddrInfo* addr );
XWThreadPool();
~XWThreadPool();
void Setup( int nThreads, packet_func pFunc, kill_func kFunc );
void Setup( int nThreads, kill_func kFunc );
void Stop();
/* Add to set being listened on */
void AddSocket( SockType stype, const AddrInfo* from );
void AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from );
/* remove from tpool altogether, and close */
void CloseSocket( const AddrInfo* addr );
@ -82,7 +82,7 @@ class XWThreadPool {
void print_in_use( void );
void log_hung_threads( void );
bool get_process_packet( SockType stype, const AddrInfo* from );
bool get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* from );
void interrupt_poll();
void* real_tpool_main( ThreadInfo* tsp );
@ -107,7 +107,6 @@ class XWThreadPool {
bool m_timeToDie;
int m_nThreads;
packet_func m_pFunc;
kill_func m_kFunc;
ThreadInfo* m_threadInfos;

View file

@ -15,7 +15,7 @@ HEARTBEAT=60
# How many worker threads in the thread pool? Default is five. Let's
# keep this at 1 until the race condition is fixed. All interaction
# with crefs should be from this one thread, including proxy stuff.
NTHREADS=5
NTHREADS=1
# How many seconds to wait for device to ack new connName
DEVACK=3
@ -26,12 +26,16 @@ GAME_PORTS=10997
# What ports do we listen on for per-device incoming connections?
DEVICE_PORTS=10998
# Port for per-device UDP interface (experimental)
UDPPORT=10997
# default 5
SOCK_TIMEOUT_SECONDS=5
# And the control port is?
CTLPORT=11000
# port for web interface
WWW_PORT=11001
#--- INADDR_ANY: 0x00000000

View file

@ -76,9 +76,19 @@
#include "lstnrmgr.h"
#include "dbmgr.h"
#include "addrinfo.h"
#include "devmgr.h"
#include "udpqueue.h"
#include "udpack.h"
typedef struct _UDPHeader {
uint32_t packetID;
unsigned char proto;
XWRelayReg cmd;
} UDPHeader;
static int s_nSpawns = 0;
static int g_maxsocks = -1;
static int g_udpsock = -1;
void
logf( XW_LogLevel level, const char* format, ... )
@ -178,7 +188,7 @@ cmdToStr( XWRELAY_Cmd cmd )
}
static bool
parseRelayID( unsigned char** const inp, const unsigned char* const end,
parseRelayID( const unsigned char** const inp, const unsigned char* const end,
char* buf, int buflen, HostID* hid )
{
const char* hidp = strchr( (char*)*inp, '/' );
@ -201,11 +211,28 @@ parseRelayID( unsigned char** const inp, const unsigned char* const end,
}
*inp = (unsigned char*)endptr;
}
if ( !ok ) {
logf( XW_LOGERROR, "%s failed", __func__ );
}
return ok;
}
static bool
getNetShort( unsigned char** bufpp, const unsigned char* end,
getNetLong( const unsigned char** bufpp, const unsigned char* end,
uint32_t* out )
{
uint32_t tmp;
bool ok = *bufpp + sizeof(tmp) <= end;
if ( ok ) {
memcpy( &tmp, *bufpp, sizeof(tmp) );
*bufpp += sizeof(tmp);
*out = ntohl( tmp );
}
return ok;
} /* getNetShort */
static bool
getNetShort( const unsigned char** bufpp, const unsigned char* end,
unsigned short* out )
{
unsigned short tmp;
@ -219,7 +246,7 @@ getNetShort( unsigned char** bufpp, const unsigned char* end,
} /* getNetShort */
static bool
getNetByte( unsigned char** bufpp, const unsigned char* end,
getNetByte( const unsigned char** bufpp, const unsigned char* end,
unsigned char* out )
{
bool ok = *bufpp < end;
@ -231,7 +258,7 @@ getNetByte( unsigned char** bufpp, const unsigned char* end,
} /* getNetByte */
static bool
getNetString( unsigned char** bufpp, const unsigned char* end, string& out )
getNetString( const unsigned char** bufpp, const unsigned char* end, string& out )
{
char* str = (char*)*bufpp;
size_t len = 1 + strlen( str );
@ -244,8 +271,43 @@ getNetString( unsigned char** bufpp, const unsigned char* end, string& out )
return success;
}
static bool
getRelayDevID( const unsigned char** bufpp, const unsigned char* end,
DevID& devID )
{
bool success = false;
unsigned short idLen;
if ( getNetShort( bufpp, end, &idLen ) ) {
if ( end - *bufpp < idLen/* && ID_TYPE_ANON != typ*/ ) {
logf( XW_LOGERROR, "full devID not received" );
} else {
devID.m_devIDString.append( (const char*)*bufpp, idLen );
*bufpp += idLen;
success = true;
}
}
return success;
}
static bool
getHeader( const unsigned char** bufpp, const unsigned char* end,
UDPHeader* header )
{
unsigned char byt;
bool success = getNetByte( bufpp, end, &header->proto )
&& getNetLong( bufpp, end, &header->packetID )
&& getNetByte( bufpp, end, &byt )
&& XWPDEV_PROTO_VERSION == header->proto;
if ( success ) {
header->cmd = (XWRelayReg)byt;
} else {
logf( XW_LOGERROR, "%s: bad packet header", __func__ );
}
return success;
}
static void
getDevID( unsigned char** bufpp, const unsigned char* end,
getDevID( const unsigned char** bufpp, const unsigned char* end,
unsigned short flags, DevID* devID )
{
if ( XWRELAY_PROTO_VERSION_CLIENTID <= flags ) {
@ -283,7 +345,7 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket )
#endif
static bool
readStr( unsigned char** bufp, const unsigned char* end,
readStr( const unsigned char** bufp, const unsigned char* end,
char* outBuf, int bufLen )
{
unsigned char clen = **bufp;
@ -298,7 +360,7 @@ readStr( unsigned char** bufp, const unsigned char* end,
} /* readStr */
static XWREASON
flagsOK( unsigned char** bufp, unsigned char const* end,
flagsOK( const unsigned char** bufp, unsigned char const* end,
unsigned short* clientVersion, unsigned short* flagsp )
{
XWREASON err = XWRELAY_ERROR_OLDFLAGS;
@ -334,24 +396,83 @@ denyConnection( const AddrInfo* addr, XWREASON err )
send_with_length_unsafe( addr, buf, sizeof(buf) );
}
static ssize_t
send_via_udp( int socket, const struct sockaddr *dest_addr,
XWRelayReg cmd, ... )
{
uint32_t packetNum = UDPAckTrack::nextPacketID( cmd );
struct iovec vec[10];
int iocount = 0;
unsigned char header[1 + 1 + sizeof(packetNum)];
header[0] = XWPDEV_PROTO_VERSION;
packetNum = htonl( packetNum );
memcpy( &header[1], &packetNum, sizeof(packetNum) );
header[5] = cmd;
vec[iocount].iov_base = header;
vec[iocount].iov_len = sizeof(header);
++iocount;
va_list ap;
va_start( ap, cmd );
for ( ; ; ) {
unsigned char* ptr = va_arg(ap, unsigned char*);
if ( !ptr ) {
break;
}
vec[iocount].iov_base = ptr;
vec[iocount].iov_len = va_arg(ap, int);
++iocount;
}
va_end( ap );
struct msghdr mhdr = {0};
mhdr.msg_iov = vec;
mhdr.msg_iovlen = iocount;
mhdr.msg_name = (void*)dest_addr;
mhdr.msg_namelen = sizeof(*dest_addr);
ssize_t nSent = sendmsg( socket, &mhdr, 0 /* flags */);
if ( 0 > nSent ) {
logf( XW_LOGERROR, "sendmsg->errno %d (%s)", errno, strerror(errno) );
}
logf( XW_LOGINFO, "%s()=>%d", __func__, nSent );
return nSent;
}
/* No mutex here. Caller better be ensuring no other thread can access this
* socket. */
bool
send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
size_t bufLen )
{
assert( !!addr );
bool ok = false;
int socket = addr->socket();
assert ( addr->isTCP() );
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
if ( nSent == 2 ) {
nSent = send( socket, buf, bufLen, 0 );
if ( nSent == ssize_t(bufLen) ) {
logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket );
ok = true;
if ( addr->isTCP() ) {
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
if ( nSent == 2 ) {
nSent = send( socket, buf, bufLen, 0 );
if ( nSent == ssize_t(bufLen) ) {
logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket );
ok = true;
}
}
} else {
AddrInfo::ClientToken clientToken = addr->clientToken();
assert( 0 != clientToken );
clientToken = htonl(clientToken);
const struct sockaddr* saddr = addr->sockaddr();
assert( g_udpsock == socket || socket == -1 );
if ( -1 == socket ) {
socket = g_udpsock;
}
send_via_udp( socket, saddr, XWPDEV_MSG, &clientToken,
sizeof(clientToken), buf, bufLen, NULL );
logf( XW_LOGINFO, "sent %d bytes on UDP socket %d", bufLen, socket );
ok = true;
}
if ( !ok ) {
@ -361,6 +482,17 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
return ok;
} /* send_with_length_unsafe */
void
send_havemsgs( const AddrInfo* addr )
{
logf( XW_LOGINFO, "%s()", __func__ );
int socket = addr->socket();
if ( -1 == socket ) {
socket = g_udpsock;
}
send_via_udp( socket, addr->sockaddr(), XWPDEV_HAVEMSGS, NULL );
}
/* A CONNECT message from a device gives us the hostID and socket we'll
* associate with one participant in a relayed session. We'll store this
@ -374,10 +506,10 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
* game?
*/
static bool
processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processConnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
char cookie[MAX_INVITE_LEN+1];
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
bool success = false;
cookie[0] = '\0';
@ -431,9 +563,9 @@ processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
} /* processConnect */
static bool
processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processReconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
bool success = false;
logf( XW_LOGINFO, "%s()", __func__ );
@ -470,9 +602,9 @@ processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
wantsPublic, makePublic );
success = scr.Reconnect( srcID, nPlayersH, nPlayersT, gameSeed,
&err );
if ( !success ) {
assert( err != XWRELAY_ERROR_NONE );
}
// if ( !success ) {
// assert( err != XWRELAY_ERROR_NONE );
// }
} else {
err = XWRELAY_ERROR_BADPROTO;
}
@ -486,10 +618,10 @@ processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
} /* processReconnect */
static bool
processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processAck( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
bool success = false;
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
HostID srcID;
if ( getNetByte( &bufp, end, &srcID ) ) {
SafeCref scr( addr );
@ -499,9 +631,9 @@ processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr )
}
static bool
processDisconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processDisconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
CookieID cookieID;
HostID hostID;
bool success = false;
@ -552,11 +684,11 @@ GetNSpawns(void)
/* forward the message. Need only change the command after looking up the
* socket and it's ready to go. */
static bool
forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr )
forwardMessage( const unsigned char* buf, int buflen, const AddrInfo* addr )
{
bool success = false;
unsigned char* bufp = buf + 1; /* skip cmd */
unsigned char* end = buf + buflen;
const unsigned char* bufp = buf + 1; /* skip cmd */
const unsigned char* end = buf + buflen;
CookieID cookieID;
HostID src;
HostID dest;
@ -578,7 +710,7 @@ forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr )
} /* forwardMessage */
static bool
processMessage( unsigned char* buf, int bufLen, const AddrInfo* addr )
processMessage( const unsigned char* buf, int bufLen, const AddrInfo* addr )
{
bool success = false; /* default is failure */
XWRELAY_Cmd cmd = *buf;
@ -673,6 +805,7 @@ usage( char* arg0 )
"\t-h (print this help)\\\n"
"\t-i <idfile> (file where next global id stored)\\\n"
"\t-l <logfile> (write logs here, not stderr)\\\n"
"\t-M <message> (Put in maintenance mode, and return this string to all callers)\\\n"
"\t-m <num_sockets> (max number of simultaneous sockets to have open)\\\n"
"\t-n <serverName> (used in permID generation)\\\n"
"\t-p <port> (port to listen on)\\\n"
@ -814,7 +947,7 @@ pushMsgs( vector<unsigned char>& out, DBMgr* dbmgr, const char* connName,
static void
handleMsgsMsg( const AddrInfo* addr, bool sendFull,
unsigned char* bufp, const unsigned char* end )
const unsigned char* bufp, const unsigned char* end )
{
unsigned short nameCount;
int ii;
@ -913,16 +1046,38 @@ log_hex( const unsigned char* memp, int len, const char* tag )
}
} // log_hex
static bool
handlePutMessage( SafeCref& scr, HostID hid, const AddrInfo* addr,
unsigned short len, const unsigned char** bufp,
const unsigned char* end )
{
bool success = false;
const unsigned char* start = *bufp;
HostID src;
HostID dest;
XWRELAY_Cmd cmd;
// sanity check that cmd and hostids are there
if ( getNetByte( bufp, end, &cmd )
&& getNetByte( bufp, end, &src )
&& getNetByte( bufp, end, &dest )
&& ( cmd == XWRELAY_MSG_TORELAY_NOCONN )
&& ( hid == dest ) ) {
scr.PutMsg( src, addr, dest, start, len );
*bufp = start + len;
success = true;
}
logf( XW_LOGINFO, "%s()=>%d", __func__, success );
return success;
}
static void
handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
unsigned char* end )
handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp,
const unsigned char* end )
{
// log_hex( bufp, end-bufp, __func__ );
unsigned short nameCount;
int ii;
if ( getNetShort( &bufp, end, &nameCount ) ) {
vector<unsigned char> out(4); /* space for len and n_msgs */
assert( out.size() == 4 );
for ( ii = 0; ii < nameCount && bufp < end; ++ii ) {
// See NetUtils.java for reply format
@ -944,20 +1099,10 @@ handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
unsigned short nMsgs;
if ( getNetShort( &bufp, end, &nMsgs ) ) {
SafeCref scr( connName );
while ( nMsgs-- > 0 ) {
while ( scr.IsValid() && nMsgs-- > 0 ) {
unsigned short len;
HostID src;
HostID dest;
XWRELAY_Cmd cmd;
if ( getNetShort( &bufp, end, &len ) ) {
unsigned char* start = bufp;
if ( getNetByte( &bufp, end, &cmd )
&& getNetByte( &bufp, end, &src )
&& getNetByte( &bufp, end, &dest ) ) {
assert( cmd == XWRELAY_MSG_TORELAY_NOCONN );
assert( hid == dest );
scr.PutMsg( src, addr, dest, start, len );
bufp = start + len;
if ( handlePutMessage( scr, hid, addr, len, &bufp, end ) ) {
continue;
}
}
@ -965,19 +1110,35 @@ handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
}
}
}
assert( bufp == end ); // don't ship with this!!!
if ( end - bufp != 1 ) {
logf( XW_LOGERROR, "%s: buf != end: %p vs %p", __func__, bufp, end );
}
// assert( bufp == end ); // don't ship with this!!!
}
} // handleProxyMsgs
void
handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
static void
game_thread_proc( UdpThreadClosure* utc )
{
if ( !processMessage( utc->buf(), utc->len(), utc->addr() ) ) {
XWThreadPool::GetTPool()->CloseSocket( utc->addr() );
}
}
static void
proxy_thread_proc( UdpThreadClosure* utc )
{
int len = utc->len();
const AddrInfo* addr = utc->addr();
const unsigned char* buf = utc->buf();
logf( XW_LOGINFO, "%s called", __func__ );
logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( len > 0 ) {
assert( addr->isTCP() );
int socket = addr->socket();
unsigned char* bufp = buf;
unsigned char* end = bufp + len;
const unsigned char* bufp = buf;
const unsigned char* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */
XWPRXYCMD cmd = (XWPRXYCMD)*bufp++;
switch( cmd ) {
@ -1044,7 +1205,245 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
}
}
}
} /* handle_proxy_packet */
XWThreadPool::GetTPool()->CloseSocket( addr );
}
static short
addRegID( unsigned char* ptr, DevIDRelay relayID )
{
short used = 0;
char idbuf[9];
int idLen = snprintf( idbuf, sizeof(idbuf), "%.8X", relayID );
short lenNBO = htons(idLen);
memcpy( &ptr[used], &lenNBO, sizeof(lenNBO) );
used += sizeof(lenNBO);
memcpy( &ptr[used], idbuf, idLen );
used += idLen;
return used;
}
static void
registerDevice( const DevID* devID, const AddrInfo::AddrUnion* saddr )
{
DevIDRelay relayID;
DBMgr* dbMgr = DBMgr::Get();
short indx = 0;
unsigned char buf[32];
if ( ID_TYPE_RELAY == devID->m_devIDType ) { // known to us; just update the time
relayID = devID->asRelayID();
if ( dbMgr->updateDevice( relayID, true ) ) {
int nMsgs = dbMgr->CountStoredMessages( relayID );
if ( 0 < nMsgs ) {
AddrInfo addr( -1, -1, saddr );
send_havemsgs( &addr );
}
} else {
indx += addRegID( &buf[indx], relayID );
send_via_udp( g_udpsock, &saddr->addr, XWPDEV_BADREG, buf, indx,
NULL );
relayID = DBMgr::DEVID_NONE;
}
} else {
relayID = dbMgr->RegisterDevice( devID );
if ( DBMgr::DEVID_NONE != relayID ) {
// send it back to the device
indx += addRegID( &buf[indx], relayID );
send_via_udp( g_udpsock, &saddr->addr, XWPDEV_REGRSP, buf,
indx, NULL );
}
}
// Now let's map the address to the devid for future sending purposes.
if ( DBMgr::DEVID_NONE != relayID ) {
DevMgr::Get()->Remember( relayID, saddr );
}
}
static void
retrieveMessages( DevID& devID, const AddrInfo::AddrUnion* saddr )
{
logf( XW_LOGINFO, "%s()", __func__ );
DBMgr* dbMgr = DBMgr::Get();
vector<int> ids;
vector<int> sentIDs;
dbMgr->GetStoredMessageIDs( devID.asRelayID(), ids );
vector<int>::const_iterator iter;
for ( iter = ids.begin(); iter != ids.end(); ++iter ) {
unsigned char buf[MAX_MSG_LEN];
size_t buflen = sizeof(buf);
AddrInfo::ClientToken clientToken;
if ( dbMgr->GetStoredMessage( *iter, buf, &buflen, &clientToken ) ) {
AddrInfo addr( -1, clientToken, saddr );
if ( ! send_with_length_unsafe( &addr, buf, buflen ) ) {
break;
}
sentIDs.push_back( *iter );
}
}
dbMgr->RemoveStoredMessages( sentIDs );
}
static const char*
msgToStr( XWRelayReg msg )
{
const char* str;
# define CASE_STR(c) case c: str = #c; break
switch( msg ) {
CASE_STR(XWPDEV_REG);
CASE_STR(XWPDEV_REGRSP);
CASE_STR(XWPDEV_PING);
CASE_STR(XWPDEV_HAVEMSGS);
CASE_STR(XWPDEV_RQSTMSGS);
CASE_STR(XWPDEV_MSG);
CASE_STR(XWPDEV_MSGNOCONN);
CASE_STR(XWPDEV_MSGRSP);
CASE_STR(XWPDEV_BADREG);
CASE_STR(XWPDEV_ALERT); // should not receive this....
CASE_STR(XWPDEV_ACK);
CASE_STR(XWPDEV_DELGAME);
default:
str = "<unknown>";
break;
}
# undef CASE_STR
return str;
}
static void
ackPacketIf( const UDPHeader* header, const AddrInfo* addr )
{
if ( UDPAckTrack::shouldAck( header->cmd ) ) {
uint32_t packetID = header->packetID;
logf( XW_LOGINFO, "acking packet %d", packetID );
packetID = htonl( packetID );
send_via_udp( addr->socket(), addr->sockaddr(), XWPDEV_ACK,
&packetID, sizeof(packetID), NULL );
}
}
static void
udp_thread_proc( UdpThreadClosure* utc )
{
const unsigned char* ptr = utc->buf();
const unsigned char* end = ptr + utc->len();
UDPHeader header;
if ( getHeader( &ptr, end, &header ) ) {
logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( header.cmd ) );
ackPacketIf( &header, utc->addr() );
switch( header.cmd ) {
case XWPDEV_REG: {
DevIDType typ = (DevIDType)*ptr++;
DevID devID( typ );
if ( getRelayDevID( &ptr, end, devID ) ) {
registerDevice( &devID, utc->saddr() );
}
break;
}
case XWPDEV_MSG: {
AddrInfo::ClientToken clientToken;
memcpy( &clientToken, ptr, sizeof(clientToken) );
ptr += sizeof(clientToken);
clientToken = ntohl( clientToken );
if ( 0 != clientToken ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
(void)processMessage( ptr, end - ptr, &addr );
} else {
logf( XW_LOGERROR, "%s: dropping packet with token of 0" );
}
break;
}
case XWPDEV_MSGNOCONN: {
AddrInfo::ClientToken clientToken;
if ( getNetLong( &ptr, end, &clientToken ) && 0 != clientToken ) {
HostID hid;
char connName[MAX_CONNNAME_LEN+1];
if ( !parseRelayID( &ptr, end, connName,
sizeof( connName ), &hid ) ) {
logf( XW_LOGERROR, "parse failed!!!" );
break;
}
SafeCref scr( connName );
if ( scr.IsValid() ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end );
assert( ptr == end ); // DON'T CHECK THIS IN!!!
} else {
logf( XW_LOGERROR, "%s: invalid scr for %s", __func__,
connName );
}
} else {
logf( XW_LOGERROR, "no clientToken found!!!" );
}
break;
}
case XWPDEV_RQSTMSGS: {
unsigned short idLen;
if ( !getNetShort( &ptr, end, &idLen ) ) {
break;
}
if ( end - ptr > idLen ) {
logf( XW_LOGERROR, "full devID not received" );
break;
}
DevID devID( ID_TYPE_RELAY );
devID.m_devIDString.append( (const char*)ptr, idLen );
ptr += idLen;
retrieveMessages( devID, utc->saddr() );
break;
}
case XWPDEV_ACK: {
uint32_t packetID;
if ( getNetLong( &ptr, end, &packetID ) ) {
logf( XW_LOGINFO, "ack for packet %d", packetID );
UDPAckTrack::recordAck( packetID );
}
break;
}
case XWPDEV_DELGAME: {
DevID devID( ID_TYPE_RELAY );
if ( !getRelayDevID( &ptr, end, devID ) ) {
break;
}
AddrInfo::ClientToken clientToken;
if ( getNetLong( &ptr, end, &clientToken ) && 0 != clientToken ) {
unsigned short seed;
HostID hid;
string connName;
if ( DBMgr::Get()->FindPlayer( devID.asRelayID(), clientToken,
connName, &hid, &seed ) ) {
SafeCref scr( connName.c_str() );
scr.DeviceGone( hid, seed );
}
}
break;
}
default:
logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, header.cmd );
}
}
}
static void
handle_udp_packet( int udpsock )
{
unsigned char buf[MAX_MSG_LEN];
AddrInfo::AddrUnion saddr;
memset( &saddr, 0, sizeof(saddr) );
socklen_t fromlen = sizeof(saddr.addr_in);
ssize_t nRead = recvfrom( udpsock, buf, sizeof(buf), 0 /*flags*/,
&saddr.addr, &fromlen );
logf( XW_LOGINFO, "%s: recvfrom=>%d", __func__, nRead );
if ( 0 < nRead ) {
AddrInfo addr( udpsock, &saddr, false );
UdpQueue::get()->handle( &addr, buf, nRead, udp_thread_proc );
}
}
/* From stack overflow, toward a snprintf with an expanding buffer.
*/
@ -1129,11 +1528,55 @@ enable_keepalive( int sock )
*/
}
static void
maint_str_loop( int udpsock, const char* str )
{
logf( XW_LOGINFO, "%s()", __func__ );
assert( -1 != udpsock );
short len = strlen(str);
unsigned char outbuf[sizeof(len) + len];
short lenNS = htons( len );
memcpy( &outbuf[0], &lenNS, sizeof(lenNS) );
memcpy( &outbuf[0+sizeof(len)], str, len );
fd_set rfds;
for ( ; ; ) {
FD_ZERO(&rfds);
FD_SET( udpsock, &rfds );
int retval = select( udpsock + 1, &rfds, NULL, NULL, NULL );
if ( 0 > retval ) {
logf( XW_LOGERROR, "%s: select=>%d (errno=%d/%s)", __func__, retval,
errno, strerror(errno) );
break;
}
if ( FD_ISSET( udpsock, &rfds ) ) {
unsigned char buf[512];
AddrInfo::AddrUnion saddr;
memset( &saddr, 0, sizeof(saddr) );
socklen_t fromlen = sizeof(saddr.addr_in);
ssize_t nRead = recvfrom( udpsock, buf, sizeof(buf), 0 /*flags*/,
&saddr.addr, &fromlen );
logf( XW_LOGINFO, "%s(); got %d bytes", __func__, nRead);
UDPHeader header;
const unsigned char* ptr = buf;
if ( getHeader( &ptr, ptr + nRead, &header ) ) {
send_via_udp( udpsock, &saddr.addr, XWPDEV_ALERT,
outbuf, sizeof(outbuf), NULL );
} else {
logf( XW_LOGERROR, "unexpected data" );
}
}
} // for
}
int
main( int argc, char** argv )
{
int port = 0;
int ctrlport = 0;
int udpport = -1;
#ifdef DO_HTTP
int httpport = 0;
const char* cssFile = NULL;
@ -1143,6 +1586,7 @@ main( int argc, char** argv )
const char* serverName = NULL;
// const char* idFileName = NULL;
const char* logFile = NULL;
const char* maint_str = NULL;
bool doDaemon = true;
bool doFork = true;
@ -1156,7 +1600,7 @@ main( int argc, char** argv )
first. */
for ( ; ; ) {
int opt = getopt(argc, argv, "h?c:p:m:n:f:l:t:s:w:"
int opt = getopt(argc, argv, "h?c:p:M:m:n:f:l:t:s:u:w:"
"DF" );
if ( opt == -1 ) {
@ -1198,6 +1642,9 @@ main( int argc, char** argv )
case 'l':
logFile = optarg;
break;
case 'M':
maint_str = optarg;
break;
case 'm':
g_maxsocks = atoi( optarg );
break;
@ -1210,6 +1657,9 @@ main( int argc, char** argv )
case 't':
nWorkerThreads = atoi( optarg );
break;
case 'u':
udpport = atoi( optarg );
break;
default:
usage( argv[0] );
exit( 1 );
@ -1232,6 +1682,9 @@ main( int argc, char** argv )
if ( ctrlport == 0 ) {
(void)cfg->GetValueFor( "CTLPORT", &ctrlport );
}
if ( -1 == udpport ) {
(void)cfg->GetValueFor( "UDPPORT", &udpport );
}
#ifdef DO_HTTP
if ( httpport == 0 ) {
(void)cfg->GetValueFor( "WWW_PORT", &httpport );
@ -1240,11 +1693,9 @@ main( int argc, char** argv )
if ( nWorkerThreads == 0 ) {
(void)cfg->GetValueFor( "NTHREADS", &nWorkerThreads );
}
if ( g_maxsocks == -1 && !cfg->GetValueFor( "MAXSOCKS", &g_maxsocks ) ) {
g_maxsocks = 100;
}
char serverNameBuf[128];
if ( serverName == NULL ) {
if ( cfg->GetValueFor( "SERVERNAME", serverNameBuf,
@ -1288,7 +1739,7 @@ main( int argc, char** argv )
#ifdef SPAWN_SELF
/* loop forever, relaunching children as they die. */
while ( doFork ) {
while ( doFork && !maint_str ) {
++s_nSpawns; /* increment in parent *before* copy */
pid_t pid = fork();
if ( pid == 0 ) { /* child */
@ -1310,6 +1761,26 @@ main( int argc, char** argv )
}
#endif
if ( -1 != udpport ) {
struct sockaddr_in saddr;
g_udpsock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
saddr.sin_family = PF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(udpport);
int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) );
if ( 0 == err ) {
err = fcntl( g_udpsock, F_SETFL, O_NONBLOCK );
} else {
logf( XW_LOGERROR, "bind()=>%s", strerror(errno) );
g_udpsock = -1;
}
}
if ( !!maint_str ) {
maint_str_loop( g_udpsock, maint_str );
exit( 1 ); // should never exit
}
/* Needs to be reset after a crash/respawn */
PermID::SetStartTime( time(NULL) );
@ -1396,7 +1867,7 @@ main( int argc, char** argv )
(void)sigaction( SIGINT, &act, NULL );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( nWorkerThreads, processMessage, killSocket );
tPool->Setup( nWorkerThreads, killSocket );
/* set up select call */
fd_set rfds;
@ -1404,6 +1875,9 @@ main( int argc, char** argv )
FD_ZERO(&rfds);
g_listeners.AddToFDSet( &rfds );
FD_SET( g_control, &rfds );
if ( -1 != g_udpsock ) {
FD_SET( g_udpsock, &rfds );
}
#ifdef DO_HTTP
if ( -1 != g_http ) {
FD_SET( g_http, &rfds );
@ -1413,6 +1887,9 @@ main( int argc, char** argv )
if ( g_control > highest ) {
highest = g_control;
}
if ( g_udpsock > highest ) {
highest = g_udpsock;
}
#ifdef DO_HTTP
if ( g_http > highest ) {
highest = g_http;
@ -1458,9 +1935,11 @@ main( int argc, char** argv )
"%s: accepting connection from %s on socket %d",
__func__, inet_ntoa(saddr.addr_in.sin_addr), newSock );
AddrInfo addr( true, newSock, &saddr );
tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
AddrInfo addr( newSock, &saddr, true );
tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
: XWThreadPool::STYPE_PROXY,
perGame ? game_thread_proc
: proxy_thread_proc,
&addr );
}
--retval;
@ -1471,6 +1950,12 @@ main( int argc, char** argv )
// run_ctrl_thread( g_control );
--retval;
}
if ( FD_ISSET( g_udpsock, &rfds ) ) {
// This will need to be done in a separate thread, or pushed
// to the existing thread pool
handle_udp_packet( g_udpsock );
--retval;
}
#ifdef DO_HTTP
if ( FD_ISSET( g_http, &rfds ) ) {
FD_CLR( g_http, &rfds );

View file

@ -27,6 +27,79 @@
/* Set if device is acting a server; cleared if as client */
#define FLAGS_SERVER_BIT 0x01
/* message types for the udp-based per-device (not per-game) protocol
*
* A number of these rely on a "clientToken", which is a 32-bit value the
* client provides and that it guarantees uniquely identifies a game on the
* device. A database rowid works great as long as they aren't reused.
*/
#define XWPDEV_PROTO_VERSION 0
#ifndef CANT_DO_TYPEDEF
typedef
#endif
enum { XWPDEV_NONE /* 0 is an illegal value */
/* All messages have the following six-byte header
* proto: 1 byte
* msgID: 4 byte unsigned long, 0 an illegal value
* cmd: 1 byte, one of the values below.
*/
,XWPDEV_ALERT /* relay->device: provides a string message to
present to the user (with device allowed not
to present the same string more than once)
format: header, null-terminnated string: varies */
,XWPDEV_REG /* dev->relay: device registers self and
self-selected (e.g. gcm) or assigned devid
format: header, idType: 1,
idLen: 2, id: <idLen> */
,XWPDEV_REGRSP /* relay->device: if non-relay-assigned devid
type was given, this gives the
relay-assigned one to be used from now on.
format: header, idLen: 2, id: <idLen>
*/
,XWPDEV_PING /* device->relay: keep the UDP connection
open. header. */
,XWPDEV_HAVEMSGS /* Relay->device: check messages for this
game. format: header */
,XWPDEV_RQSTMSGS /* device->relay: got any messages for me?
format: header, devID: 4 [, clientToken: 4]
*/
,XWPDEV_MSG /* dev->relay and relay->dev: norm: a message from a game to
the relay format: header, clientToken: 4, message<varies>*/
,XWPDEV_MSGNOCONN /* dev->relay in the proxy format that
includes relayID (connname:hid) and seems
to be reserved for relay FWD messages.
format: header, clientToken: 4; <cr>-terminated-connname:
varies, message: varies */
,XWPDEV_MSGRSP /* relay->dev: conveys error on receipt of XWPDEV_MSG */
,XWPDEV_BADREG /* relay->dev. You sent me a relayID via
XWPDEV_REG but I've never heard of it */
,XWPDEV_ACK /* relay->dev (maybe) and dev->relay
(definitely). Tells recipient its message
has been received. This is for debugging,
and maybe later for timing keepAlives based
on firewall timeouts. format: header,
msgID: 4
*/
,XWPDEV_DELGAME /* dev->relay: game's been deleted. format:
header, relayid: 4, clientToken: 4 */
}
#ifndef CANT_DO_TYPEDEF
XWRelayReg
#endif
;
#ifndef CANT_DO_TYPEDEF
typedef
#endif
@ -110,6 +183,8 @@ typedef enum {
,ID_TYPE_RELAY /* assigned by relay as replacement for one of the below */
,ID_TYPE_LINUX
,ID_TYPE_ANDROID_GCM
,ID_TYPE_ANDROID_OTHER
,ID_TYPE_ANON /* please assign me one based on nothing */
,ID_TYPE_NTYPES
} DevIDType;

View file

@ -58,6 +58,7 @@ cid integer
,mtimes TIMESTAMP(0)[]
,addrs INET[]
,devids INTEGER[]
,tokens INTEGER[]
);
EOF
@ -66,10 +67,12 @@ CREATE TABLE msgs (
id SERIAL
,connName VARCHAR(64)
,hid INTEGER
,token INTEGER
,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
,stime TIMESTAMP DEFAULT NULL
,devid INTEGER
,msg BYTEA
,msg64 TEXT
,msglen INTEGER
,UNIQUE ( connName, hid, msg )
);
@ -81,6 +84,7 @@ id INTEGER UNIQUE PRIMARY KEY
,devType INTEGER
,devid TEXT
,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
,mtime TIMESTAMP
,unreg BOOLEAN DEFAULT FALSE
);
EOF

View file

@ -25,11 +25,14 @@
#include <string>
#include <time.h>
#include <netinet/in.h>
#include <stdint.h>
#include "lstnrmgr.h"
#include "xwrelay.h"
#include "addrinfo.h"
typedef unsigned char HostID; /* see HOST_ID_SERVER */
typedef uint32_t DevIDRelay;
typedef enum {
XW_LOGERROR
@ -42,7 +45,8 @@ void logf( XW_LogLevel level, const char* format, ... );
void denyConnection( const AddrInfo* addr, XWREASON err );
bool send_with_length_unsafe( const AddrInfo* addr,
unsigned char* buf, size_t bufLen );
const unsigned char* buf, size_t bufLen );
void send_havemsgs( const AddrInfo* addr );
time_t uptime(void);
@ -55,8 +59,6 @@ int make_socket( unsigned long addr, unsigned short port );
void string_printf( std::string& str, const char* fmt, ... );
int read_packet( int sock, unsigned char* buf, int buflen );
void handle_proxy_packet( unsigned char* buf, int bufLen,
const AddrInfo* addr );
const char* cmdToStr( XWRELAY_Cmd cmd );