diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 7aea5e051..d1928f46f 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -861,16 +861,22 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) DBMgr* dbmgr = DBMgr::Get(); const char* cname = ConnName(); - while ( addr->isCurrent() ) { - unsigned char buf[MAX_MSG_LEN]; - size_t buflen = sizeof(buf); - int msgID; - if ( !dbmgr->GetStoredMessage( cname, dest, buf, &buflen, &msgID ) - || ! send_with_length( addr, dest, buf, buflen, true ) ) { - break; - } - dbmgr->RemoveStoredMessages( &msgID, 1 ); + vector msgs; + dbmgr->GetStoredMessages( cname, dest, msgs ); + + vector sentIDs; + vector::const_iterator iter; + for ( iter = msgs.begin(); addr->isCurrent() && msgs.end() != iter; + ++iter ) { + DBMgr::MsgInfo msg = *iter; + if ( ! send_with_length( addr, dest, + (const unsigned char*)msg.msg.c_str(), + msg.msg.length(), true ) ) { + break; + } + sentIDs.push_back( msg.msgID ); } + dbmgr->RemoveStoredMessages( sentIDs ); } /* send_stored_messages */ bool diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index 4214b1f39..e0560f6b0 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -735,17 +735,6 @@ DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid, return found; } -int -DBMgr::PendingMsgCount( const char* connName, int hid ) -{ - string test; - string_printf( test, "connName = '%s' AND hid = %d ", connName, hid ); -#ifdef HAVE_STIME - string_printf( test, " AND stime IS NULL" ); -#endif - return getCountWhere( MSGS_TABLE, test ); -} - bool DBMgr::execSql( const string& query ) { @@ -882,74 +871,55 @@ DBMgr::CountStoredMessages( DevIDRelay relayID ) return getCountWhere( MSGS_TABLE, test ); } -void -DBMgr::GetStoredMessageIDs( DevIDRelay relayID, vector& ids ) -{ - const char* fmt = "SELECT id FROM " MSGS_TABLE " WHERE devid=%d " - "AND connname IN (SELECT connname FROM " GAMES_TABLE - " WHERE NOT " GAMES_TABLE ".dead)"; - string query; - string_printf( query, fmt, relayID ); - // logf( XW_LOGINFO, "%s: query=\"%s\"", __func__, query.c_str() ); - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - int nTuples = PQntuples( result ); - for ( int ii = 0; ii < nTuples; ++ii ) { - int id = atoi( PQgetvalue( result, ii, 0 ) ); - // logf( XW_LOGINFO, "%s: adding id %d", __func__, id ); - ids.push_back( id ); - } - PQclear( result ); - logf( XW_LOGINFO, "%s(relayID=%d)=>%d ids", __func__, relayID, ids.size() ); -} - void DBMgr::StoreMessage( const char* const connName, int hid, const unsigned char* buf, int len ) { DevIDRelay devID = getDevID( connName, hid ); if ( DEVID_NONE == devID ) { - logf( XW_LOGERROR, "%s: warning: devid not found for connName=%s, hid=%d", - __func__, connName, hid ); - } - - size_t newLen; - const char* fmt = "INSERT INTO " MSGS_TABLE " " - "(connname, hid, devid, token, %s, msglen) " - "VALUES( '%s', %d, %d, " - "(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), " - "%s'%s', %d)"; - - string query; - if ( m_useB64 ) { - gchar* b64 = g_base64_encode( buf, len ); - string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName, - "", b64, len ); - g_free( b64 ); + logf( XW_LOGINFO, "%s: devid not found for connName=%s, hid=%d; " + "dropping message", __func__, connName, hid ); } else { - unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, - len, &newLen ); - assert( NULL != bytes ); - - string_printf( query, fmt, "msg", connName, hid, devID, hid, connName, - "E", bytes, len ); - PQfreemem( bytes ); - } - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - execSql( query ); + size_t newLen; + const char* fmt = "INSERT INTO " MSGS_TABLE " " + "(connname, hid, devid, token, %s, msglen) " + "VALUES( '%s', %d, %d, " + "(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), " + "%s'%s', %d)"; + + string query; + if ( m_useB64 ) { + gchar* b64 = g_base64_encode( buf, len ); + string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName, + "", b64, len ); + g_free( b64 ); + } else { + unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, + len, &newLen ); + assert( NULL != bytes ); + + string_printf( query, fmt, "msg", connName, hid, devID, hid, connName, + "E", bytes, len ); + PQfreemem( bytes ); + } + + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + execSql( query ); + } } void -DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex, - unsigned char* buf, size_t* buflen ) +DBMgr::decodeMessage( PGresult* result, bool useB64, int rowIndx, int b64indx, + int byteaIndex, unsigned char* buf, size_t* buflen ) { const char* from = NULL; if ( useB64 ) { - from = PQgetvalue( result, 0, b64indx ); + from = PQgetvalue( result, rowIndx, b64indx ); } if ( NULL == from || '\0' == from[0] ) { useB64 = false; - from = PQgetvalue( result, 0, byteaIndex ); + from = PQgetvalue( result, rowIndx, byteaIndex ); } size_t to_length; @@ -970,71 +940,61 @@ DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex *buflen = to_length; } -bool -DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn, - unsigned char* buf, size_t* buflen, int* msgID ) +// storedMessagesImpl() assumes its callers' queries return the same results +#define STORED_SELECT "SELECT id, msg64, msg, msglen, token FROM " MSGS_TABLE " " + +void +DBMgr::storedMessagesImpl( string query, vector& msgs ) { - const char* fmt = "SELECT id, msg, msg64, msglen FROM " MSGS_TABLE - " WHERE connName = '%s' AND hid = %d " -#ifdef HAVE_STIME - "AND stime IS NULL " -#endif - "ORDER BY id LIMIT 1 OFFSET %d"; - string query; - string_printf( query, fmt, connName, hid, nn ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - int nTuples = PQntuples( result ); - assert( nTuples <= 1 ); - bool found = nTuples == 1; - if ( found ) { - if ( NULL != msgID ) { - *msgID = atoi( PQgetvalue( result, 0, 0 ) ); - } - size_t msglen = atoi( PQgetvalue( result, 0, 3 ) ); - decodeMessage( result, m_useB64, 2, 1, buf, buflen ); - assert( 0 == msglen || msglen == *buflen ); + int nTuples = PQntuples( result ); + for ( int ii = 0; ii < nTuples; ++ii ) { + int id = atoi( PQgetvalue( result, ii, 0 ) ); + size_t msglen = atoi( PQgetvalue( result, ii, 3 ) ); + uint8_t buf[1024]; + size_t buflen = sizeof(buf); + decodeMessage( result, m_useB64, ii, 1, 2, buf, &buflen ); + assert( 0 == msglen || buflen == msglen ); + string str( (char*)buf, buflen ); + AddrInfo::ClientToken token = atoi( PQgetvalue( result, ii, 4 ) ); + MsgInfo msg( str, id, token ); + msgs.push_back( msg ); } PQclear( result ); - return found; } -bool -DBMgr::GetStoredMessage( const char* const connName, int hid, - unsigned char* buf, size_t* buflen, int* msgID ) +void +DBMgr::GetStoredMessages( DevIDRelay relayID, vector& msgs ) { - return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID ); -} - -bool -DBMgr::GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, - AddrInfo::ClientToken* token ) -{ - const char* fmt = "SELECT token, msg, msg64, msglen FROM " MSGS_TABLE - " WHERE id = %d " + const char* fmt = STORED_SELECT " WHERE devid=%d " #ifdef HAVE_STIME - "AND stime IS NULL " + " AND stime IS NULL " +#endif + " AND connname IN (SELECT connname FROM " GAMES_TABLE + " WHERE NOT " GAMES_TABLE ".dead)" + " ORDER BY id"; + + string query; + string_printf( query, fmt, relayID ); + + storedMessagesImpl( query, msgs ); +} + +void +DBMgr::GetStoredMessages( const char* const connName, HostID hid, + vector& msgs ) +{ + const char* fmt = STORED_SELECT + " WHERE hid = %d AND connname = '%s'" +#ifdef HAVE_STIME + " AND stime IS NULL " #endif ; string query; - string_printf( query, fmt, msgID ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + string_printf( query, fmt, hid, connName ); - PGresult* result = PQexec( getThreadConn(), query.c_str() ); - int nTuples = PQntuples( result ); - assert( nTuples <= 1 ); - - bool found = nTuples == 1; - if ( found ) { - *token = atoi( PQgetvalue( result, 0, 0 ) ); - size_t msglen = atoi( PQgetvalue( result, 0, 3 ) ); - decodeMessage( result, m_useB64, 2, 1, buf, buflen ); - assert( 0 == msglen || *buflen == msglen ); - } - PQclear( result ); - return found; + storedMessagesImpl( query, msgs ); } void diff --git a/xwords4/relay/dbmgr.h b/xwords4/relay/dbmgr.h index bed8a96c6..2a1995c39 100644 --- a/xwords4/relay/dbmgr.h +++ b/xwords4/relay/dbmgr.h @@ -38,6 +38,16 @@ class DBMgr { them. */ static const DevIDRelay DEVID_NONE = 0; + class MsgInfo { + public: + MsgInfo( string m, int id, AddrInfo::ClientToken tok ) { + msg = m; msgID = id; token = tok; + } + string msg; + int msgID; + AddrInfo::ClientToken token; + }; + static DBMgr* Get(); ~DBMgr(); @@ -101,23 +111,14 @@ class DBMgr { bool TokenFor( const char* const connName, int hid, DevIDRelay* devid, AddrInfo::ClientToken* token ); - /* Return number of messages pending for connName:hostid pair passed in */ - int PendingMsgCount( const char* const connName, int hid ); - /* message storage -- different DB */ int CountStoredMessages( const char* const connName ); - int CountStoredMessages( const char* const connName, int hid ); int CountStoredMessages( DevIDRelay relayID ); void StoreMessage( const char* const connName, int hid, const unsigned char* const buf, int len ); - void GetStoredMessageIDs( DevIDRelay relayID, vector& ids ); - - bool GetStoredMessage( const char* const connName, int hid, - unsigned char* buf, size_t* buflen, int* msgID ); - bool GetNthStoredMessage( const char* const connName, int hid, int nn, - unsigned char* buf, size_t* buflen, int* msgID ); - bool GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen, - AddrInfo::ClientToken* token ); + void GetStoredMessages( DevIDRelay relayID, vector& msgs ); + void GetStoredMessages( const char* const connName, HostID hid, + vector& msgs ); void RemoveStoredMessages( const int* msgID, int nMsgIDs ); void RemoveStoredMessages( vector& ids ); @@ -131,8 +132,10 @@ class DBMgr { DevIDRelay getDevID( const DevID* devID ); int getCountWhere( const char* table, string& test ); void RemoveStoredMessages( string& msgIDs ); - void decodeMessage( PGresult* result, bool useB64, int b64indx, + void decodeMessage( PGresult* result, bool useB64, int rowIndx, int b64indx, int byteaIndex, unsigned char* buf, size_t* buflen ); + void storedMessagesImpl( string query, vector& msgs ); + int CountStoredMessages( const char* const connName, int hid ); PGconn* getThreadConn( void ); void clearThreadConn(); diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index ab57f121c..90bdb0518 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -982,21 +982,16 @@ pushShort( vector& out, unsigned short num ) static void pushMsgs( vector& out, DBMgr* dbmgr, const char* connName, - HostID hid, int msgCount, vector& msgIDs ) + HostID hid, vector& msgs, vector& msgIDs ) { - int ii; - for ( ii = 0; ii < msgCount; ++ii ) { - unsigned char buf[1024]; - size_t buflen = sizeof(buf); - int msgID; - if ( !dbmgr->GetNthStoredMessage( connName, hid, ii, buf, - &buflen, &msgID ) ) { - logf( XW_LOGERROR, "%s: %dth message not there", __func__, ii ); - break; - } - pushShort( out, buflen ); - out.insert( out.end(), buf, buf + buflen ); - msgIDs.push_back( msgID ); + vector::const_iterator iter; + for ( iter = msgs.begin(); msgs.end() != iter; ++iter ) { + DBMgr::MsgInfo msg = *iter; + int len = msg.msg.length(); + uint8_t* ptr = (uint8_t*)msg.msg.c_str(); + pushShort( out, len ); + out.insert( out.end(), ptr, ptr + len ); + msgIDs.push_back( msg.msgID ); } } @@ -1034,10 +1029,11 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull, /* For each relayID, write the number of messages and then each message (in the getmsg case) */ - int msgCount = dbmgr->PendingMsgCount( connName, hid ); - pushShort( out, msgCount ); + vector msgs; + dbmgr->GetStoredMessages( connName, hid, msgs ); + pushShort( out, msgs.size() ); if ( sendFull ) { - pushMsgs( out, dbmgr, connName, hid, msgCount, msgIDs ); + pushMsgs( out, dbmgr, connName, hid, msgs, msgIDs ); } } @@ -1049,7 +1045,7 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull, logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten ); if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); - dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() ); + dbmgr->RemoveStoredMessages( msgIDs ); } } } // handleMsgsMsg @@ -1319,21 +1315,19 @@ retrieveMessages( DevID& devID, const AddrInfo::AddrUnion* saddr ) { logf( XW_LOGINFO, "%s()", __func__ ); DBMgr* dbMgr = DBMgr::Get(); - vector ids; + vector msgs; + dbMgr->GetStoredMessages( devID.asRelayID(), msgs ); + + vector::const_iterator iter; vector sentIDs; - dbMgr->GetStoredMessageIDs( devID.asRelayID(), ids ); - vector::const_iterator iter; - for ( iter = ids.begin(); iter != ids.end(); ++iter ) { - unsigned char buf[MAX_MSG_LEN]; - size_t buflen = sizeof(buf); - AddrInfo::ClientToken clientToken; - if ( dbMgr->GetStoredMessage( *iter, buf, &buflen, &clientToken ) ) { - AddrInfo addr( clientToken, saddr ); - if ( ! send_with_length_unsafe( &addr, buf, buflen ) ) { - break; - } - sentIDs.push_back( *iter ); + for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) { + DBMgr::MsgInfo msg = *iter; + AddrInfo addr( msg.token, saddr ); + if ( ! send_with_length_unsafe( &addr, (unsigned char*)msg.msg.c_str(), + msg.msg.length() ) ) { + break; } + sentIDs.push_back( msg.msgID ); } dbMgr->RemoveStoredMessages( sentIDs ); }