From 63d3f70cdfae51345fc70f290a84e768dbefe025 Mon Sep 17 00:00:00 2001 From: Eric House Date: Sat, 19 Jan 2013 14:34:04 -0800 Subject: [PATCH] more changes to storage and fetching of messages and to protocol so presence of messages is reported on connect (as are bad relayIDs). Now a game with a robot player in a "closed" game can continue. Once the next set of linux-side chances is committed. --- xwords4/relay/cref.cpp | 29 +++-- xwords4/relay/cref.h | 2 + xwords4/relay/dbmgr.cpp | 198 ++++++++++++++++++++++------- xwords4/relay/dbmgr.h | 10 ++ xwords4/relay/states.cpp | 7 ++ xwords4/relay/states.h | 3 + xwords4/relay/xwrelay.cpp | 233 +++++++++++++++++++++++++++++------ xwords4/relay/xwrelay.h | 32 +++-- xwords4/relay/xwrelay.sh | 2 + xwords4/relay/xwrelay_priv.h | 1 + 10 files changed, 421 insertions(+), 96 deletions(-) diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 5f3a47ab4..cbd7558c0 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -590,7 +590,7 @@ void CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest, const unsigned char* buf, int buflen ) { - logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest ); + logf( XW_LOGVERBOSE1, "%s: %d -> %d", __func__, src, dest ); CRefEvent evt( XWE_FORWARDMSG, addr ); evt.u.fwd.src = src; evt.u.fwd.dest = dest; @@ -704,6 +704,10 @@ CookieRef::handleEvents() forward_or_store/*_proxy*/( &evt ); break; + case XWA_TRYTELL: + send_havemsgs( &evt.addr ); + break; + case XWA_TIMERDISCONN: disconnectSockets( XWRELAY_ERROR_TIMEOUT ); break; @@ -993,6 +997,14 @@ CookieRef::postDropDevice( HostID hostID ) handleEvents(); } +void +CookieRef::postTellHaveMsgs( const AddrInfo* addr ) +{ + CRefEvent evt( XWE_TRYTELL, addr ); + m_eventQueue.push_back( evt ); + assert( m_in_handleEvents ); +} + void CookieRef::setAllConnectedTimer() { @@ -1129,7 +1141,6 @@ CookieRef::sendAnyStored( const CRefEvent* evt ) void CookieRef::forward_or_store( const CRefEvent* evt ) { - AddrInfo addr; // invalid unless assigned to const unsigned char* cbuf = evt->u.fwd.buf; do { int buflen = evt->u.fwd.buflen; @@ -1153,6 +1164,11 @@ CookieRef::forward_or_store( const CRefEvent* evt ) usleep( m_delayMicros ); } + if ( (NULL == destAddr) + || !send_with_length( destAddr, dest, buf, buflen, true ) ) { + store_message( dest, buf, buflen ); + } + // If recipient GAME isn't connected, see if owner device is and can // receive if ( NULL == destAddr) { @@ -1161,17 +1177,12 @@ CookieRef::forward_or_store( const CRefEvent* evt ) if ( DBMgr::Get()->TokenFor( ConnName(), dest, &devid, &token ) ) { const AddrInfo::AddrUnion* saddr = DevMgr::Get()->get( devid ); if ( !!saddr ) { - addr.init( -1, token, saddr ); - destAddr = &addr; + AddrInfo addr( -1, token, saddr ); + postTellHaveMsgs( &addr ); } } } - if ( (NULL == destAddr) - || !send_with_length( destAddr, dest, buf, buflen, true ) ) { - store_message( dest, buf, buflen ); - } - /* also note that we've heard from src recently */ HostID src = evt->u.fwd.src; DBMgr::Get()->RecordAddress( ConnName(), src, &evt->addr ); diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 45cd35863..48d1d2b80 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -230,6 +230,7 @@ class CookieRef { void postCheckAllHere(); void postDropDevice( HostID hostID ); + void postTellHaveMsgs( const AddrInfo* addr ); void setAllConnectedTimer(); void cancelAllConnectedTimer(); @@ -238,6 +239,7 @@ class CookieRef { void forward_or_store( const CRefEvent* evt ); void send_denied( const CRefEvent* evt, XWREASON why ); + void send_trytell( const CRefEvent* evt ); void checkFromServer( const CRefEvent* evt ); void notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why ); diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index a94fd3a2e..8f82b5514 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -262,7 +262,9 @@ DBMgr::RegisterDevice( const DevID* host ) // If it's not present *and* of type ID_TYPE_RELAY, we can do nothing. // Otherwise proceed. - if ( DEVID_NONE == devID && ID_TYPE_RELAY < host->m_devIDType ) { + if ( DEVID_NONE != devID ) { + (void)updateDevice( devID, false ); + } else if ( ID_TYPE_RELAY < host->m_devIDType ) { // loop until we're successful inserting the unique key. Ship with this // coming from random, but test with increasing values initially to make // sure duplicates are detected. @@ -299,6 +301,25 @@ DBMgr::RegisterDevice( const DevID* host ) return devID; } +bool +DBMgr::updateDevice( DevIDRelay relayID, bool check ) +{ + bool exists = !check; + if ( !exists ) { + string test; + string_printf( test, "id = %d", relayID ); + exists = 1 == getCountWhere( DEVICES_TABLE, test ); + } + + if ( exists ) { + const char* fmt = "UPDATE " DEVICES_TABLE " SET mtime='now' WHERE id = %d"; + string query; + string_printf( query, fmt, relayID ); + execSql( query ); + } + return exists; +} + HostID DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion, int nToAdd, unsigned short seed, const AddrInfo* addr, @@ -606,23 +627,12 @@ DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid, int DBMgr::PendingMsgCount( const char* connName, int hid ) { - int count = 0; - const char* fmt = "SELECT COUNT(*) FROM " MSGS_TABLE - " WHERE connName = '%s' AND hid = %d " + string test; + string_printf( test, "connName = '%s' AND hid = %d ", connName, hid ); #ifdef HAVE_STIME - "AND stime IS NULL" + string_printf( test, " AND stime IS NULL" ); #endif - ; - string query; - string_printf( query, fmt, connName, hid ); - logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() ); - - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - if ( 1 == PQntuples( result ) ) { - count = atoi( PQgetvalue( result, 0, 0 ) ); - } - PQclear( result ); - return count; + return getCountWhere( MSGS_TABLE, test ); } bool @@ -703,6 +713,8 @@ DBMgr::getDevID( const DevID* devID ) } PQclear( result ); } + logf( XW_LOGINFO, "%s(in=%s)=>%d (0x.8X)", __func__, + devID->m_devIDString.c_str(), rDevID, rDevID ); return rDevID; } @@ -717,25 +729,16 @@ DBMgr::getDevID( const DevID* devID ) int DBMgr::CountStoredMessages( const char* const connName, int hid ) { - const char* fmt = "SELECT count(*) FROM " MSGS_TABLE - " WHERE connname = '%s' " + string test; + string_printf( test, "connname = '%s'", connName ); #ifdef HAVE_STIME - "AND stime IS NULL" + string_printf( test, " AND stime IS NULL" ); #endif - ; - - string query; - string_printf( query, fmt, connName ); - if ( hid != -1 ) { - string_printf( query, "AND hid = %d", hid ); + string_printf( test, " AND hid = %d", hid ); } - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - assert( 1 == PQntuples( result ) ); - int count = atoi( PQgetvalue( result, 0, 0 ) ); - PQclear( result ); - return count; + return getCountWhere( MSGS_TABLE, test ); } int @@ -744,6 +747,36 @@ DBMgr::CountStoredMessages( const char* const connName ) return CountStoredMessages( connName, -1 ); } /* CountStoredMessages */ +int +DBMgr::CountStoredMessages( DevIDRelay relayID ) +{ + string test; + string_printf( test, "devid = %d", relayID ); +#ifdef HAVE_STIME + string_printf( test, "AND stime IS NULL" ); +#endif + + return getCountWhere( MSGS_TABLE, test ); +} + +void +DBMgr::GetStoredMessageIDs( DevIDRelay relayID, vector& ids ) +{ + const char* fmt = "SELECT id FROM " MSGS_TABLE " WHERE devid=%d"; + string query; + string_printf( query, fmt, relayID ); + // logf( XW_LOGINFO, "%s: query=\"%s\"", __func__, query.c_str() ); + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + int nTuples = PQntuples( result ); + for ( int ii = 0; ii < nTuples; ++ii ) { + int id = atoi( PQgetvalue( result, ii, 0 ) ); + // logf( XW_LOGINFO, "%s: adding id %d", __func__, id ); + ids.push_back( id ); + } + PQclear( result ); + logf( XW_LOGINFO, "%s(relayID=%d)=>%d ids", __func__, relayID, ids.size() ); +} + void DBMgr::StoreMessage( const char* const connName, int hid, const unsigned char* buf, int len ) @@ -752,15 +785,18 @@ DBMgr::StoreMessage( const char* const connName, int hid, size_t newLen; const char* fmt = "INSERT INTO " MSGS_TABLE - " (connname, hid, devid, msg, msglen)" - " VALUES( '%s', %d, %d, E'%s', %d)"; + " (connname, hid, devid, token, msg, msglen)" + " VALUES( '%s', %d, %d, " + "(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), " + "E'%s', %d)"; unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, len, &newLen ); assert( NULL != bytes ); string query; - string_printf( query, fmt, connName, hid, devID, bytes, len ); + string_printf( query, fmt, connName, hid, devID, hid, connName, + bytes, len ); PQfreemem( bytes ); @@ -815,6 +851,58 @@ DBMgr::GetStoredMessage( const char* const connName, int hid, return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID ); } +bool +DBMgr::GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, + AddrInfo::ClientToken* token ) +{ + const char* fmt = "SELECT token, msg, msglen FROM " MSGS_TABLE + " WHERE id = %d " +#ifdef HAVE_STIME + "AND stime IS NULL " +#endif + ; + string query; + string_printf( query, fmt, msgID ); + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + int nTuples = PQntuples( result ); + assert( nTuples <= 1 ); + + bool found = nTuples == 1; + if ( found ) { + *token = atoi( PQgetvalue( result, 0, 0 ) ); + size_t msglen = atoi( PQgetvalue( result, 0, 2 ) ); + const unsigned char* from = + (const unsigned char* )PQgetvalue( result, 0, 1 ); + size_t to_length; + unsigned char* bytes = PQunescapeBytea( from, &to_length ); + assert( to_length <= *buflen ); + memcpy( buf, bytes, to_length ); + PQfreemem( bytes ); + *buflen = to_length; + assert( 0 == msglen || to_length == msglen ); + } + PQclear( result ); + return found; +} + +void +DBMgr::RemoveStoredMessages( string& msgids ) +{ + const char* fmt = +#ifdef HAVE_STIME + "UPDATE " MSGS_TABLE " SET stime='now' " +#else + "DELETE FROM " MSGS_TABLE +#endif + " WHERE id IN (%s)"; + string query; + string_printf( query, fmt, msgids.c_str() ); + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + execSql( query ); +} + void DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs ) { @@ -831,21 +919,41 @@ DBMgr::RemoveStoredMessages( const int* msgIDs, int nMsgIDs ) ids.append( "," ); } } - - const char* fmt = -#ifdef HAVE_STIME - "UPDATE " MSGS_TABLE " SET stime='now' " -#else - "DELETE FROM " MSGS_TABLE -#endif - " WHERE id IN (%s)"; - string query; - string_printf( query, fmt, ids.c_str() ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - execSql( query ); + RemoveStoredMessages( ids ); } } +void +DBMgr::RemoveStoredMessages( vector& idv ) +{ + if ( 0 < idv.size() ) { + string ids; + vector::const_iterator iter = idv.begin(); + for ( ; ; ) { + string_printf( ids, "%d", *iter ); + if ( ++iter == idv.end() ) { + break; + } + string_printf( ids, "," ); + } + RemoveStoredMessages( ids ); + } +} + +int +DBMgr::getCountWhere( const char* table, string& test ) +{ + string query; + string_printf( query, "SELECT count(*) FROM %s WHERE %s", table, test.c_str() ); + + PGresult* result = PQexec( getThreadConn(), query.c_str() ); + assert( 1 == PQntuples( result ) ); + int count = atoi( PQgetvalue( result, 0, 0 ) ); + PQclear( result ); + logf( XW_LOGINFO, "%s(%s)=>%d", __func__, query.c_str(), count ); + return count; +} + static void formatParams( char* paramValues[], int nParams, const char* fmt, char* buf, int bufLen, ... ) diff --git a/xwords4/relay/dbmgr.h b/xwords4/relay/dbmgr.h index dbf1eb02d..ce3a0abc1 100644 --- a/xwords4/relay/dbmgr.h +++ b/xwords4/relay/dbmgr.h @@ -62,6 +62,7 @@ class DBMgr { bool AllDevsAckd( const char* const connName ); DevIDRelay RegisterDevice( const DevID* host ); + bool updateDevice( DevIDRelay relayID, bool check ); HostID AddDevice( const char* const connName, HostID curID, int clientVersion, int nToAdd, unsigned short seed, const AddrInfo* addr, @@ -98,13 +99,20 @@ class DBMgr { /* message storage -- different DB */ int CountStoredMessages( const char* const connName ); int CountStoredMessages( const char* const connName, int hid ); + int CountStoredMessages( DevIDRelay relayID ); void StoreMessage( const char* const connName, int hid, const unsigned char* const buf, int len ); + void GetStoredMessageIDs( DevIDRelay relayID, vector& ids ); + bool GetStoredMessage( const char* const connName, int hid, unsigned char* buf, size_t* buflen, int* msgID ); bool GetNthStoredMessage( const char* const connName, int hid, int nn, unsigned char* buf, size_t* buflen, int* msgID ); + bool GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, + AddrInfo::ClientToken* token ); + void RemoveStoredMessages( const int* msgID, int nMsgIDs ); + void RemoveStoredMessages( vector& ids ); private: DBMgr(); @@ -113,6 +121,8 @@ class DBMgr { void readArray( const char* const connName, int arr[] ); DevIDRelay getDevID( const char* connName, int hid ); DevIDRelay getDevID( const DevID* devID ); + int getCountWhere( const char* table, string& test ); + void RemoveStoredMessages( string& msgIDs ); PGconn* getThreadConn( void ); diff --git a/xwords4/relay/states.cpp b/xwords4/relay/states.cpp index f133edb10..ab1ffe97e 100644 --- a/xwords4/relay/states.cpp +++ b/xwords4/relay/states.cpp @@ -79,6 +79,11 @@ static StateTable g_stateTable[] = { { XWS_WAITMORE, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME }, { XWS_ALLCONND, XWE_PROXYMSG, XWA_PROXYMSG, XWS_SAME }, +{ XWS_EMPTY, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, +{ XWS_WAITMORE, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, +{ XWS_ALLCONND, XWE_TRYTELL, XWA_TRYTELL, XWS_SAME }, + + /* { XWS_WAITMORE, XWE_GAMEFULL, XWA_SENDALLHERE, XWS_ALLCONND }, */ /* { XWS_WAITMORE, XWE_CHECKFULL, XWA_, XWS_WAITMORE }, */ /* { XWS_INITED, XWE_DEVCONNECT, XWA_SEND_NO_ROOM, XWS_DEAD }, */ @@ -229,6 +234,7 @@ eventString( XW_RELAY_EVENT evt ) CASESTR(XWE_RECONNECT); CASESTR(XWE_GOTONEACK); CASESTR(XWE_PROXYMSG); + CASESTR(XWE_TRYTELL); CASESTR(XWE_GOTLASTACK); CASESTR(XWE_ACKTIMEOUT); CASESTR(XWE_DISCONN); @@ -276,6 +282,7 @@ actString( XW_RELAY_ACTION act ) CASESTR(XWA_SNDALLHERE_2); CASESTR(XWA_FWD); CASESTR(XWA_PROXYMSG); + CASESTR(XWA_TRYTELL); CASESTR(XWA_NOTEHEART); CASESTR(XWA_TIMERDISCONN); CASESTR(XWA_DISCONNECT); diff --git a/xwords4/relay/states.h b/xwords4/relay/states.h index 2171a54eb..7f86b1d89 100644 --- a/xwords4/relay/states.h +++ b/xwords4/relay/states.h @@ -86,6 +86,7 @@ typedef enum { this object */ ,XWE_PROXYMSG /* msg when game may not be connected */ + ,XWE_TRYTELL /* tell the addressee to check for stored messages */ ,XWE_GOTONEACK ,XWE_GOTLASTACK @@ -147,6 +148,8 @@ typedef enum { ,XWA_PROXYMSG /* out-of-band message */ + ,XWA_TRYTELL /* Tell the addresses to check for messages */ + ,XWA_NOTEHEART /* Record heartbeat received */ ,XWA_NOTE_EMPTY /* No sockets left; check if can delete */ diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index faf99ba15..aa98d48f0 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -207,6 +207,20 @@ parseRelayID( const unsigned char** const inp, const unsigned char* const end, return ok; } +static bool +getNetLong( const unsigned char** bufpp, const unsigned char* end, + uint32_t* out ) +{ + uint32_t tmp; + bool ok = *bufpp + sizeof(tmp) <= end; + if ( ok ) { + memcpy( &tmp, *bufpp, sizeof(tmp) ); + *bufpp += sizeof(tmp); + *out = ntohl( tmp ); + } + return ok; +} /* getNetShort */ + static bool getNetShort( const unsigned char** bufpp, const unsigned char* end, unsigned short* out ) @@ -343,6 +357,7 @@ send_via_udp( int socket, const struct sockaddr *dest_addr, { ssize_t nSent = sendto( socket, buf, buflen, 0, /* flags */ dest_addr, sizeof(*dest_addr) ); + logf( XW_LOGINFO, "%s()=>%d", __func__, nSent ); return nSent; } @@ -370,8 +385,8 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, AddrInfo::ClientToken clientToken = addr->clientToken(); assert( 0 != clientToken ); unsigned char tmpbuf[1 + 1 + sizeof(clientToken) + bufLen]; - tmpbuf[0] = XWREG_PROTO_VERSION; - tmpbuf[1] = XWRREG_MSG; + tmpbuf[0] = XWPDEV_PROTO_VERSION; + tmpbuf[1] = XWPDEV_MSG; clientToken = htonl(clientToken); memcpy( &tmpbuf[2], &clientToken, sizeof(clientToken) ); memcpy( &tmpbuf[2 + sizeof(clientToken)], buf, bufLen ); @@ -392,6 +407,23 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, return ok; } /* send_with_length_unsafe */ +void +send_havemsgs( const AddrInfo* addr ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + int socket = addr->socket(); + if ( -1 == socket ) { + socket = g_udpsock; + } + + AddrInfo::ClientToken clientToken = htonl(addr->clientToken()); + unsigned char tmpbuf[1 + 1 + sizeof(clientToken)]; + tmpbuf[0] = XWPDEV_PROTO_VERSION; + tmpbuf[1] = XWPDEV_HAVEMSGS; + memcpy( &tmpbuf[2], &clientToken, sizeof(clientToken) ); + + send_via_udp( socket, addr->sockaddr(), tmpbuf, sizeof(tmpbuf) ); +} /* A CONNECT message from a device gives us the hostID and socket we'll * associate with one participant in a relayed session. We'll store this @@ -944,6 +976,31 @@ log_hex( const unsigned char* memp, int len, const char* tag ) } } // log_hex +static bool +handlePutMessage( SafeCref& scr, HostID hid, const AddrInfo* addr, + unsigned short len, const unsigned char** bufp, + const unsigned char* end ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + bool success = false; + const unsigned char* start = *bufp; + HostID src; + HostID dest; + XWRELAY_Cmd cmd; + // sanity check that cmd and hostids are there + if ( getNetByte( bufp, end, &cmd ) + && getNetByte( bufp, end, &src ) + && getNetByte( bufp, end, &dest ) ) { + assert( cmd == XWRELAY_MSG_TORELAY_NOCONN ); + assert( hid == dest ); + scr.PutMsg( src, addr, dest, start, len ); + *bufp = start + len; + success = true; + } + logf( XW_LOGINFO, "%s()=>%d", __func__, success ); + return success; +} + static void handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp, const unsigned char* end ) @@ -975,20 +1032,10 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp, unsigned short nMsgs; if ( getNetShort( &bufp, end, &nMsgs ) ) { SafeCref scr( connName ); - while ( nMsgs-- > 0 ) { + while ( scr.IsValid() && nMsgs-- > 0 ) { unsigned short len; - HostID src; - HostID dest; - XWRELAY_Cmd cmd; if ( getNetShort( &bufp, end, &len ) ) { - const unsigned char* start = bufp; - if ( getNetByte( &bufp, end, &cmd ) - && getNetByte( &bufp, end, &src ) - && getNetByte( &bufp, end, &dest ) ) { - assert( cmd == XWRELAY_MSG_TORELAY_NOCONN ); - assert( hid == dest ); - scr.PutMsg( src, addr, dest, start, len ); - bufp = start + len; + if ( handlePutMessage( scr, hid, addr, len, &bufp, end ) ) { continue; } } @@ -1077,30 +1124,106 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr ) } } /* handle_proxy_packet */ +static short +addRegID( unsigned char* ptr, DevIDRelay relayID ) +{ + short used = 0; + char idbuf[9]; + int idLen = snprintf( idbuf, sizeof(idbuf), "%.8X", relayID ); + short lenNBO = htons(idLen); + memcpy( &ptr[used], &lenNBO, sizeof(lenNBO) ); + used += sizeof(lenNBO); + memcpy( &ptr[used], idbuf, idLen ); + used += idLen; + return used; +} + static void registerDevice( const DevID* devID, const AddrInfo::AddrUnion* saddr ) { DevIDRelay relayID; - if ( ID_TYPE_RELAY != devID->m_devIDType ) { // known to us; just update the time - relayID = DBMgr::Get()->RegisterDevice( devID ); - if ( ID_TYPE_NONE != relayID ) { + DBMgr* dbMgr = DBMgr::Get(); + short indx = 0; + unsigned char buf[32]; + if ( ID_TYPE_RELAY == devID->m_devIDType ) { // known to us; just update the time + relayID = devID->asRelayID(); + if ( dbMgr->updateDevice( relayID, true ) ) { + int nMsgs = dbMgr->CountStoredMessages( relayID ); + if ( 0 < nMsgs ) { + AddrInfo addr( -1, 0, saddr ); + send_havemsgs( &addr ); + } + } else { + relayID = DBMgr::DEVID_NONE; + + buf[indx++] = XWPDEV_PROTO_VERSION; + buf[indx++] = XWPDEV_BADREG; + indx += addRegID( &buf[indx], relayID ); + send_via_udp( g_udpsock, &saddr->addr, buf, indx ); + } + } else { + relayID = dbMgr->RegisterDevice( devID ); + if ( DBMgr::DEVID_NONE != relayID ) { // send it back to the device - char idbuf[9]; - int len = snprintf( idbuf, sizeof(idbuf), "%.8X", relayID ); - logf( XW_LOGERROR, "%s: len(%s) => %d", __func__, idbuf, len ); - unsigned char buf[1 + 1 + 2 + len]; - buf[0] = XWREG_PROTO_VERSION; - buf[1] = XWRREG_REGRSP; - short lenNBO = htons(len); - memcpy( &buf[2], &lenNBO, sizeof(lenNBO)); - memcpy( &buf[4], idbuf, len ); + buf[indx++] = XWPDEV_PROTO_VERSION; + buf[indx++] = XWPDEV_REGRSP; + indx += addRegID( &buf[indx], relayID ); send_via_udp( g_udpsock, &saddr->addr, buf, sizeof(buf) ); } - } else { - relayID = devID->asRelayID(); } + // Now let's map the address to the devid for future sending purposes. - DevMgr::Get()->Remember( relayID, saddr ); + if ( DBMgr::DEVID_NONE != relayID ) { + DevMgr::Get()->Remember( relayID, saddr ); + } +} + +static void +retrieveMessages( DevID& devID, const AddrInfo::AddrUnion* saddr ) +{ + logf( XW_LOGINFO, "%s()", __func__ ); + DBMgr* dbMgr = DBMgr::Get(); + vector ids; + vector sentIDs; + dbMgr->GetStoredMessageIDs( devID.asRelayID(), ids ); + vector::const_iterator iter; + for ( iter = ids.begin(); iter != ids.end(); ++iter ) { + unsigned char buf[MAX_MSG_LEN]; + size_t buflen = sizeof(buf); + AddrInfo::ClientToken clientToken; + if ( dbMgr->GetStoredMessage( *iter, buf, &buflen, &clientToken ) ) { + AddrInfo addr( -1, clientToken, saddr ); + if ( ! send_with_length_unsafe( &addr, buf, buflen ) ) { + break; + } + sentIDs.push_back( *iter ); + } + } + dbMgr->RemoveStoredMessages( sentIDs ); +} + +static const char* +msgToStr( XWRelayReg msg ) +{ + const char* str; +# define CASE_STR(c) case c: str = #c; break + switch( msg ) { + CASE_STR(XWPDEV_REG); + CASE_STR(XWPDEV_REGRSP); + CASE_STR(XWPDEV_PING); + CASE_STR(XWPDEV_HAVEMSGS); + CASE_STR(XWPDEV_RQSTMSGS); + CASE_STR(XWPDEV_MSG); + CASE_STR(XWPDEV_MSGNOCONN); + CASE_STR(XWPDEV_MSGRSP); + CASE_STR(XWPDEV_BADREG); + default: + assert(0); + break; + } +# undef CASE_STR + return str; + } static void @@ -1110,12 +1233,13 @@ udp_thread_proc( UdpThreadClosure* utc ) const unsigned char* end = ptr + utc->len(); unsigned char proto = *ptr++; - if ( XWREG_PROTO_VERSION != 0 ) { + if ( XWPDEV_PROTO_VERSION != 0 ) { logf( XW_LOGERROR, "unexpected proto %d", __func__, (int) proto ); } else { - int msg = *ptr++; + XWRelayReg msg = (XWRelayReg)*ptr++; + logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( msg ) ); switch( msg ) { - case XWRREG_REG: { + case XWPDEV_REG: { DevIDType typ = (DevIDType)*ptr++; unsigned short idLen; if ( !getNetShort( &ptr, end, &idLen ) ) { @@ -1131,7 +1255,7 @@ udp_thread_proc( UdpThreadClosure* utc ) registerDevice( &devID, utc->saddr() ); } break; - case XWRREG_MSG: { + case XWPDEV_MSG: { AddrInfo::ClientToken clientToken; memcpy( &clientToken, ptr, sizeof(clientToken) ); ptr += sizeof(clientToken); @@ -1142,8 +1266,48 @@ udp_thread_proc( UdpThreadClosure* utc ) } else { logf( XW_LOGERROR, "%s: dropping packet with token of 0" ); } - } break; + } + case XWPDEV_MSGNOCONN: { + AddrInfo::ClientToken clientToken; + if ( getNetLong( &ptr, end, &clientToken ) ) { + HostID hid; + char connName[MAX_CONNNAME_LEN+1]; + if ( !parseRelayID( &ptr, end, connName, + sizeof( connName ), &hid ) ) { + logf( XW_LOGERROR, "parse failed!!!" ); + break; + } + SafeCref scr( connName ); + if ( scr.IsValid() ) { + AddrInfo addr( g_udpsock, clientToken, utc->saddr() ); + handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end ); + assert( ptr == end ); // DON'T CHECK THIS IN!!! + } else { + logf( XW_LOGERROR, "%s: invalid scr for %s", __func__, connName ); + } + } else { + logf( XW_LOGERROR, "no clientToken found!!!" ); + } + break; + } + + case XWPDEV_RQSTMSGS: { + unsigned short idLen; + if ( !getNetShort( &ptr, end, &idLen ) ) { + break; + } + if ( end - ptr > idLen ) { + logf( XW_LOGERROR, "full devID not received" ); + break; + } + DevID devID( ID_TYPE_RELAY ); + devID.m_devIDString.append( (const char*)ptr, idLen ); + ptr += idLen; + retrieveMessages( devID, utc->saddr() ); + break; + } + default: logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, msg ); } @@ -1155,7 +1319,6 @@ udp_thread_proc( UdpThreadClosure* utc ) static void handle_udp_packet( int udpsock ) { - logf( XW_LOGINFO, "%s()", __func__ ); unsigned char buf[512]; AddrInfo::AddrUnion saddr; memset( &saddr, 0, sizeof(saddr) ); diff --git a/xwords4/relay/xwrelay.h b/xwords4/relay/xwrelay.h index 2f30c5068..c06682295 100644 --- a/xwords4/relay/xwrelay.h +++ b/xwords4/relay/xwrelay.h @@ -28,30 +28,48 @@ #define FLAGS_SERVER_BIT 0x01 /* message types for the udp-based per-device (not per-game) protocol */ -#define XWREG_PROTO_VERSION 0 +#define XWPDEV_PROTO_VERSION 0 #ifndef CANT_DO_TYPEDEF typedef #endif -enum { XWRREG_NONE /* 0 is an illegal value */ - ,XWRREG_REG /* dev->relay: device registers self and +enum { XWPDEV_NONE /* 0 is an illegal value */ + ,XWPDEV_REG /* dev->relay: device registers self and self-selected (e.g. gcm) or assigned devid format: proto: 1; this enum: 1; idType: 1, idLen: 2, id: */ - ,XWRREG_REGRSP /* relay->device: if non-relay-assigned devid + ,XWPDEV_REGRSP /* relay->device: if non-relay-assigned devid type was given, this gives the relay-assigned one to be used from now on. format: proto: 1, this enum: 1, idLen: 2, id: */ - ,XWRREG_PING /* device->relay: keep the UDP connection + ,XWPDEV_PING /* device->relay: keep the UDP connection open. format: proto: 1, this enum: 1. */ - ,XWRREG_MSG /* dev->relay and relay->dev: norm: a message from a game to + ,XWPDEV_HAVEMSGS /* Relay->device: check messages for this + game. format: proto: 1, this enum: 1; clientToken: 4 */ + + ,XWPDEV_RQSTMSGS /* device->relay: got any messages for me? + format: proto: 1, this enum 1, devID: 4 [, clientToken: 4] + */ + + ,XWPDEV_MSG /* dev->relay and relay->dev: norm: a message from a game to the relay format: proto: 1, this enum: 1, clientToken: 4, message*/ - ,XWRREG_MSGRSP /* relay->dev: conveys error on receipt of XWRREG_MSG */ + ,XWPDEV_MSGNOCONN /* dev->relay in the proxy format that + includes relayID (connname:hid) and seems + to be reserved for relay FWD messages. + format: proto: 1, this enum: 1, + clientToken: 4; -terminated-connname: + varies, message: varies */ + + ,XWPDEV_MSGRSP /* relay->dev: conveys error on receipt of XWPDEV_MSG */ + + ,XWPDEV_BADREG /* relay->dev. You sent me a relayID via + XWPDEV_REG but I've never heard of it */ + } #ifndef CANT_DO_TYPEDEF XWRelayReg diff --git a/xwords4/relay/xwrelay.sh b/xwords4/relay/xwrelay.sh index f39efbdac..52da03349 100755 --- a/xwords4/relay/xwrelay.sh +++ b/xwords4/relay/xwrelay.sh @@ -67,6 +67,7 @@ CREATE TABLE msgs ( id SERIAL ,connName VARCHAR(64) ,hid INTEGER +,token INTEGER ,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,stime TIMESTAMP DEFAULT NULL ,devid INTEGER @@ -82,6 +83,7 @@ id INTEGER UNIQUE PRIMARY KEY ,devType INTEGER ,devid TEXT ,ctime TIMESTAMP DEFAULT CURRENT_TIMESTAMP +,mtime TIMESTAMP ,unreg BOOLEAN DEFAULT FALSE ); EOF diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 51d8da001..84ae9d696 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -46,6 +46,7 @@ void logf( XW_LogLevel level, const char* format, ... ); void denyConnection( const AddrInfo* addr, XWREASON err ); bool send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, size_t bufLen ); +void send_havemsgs( const AddrInfo* addr ); time_t uptime(void);