diff --git a/xwords4/relay/Makefile b/xwords4/relay/Makefile index 4d6f95188..d465f3b65 100644 --- a/xwords4/relay/Makefile +++ b/xwords4/relay/Makefile @@ -31,6 +31,9 @@ SRC = \ tpool.cpp \ cidlock.cpp \ addrinfo.cpp \ + devmgr.cpp \ + udpqueue.cpp \ + udpack.cpp \ xwrelay.cpp \ # STATIC ?= -static @@ -39,12 +42,14 @@ HASH=$(shell git describe) OBJ = $(patsubst %.cpp,%.o,$(SRC)) #LDFLAGS += -pthread -g -lmcheck $(STATIC) -LDFLAGS += -pthread -g $(STATIC) \ - -L$(shell pg_config --libdir) +LDFLAGS += -pthread -g $(STATIC) +LDFLAGS += -L$(shell pg_config --libdir) +LDFLAGS += $(shell pkg-config --libs glib-2.0) -CPPFLAGS += -DSPAWN_SELF -g -Wall \ - -I $(shell pg_config --includedir) \ - -DSVN_REV=\"$(shell cat $(GITINFO) 2>/dev/null || echo -n $(HASH) )\" +CPPFLAGS += -DSPAWN_SELF -g -Wall +CPPFLAGS += -I $(shell pg_config --includedir) +CPPFLAGS += -DSVN_REV=\"$(shell cat $(GITINFO) 2>/dev/null || echo -n $(HASH) )\" +CPPFLAGS += $(shell pkg-config --cflags glib-2.0) # CPPFLAGS += -DDO_HTTP # CPPFLAGS += -DHAVE_STIME @@ -56,7 +61,7 @@ memdebug all: xwrelay rq # Manual config in order to place -lpq after the .obj files as # required by something Ubuntu did upgrading natty to oneiric xwrelay: $(OBJ) - $(CXX) $(CPPFLAGS) $(LDFLAGS) -o $@ $^ -lpq + $(CXX) $(CPPFLAGS) -o $@ $^ -lpq $(LDFLAGS) rq: rq.c diff --git a/xwords4/relay/addrinfo.cpp b/xwords4/relay/addrinfo.cpp index 6486517de..e61f3595d 100644 --- a/xwords4/relay/addrinfo.cpp +++ b/xwords4/relay/addrinfo.cpp @@ -32,7 +32,10 @@ AddrInfo::equals( const AddrInfo& other ) const if ( isTCP() ) { equal = m_socket == other.m_socket; } else { - assert(0); /* later.... */ + // assert( m_socket == other.m_socket ); /* both same UDP socket */ + /* what does equal mean on udp addresses? Same host, or same host AND game */ + equal = m_clientToken == other.m_clientToken + && 0 == memcmp( &m_saddr, &other.m_saddr, sizeof(m_saddr) ); } } return equal; diff --git a/xwords4/relay/addrinfo.h b/xwords4/relay/addrinfo.h index 84e3230b6..1c8d04d87 100644 --- a/xwords4/relay/addrinfo.h +++ b/xwords4/relay/addrinfo.h @@ -24,39 +24,61 @@ #include #include +#include class AddrInfo { public: + typedef uint32_t ClientToken; + typedef union _AddrUnion { struct sockaddr addr; struct sockaddr_in addr_in; } AddrUnion; + /* Those constructed without params are only valid after another copied on + top of it */ AddrInfo() { - memset( this, 0, sizeof(*this) ); - m_socket = -1; m_isValid = false; } - AddrInfo( bool isTCP, int socket, const AddrUnion* saddr ) { - m_isValid = true; - m_isTCP = isTCP; - m_socket = socket; - memcpy( &m_saddr, saddr, sizeof(m_saddr) ); + AddrInfo( int socket, const AddrUnion* saddr, bool isTCP ) { + construct( socket, saddr, isTCP ); + } + + AddrInfo( int socket, ClientToken clientToken, const AddrUnion* saddr ) { + init( socket, clientToken, saddr ); + } + + void init( int socket, ClientToken clientToken, const AddrUnion* saddr ) { + construct( socket, saddr, false ); + m_clientToken = clientToken; } void setIsTCP( bool val ) { m_isTCP = val; } bool isTCP() const { return m_isTCP; } /* later UDP will be here too */ int socket() const { assert(m_isValid); return m_socket; } + ClientToken clientToken() const { assert(m_isValid); return m_clientToken; } struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; } + const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; } + const AddrUnion* saddr() const { assert(m_isValid); return &m_saddr; } bool equals( const AddrInfo& other ) const; private: + void construct( int socket, const AddrUnion* saddr, bool isTCP ) { + memset( this, 0, sizeof(*this) ); + + m_socket = socket; + m_isTCP = isTCP; + memcpy( &m_saddr, saddr, sizeof(m_saddr) ); + m_isValid = true; + } + // AddrInfo& operator=(const AddrInfo&); // Prevent assignment int m_socket; bool m_isTCP; bool m_isValid; + ClientToken m_clientToken; /* must be 32 bit */ AddrUnion m_saddr; }; diff --git a/xwords4/relay/cidlock.cpp b/xwords4/relay/cidlock.cpp index 1a1ca9194..76c7df7bd 100644 --- a/xwords4/relay/cidlock.cpp +++ b/xwords4/relay/cidlock.cpp @@ -52,9 +52,9 @@ CidLock::~CidLock() void CidLock::print_claimed( const char* caller ) { - char buf[512] = {0}; int unclaimed = 0; - int len = snprintf( buf, sizeof(buf), "after %s: ", caller ); + string str; + string_printf( str, "after %s: ", caller ); // Assume we have the mutex!!!! map< CookieID, CidInfo*>::iterator iter; for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) { @@ -62,13 +62,11 @@ CidLock::print_claimed( const char* caller ) if ( 0 == info->GetOwner() ) { ++unclaimed; } else { - len += snprintf( &buf[len], sizeof(buf)-len, "%d,", - info->GetCid() ); + string_printf( str, "%d,", info->GetCid() ); } } - len += snprintf( &buf[len], sizeof(buf)-len, " (plus %d unclaimed.)", - unclaimed ); - logf( XW_LOGINFO, "%s: claimed: %s", __func__, buf ); + string_printf( str, "%d,", " (plus %d unclaimed.)", unclaimed ); + logf( XW_LOGINFO, "%s: claimed: %s", __func__, str.c_str() ); } #else # define PRINT_CLAIMED() diff --git a/xwords4/relay/configs.cpp b/xwords4/relay/configs.cpp index 3152f98a4..90b222149 100644 --- a/xwords4/relay/configs.cpp +++ b/xwords4/relay/configs.cpp @@ -133,7 +133,9 @@ RelayConfigs::SetValueFor( const char* key, const char* value ) m_values.erase(iter); } - m_values.insert( pair(strdup(key),strdup(value) ) ); + pair::iterator,bool> result = + m_values.insert( pair(strdup(key),strdup(value) ) ); + assert( result.second ); } ino_t diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 9ca339383..bf61d8a70 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -40,6 +40,7 @@ #include "timermgr.h" #include "configs.h" #include "crefmgr.h" +#include "devmgr.h" #include "permid.h" using namespace std; @@ -261,7 +262,7 @@ CookieRef::_HandleAck( HostID hostID ) void CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID, - unsigned char* buf, int buflen ) + const unsigned char* buf, int buflen ) { CRefEvent evt( XWE_PROXYMSG, addr ); evt.u.fwd.src = srcID; @@ -276,7 +277,7 @@ CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID, void CookieRef::_Disconnect( const AddrInfo* addr, HostID hostID ) { - logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID ); + logf( XW_LOGINFO, "%s(hostID=%d)", __func__, hostID ); CRefEvent evt( XWE_DISCONN, addr ); evt.u.discon.srcID = hostID; @@ -308,14 +309,14 @@ CookieRef::_Shutdown() HostID CookieRef::HostForSocket( const AddrInfo* addr ) { - HostID hid = -1; + HostID hid = HOST_ID_NONE; ASSERT_LOCKED(); RWReadLock rrl( &m_socketsRWLock ); vector::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { if ( iter->m_addr.equals( *addr ) ) { hid = iter->m_hostID; - logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid ); + assert( HOST_ID_NONE != hid ); break; } } @@ -346,7 +347,7 @@ bool CookieRef::AlreadyHere( unsigned short seed, const AddrInfo* addr, HostID* prevHostID ) { - logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket ); + logf( XW_LOGINFO, "%s(seed=%x(%d))", __func__, seed, seed ); bool here = false; RWReadLock rrl( &m_socketsRWLock ); @@ -372,7 +373,7 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, const AddrInfo* addr, bool* spotTaken ) { logf( XW_LOGINFO, "%s(hid=%d,seed=%x(%d),socket=%d)", __func__, - hid, seed, seed, socket ); + hid, seed, seed, addr->socket() ); bool here = false; RWWriteLock rwl( &m_socketsRWLock ); @@ -404,7 +405,7 @@ CookieRef::notifyDisconn( const CRefEvent* evt ) evt->u.disnote.why }; - send_with_length( &evt->addr, buf, sizeof(buf), true ); + send_with_length( &evt->addr, HOST_ID_NONE, buf, sizeof(buf), true ); } /* notifyDisconn */ void @@ -522,7 +523,7 @@ CookieRef::_CheckHeartbeats( time_t now ) void CookieRef::_Forward( HostID src, const AddrInfo* addr, - HostID dest, unsigned char* buf, int buflen ) + HostID dest, const unsigned char* buf, int buflen ) { pushForwardEvent( src, addr, dest, buf, buflen ); handleEvents(); @@ -587,9 +588,9 @@ CookieRef::pushHeartFailedEvent( int socket ) void CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest, - unsigned char* buf, int buflen ) + const unsigned char* buf, int buflen ) { - logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest ); + logf( XW_LOGVERBOSE1, "%s: %d -> %d", __func__, src, dest ); CRefEvent evt( XWE_FORWARDMSG, addr ); evt.u.fwd.src = src; evt.u.fwd.dest = dest; @@ -636,7 +637,7 @@ CookieRef::handleEvents() /* Assumption: has mutex!!!! */ while ( m_eventQueue.size () > 0 ) { XW_RELAY_STATE nextState; - DBMgr::DevIDRelay devID; + DevIDRelay devID; CRefEvent evt = m_eventQueue.front(); m_eventQueue.pop_front(); @@ -703,6 +704,10 @@ CookieRef::handleEvents() forward_or_store/*_proxy*/( &evt ); break; + case XWA_TRYTELL: + send_havemsgs( &evt.addr ); + break; + case XWA_TIMERDISCONN: disconnectSockets( XWRELAY_ERROR_TIMEOUT ); break; @@ -807,12 +812,19 @@ CookieRef::handleEvents() } /* handleEvents */ bool -CookieRef::send_with_length( const AddrInfo* addr, - unsigned char* buf, int bufLen, bool cascade ) +CookieRef::send_with_length( const AddrInfo* addr, HostID dest, + const unsigned char* buf, int bufLen, bool cascade ) { bool failed = false; if ( send_with_length_unsafe( addr, buf, bufLen ) ) { - DBMgr::Get()->RecordSent( ConnName(), HostForSocket(addr), bufLen ); + if ( HOST_ID_NONE == dest ) { + dest = HostForSocket(addr); + } + if ( HOST_ID_NONE != dest ) { + DBMgr::Get()->RecordSent( ConnName(), dest, bufLen ); + } else { + logf( XW_LOGERROR, "%s: no hid for addr", __func__ ); + } } else { failed = true; } @@ -848,7 +860,6 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) assert( dest > 0 && dest <= 4 ); assert( -1 != addr->socket() ); - assert( addr->isTCP() ); for ( ; ; ) { unsigned char buf[MAX_MSG_LEN]; @@ -858,7 +869,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) buf, &buflen, &msgID ) ) { break; } - if ( ! send_with_length( addr, buf, buflen, true ) ) { + if ( ! send_with_length( addr, dest, buf, buflen, true ) ) { break; } DBMgr::Get()->RemoveStoredMessages( &msgID, 1 ); @@ -867,9 +878,9 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) bool CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, - DBMgr::DevIDRelay* devIDp ) + DevIDRelay* devIDp ) { - DBMgr::DevIDRelay devID = DBMgr::DEVID_NONE; + DevIDRelay devID = DBMgr::DEVID_NONE; int nPlayersH = evt->u.con.nPlayersH; int seed = evt->u.con.seed; @@ -909,6 +920,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, evt->u.con.clientVersion, nPlayersH, seed, &evt->addr, devID, reconn ); + DevMgr::Get()->Remember( devID, &evt->addr ); + HostID hostid = evt->u.con.srcID; if ( NULL != hidp ) { *hidp = hostid; @@ -916,8 +929,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, /* first add the rec here, whether it'll get ack'd or not */ logf( XW_LOGINFO, "%s: remembering pair: hostid=%x, " - "socket=%d (size=%d)", - __func__, hostid, socket, m_sockets.size()); + "(size=%d)", __func__, hostid, m_sockets.size()); assert( m_sockets.size() < 4 ); @@ -985,6 +997,14 @@ CookieRef::postDropDevice( HostID hostID ) handleEvents(); } +void +CookieRef::postTellHaveMsgs( const AddrInfo* addr ) +{ + CRefEvent evt( XWE_TRYTELL, addr ); + m_eventQueue.push_back( evt ); + assert( m_in_handleEvents ); +} + void CookieRef::setAllConnectedTimer() { @@ -1038,7 +1058,7 @@ CookieRef::cancelAllConnectedTimer() void CookieRef::sendResponse( const CRefEvent* evt, bool initial, - const DBMgr::DevIDRelay* devID ) + const DevIDRelay* devID ) { /* Now send the response */ unsigned char buf[1 /* cmd */ @@ -1105,7 +1125,7 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial, } } - send_with_length( &evt->addr, buf, bufp - buf, true ); + send_with_length( &evt->addr, evt->u.con.srcID, buf, bufp - buf, true ); logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) ); } /* sendResponse */ @@ -1121,12 +1141,13 @@ CookieRef::sendAnyStored( const CRefEvent* evt ) void CookieRef::forward_or_store( const CRefEvent* evt ) { - unsigned char* buf = evt->u.fwd.buf; + const unsigned char* cbuf = evt->u.fwd.buf; do { - /* This is an ugly hack!!!! */ - if ( *buf == XWRELAY_MSG_TORELAY ) { - *buf = XWRELAY_MSG_FROMRELAY; - } else if ( *buf == XWRELAY_MSG_TORELAY_NOCONN ) { + int buflen = evt->u.fwd.buflen; + unsigned char buf[buflen]; + if ( *cbuf == XWRELAY_MSG_TORELAY ) { + buf[0] = XWRELAY_MSG_FROMRELAY; + } else if ( *cbuf == XWRELAY_MSG_TORELAY_NOCONN ) { *buf = XWRELAY_MSG_FROMRELAY_NOCONN; } else { logf( XW_LOGERROR, "%s: got XWRELAY type of %d", __func__, @@ -1134,7 +1155,8 @@ CookieRef::forward_or_store( const CRefEvent* evt ) break; } - int buflen = evt->u.fwd.buflen; + memcpy( &buf[1], &cbuf[1], buflen-1 ); + HostID dest = evt->u.fwd.dest; const AddrInfo* destAddr = SocketForHost( dest ); @@ -1143,10 +1165,24 @@ CookieRef::forward_or_store( const CRefEvent* evt ) } if ( (NULL == destAddr) - || !send_with_length( destAddr, buf, buflen, true ) ) { + || !send_with_length( destAddr, dest, buf, buflen, true ) ) { store_message( dest, buf, buflen ); } + // If recipient GAME isn't connected, see if owner device is and can + // receive + if ( NULL == destAddr) { + DevIDRelay devid; + AddrInfo::ClientToken token; + if ( DBMgr::Get()->TokenFor( ConnName(), dest, &devid, &token ) ) { + const AddrInfo::AddrUnion* saddr = DevMgr::Get()->get( devid ); + if ( !!saddr ) { + AddrInfo addr( -1, token, saddr ); + postTellHaveMsgs( &addr ); + } + } + } + /* also note that we've heard from src recently */ HostID src = evt->u.fwd.src; DBMgr::Get()->RecordAddress( ConnName(), src, &evt->addr ); @@ -1163,7 +1199,7 @@ CookieRef::send_denied( const CRefEvent* evt, XWREASON why ) } void -CookieRef::send_msg( const AddrInfo* addr, HostID id, +CookieRef::send_msg( const AddrInfo* addr, HostID hid, XWRelayMsg msg, XWREASON why, bool cascade ) { unsigned char buf[10]; @@ -1174,7 +1210,7 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id, switch ( msg ) { case XWRELAY_DISCONNECT_OTHER: buf[len++] = why; - tmp = htons( id ); + tmp = htons( hid ); memcpy( &buf[len], &tmp, 2 ); len += 2; break; @@ -1184,14 +1220,13 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id, } assert( len <= sizeof(buf) ); - send_with_length( addr, buf, len, cascade ); + send_with_length( addr, HOST_ID_NONE, buf, len, cascade ); } /* send_msg */ void CookieRef::notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why ) { assert( addr->socket() != 0 ); - assert( addr->isTCP() ); ASSERT_LOCKED(); RWReadLock rrl( &m_socketsRWLock ); @@ -1212,7 +1247,7 @@ CookieRef::notifyGameDead( const AddrInfo* addr ) ,XWRELAY_ERROR_DELETED }; - send_with_length( addr, buf, sizeof(buf), true ); + send_with_length( addr, HOST_ID_NONE, buf, sizeof(buf), true ); } /* void */ @@ -1265,7 +1300,7 @@ CookieRef::sendAllHere( bool initial ) vector::const_iterator iter; for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { if ( iter->m_hostID == dest ) { - sent = send_with_length( &iter->m_addr, buf, bufp-buf, true ); + sent = send_with_length( &iter->m_addr, dest, buf, bufp-buf, true ); break; } } diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 5ff36326d..48d1d2b80 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -33,6 +33,7 @@ #include "devid.h" #include "dbmgr.h" #include "states.h" +#include "addrinfo.h" typedef vector MsgBuffer; typedef deque MsgBufQueue; @@ -131,14 +132,14 @@ class CookieRef { int seed, const AddrInfo* addr, bool gameDead ); void _HandleAck( HostID hostID ); void _PutMsg( HostID srcID, const AddrInfo* addr, HostID destID, - unsigned char* buf, int buflen ); + const unsigned char* buf, int buflen ); void _Disconnect( const AddrInfo* addr, HostID hostID ); void _DeviceGone( HostID hostID, int seed ); void _Shutdown(); void _HandleHeartbeat( HostID id, const AddrInfo* addr ); void _CheckHeartbeats( time_t now ); void _Forward( HostID src, const AddrInfo* addr, HostID dest, - unsigned char* buf, int buflen ); + const unsigned char* buf, int buflen ); void _Remove( const AddrInfo* addr ); void _CheckAllConnected(); void _CheckNotAcked( HostID hid ); @@ -159,7 +160,7 @@ class CookieRef { struct { HostID src; HostID dest; - unsigned char* buf; + const unsigned char* buf; int buflen; } fwd; struct { @@ -194,8 +195,8 @@ class CookieRef { } u; }; - bool send_with_length( const AddrInfo* addr, - unsigned char* buf, int bufLen, bool cascade ); + bool send_with_length( const AddrInfo* addr, HostID hid, + const unsigned char* buf, int bufLen, bool cascade ); void send_msg( const AddrInfo* addr, HostID id, XWRelayMsg msg, XWREASON why, bool cascade ); void pushConnectEvent( int clientVersion, DevID* devID, @@ -208,7 +209,7 @@ class CookieRef { void pushHeartFailedEvent( const AddrInfo* addr ); void pushForwardEvent( HostID src, const AddrInfo* addr, - HostID dest, unsigned char* buf, int buflen ); + HostID dest, const unsigned char* buf, int buflen ); void pushDestBadEvent(); void pushLastSocketGoneEvent(); void pushGameDead( const AddrInfo* addr ); @@ -219,16 +220,17 @@ class CookieRef { void handleEvents(); void sendResponse( const CRefEvent* evt, bool initial, - const DBMgr::DevIDRelay* devID ); + const DevIDRelay* devID ); void sendAnyStored( const CRefEvent* evt ); void initPlayerCounts( const CRefEvent* evt ); bool increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, - DBMgr::DevIDRelay* devID ); + DevIDRelay* devID ); void updateAck( HostID hostID, bool keep ); void dropPending( int seed ); void postCheckAllHere(); void postDropDevice( HostID hostID ); + void postTellHaveMsgs( const AddrInfo* addr ); void setAllConnectedTimer(); void cancelAllConnectedTimer(); @@ -237,6 +239,7 @@ class CookieRef { void forward_or_store( const CRefEvent* evt ); void send_denied( const CRefEvent* evt, XWREASON why ); + void send_trytell( const CRefEvent* evt ); void checkFromServer( const CRefEvent* evt ); void notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why ); diff --git a/xwords4/relay/crefmgr.cpp b/xwords4/relay/crefmgr.cpp index 01c0fdc81..42075d39d 100644 --- a/xwords4/relay/crefmgr.cpp +++ b/xwords4/relay/crefmgr.cpp @@ -405,16 +405,18 @@ CidInfo* CRefMgr::getCookieRef( CookieID cid, bool failOk ) { CidInfo* cinfo = NULL; - for ( ; ; ) { + for ( int count = 0; ; ++count ) { cinfo = m_cidlock->Claim( cid ); if ( NULL != cinfo->GetRef() ) { break; - } else if ( failOk ) { + } else if ( failOk || count > 20 ) { break; } m_cidlock->Relinquish( cinfo, true ); - logf( XW_LOGINFO, "%s: sleeping after failing to get cinfo", __func__ ); + logf( XW_LOGINFO, "%s: (count=%d) sleeping after " + "failing to get cinfo", __func__, count ); usleep(200000); /* 2/10 second */ + cinfo = NULL; } return cinfo; } /* getCookieRef */ @@ -462,7 +464,9 @@ CRefMgr::AddNew( const char* cookie, const char* connName, CookieID cid, ref->assignConnName(); - m_cookieMap.insert( pair(ref->GetCid(), ref ) ); + pair result = + m_cookieMap.insert( pair(ref->GetCid(), ref ) ); + assert( result.second ); logf( XW_LOGINFO, "%s: paired cookie %s/connName %s with cid %d", __func__, (cookie?cookie:"NULL"), connName, ref->GetCid() ); diff --git a/xwords4/relay/crefmgr.h b/xwords4/relay/crefmgr.h index a4020e6dd..7f657c2ab 100644 --- a/xwords4/relay/crefmgr.h +++ b/xwords4/relay/crefmgr.h @@ -186,7 +186,7 @@ class SafeCref { ~SafeCref(); bool Forward( HostID src, const AddrInfo* addr, HostID dest, - unsigned char* buf, int buflen ) { + const unsigned char* buf, int buflen ) { if ( IsValid() ) { CookieRef* cref = m_cinfo->GetRef(); assert( 0 != cref->GetCid() ); @@ -198,7 +198,7 @@ class SafeCref { } void PutMsg( HostID srcID, const AddrInfo* addr, HostID destID, - unsigned char* buf, int buflen ) { + const unsigned char* buf, int buflen ) { if ( IsValid() ) { CookieRef* cref = m_cinfo->GetRef(); assert( 0 != cref->GetCid() ); diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index e1c853091..75246fef1 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -27,6 +27,8 @@ #include #include +#include + #include "dbmgr.h" #include "mlock.h" #include "configs.h" @@ -41,6 +43,7 @@ static DBMgr* s_instance = NULL; #define DELIM "\1" +#define MAX_NUM_PLAYERS 4 static void formatParams( char* paramValues[], int nParams, const char* fmt, char* buf, int bufLen, ... ); @@ -60,6 +63,7 @@ DBMgr::Get() DBMgr::DBMgr() { logf( XW_LOGINFO, "%s called", __func__ ); + m_useB64 = false; pthread_key_create( &m_conn_key, destr_function ); @@ -146,6 +150,54 @@ DBMgr::FindGame( const char* connName, char* cookieBuf, int bufLen, return cid; } /* FindGame */ +bool +DBMgr::FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken token, + string& connName, HostID* hidp, unsigned short* seed ) +{ + int nSuccesses = 0; + + const char* fmt = + "SELECT connName FROM %s WHERE %d = ANY(devids) AND %d = ANY(tokens)"; + string query; + string_printf( query, fmt, GAMES_TABLE, relayID, token ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + int nTuples = PQntuples( result ); + vector names(nTuples); + for ( int ii = 0; ii < nTuples; ++ii ) { + string name( PQgetvalue( result, ii, 0 ) ); + names.push_back( name ); + } + PQclear( result ); + + for ( vector::const_iterator iter = names.begin(); + iter != names.end(); ++iter ) { + const char* name = iter->c_str(); + for ( HostID hid = 1; hid <= MAX_NUM_PLAYERS; ++hid ) { + fmt = "SELECT seeds[%d] FROM %s WHERE connname = '%s' " + "AND devids[%d] = %d AND tokens[%d] = %d"; + string query; + string_printf( query, fmt, hid, GAMES_TABLE, name, + hid, relayID, hid, token ); + result = PQexec( getThreadConn(), query.c_str() ); + int nTuples2 = PQntuples( result ); + for ( int jj = 0; jj < nTuples2; ++jj ) { + connName = name; + *hidp = hid; + *seed = atoi( PQgetvalue( result, 0, 0 ) ); + ++nSuccesses; + } + PQclear( result ); + } + } + + if ( 1 < nSuccesses ) { + logf( XW_LOGERROR, "%s found %d matches!!!", __func__, nSuccesses ); + } + + return nSuccesses >= 1; +} // FindPlayer + bool DBMgr::SeenSeed( const char* cookie, unsigned short seed, int langCode, int nPlayersT, bool wantsPublic, @@ -249,10 +301,10 @@ DBMgr::AllDevsAckd( const char* const connName ) // Return DevIDRelay for device, adding it to devices table IFF it's not // already there. -DBMgr::DevIDRelay +DevIDRelay DBMgr::RegisterDevice( const DevID* host ) { - DBMgr::DevIDRelay devID; + DevIDRelay devID; assert( host->m_devIDType != ID_TYPE_NONE ); int ii; bool success; @@ -261,15 +313,17 @@ DBMgr::RegisterDevice( const DevID* host ) devID = getDevID( host ); // If it's not present *and* of type ID_TYPE_RELAY, we can do nothing. - // Fail. - if ( DEVID_NONE == devID && ID_TYPE_RELAY < host->m_devIDType ) { + // Otherwise proceed. + if ( DEVID_NONE != devID ) { + (void)updateDevice( devID, false ); + } else if ( ID_TYPE_RELAY < host->m_devIDType ) { // loop until we're successful inserting the unique key. Ship with this // coming from random, but test with increasing values initially to make // sure duplicates are detected. for ( success = false, ii = 0; !success; ++ii ) { assert( 10 > ii ); // better to check that we're looping BECAUSE // of uniqueness problem. - devID = (DBMgr::DevIDRelay)random(); + devID = (DevIDRelay)random(); if ( DEVID_NONE == devID ) { continue; } @@ -290,7 +344,8 @@ DBMgr::RegisterDevice( const DevID* host ) NULL, NULL, 0 ); success = PGRES_COMMAND_OK == PQresultStatus(result); if ( !success ) { - logf( XW_LOGERROR, "PQexec=>%s;%s", PQresStatus(PQresultStatus(result)), + logf( XW_LOGERROR, "PQexec=>%s;%s", + PQresStatus(PQresultStatus(result)), PQresultErrorMessage(result) ); } PQclear( result ); @@ -299,6 +354,26 @@ DBMgr::RegisterDevice( const DevID* host ) return devID; } +bool +DBMgr::updateDevice( DevIDRelay relayID, bool check ) +{ + bool exists = !check; + if ( !exists ) { + string test; + string_printf( test, "id = %d", relayID ); + exists = 1 == getCountWhere( DEVICES_TABLE, test ); + } + + if ( exists ) { + const char* fmt = + "UPDATE " DEVICES_TABLE " SET mtime='now' WHERE id = %d"; + string query; + string_printf( query, fmt, relayID ); + execSql( query ); + } + return exists; +} + HostID DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion, int nToAdd, unsigned short seed, const AddrInfo* addr, @@ -327,13 +402,14 @@ DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion, const char* fmt = "UPDATE " GAMES_TABLE " SET nPerDevice[%d] = %d," " clntVers[%d] = %d," " seeds[%d] = %d, addrs[%d] = \'%s\', %s" - " mtimes[%d]='now', ack[%d]=\'%c\'" + " tokens[%d] = %d, mtimes[%d]='now', ack[%d]=\'%c\'" " WHERE connName = '%s'"; string query; char* ntoa = inet_ntoa( addr->sin_addr() ); string_printf( query, fmt, newID, nToAdd, newID, clientVersion, - newID, seed, newID, ntoa, devIDBuf.c_str(), newID, - newID, ackd?'A':'a', connName ); + newID, seed, newID, ntoa, devIDBuf.c_str(), + newID, addr->clientToken(), newID, newID, ackd?'A':'a', + connName ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); execSql( query ); @@ -576,26 +652,41 @@ DBMgr::PublicRooms( int lang, int nPlayers, int* nNames, string& names ) *nNames = nTuples; } +bool +DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid, + AddrInfo::ClientToken* token ) +{ + bool found = false; + const char* fmt = "SELECT tokens[%d], devids[%d] FROM " GAMES_TABLE + " WHERE connName='%s'"; + string query; + string_printf( query, fmt, hid, hid, connName ); + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + if ( 1 == PQntuples( result ) ) { + AddrInfo::ClientToken token_tmp = atoi( PQgetvalue( result, 0, 0 ) ); + DevIDRelay devid_tmp = atoi( PQgetvalue( result, 0, 1 ) ); + if ( 0 != token_tmp // 0 is illegal (legacy/unset) value + && 0 != devid_tmp ) { + *token = token_tmp; + *devid = devid_tmp; + found = true; + } + } + PQclear( result ); + logf( XW_LOGINFO, "%s(%s,%d)=>%s (%d, %d)", __func__, connName, hid, + (found?"true":"false"), *devid, *token ); + return found; +} + int DBMgr::PendingMsgCount( const char* connName, int hid ) { - int count = 0; - const char* fmt = "SELECT COUNT(*) FROM " MSGS_TABLE - " WHERE connName = '%s' AND hid = %d " + string test; + string_printf( test, "connName = '%s' AND hid = %d ", connName, hid ); #ifdef HAVE_STIME - "AND stime IS NULL" + string_printf( test, " AND stime IS NULL" ); #endif - ; - string query; - string_printf( query, fmt, connName, hid ); - logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() ); - - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - if ( 1 == PQntuples( result ) ) { - count = atoi( PQgetvalue( result, 0, 0 ) ); - } - PQclear( result ); - return count; + return getCountWhere( MSGS_TABLE, test ); } bool @@ -632,10 +723,10 @@ DBMgr::readArray( const char* const connName, int arr[] ) /* len 4 */ PQclear( result ); } -DBMgr::DevIDRelay +DevIDRelay DBMgr::getDevID( const char* connName, int hid ) { - DBMgr::DevIDRelay devID; + DevIDRelay devID; const char* fmt = "SELECT devids[%d] FROM " GAMES_TABLE " WHERE connName='%s'"; string query; string_printf( query, fmt, hid, connName ); @@ -643,29 +734,28 @@ DBMgr::getDevID( const char* connName, int hid ) PGresult* result = PQexec( getThreadConn(), query.c_str() ); assert( 1 == PQntuples( result ) ); - devID = (DBMgr::DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 ); + devID = (DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 ); PQclear( result ); return devID; } -DBMgr::DevIDRelay +DevIDRelay DBMgr::getDevID( const DevID* devID ) { - DBMgr::DevIDRelay rDevID = DEVID_NONE; + DevIDRelay rDevID = DEVID_NONE; DevIDType devIDType = devID->m_devIDType; string query; assert( ID_TYPE_NONE < devIDType ); - const char* asStr = devID->m_devIDString.c_str(); if ( ID_TYPE_RELAY == devIDType ) { // confirm it's there - DBMgr::DevIDRelay cur = strtoul( asStr, NULL, 16 ); + DevIDRelay cur = devID->asRelayID(); if ( DEVID_NONE != cur ) { const char* fmt = "SELECT id FROM " DEVICES_TABLE " WHERE id=%d"; string_printf( query, fmt, cur ); } } else { const char* fmt = "SELECT id FROM " DEVICES_TABLE " WHERE devtype=%d and devid = '%s'"; - string_printf( query, fmt, devIDType, asStr ); + string_printf( query, fmt, devIDType, devID->m_devIDString.c_str() ); } if ( 0 < query.size() ) { @@ -673,10 +763,12 @@ DBMgr::getDevID( const DevID* devID ) PGresult* result = PQexec( getThreadConn(), query.c_str() ); assert( 1 >= PQntuples( result ) ); if ( 1 == PQntuples( result ) ) { - rDevID = (DBMgr::DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 ); + rDevID = (DevIDRelay)strtoul( PQgetvalue( result, 0, 0 ), NULL, 10 ); } PQclear( result ); } + logf( XW_LOGINFO, "%s(in=%s)=>%d (0x.8X)", __func__, + devID->m_devIDString.c_str(), rDevID, rDevID ); return rDevID; } @@ -691,25 +783,16 @@ DBMgr::getDevID( const DevID* devID ) int DBMgr::CountStoredMessages( const char* const connName, int hid ) { - const char* fmt = "SELECT count(*) FROM " MSGS_TABLE - " WHERE connname = '%s' " + string test; + string_printf( test, "connname = '%s'", connName ); #ifdef HAVE_STIME - "AND stime IS NULL" + string_printf( test, " AND stime IS NULL" ); #endif - ; - - string query; - string_printf( query, fmt, connName ); - if ( hid != -1 ) { - string_printf( query, "AND hid = %d", hid ); + string_printf( test, " AND hid = %d", hid ); } - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - assert( 1 == PQntuples( result ) ); - int count = atoi( PQgetvalue( result, 0, 0 ) ); - PQclear( result ); - return count; + return getCountWhere( MSGS_TABLE, test ); } int @@ -718,6 +801,38 @@ DBMgr::CountStoredMessages( const char* const connName ) return CountStoredMessages( connName, -1 ); } /* CountStoredMessages */ +int +DBMgr::CountStoredMessages( DevIDRelay relayID ) +{ + string test; + string_printf( test, "devid = %d", relayID ); +#ifdef HAVE_STIME + string_printf( test, "AND stime IS NULL" ); +#endif + + return getCountWhere( MSGS_TABLE, test ); +} + +void +DBMgr::GetStoredMessageIDs( DevIDRelay relayID, vector& ids ) +{ + const char* fmt = "SELECT id FROM " MSGS_TABLE " WHERE devid=%d " + "AND connname IN (SELECT connname FROM " GAMES_TABLE + " WHERE NOT " GAMES_TABLE ".dead)"; + string query; + string_printf( query, fmt, relayID ); + // logf( XW_LOGINFO, "%s: query=\"%s\"", __func__, query.c_str() ); + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + int nTuples = PQntuples( result ); + for ( int ii = 0; ii < nTuples; ++ii ) { + int id = atoi( PQgetvalue( result, ii, 0 ) ); + // logf( XW_LOGINFO, "%s: adding id %d", __func__, id ); + ids.push_back( id ); + } + PQclear( result ); + logf( XW_LOGINFO, "%s(relayID=%d)=>%d ids", __func__, relayID, ids.size() ); +} + void DBMgr::StoreMessage( const char* const connName, int hid, const unsigned char* buf, int len ) @@ -726,27 +841,67 @@ DBMgr::StoreMessage( const char* const connName, int hid, size_t newLen; const char* fmt = "INSERT INTO " MSGS_TABLE - " (connname, hid, devid, msg, msglen)" - " VALUES( '%s', %d, %d, E'%s', %d)"; - - unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, - len, &newLen ); - assert( NULL != bytes ); + " (connname, hid, devid, token, %s, msglen)" + " VALUES( '%s', %d, %d, " + "(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), " + "%s'%s', %d)"; string query; - string_printf( query, fmt, connName, hid, devID, bytes, len ); - - PQfreemem( bytes ); + if ( m_useB64 ) { + gchar* b64 = g_base64_encode( buf, len ); + string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName, + "", b64, len ); + g_free( b64 ); + } else { + unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, + len, &newLen ); + assert( NULL != bytes ); + + string_printf( query, fmt, "msg", connName, hid, devID, hid, connName, + "E", bytes, len ); + PQfreemem( bytes ); + } logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); execSql( query ); } +void +DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex, + unsigned char* buf, size_t* buflen ) +{ + const char* from = NULL; + if ( useB64 ) { + from = PQgetvalue( result, 0, b64indx ); + } + if ( NULL == from || '\0' == from[0] ) { + useB64 = false; + from = PQgetvalue( result, 0, byteaIndex ); + } + + size_t to_length; + if ( useB64 ) { + gsize out_len; + guchar* txt = g_base64_decode( (const gchar*)from, &out_len ); + to_length = out_len; + assert( to_length <= *buflen ); + memcpy( buf, txt, to_length ); + g_free( txt ); + } else { + unsigned char* bytes = PQunescapeBytea( (const unsigned char*)from, + &to_length ); + assert( to_length <= *buflen ); + memcpy( buf, bytes, to_length ); + PQfreemem( bytes ); + } + *buflen = to_length; +} + bool DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn, unsigned char* buf, size_t* buflen, int* msgID ) { - const char* fmt = "SELECT id, msg, msglen FROM " MSGS_TABLE + const char* fmt = "SELECT id, msg, msg64, msglen FROM " MSGS_TABLE " WHERE connName = '%s' AND hid = %d " #ifdef HAVE_STIME "AND stime IS NULL " @@ -765,18 +920,9 @@ DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn, if ( NULL != msgID ) { *msgID = atoi( PQgetvalue( result, 0, 0 ) ); } - size_t msglen = atoi( PQgetvalue( result, 0, 2 ) ); - - /* int len = PQgetlength( result, 0, 1 ); */ - const unsigned char* from = - (const unsigned char* )PQgetvalue( result, 0, 1 ); - size_t to_length; - unsigned char* bytes = PQunescapeBytea( from, &to_length ); - assert( to_length <= *buflen ); - memcpy( buf, bytes, to_length ); - PQfreemem( bytes ); - *buflen = to_length; - assert( 0 == msglen || to_length == msglen ); + size_t msglen = atoi( PQgetvalue( result, 0, 3 ) ); + decodeMessage( result, m_useB64, 2, 1, buf, buflen ); + assert( 0 == msglen || msglen == *buflen ); } PQclear( result ); return found; @@ -789,6 +935,51 @@ DBMgr::GetStoredMessage( const char* const connName, int hid, return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID ); } +bool +DBMgr::GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, + AddrInfo::ClientToken* token ) +{ + const char* fmt = "SELECT token, msg, msg64, msglen FROM " MSGS_TABLE + " WHERE id = %d " +#ifdef HAVE_STIME + "AND stime IS NULL " +#endif + ; + string query; + string_printf( query, fmt, msgID ); + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + int nTuples = PQntuples( result ); + assert( nTuples <= 1 ); + + bool found = nTuples == 1; + if ( found ) { + *token = atoi( PQgetvalue( result, 0, 0 ) ); + size_t msglen = atoi( PQgetvalue( result, 0, 3 ) ); + decodeMessage( result, m_useB64, 2, 1, buf, buflen ); + assert( 0 == msglen || *buflen == msglen ); + } + PQclear( result ); + return found; +} + +void +DBMgr::RemoveStoredMessages( string& msgids ) +{ + const char* fmt = +#ifdef HAVE_STIME + "UPDATE " MSGS_TABLE " SET stime='now' " +#else + "DELETE FROM " MSGS_TABLE +#endif + " WHERE id IN (%s)"; + string query; + string_printf( query, fmt, msgids.c_str() ); + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + execSql( query ); +} + void DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs ) { @@ -805,21 +996,41 @@ DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs ) ids.append( "," ); } } - - const char* fmt = -#ifdef HAVE_STIME - "UPDATE " MSGS_TABLE " SET stime='now' " -#else - "DELETE FROM " MSGS_TABLE -#endif - " WHERE id IN (%s)"; - string query; - string_printf( query, fmt, ids.c_str() ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - execSql( query ); + RemoveStoredMessages( ids ); } } +void +DBMgr::RemoveStoredMessages( vector& idv ) +{ + if ( 0 < idv.size() ) { + string ids; + vector::const_iterator iter = idv.begin(); + for ( ; ; ) { + string_printf( ids, "%d", *iter ); + if ( ++iter == idv.end() ) { + break; + } + string_printf( ids, "," ); + } + RemoveStoredMessages( ids ); + } +} + +int +DBMgr::getCountWhere( const char* table, string& test ) +{ + string query; + string_printf( query, "SELECT count(*) FROM %s WHERE %s", table, test.c_str() ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + assert( 1 == PQntuples( result ) ); + int count = atoi( PQgetvalue( result, 0, 0 ) ); + PQclear( result ); + logf( XW_LOGINFO, "%s(%s)=>%d", __func__, query.c_str(), count ); + return count; +} + static void formatParams( char* paramValues[], int nParams, const char* fmt, char* buf, int bufLen, ... ) diff --git a/xwords4/relay/dbmgr.h b/xwords4/relay/dbmgr.h index 2106ae101..6f3c0d2ef 100644 --- a/xwords4/relay/dbmgr.h +++ b/xwords4/relay/dbmgr.h @@ -36,7 +36,6 @@ class DBMgr { /* DevIDs on various platforms are stored in devices table. This is the key, and used in msgs and games tables as a shorter way to refer to them. */ - typedef unsigned int DevIDRelay; static const DevIDRelay DEVID_NONE = 0; static DBMgr* Get(); @@ -48,6 +47,9 @@ class DBMgr { void AddNew( const char* cookie, const char* connName, CookieID cid, int langCode, int nPlayersT, bool isPublic ); + bool FindPlayer( DevIDRelay relayID, AddrInfo::ClientToken, + string& connName, HostID* hid, unsigned short* seed ); + CookieID FindGame( const char* connName, char* cookieBuf, int bufLen, int* langP, int* nPlayersTP, int* nPlayersHP, bool* isDead ); @@ -62,7 +64,8 @@ class DBMgr { char* connNameBuf, int bufLen, int* nPlayersHP ); bool AllDevsAckd( const char* const connName ); - DevIDRelay RegisterDevice( const DevID* hosts ); + DevIDRelay RegisterDevice( const DevID* host ); + bool updateDevice( DevIDRelay relayID, bool check ); HostID AddDevice( const char* const connName, HostID curID, int clientVersion, int nToAdd, unsigned short seed, const AddrInfo* addr, @@ -89,19 +92,30 @@ class DBMgr { queries.*/ void PublicRooms( int lang, int nPlayers, int* nNames, string& names ); + /* Get stored address info, if available and valid */ + bool TokenFor( const char* const connName, int hid, DevIDRelay* devid, + AddrInfo::ClientToken* token ); + /* Return number of messages pending for connName:hostid pair passed in */ int PendingMsgCount( const char* const connName, int hid ); /* message storage -- different DB */ int CountStoredMessages( const char* const connName ); int CountStoredMessages( const char* const connName, int hid ); + int CountStoredMessages( DevIDRelay relayID ); void StoreMessage( const char* const connName, int hid, const unsigned char* const buf, int len ); + void GetStoredMessageIDs( DevIDRelay relayID, vector& ids ); + bool GetStoredMessage( const char* const connName, int hid, unsigned char* buf, size_t* buflen, int* msgID ); bool GetNthStoredMessage( const char* const connName, int hid, int nn, unsigned char* buf, size_t* buflen, int* msgID ); + bool GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, + AddrInfo::ClientToken* token ); + void RemoveStoredMessages( const int* msgID, int nMsgIDs ); + void RemoveStoredMessages( vector& ids ); private: DBMgr(); @@ -110,11 +124,16 @@ class DBMgr { void readArray( const char* const connName, int arr[] ); DevIDRelay getDevID( const char* connName, int hid ); DevIDRelay getDevID( const DevID* devID ); + int getCountWhere( const char* table, string& test ); + void RemoveStoredMessages( string& msgIDs ); + void decodeMessage( PGresult* result, bool useB64, int b64indx, + int byteaIndex, unsigned char* buf, size_t* buflen ); PGconn* getThreadConn( void ); void conn_key_alloc(); pthread_key_t m_conn_key; + bool m_useB64; }; /* DBMgr */ diff --git a/xwords4/relay/devid.h b/xwords4/relay/devid.h index a31744dd0..013a68fe0 100644 --- a/xwords4/relay/devid.h +++ b/xwords4/relay/devid.h @@ -22,6 +22,8 @@ #define _DEVID_H_ #include +#include + #include "xwrelay.h" /* DevID protocol. @@ -52,11 +54,18 @@ * */ +#include + using namespace std; class DevID { public: DevID() { m_devIDType = ID_TYPE_NONE; } + DevID(DevIDType typ) { m_devIDType = typ; } + DevIDRelay asRelayID() const { + assert( ID_TYPE_RELAY == m_devIDType ); + return strtoul( m_devIDString.c_str(), NULL, 16 ); + } string m_devIDString; DevIDType m_devIDType; }; diff --git a/xwords4/relay/lstnrmgr.cpp b/xwords4/relay/lstnrmgr.cpp index 098605930..d01118413 100644 --- a/xwords4/relay/lstnrmgr.cpp +++ b/xwords4/relay/lstnrmgr.cpp @@ -171,7 +171,9 @@ ListenerMgr::addOne( int port, bool perGame ) success = sock != -1; if ( success ) { pairentry(port, perGame); - m_socks_to_ports.insert( pair >(sock, entry ) ); + pair >::iterator, bool> result + = m_socks_to_ports.insert( pair >(sock, entry ) ); + assert( result.second ); } return success; } diff --git a/xwords4/relay/scripts/gcm_loop.py b/xwords4/relay/scripts/gcm_loop.py index c60dc6f29..11b9b59de 100755 --- a/xwords4/relay/scripts/gcm_loop.py +++ b/xwords4/relay/scripts/gcm_loop.py @@ -6,8 +6,9 @@ # # Depends on the gcm module -import getpass, sys, gcm, psycopg2, time, signal, shelve +import getpass, sys, psycopg2, time, signal, shelve, json, urllib2 from time import gmtime, strftime +from os import path # I'm not checking my key in... import mykey @@ -25,7 +26,7 @@ import mykey # contact list if it is the target of at least one message in the msgs # table. -k_shelfFile = "gcm_loop.shelf" +k_shelfFile = path.splitext( path.basename( sys.argv[0]) )[0] + ".shelf" k_SENT = 'SENT' g_con = None g_sent = None @@ -65,7 +66,7 @@ def getPendingMsgs( con, typ ): def unregister( gcmid ): global g_con print "unregister(", gcmid, ")" - query = "UPDATE devices SET unreg=TRUE WHERE devid = '%s'" % gcmid + query = "UPDATE devices SET unreg=TRUE WHERE devid = '%s' and devtype = 3" % gcmid g_con.cursor().execute( query ) def asGCMIds(con, devids, typ): @@ -77,20 +78,23 @@ def asGCMIds(con, devids, typ): def notifyGCM( devids, typ ): if typ == DEVTYPE_GCM: - instance = gcm.GCM( mykey.myKey ) - data = { 'getMoves': True, } - response = instance.json_request( registration_ids = devids, - data = data, - ) - if 'errors' in response: - response = response['errors'] - if 'NotRegistered' in response: - for gcmid in response['NotRegistered']: - unregister( gcmid ) - else: - print "got some kind of error" + values = { + 'data' : { 'getMoves': True, }, + 'registration_ids': devids, + } + params = json.dumps( values ) + req = urllib2.Request("https://android.googleapis.com/gcm/send", params ) + req.add_header( 'Content-Type' , 'application/x-www-form-urlencoded;charset=UTF-8' ) + req.add_header( 'Authorization' , 'key=' + mykey.myKey ) + req.add_header('Content-Type', 'application/json' ) + response = urllib2.urlopen( req ).read() + asJson = json.loads( response ) + + if 'success' in asJson and 'failure' in asJson and len(devids) == asJson['success'] and 0 == asJson['failure']: + print "OK" else: - if g_debug: print 'no errors:', response + print "Errors: " + print response else: print "not sending to", len(devids), "devices because typ ==", typ @@ -182,7 +186,7 @@ def main(): if 0 < emptyCount: print "" emptyCount = 0 print strftime("%Y-%m-%d %H:%M:%S", time.localtime()), - print "devices needing notification:", targets + print "devices needing notification:", targets, '=>', notifyGCM( asGCMIds( g_con, targets, typ ), typ ) pruneSent( devids ) elif g_debug: print "no targets after backoff" diff --git a/xwords4/relay/scripts/showinplay.sh b/xwords4/relay/scripts/showinplay.sh index ee0c5e19a..dd9e940dd 100755 --- a/xwords4/relay/scripts/showinplay.sh +++ b/xwords4/relay/scripts/showinplay.sh @@ -23,10 +23,11 @@ done QUERY="WHERE NOT -NTOTAL = sum_array(nperdevice)" -echo "Device (pid) count: $(pidof xwords | wc | awk '{print $2}')" +echo -n "Device (pid) count: $(pidof xwords | wc | awk '{print $2}')" +echo "; relay pid[s]: $(pidof xwrelay)" echo "Row count:" $(psql -t xwgames -c "select count(*) FROM games $QUERY;") -echo "SELECT dead,connname,cid,room,lang,clntVers,ntotal,nperdevice,seeds,devids,ack,nsent "\ +echo "SELECT dead,connname,cid,room,lang,clntVers as cv ,ntotal,nperdevice,seeds,addrs,tokens,devids,ack,nsent as snt "\ "FROM games $QUERY ORDER BY NOT dead, connname LIMIT $LIMIT;" \ | psql xwgames diff --git a/xwords4/relay/states.cpp b/xwords4/relay/states.cpp index f133edb10..ab1ffe97e 100644 --- a/xwords4/relay/states.cpp +++ b/xwords4/relay/states.cpp @@ -79,6 +79,11 @@ static StateTable g_stateTable[] = { { XWS_WAITMORE, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME }, { XWS_ALLCONND, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME }, +{ XWS_EMPTY, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, +{ XWS_WAITMORE, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, +{ XWS_ALLCONND, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, + + /* { XWS_WAITMORE, XWE_GAMEFULL, XWA_SENDALLHERE, XWS_ALLCONND }, */ /* { XWS_WAITMORE, XWE_CHECKFULL, XWA_, XWS_WAITMORE }, */ /* { XWS_INITED, XWE_DEVCONNECT, XWA_SEND_NO_ROOM, XWS_DEAD }, */ @@ -229,6 +234,7 @@ eventString( XW_RELAY_EVENT evt ) CASESTR(XWE_RECONNECT); CASESTR(XWE_GOTONEACK); CASESTR(XWE_PROXYMSG); + CASESTR(XWE_TRYTELL); CASESTR(XWE_GOTLASTACK); CASESTR(XWE_ACKTIMEOUT); CASESTR(XWE_DISCONN); @@ -276,6 +282,7 @@ actString( XW_RELAY_ACTION act ) CASESTR(XWA_SNDALLHERE_2); CASESTR(XWA_FWD); CASESTR(XWA_PROXYMSG); + CASESTR(XWA_TRYTELL); CASESTR(XWA_NOTEHEART); CASESTR(XWA_TIMERDISCONN); CASESTR(XWA_DISCONNECT); diff --git a/xwords4/relay/states.h b/xwords4/relay/states.h index 2171a54eb..7f86b1d89 100644 --- a/xwords4/relay/states.h +++ b/xwords4/relay/states.h @@ -86,6 +86,7 @@ typedef enum { this object */ ,XWE_PROXYMSG /* msg when game may not be connected */ + ,XWE_TRYTELL /* tell the addressee to check for stored messages */ ,XWE_GOTONEACK ,XWE_GOTLASTACK @@ -147,6 +148,8 @@ typedef enum { ,XWA_PROXYMSG /* out-of-band message */ + ,XWA_TRYTELL /* Tell the addresses to check for messages */ + ,XWA_NOTEHEART /* Record heartbeat received */ ,XWA_NOTE_EMPTY /* No sockets left; check if can delete */ diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 17d2b36b3..2aff5caab 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -78,11 +78,10 @@ XWThreadPool::~XWThreadPool() } /* ~XWThreadPool */ void -XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) +XWThreadPool::Setup( int nThreads, kill_func kFunc ) { m_nThreads = nThreads; m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) ); - m_pFunc = pFunc; m_kFunc = kFunc; for ( int ii = 0; ii < nThreads; ++ii ) { @@ -116,12 +115,13 @@ XWThreadPool::Stop() } void -XWThreadPool::AddSocket( SockType stype, const AddrInfo* from ) +XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from ) { { RWWriteLock ml( &m_activeSocketsRWLock ); SockInfo si; si.m_type = stype; + si.m_proc = proc; si.m_addr = *from; m_activeSockets.push_back( si ); logf( XW_LOGINFO, "%s: %d sockets active", __func__, @@ -173,7 +173,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) ++iter; } } - logf( XW_LOGINFO, "CLOSING socket %d", socket ); + logf( XW_LOGINFO, "CLOSING socket %d", addr->socket() ); close( addr->socket() ); /* if ( do_interrupt ) { */ /* We always need to interrupt the poll because the socket we're closing @@ -187,7 +187,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) void XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) { - logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, socket, why ); + logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, addr->socket(), why ); if ( addr->isTCP() ) { SockInfo si; si.m_type = STYPE_UNKNOWN; @@ -197,7 +197,7 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) } bool -XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr ) +XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) { bool success = false; short packetSize; @@ -208,13 +208,14 @@ XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr ) int nRead = read_packet( addr->socket(), buf, sizeof(buf) ); if ( nRead < 0 ) { EnqueueKill( addr, "bad packet" ); - } else if ( STYPE_GAME == stype ) { - logf( XW_LOGINFO, "calling m_pFunc" ); - success = (*m_pFunc)( buf, nRead, addr ); - } else { + } else if ( STYPE_PROXY == stype && NULL != proc ) { buf[nRead] = '\0'; - handle_proxy_packet( buf, nRead, addr ); - CloseSocket( addr ); + UdpQueue::get()->handle( addr, buf, nRead+1, proc ); + } else if ( STYPE_GAME == stype && NULL != proc ) { + UdpQueue::get()->handle( addr, buf, nRead, proc ); + success = true; + } else { + assert(0); } return success; } /* get_process_packet */ @@ -261,8 +262,8 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip ) switch ( pr.m_act ) { case Q_READ: assert( socket >= 0 ); - if ( get_process_packet( pr.m_info.m_type, &pr.m_info.m_addr ) ) { - AddSocket( pr.m_info.m_type, &pr.m_info.m_addr ); + if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) { + AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ); } break; case Q_KILL: diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 2c720628b..f8fa997ed 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -32,6 +32,7 @@ #include #include "addrinfo.h" +#include "udpqueue.h" using namespace std; @@ -41,6 +42,7 @@ class XWThreadPool { typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType; typedef struct _SockInfo { SockType m_type; + QueueCallback m_proc; AddrInfo m_addr; } SockInfo; @@ -51,18 +53,16 @@ class XWThreadPool { } ThreadInfo; static XWThreadPool* GetTPool(); - typedef bool (*packet_func)( unsigned char* buf, int bufLen, - const AddrInfo* from ); typedef void (*kill_func)( const AddrInfo* addr ); XWThreadPool(); ~XWThreadPool(); - void Setup( int nThreads, packet_func pFunc, kill_func kFunc ); + void Setup( int nThreads, kill_func kFunc ); void Stop(); /* Add to set being listened on */ - void AddSocket( SockType stype, const AddrInfo* from ); + void AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from ); /* remove from tpool altogether, and close */ void CloseSocket( const AddrInfo* addr ); @@ -82,7 +82,7 @@ class XWThreadPool { void print_in_use( void ); void log_hung_threads( void ); - bool get_process_packet( SockType stype, const AddrInfo* from ); + bool get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* from ); void interrupt_poll(); void* real_tpool_main( ThreadInfo* tsp ); @@ -107,7 +107,6 @@ class XWThreadPool { bool m_timeToDie; int m_nThreads; - packet_func m_pFunc; kill_func m_kFunc; ThreadInfo* m_threadInfos; diff --git a/xwords4/relay/xwrelay.conf_tmplate b/xwords4/relay/xwrelay.conf_tmplate index 4f80efb2c..27d79ec06 100644 --- a/xwords4/relay/xwrelay.conf_tmplate +++ b/xwords4/relay/xwrelay.conf_tmplate @@ -15,7 +15,7 @@ HEARTBEAT=60 # How many worker threads in the thread pool? Default is five. Let's # keep this at 1 until the race condition is fixed. All interaction # with crefs should be from this one thread, including proxy stuff. -NTHREADS=5 +NTHREADS=1 # How many seconds to wait for device to ack new connName DEVACK=3 @@ -26,12 +26,16 @@ GAME_PORTS=10997 # What ports do we listen on for per-device incoming connections? DEVICE_PORTS=10998 +# Port for per-device UDP interface (experimental) +UDPPORT=10997 + # default 5 SOCK_TIMEOUT_SECONDS=5 # And the control port is? CTLPORT=11000 + # port for web interface WWW_PORT=11001 #--- INADDR_ANY: 0x00000000 diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index ae74fc790..35615fa5c 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -76,9 +76,19 @@ #include "lstnrmgr.h" #include "dbmgr.h" #include "addrinfo.h" +#include "devmgr.h" +#include "udpqueue.h" +#include "udpack.h" + +typedef struct _UDPHeader { + uint32_t packetID; + unsigned char proto; + XWRelayReg cmd; +} UDPHeader; static int s_nSpawns = 0; static int g_maxsocks = -1; +static int g_udpsock = -1; void logf( XW_LogLevel level, const char* format, ... ) @@ -178,7 +188,7 @@ cmdToStr( XWRELAY_Cmd cmd ) } static bool -parseRelayID( unsigned char** const inp, const unsigned char* const end, +parseRelayID( const unsigned char** const inp, const unsigned char* const end, char* buf, int buflen, HostID* hid ) { const char* hidp = strchr( (char*)*inp, '/' ); @@ -201,11 +211,28 @@ parseRelayID( unsigned char** const inp, const unsigned char* const end, } *inp = (unsigned char*)endptr; } + if ( !ok ) { + logf( XW_LOGERROR, "%s failed", __func__ ); + } return ok; } static bool -getNetShort( unsigned char** bufpp, const unsigned char* end, +getNetLong( const unsigned char** bufpp, const unsigned char* end, + uint32_t* out ) +{ + uint32_t tmp; + bool ok = *bufpp + sizeof(tmp) <= end; + if ( ok ) { + memcpy( &tmp, *bufpp, sizeof(tmp) ); + *bufpp += sizeof(tmp); + *out = ntohl( tmp ); + } + return ok; +} /* getNetShort */ + +static bool +getNetShort( const unsigned char** bufpp, const unsigned char* end, unsigned short* out ) { unsigned short tmp; @@ -219,7 +246,7 @@ getNetShort( unsigned char** bufpp, const unsigned char* end, } /* getNetShort */ static bool -getNetByte( unsigned char** bufpp, const unsigned char* end, +getNetByte( const unsigned char** bufpp, const unsigned char* end, unsigned char* out ) { bool ok = *bufpp < end; @@ -231,7 +258,7 @@ getNetByte( unsigned char** bufpp, const unsigned char* end, } /* getNetByte */ static bool -getNetString( unsigned char** bufpp, const unsigned char* end, string& out ) +getNetString( const unsigned char** bufpp, const unsigned char* end, string& out ) { char* str = (char*)*bufpp; size_t len = 1 + strlen( str ); @@ -244,8 +271,43 @@ getNetString( unsigned char** bufpp, const unsigned char* end, string& out ) return success; } +static bool +getRelayDevID( const unsigned char** bufpp, const unsigned char* end, + DevID& devID ) +{ + bool success = false; + unsigned short idLen; + if ( getNetShort( bufpp, end, &idLen ) ) { + if ( end - *bufpp < idLen/* && ID_TYPE_ANON != typ*/ ) { + logf( XW_LOGERROR, "full devID not received" ); + } else { + devID.m_devIDString.append( (const char*)*bufpp, idLen ); + *bufpp += idLen; + success = true; + } + } + return success; +} + +static bool +getHeader( const unsigned char** bufpp, const unsigned char* end, + UDPHeader* header ) +{ + unsigned char byt; + bool success = getNetByte( bufpp, end, &header->proto ) + && getNetLong( bufpp, end, &header->packetID ) + && getNetByte( bufpp, end, &byt ) + && XWPDEV_PROTO_VERSION == header->proto; + if ( success ) { + header->cmd = (XWRelayReg)byt; + } else { + logf( XW_LOGERROR, "%s: bad packet header", __func__ ); + } + return success; +} + static void -getDevID( unsigned char** bufpp, const unsigned char* end, +getDevID( const unsigned char** bufpp, const unsigned char* end, unsigned short flags, DevID* devID ) { if ( XWRELAY_PROTO_VERSION_CLIENTID <= flags ) { @@ -283,7 +345,7 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket ) #endif static bool -readStr( unsigned char** bufp, const unsigned char* end, +readStr( const unsigned char** bufp, const unsigned char* end, char* outBuf, int bufLen ) { unsigned char clen = **bufp; @@ -298,7 +360,7 @@ readStr( unsigned char** bufp, const unsigned char* end, } /* readStr */ static XWREASON -flagsOK( unsigned char** bufp, unsigned char const* end, +flagsOK( const unsigned char** bufp, unsigned char const* end, unsigned short* clientVersion, unsigned short* flagsp ) { XWREASON err = XWRELAY_ERROR_OLDFLAGS; @@ -334,24 +396,83 @@ denyConnection( const AddrInfo* addr, XWREASON err ) send_with_length_unsafe( addr, buf, sizeof(buf) ); } +static ssize_t +send_via_udp( int socket, const struct sockaddr *dest_addr, + XWRelayReg cmd, ... ) +{ + uint32_t packetNum = UDPAckTrack::nextPacketID( cmd ); + struct iovec vec[10]; + int iocount = 0; + + unsigned char header[1 + 1 + sizeof(packetNum)]; + header[0] = XWPDEV_PROTO_VERSION; + packetNum = htonl( packetNum ); + memcpy( &header[1], &packetNum, sizeof(packetNum) ); + header[5] = cmd; + vec[iocount].iov_base = header; + vec[iocount].iov_len = sizeof(header); + ++iocount; + + va_list ap; + va_start( ap, cmd ); + for ( ; ; ) { + unsigned char* ptr = va_arg(ap, unsigned char*); + if ( !ptr ) { + break; + } + vec[iocount].iov_base = ptr; + vec[iocount].iov_len = va_arg(ap, int); + ++iocount; + } + va_end( ap ); + + struct msghdr mhdr = {0}; + mhdr.msg_iov = vec; + mhdr.msg_iovlen = iocount; + mhdr.msg_name = (void*)dest_addr; + mhdr.msg_namelen = sizeof(*dest_addr); + + ssize_t nSent = sendmsg( socket, &mhdr, 0 /* flags */); + if ( 0 > nSent ) { + logf( XW_LOGERROR, "sendmsg->errno %d (%s)", errno, strerror(errno) ); + } + logf( XW_LOGINFO, "%s()=>%d", __func__, nSent ); + return nSent; +} + /* No mutex here. Caller better be ensuring no other thread can access this * socket. */ bool -send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf, +send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, size_t bufLen ) { assert( !!addr ); bool ok = false; int socket = addr->socket(); - assert ( addr->isTCP() ); - unsigned short len = htons( bufLen ); - ssize_t nSent = send( socket, &len, 2, 0 ); - if ( nSent == 2 ) { - nSent = send( socket, buf, bufLen, 0 ); - if ( nSent == ssize_t(bufLen) ) { - logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); - ok = true; + + if ( addr->isTCP() ) { + unsigned short len = htons( bufLen ); + ssize_t nSent = send( socket, &len, 2, 0 ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == ssize_t(bufLen) ) { + logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); + ok = true; + } } + } else { + AddrInfo::ClientToken clientToken = addr->clientToken(); + assert( 0 != clientToken ); + clientToken = htonl(clientToken); + const struct sockaddr* saddr = addr->sockaddr(); + assert( g_udpsock == socket || socket == -1 ); + if ( -1 == socket ) { + socket = g_udpsock; + } + send_via_udp( socket, saddr, XWPDEV_MSG, &clientToken, + sizeof(clientToken), buf, bufLen, NULL ); + logf( XW_LOGINFO, "sent %d bytes on UDP socket %d", bufLen, socket ); + ok = true; } if ( !ok ) { @@ -361,6 +482,17 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf, return ok; } /* send_with_length_unsafe */ +void +send_havemsgs( const AddrInfo* addr ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + int socket = addr->socket(); + if ( -1 == socket ) { + socket = g_udpsock; + } + + send_via_udp( socket, addr->sockaddr(), XWPDEV_HAVEMSGS, NULL ); +} /* A CONNECT message from a device gives us the hostID and socket we'll * associate with one participant in a relayed session. We'll store this @@ -374,10 +506,10 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf, * game? */ static bool -processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) +processConnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr ) { char cookie[MAX_INVITE_LEN+1]; - unsigned char* end = bufp + bufLen; + const unsigned char* end = bufp + bufLen; bool success = false; cookie[0] = '\0'; @@ -431,9 +563,9 @@ processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) } /* processConnect */ static bool -processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) +processReconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr ) { - unsigned char* end = bufp + bufLen; + const unsigned char* end = bufp + bufLen; bool success = false; logf( XW_LOGINFO, "%s()", __func__ ); @@ -470,9 +602,9 @@ processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) wantsPublic, makePublic ); success = scr.Reconnect( srcID, nPlayersH, nPlayersT, gameSeed, &err ); - if ( !success ) { - assert( err != XWRELAY_ERROR_NONE ); - } + // if ( !success ) { + // assert( err != XWRELAY_ERROR_NONE ); + // } } else { err = XWRELAY_ERROR_BADPROTO; } @@ -486,10 +618,10 @@ processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) } /* processReconnect */ static bool -processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr ) +processAck( const unsigned char* bufp, int bufLen, const AddrInfo* addr ) { bool success = false; - unsigned char* end = bufp + bufLen; + const unsigned char* end = bufp + bufLen; HostID srcID; if ( getNetByte( &bufp, end, &srcID ) ) { SafeCref scr( addr ); @@ -499,9 +631,9 @@ processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr ) } static bool -processDisconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr ) +processDisconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr ) { - unsigned char* end = bufp + bufLen; + const unsigned char* end = bufp + bufLen; CookieID cookieID; HostID hostID; bool success = false; @@ -552,11 +684,11 @@ GetNSpawns(void) /* forward the message. Need only change the command after looking up the * socket and it's ready to go. */ static bool -forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr ) +forwardMessage( const unsigned char* buf, int buflen, const AddrInfo* addr ) { bool success = false; - unsigned char* bufp = buf + 1; /* skip cmd */ - unsigned char* end = buf + buflen; + const unsigned char* bufp = buf + 1; /* skip cmd */ + const unsigned char* end = buf + buflen; CookieID cookieID; HostID src; HostID dest; @@ -578,7 +710,7 @@ forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr ) } /* forwardMessage */ static bool -processMessage( unsigned char* buf, int bufLen, const AddrInfo* addr ) +processMessage( const unsigned char* buf, int bufLen, const AddrInfo* addr ) { bool success = false; /* default is failure */ XWRELAY_Cmd cmd = *buf; @@ -673,6 +805,7 @@ usage( char* arg0 ) "\t-h (print this help)\\\n" "\t-i (file where next global id stored)\\\n" "\t-l (write logs here, not stderr)\\\n" + "\t-M (Put in maintenance mode, and return this string to all callers)\\\n" "\t-m (max number of simultaneous sockets to have open)\\\n" "\t-n (used in permID generation)\\\n" "\t-p (port to listen on)\\\n" @@ -814,7 +947,7 @@ pushMsgs( vector& out, DBMgr* dbmgr, const char* connName, static void handleMsgsMsg( const AddrInfo* addr, bool sendFull, - unsigned char* bufp, const unsigned char* end ) + const unsigned char* bufp, const unsigned char* end ) { unsigned short nameCount; int ii; @@ -913,16 +1046,38 @@ log_hex( const unsigned char* memp, int len, const char* tag ) } } // log_hex +static bool +handlePutMessage( SafeCref& scr, HostID hid, const AddrInfo* addr, + unsigned short len, const unsigned char** bufp, + const unsigned char* end ) +{ + bool success = false; + const unsigned char* start = *bufp; + HostID src; + HostID dest; + XWRELAY_Cmd cmd; + // sanity check that cmd and hostids are there + if ( getNetByte( bufp, end, &cmd ) + && getNetByte( bufp, end, &src ) + && getNetByte( bufp, end, &dest ) + && ( cmd == XWRELAY_MSG_TORELAY_NOCONN ) + && ( hid == dest ) ) { + scr.PutMsg( src, addr, dest, start, len ); + *bufp = start + len; + success = true; + } + logf( XW_LOGINFO, "%s()=>%d", __func__, success ); + return success; +} + static void -handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp, - unsigned char* end ) +handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp, + const unsigned char* end ) { // log_hex( bufp, end-bufp, __func__ ); unsigned short nameCount; int ii; if ( getNetShort( &bufp, end, &nameCount ) ) { - vector out(4); /* space for len and n_msgs */ - assert( out.size() == 4 ); for ( ii = 0; ii < nameCount && bufp < end; ++ii ) { // See NetUtils.java for reply format @@ -944,20 +1099,10 @@ handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp, unsigned short nMsgs; if ( getNetShort( &bufp, end, &nMsgs ) ) { SafeCref scr( connName ); - while ( nMsgs-- > 0 ) { + while ( scr.IsValid() && nMsgs-- > 0 ) { unsigned short len; - HostID src; - HostID dest; - XWRELAY_Cmd cmd; if ( getNetShort( &bufp, end, &len ) ) { - unsigned char* start = bufp; - if ( getNetByte( &bufp, end, &cmd ) - && getNetByte( &bufp, end, &src ) - && getNetByte( &bufp, end, &dest ) ) { - assert( cmd == XWRELAY_MSG_TORELAY_NOCONN ); - assert( hid == dest ); - scr.PutMsg( src, addr, dest, start, len ); - bufp = start + len; + if ( handlePutMessage( scr, hid, addr, len, &bufp, end ) ) { continue; } } @@ -965,19 +1110,35 @@ handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp, } } } - assert( bufp == end ); // don't ship with this!!! + if ( end - bufp != 1 ) { + logf( XW_LOGERROR, "%s: buf != end: %p vs %p", __func__, bufp, end ); + } + // assert( bufp == end ); // don't ship with this!!! } } // handleProxyMsgs -void -handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr ) +static void +game_thread_proc( UdpThreadClosure* utc ) { + if ( !processMessage( utc->buf(), utc->len(), utc->addr() ) ) { + XWThreadPool::GetTPool()->CloseSocket( utc->addr() ); + } +} + +static void +proxy_thread_proc( UdpThreadClosure* utc ) +{ + int len = utc->len(); + const AddrInfo* addr = utc->addr(); + const unsigned char* buf = utc->buf(); + + logf( XW_LOGINFO, "%s called", __func__ ); logf( XW_LOGVERBOSE0, "%s()", __func__ ); if ( len > 0 ) { assert( addr->isTCP() ); int socket = addr->socket(); - unsigned char* bufp = buf; - unsigned char* end = bufp + len; + const unsigned char* bufp = buf; + const unsigned char* end = bufp + len; if ( (0 == *bufp++) ) { /* protocol */ XWPRXYCMD cmd = (XWPRXYCMD)*bufp++; switch( cmd ) { @@ -1044,7 +1205,245 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr ) } } } -} /* handle_proxy_packet */ + XWThreadPool::GetTPool()->CloseSocket( addr ); +} + +static short +addRegID( unsigned char* ptr, DevIDRelay relayID ) +{ + short used = 0; + char idbuf[9]; + int idLen = snprintf( idbuf, sizeof(idbuf), "%.8X", relayID ); + short lenNBO = htons(idLen); + memcpy( &ptr[used], &lenNBO, sizeof(lenNBO) ); + used += sizeof(lenNBO); + memcpy( &ptr[used], idbuf, idLen ); + used += idLen; + return used; +} + +static void +registerDevice( const DevID* devID, const AddrInfo::AddrUnion* saddr ) +{ + DevIDRelay relayID; + DBMgr* dbMgr = DBMgr::Get(); + short indx = 0; + unsigned char buf[32]; + + if ( ID_TYPE_RELAY == devID->m_devIDType ) { // known to us; just update the time + relayID = devID->asRelayID(); + if ( dbMgr->updateDevice( relayID, true ) ) { + int nMsgs = dbMgr->CountStoredMessages( relayID ); + if ( 0 < nMsgs ) { + AddrInfo addr( -1, -1, saddr ); + send_havemsgs( &addr ); + } + } else { + indx += addRegID( &buf[indx], relayID ); + send_via_udp( g_udpsock, &saddr->addr, XWPDEV_BADREG, buf, indx, + NULL ); + + relayID = DBMgr::DEVID_NONE; + } + } else { + relayID = dbMgr->RegisterDevice( devID ); + if ( DBMgr::DEVID_NONE != relayID ) { + // send it back to the device + indx += addRegID( &buf[indx], relayID ); + send_via_udp( g_udpsock, &saddr->addr, XWPDEV_REGRSP, buf, + indx, NULL ); + } + } + + // Now let's map the address to the devid for future sending purposes. + if ( DBMgr::DEVID_NONE != relayID ) { + DevMgr::Get()->Remember( relayID, saddr ); + } +} + +static void +retrieveMessages( DevID& devID, const AddrInfo::AddrUnion* saddr ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + DBMgr* dbMgr = DBMgr::Get(); + vector ids; + vector sentIDs; + dbMgr->GetStoredMessageIDs( devID.asRelayID(), ids ); + vector::const_iterator iter; + for ( iter = ids.begin(); iter != ids.end(); ++iter ) { + unsigned char buf[MAX_MSG_LEN]; + size_t buflen = sizeof(buf); + AddrInfo::ClientToken clientToken; + if ( dbMgr->GetStoredMessage( *iter, buf, &buflen, &clientToken ) ) { + AddrInfo addr( -1, clientToken, saddr ); + if ( ! send_with_length_unsafe( &addr, buf, buflen ) ) { + break; + } + sentIDs.push_back( *iter ); + } + } + dbMgr->RemoveStoredMessages( sentIDs ); +} + +static const char* +msgToStr( XWRelayReg msg ) +{ + const char* str; +# define CASE_STR(c) case c: str = #c; break + switch( msg ) { + CASE_STR(XWPDEV_REG); + CASE_STR(XWPDEV_REGRSP); + CASE_STR(XWPDEV_PING); + CASE_STR(XWPDEV_HAVEMSGS); + CASE_STR(XWPDEV_RQSTMSGS); + CASE_STR(XWPDEV_MSG); + CASE_STR(XWPDEV_MSGNOCONN); + CASE_STR(XWPDEV_MSGRSP); + CASE_STR(XWPDEV_BADREG); + CASE_STR(XWPDEV_ALERT); // should not receive this.... + CASE_STR(XWPDEV_ACK); + CASE_STR(XWPDEV_DELGAME); + default: + str = ""; + break; + } +# undef CASE_STR + return str; + +} + +static void +ackPacketIf( const UDPHeader* header, const AddrInfo* addr ) +{ + if ( UDPAckTrack::shouldAck( header->cmd ) ) { + uint32_t packetID = header->packetID; + logf( XW_LOGINFO, "acking packet %d", packetID ); + packetID = htonl( packetID ); + send_via_udp( addr->socket(), addr->sockaddr(), XWPDEV_ACK, + &packetID, sizeof(packetID), NULL ); + } +} + +static void +udp_thread_proc( UdpThreadClosure* utc ) +{ + const unsigned char* ptr = utc->buf(); + const unsigned char* end = ptr + utc->len(); + + UDPHeader header; + if ( getHeader( &ptr, end, &header ) ) { + logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( header.cmd ) ); + ackPacketIf( &header, utc->addr() ); + switch( header.cmd ) { + case XWPDEV_REG: { + DevIDType typ = (DevIDType)*ptr++; + DevID devID( typ ); + if ( getRelayDevID( &ptr, end, devID ) ) { + registerDevice( &devID, utc->saddr() ); + } + break; + } + case XWPDEV_MSG: { + AddrInfo::ClientToken clientToken; + memcpy( &clientToken, ptr, sizeof(clientToken) ); + ptr += sizeof(clientToken); + clientToken = ntohl( clientToken ); + if ( 0 != clientToken ) { + AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); + (void)processMessage( ptr, end - ptr, &addr ); + } else { + logf( XW_LOGERROR, "%s: dropping packet with token of 0" ); + } + break; + } + case XWPDEV_MSGNOCONN: { + AddrInfo::ClientToken clientToken; + if ( getNetLong( &ptr, end, &clientToken ) && 0 != clientToken ) { + HostID hid; + char connName[MAX_CONNNAME_LEN+1]; + if ( !parseRelayID( &ptr, end, connName, + sizeof( connName ), &hid ) ) { + logf( XW_LOGERROR, "parse failed!!!" ); + break; + } + SafeCref scr( connName ); + if ( scr.IsValid() ) { + AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); + handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end ); + assert( ptr == end ); // DON'T CHECK THIS IN!!! + } else { + logf( XW_LOGERROR, "%s: invalid scr for %s", __func__, + connName ); + } + } else { + logf( XW_LOGERROR, "no clientToken found!!!" ); + } + break; + } + + case XWPDEV_RQSTMSGS: { + unsigned short idLen; + if ( !getNetShort( &ptr, end, &idLen ) ) { + break; + } + if ( end - ptr > idLen ) { + logf( XW_LOGERROR, "full devID not received" ); + break; + } + DevID devID( ID_TYPE_RELAY ); + devID.m_devIDString.append( (const char*)ptr, idLen ); + ptr += idLen; + retrieveMessages( devID, utc->saddr() ); + break; + } + case XWPDEV_ACK: { + uint32_t packetID; + if ( getNetLong( &ptr, end, &packetID ) ) { + logf( XW_LOGINFO, "ack for packet %d", packetID ); + UDPAckTrack::recordAck( packetID ); + } + break; + } + case XWPDEV_DELGAME: { + DevID devID( ID_TYPE_RELAY ); + if ( !getRelayDevID( &ptr, end, devID ) ) { + break; + } + AddrInfo::ClientToken clientToken; + if ( getNetLong( &ptr, end, &clientToken ) && 0 != clientToken ) { + unsigned short seed; + HostID hid; + string connName; + if ( DBMgr::Get()->FindPlayer( devID.asRelayID(), clientToken, + connName, &hid, &seed ) ) { + SafeCref scr( connName.c_str() ); + scr.DeviceGone( hid, seed ); + } + } + break; + } + default: + logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, header.cmd ); + } + } +} + +static void +handle_udp_packet( int udpsock ) +{ + unsigned char buf[MAX_MSG_LEN]; + AddrInfo::AddrUnion saddr; + memset( &saddr, 0, sizeof(saddr) ); + socklen_t fromlen = sizeof(saddr.addr_in); + + ssize_t nRead = recvfrom( udpsock, buf, sizeof(buf), 0 /*flags*/, + &saddr.addr, &fromlen ); + logf( XW_LOGINFO, "%s: recvfrom=>%d", __func__, nRead ); + if ( 0 < nRead ) { + AddrInfo addr( udpsock, &saddr, false ); + UdpQueue::get()->handle( &addr, buf, nRead, udp_thread_proc ); + } +} /* From stack overflow, toward a snprintf with an expanding buffer. */ @@ -1129,11 +1528,55 @@ enable_keepalive( int sock ) */ } +static void +maint_str_loop( int udpsock, const char* str ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + assert( -1 != udpsock ); + short len = strlen(str); + unsigned char outbuf[sizeof(len) + len]; + short lenNS = htons( len ); + memcpy( &outbuf[0], &lenNS, sizeof(lenNS) ); + memcpy( &outbuf[0+sizeof(len)], str, len ); + + fd_set rfds; + for ( ; ; ) { + FD_ZERO(&rfds); + FD_SET( udpsock, &rfds ); + int retval = select( udpsock + 1, &rfds, NULL, NULL, NULL ); + if ( 0 > retval ) { + logf( XW_LOGERROR, "%s: select=>%d (errno=%d/%s)", __func__, retval, + errno, strerror(errno) ); + break; + } + if ( FD_ISSET( udpsock, &rfds ) ) { + unsigned char buf[512]; + AddrInfo::AddrUnion saddr; + memset( &saddr, 0, sizeof(saddr) ); + socklen_t fromlen = sizeof(saddr.addr_in); + + ssize_t nRead = recvfrom( udpsock, buf, sizeof(buf), 0 /*flags*/, + &saddr.addr, &fromlen ); + logf( XW_LOGINFO, "%s(); got %d bytes", __func__, nRead); + + UDPHeader header; + const unsigned char* ptr = buf; + if ( getHeader( &ptr, ptr + nRead, &header ) ) { + send_via_udp( udpsock, &saddr.addr, XWPDEV_ALERT, + outbuf, sizeof(outbuf), NULL ); + } else { + logf( XW_LOGERROR, "unexpected data" ); + } + } + } // for +} + int main( int argc, char** argv ) { int port = 0; int ctrlport = 0; + int udpport = -1; #ifdef DO_HTTP int httpport = 0; const char* cssFile = NULL; @@ -1143,6 +1586,7 @@ main( int argc, char** argv ) const char* serverName = NULL; // const char* idFileName = NULL; const char* logFile = NULL; + const char* maint_str = NULL; bool doDaemon = true; bool doFork = true; @@ -1156,7 +1600,7 @@ main( int argc, char** argv ) first. */ for ( ; ; ) { - int opt = getopt(argc, argv, "h?c:p:m:n:f:l:t:s:w:" + int opt = getopt(argc, argv, "h?c:p:M:m:n:f:l:t:s:u:w:" "DF" ); if ( opt == -1 ) { @@ -1198,6 +1642,9 @@ main( int argc, char** argv ) case 'l': logFile = optarg; break; + case 'M': + maint_str = optarg; + break; case 'm': g_maxsocks = atoi( optarg ); break; @@ -1210,6 +1657,9 @@ main( int argc, char** argv ) case 't': nWorkerThreads = atoi( optarg ); break; + case 'u': + udpport = atoi( optarg ); + break; default: usage( argv[0] ); exit( 1 ); @@ -1232,6 +1682,9 @@ main( int argc, char** argv ) if ( ctrlport == 0 ) { (void)cfg->GetValueFor( "CTLPORT", &ctrlport ); } + if ( -1 == udpport ) { + (void)cfg->GetValueFor( "UDPPORT", &udpport ); + } #ifdef DO_HTTP if ( httpport == 0 ) { (void)cfg->GetValueFor( "WWW_PORT", &httpport ); @@ -1240,11 +1693,9 @@ main( int argc, char** argv ) if ( nWorkerThreads == 0 ) { (void)cfg->GetValueFor( "NTHREADS", &nWorkerThreads ); } - if ( g_maxsocks == -1 && !cfg->GetValueFor( "MAXSOCKS", &g_maxsocks ) ) { g_maxsocks = 100; } - char serverNameBuf[128]; if ( serverName == NULL ) { if ( cfg->GetValueFor( "SERVERNAME", serverNameBuf, @@ -1288,7 +1739,7 @@ main( int argc, char** argv ) #ifdef SPAWN_SELF /* loop forever, relaunching children as they die. */ - while ( doFork ) { + while ( doFork && !maint_str ) { ++s_nSpawns; /* increment in parent *before* copy */ pid_t pid = fork(); if ( pid == 0 ) { /* child */ @@ -1310,6 +1761,26 @@ main( int argc, char** argv ) } #endif + if ( -1 != udpport ) { + struct sockaddr_in saddr; + g_udpsock = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); + saddr.sin_family = PF_INET; + saddr.sin_addr.s_addr = htonl(INADDR_ANY); + saddr.sin_port = htons(udpport); + int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) ); + if ( 0 == err ) { + err = fcntl( g_udpsock, F_SETFL, O_NONBLOCK ); + } else { + logf( XW_LOGERROR, "bind()=>%s", strerror(errno) ); + g_udpsock = -1; + } + } + + if ( !!maint_str ) { + maint_str_loop( g_udpsock, maint_str ); + exit( 1 ); // should never exit + } + /* Needs to be reset after a crash/respawn */ PermID::SetStartTime( time(NULL) ); @@ -1396,7 +1867,7 @@ main( int argc, char** argv ) (void)sigaction( SIGINT, &act, NULL ); XWThreadPool* tPool = XWThreadPool::GetTPool(); - tPool->Setup( nWorkerThreads, processMessage, killSocket ); + tPool->Setup( nWorkerThreads, killSocket ); /* set up select call */ fd_set rfds; @@ -1404,6 +1875,9 @@ main( int argc, char** argv ) FD_ZERO(&rfds); g_listeners.AddToFDSet( &rfds ); FD_SET( g_control, &rfds ); + if ( -1 != g_udpsock ) { + FD_SET( g_udpsock, &rfds ); + } #ifdef DO_HTTP if ( -1 != g_http ) { FD_SET( g_http, &rfds ); @@ -1413,6 +1887,9 @@ main( int argc, char** argv ) if ( g_control > highest ) { highest = g_control; } + if ( g_udpsock > highest ) { + highest = g_udpsock; + } #ifdef DO_HTTP if ( g_http > highest ) { highest = g_http; @@ -1458,9 +1935,11 @@ main( int argc, char** argv ) "%s: accepting connection from %s on socket %d", __func__, inet_ntoa(saddr.addr_in.sin_addr), newSock ); - AddrInfo addr( true, newSock, &saddr ); - tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME + AddrInfo addr( newSock, &saddr, true ); + tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME : XWThreadPool::STYPE_PROXY, + perGame ? game_thread_proc + : proxy_thread_proc, &addr ); } --retval; @@ -1471,6 +1950,12 @@ main( int argc, char** argv ) // run_ctrl_thread( g_control ); --retval; } + if ( FD_ISSET( g_udpsock, &rfds ) ) { + // This will need to be done in a separate thread, or pushed + // to the existing thread pool + handle_udp_packet( g_udpsock ); + --retval; + } #ifdef DO_HTTP if ( FD_ISSET( g_http, &rfds ) ) { FD_CLR( g_http, &rfds ); diff --git a/xwords4/relay/xwrelay.h b/xwords4/relay/xwrelay.h index d7227c9a8..f59b94ecd 100644 --- a/xwords4/relay/xwrelay.h +++ b/xwords4/relay/xwrelay.h @@ -27,6 +27,79 @@ /* Set if device is acting a server; cleared if as client */ #define FLAGS_SERVER_BIT 0x01 +/* message types for the udp-based per-device (not per-game) protocol + * + * A number of these rely on a "clientToken", which is a 32-bit value the + * client provides and that it guarantees uniquely identifies a game on the + * device. A database rowid works great as long as they aren't reused. + */ +#define XWPDEV_PROTO_VERSION 0 +#ifndef CANT_DO_TYPEDEF +typedef +#endif +enum { XWPDEV_NONE /* 0 is an illegal value */ + /* All messages have the following six-byte header + * proto: 1 byte + * msgID: 4 byte unsigned long, 0 an illegal value + * cmd: 1 byte, one of the values below. + */ + + ,XWPDEV_ALERT /* relay->device: provides a string message to + present to the user (with device allowed not + to present the same string more than once) + format: header, null-terminnated string: varies */ + ,XWPDEV_REG /* dev->relay: device registers self and + self-selected (e.g. gcm) or assigned devid + format: header, idType: 1, + idLen: 2, id: */ + + ,XWPDEV_REGRSP /* relay->device: if non-relay-assigned devid + type was given, this gives the + relay-assigned one to be used from now on. + format: header, idLen: 2, id: + */ + + ,XWPDEV_PING /* device->relay: keep the UDP connection + open. header. */ + + ,XWPDEV_HAVEMSGS /* Relay->device: check messages for this + game. format: header */ + + ,XWPDEV_RQSTMSGS /* device->relay: got any messages for me? + format: header, devID: 4 [, clientToken: 4] + */ + + ,XWPDEV_MSG /* dev->relay and relay->dev: norm: a message from a game to + the relay format: header, clientToken: 4, message*/ + + ,XWPDEV_MSGNOCONN /* dev->relay in the proxy format that + includes relayID (connname:hid) and seems + to be reserved for relay FWD messages. + format: header, clientToken: 4; -terminated-connname: + varies, message: varies */ + + ,XWPDEV_MSGRSP /* relay->dev: conveys error on receipt of XWPDEV_MSG */ + + ,XWPDEV_BADREG /* relay->dev. You sent me a relayID via + XWPDEV_REG but I've never heard of it */ + + ,XWPDEV_ACK /* relay->dev (maybe) and dev->relay + (definitely). Tells recipient its message + has been received. This is for debugging, + and maybe later for timing keepAlives based + on firewall timeouts. format: header, + msgID: 4 + */ + + ,XWPDEV_DELGAME /* dev->relay: game's been deleted. format: + header, relayid: 4, clientToken: 4 */ + +} +#ifndef CANT_DO_TYPEDEF + XWRelayReg +#endif +; + #ifndef CANT_DO_TYPEDEF typedef #endif @@ -110,6 +183,8 @@ typedef enum { ,ID_TYPE_RELAY /* assigned by relay as replacement for one of the below */ ,ID_TYPE_LINUX ,ID_TYPE_ANDROID_GCM + ,ID_TYPE_ANDROID_OTHER + ,ID_TYPE_ANON /* please assign me one based on nothing */ ,ID_TYPE_NTYPES } DevIDType; diff --git a/xwords4/relay/xwrelay.sh b/xwords4/relay/xwrelay.sh index 111797100..7e60ad072 100755 --- a/xwords4/relay/xwrelay.sh +++ b/xwords4/relay/xwrelay.sh @@ -58,6 +58,7 @@ cid integer ,mtimes TIMESTAMP(0)[] ,addrs INET[] ,devids INTEGER[] +,tokens INTEGER[] ); EOF @@ -66,10 +67,12 @@ CREATE TABLE msgs ( id SERIAL ,connName VARCHAR(64) ,hid INTEGER +,token INTEGER ,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,stime TIMESTAMP DEFAULT NULL ,devid INTEGER ,msg BYTEA +,msg64 TEXT ,msglen INTEGER ,UNIQUE ( connName, hid, msg ) ); @@ -81,6 +84,7 @@ id INTEGER UNIQUE PRIMARY KEY ,devType INTEGER ,devid TEXT ,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP +,mtime TIMESTAMP ,unreg BOOLEAN DEFAULT FALSE ); EOF diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 2e933eb89..c8fe20909 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -25,11 +25,14 @@ #include #include #include +#include + #include "lstnrmgr.h" #include "xwrelay.h" #include "addrinfo.h" typedef unsigned char HostID; /* see HOST_ID_SERVER */ +typedef uint32_t DevIDRelay; typedef enum { XW_LOGERROR @@ -42,7 +45,8 @@ void logf( XW_LogLevel level, const char* format, ... ); void denyConnection( const AddrInfo* addr, XWREASON err ); bool send_with_length_unsafe( const AddrInfo* addr, - unsigned char* buf, size_t bufLen ); + const unsigned char* buf, size_t bufLen ); +void send_havemsgs( const AddrInfo* addr ); time_t uptime(void); @@ -55,8 +59,6 @@ int make_socket( unsigned long addr, unsigned short port ); void string_printf( std::string& str, const char* fmt, ... ); int read_packet( int sock, unsigned char* buf, int buflen ); -void handle_proxy_packet( unsigned char* buf, int bufLen, - const AddrInfo* addr ); const char* cmdToStr( XWRELAY_Cmd cmd );