fetch all stored messages at once to reduce number of DB calls made.

This commit is contained in:
Eric House 2013-07-25 06:37:53 -07:00
parent 6bb2efbbd5
commit 9951064439
4 changed files with 130 additions and 167 deletions

View file

@ -861,16 +861,22 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
DBMgr* dbmgr = DBMgr::Get();
const char* cname = ConnName();
while ( addr->isCurrent() ) {
unsigned char buf[MAX_MSG_LEN];
size_t buflen = sizeof(buf);
int msgID;
if ( !dbmgr->GetStoredMessage( cname, dest, buf, &buflen, &msgID )
|| ! send_with_length( addr, dest, buf, buflen, true ) ) {
break;
}
dbmgr->RemoveStoredMessages( &msgID, 1 );
vector<DBMgr::MsgInfo> msgs;
dbmgr->GetStoredMessages( cname, dest, msgs );
vector<int> sentIDs;
vector<DBMgr::MsgInfo>::const_iterator iter;
for ( iter = msgs.begin(); addr->isCurrent() && msgs.end() != iter;
++iter ) {
DBMgr::MsgInfo msg = *iter;
if ( ! send_with_length( addr, dest,
(const unsigned char*)msg.msg.c_str(),
msg.msg.length(), true ) ) {
break;
}
sentIDs.push_back( msg.msgID );
}
dbmgr->RemoveStoredMessages( sentIDs );
} /* send_stored_messages */
bool

View file

@ -735,17 +735,6 @@ DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid,
return found;
}
int
DBMgr::PendingMsgCount( const char* connName, int hid )
{
string test;
string_printf( test, "connName = '%s' AND hid = %d ", connName, hid );
#ifdef HAVE_STIME
string_printf( test, " AND stime IS NULL" );
#endif
return getCountWhere( MSGS_TABLE, test );
}
bool
DBMgr::execSql( const string& query )
{
@ -882,74 +871,55 @@ DBMgr::CountStoredMessages( DevIDRelay relayID )
return getCountWhere( MSGS_TABLE, test );
}
void
DBMgr::GetStoredMessageIDs( DevIDRelay relayID, vector<int>& ids )
{
const char* fmt = "SELECT id FROM " MSGS_TABLE " WHERE devid=%d "
"AND connname IN (SELECT connname FROM " GAMES_TABLE
" WHERE NOT " GAMES_TABLE ".dead)";
string query;
string_printf( query, fmt, relayID );
// logf( XW_LOGINFO, "%s: query=\"%s\"", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
for ( int ii = 0; ii < nTuples; ++ii ) {
int id = atoi( PQgetvalue( result, ii, 0 ) );
// logf( XW_LOGINFO, "%s: adding id %d", __func__, id );
ids.push_back( id );
}
PQclear( result );
logf( XW_LOGINFO, "%s(relayID=%d)=>%d ids", __func__, relayID, ids.size() );
}
void
DBMgr::StoreMessage( const char* const connName, int hid,
const unsigned char* buf, int len )
{
DevIDRelay devID = getDevID( connName, hid );
if ( DEVID_NONE == devID ) {
logf( XW_LOGERROR, "%s: warning: devid not found for connName=%s, hid=%d",
__func__, connName, hid );
}
size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE " "
"(connname, hid, devid, token, %s, msglen) "
"VALUES( '%s', %d, %d, "
"(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), "
"%s'%s', %d)";
string query;
if ( m_useB64 ) {
gchar* b64 = g_base64_encode( buf, len );
string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName,
"", b64, len );
g_free( b64 );
logf( XW_LOGINFO, "%s: devid not found for connName=%s, hid=%d; "
"dropping message", __func__, connName, hid );
} else {
unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf,
len, &newLen );
assert( NULL != bytes );
string_printf( query, fmt, "msg", connName, hid, devID, hid, connName,
"E", bytes, len );
PQfreemem( bytes );
}
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE " "
"(connname, hid, devid, token, %s, msglen) "
"VALUES( '%s', %d, %d, "
"(SELECT tokens[%d] from " GAMES_TABLE " where connname='%s'), "
"%s'%s', %d)";
string query;
if ( m_useB64 ) {
gchar* b64 = g_base64_encode( buf, len );
string_printf( query, fmt, "msg64", connName, hid, devID, hid, connName,
"", b64, len );
g_free( b64 );
} else {
unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf,
len, &newLen );
assert( NULL != bytes );
string_printf( query, fmt, "msg", connName, hid, devID, hid, connName,
"E", bytes, len );
PQfreemem( bytes );
}
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
}
}
void
DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex,
unsigned char* buf, size_t* buflen )
DBMgr::decodeMessage( PGresult* result, bool useB64, int rowIndx, int b64indx,
int byteaIndex, unsigned char* buf, size_t* buflen )
{
const char* from = NULL;
if ( useB64 ) {
from = PQgetvalue( result, 0, b64indx );
from = PQgetvalue( result, rowIndx, b64indx );
}
if ( NULL == from || '\0' == from[0] ) {
useB64 = false;
from = PQgetvalue( result, 0, byteaIndex );
from = PQgetvalue( result, rowIndx, byteaIndex );
}
size_t to_length;
@ -970,71 +940,61 @@ DBMgr::decodeMessage( PGresult* result, bool useB64, int b64indx, int byteaIndex
*buflen = to_length;
}
bool
DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn,
unsigned char* buf, size_t* buflen, int* msgID )
// storedMessagesImpl() assumes its callers' queries return the same results
#define STORED_SELECT "SELECT id, msg64, msg, msglen, token FROM " MSGS_TABLE " "
void
DBMgr::storedMessagesImpl( string query, vector<DBMgr::MsgInfo>& msgs )
{
const char* fmt = "SELECT id, msg, msg64, msglen FROM " MSGS_TABLE
" WHERE connName = '%s' AND hid = %d "
#ifdef HAVE_STIME
"AND stime IS NULL "
#endif
"ORDER BY id LIMIT 1 OFFSET %d";
string query;
string_printf( query, fmt, connName, hid, nn );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
assert( nTuples <= 1 );
bool found = nTuples == 1;
if ( found ) {
if ( NULL != msgID ) {
*msgID = atoi( PQgetvalue( result, 0, 0 ) );
}
size_t msglen = atoi( PQgetvalue( result, 0, 3 ) );
decodeMessage( result, m_useB64, 2, 1, buf, buflen );
assert( 0 == msglen || msglen == *buflen );
int nTuples = PQntuples( result );
for ( int ii = 0; ii < nTuples; ++ii ) {
int id = atoi( PQgetvalue( result, ii, 0 ) );
size_t msglen = atoi( PQgetvalue( result, ii, 3 ) );
uint8_t buf[1024];
size_t buflen = sizeof(buf);
decodeMessage( result, m_useB64, ii, 1, 2, buf, &buflen );
assert( 0 == msglen || buflen == msglen );
string str( (char*)buf, buflen );
AddrInfo::ClientToken token = atoi( PQgetvalue( result, ii, 4 ) );
MsgInfo msg( str, id, token );
msgs.push_back( msg );
}
PQclear( result );
return found;
}
bool
DBMgr::GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID )
void
DBMgr::GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs )
{
return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID );
}
bool
DBMgr::GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen,
AddrInfo::ClientToken* token )
{
const char* fmt = "SELECT token, msg, msg64, msglen FROM " MSGS_TABLE
" WHERE id = %d "
const char* fmt = STORED_SELECT " WHERE devid=%d "
#ifdef HAVE_STIME
"AND stime IS NULL "
" AND stime IS NULL "
#endif
" AND connname IN (SELECT connname FROM " GAMES_TABLE
" WHERE NOT " GAMES_TABLE ".dead)"
" ORDER BY id";
string query;
string_printf( query, fmt, relayID );
storedMessagesImpl( query, msgs );
}
void
DBMgr::GetStoredMessages( const char* const connName, HostID hid,
vector<DBMgr::MsgInfo>& msgs )
{
const char* fmt = STORED_SELECT
" WHERE hid = %d AND connname = '%s'"
#ifdef HAVE_STIME
" AND stime IS NULL "
#endif
;
string query;
string_printf( query, fmt, msgID );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
string_printf( query, fmt, hid, connName );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
int nTuples = PQntuples( result );
assert( nTuples <= 1 );
bool found = nTuples == 1;
if ( found ) {
*token = atoi( PQgetvalue( result, 0, 0 ) );
size_t msglen = atoi( PQgetvalue( result, 0, 3 ) );
decodeMessage( result, m_useB64, 2, 1, buf, buflen );
assert( 0 == msglen || *buflen == msglen );
}
PQclear( result );
return found;
storedMessagesImpl( query, msgs );
}
void

View file

@ -38,6 +38,16 @@ class DBMgr {
them. */
static const DevIDRelay DEVID_NONE = 0;
class MsgInfo {
public:
MsgInfo( string m, int id, AddrInfo::ClientToken tok ) {
msg = m; msgID = id; token = tok;
}
string msg;
int msgID;
AddrInfo::ClientToken token;
};
static DBMgr* Get();
~DBMgr();
@ -101,23 +111,14 @@ class DBMgr {
bool TokenFor( const char* const connName, int hid, DevIDRelay* devid,
AddrInfo::ClientToken* token );
/* Return number of messages pending for connName:hostid pair passed in */
int PendingMsgCount( const char* const connName, int hid );
/* message storage -- different DB */
int CountStoredMessages( const char* const connName );
int CountStoredMessages( const char* const connName, int hid );
int CountStoredMessages( DevIDRelay relayID );
void StoreMessage( const char* const connName, int hid,
const unsigned char* const buf, int len );
void GetStoredMessageIDs( DevIDRelay relayID, vector<int>& ids );
bool GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID );
bool GetNthStoredMessage( const char* const connName, int hid, int nn,
unsigned char* buf, size_t* buflen, int* msgID );
bool GetStoredMessage( int msgID, unsigned char* buf, size_t* buflen,
AddrInfo::ClientToken* token );
void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs );
void GetStoredMessages( const char* const connName, HostID hid,
vector<DBMgr::MsgInfo>& msgs );
void RemoveStoredMessages( const int* msgID, int nMsgIDs );
void RemoveStoredMessages( vector<int>& ids );
@ -131,8 +132,10 @@ class DBMgr {
DevIDRelay getDevID( const DevID* devID );
int getCountWhere( const char* table, string& test );
void RemoveStoredMessages( string& msgIDs );
void decodeMessage( PGresult* result, bool useB64, int b64indx,
void decodeMessage( PGresult* result, bool useB64, int rowIndx, int b64indx,
int byteaIndex, unsigned char* buf, size_t* buflen );
void storedMessagesImpl( string query, vector<DBMgr::MsgInfo>& msgs );
int CountStoredMessages( const char* const connName, int hid );
PGconn* getThreadConn( void );
void clearThreadConn();

View file

@ -982,21 +982,16 @@ pushShort( vector<unsigned char>& out, unsigned short num )
static void
pushMsgs( vector<unsigned char>& out, DBMgr* dbmgr, const char* connName,
HostID hid, int msgCount, vector<int>& msgIDs )
HostID hid, vector<DBMgr::MsgInfo>& msgs, vector<int>& msgIDs )
{
int ii;
for ( ii = 0; ii < msgCount; ++ii ) {
unsigned char buf[1024];
size_t buflen = sizeof(buf);
int msgID;
if ( !dbmgr->GetNthStoredMessage( connName, hid, ii, buf,
&buflen, &msgID ) ) {
logf( XW_LOGERROR, "%s: %dth message not there", __func__, ii );
break;
}
pushShort( out, buflen );
out.insert( out.end(), buf, buf + buflen );
msgIDs.push_back( msgID );
vector<DBMgr::MsgInfo>::const_iterator iter;
for ( iter = msgs.begin(); msgs.end() != iter; ++iter ) {
DBMgr::MsgInfo msg = *iter;
int len = msg.msg.length();
uint8_t* ptr = (uint8_t*)msg.msg.c_str();
pushShort( out, len );
out.insert( out.end(), ptr, ptr + len );
msgIDs.push_back( msg.msgID );
}
}
@ -1034,10 +1029,11 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull,
/* For each relayID, write the number of messages and then
each message (in the getmsg case) */
int msgCount = dbmgr->PendingMsgCount( connName, hid );
pushShort( out, msgCount );
vector<DBMgr::MsgInfo> msgs;
dbmgr->GetStoredMessages( connName, hid, msgs );
pushShort( out, msgs.size() );
if ( sendFull ) {
pushMsgs( out, dbmgr, connName, hid, msgCount, msgIDs );
pushMsgs( out, dbmgr, connName, hid, msgs, msgIDs );
}
}
@ -1049,7 +1045,7 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull,
logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten );
if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) {
dbmgr->RecordSent( &msgIDs[0], msgIDs.size() );
dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() );
dbmgr->RemoveStoredMessages( msgIDs );
}
}
} // handleMsgsMsg
@ -1319,21 +1315,19 @@ retrieveMessages( DevID& devID, const AddrInfo::AddrUnion* saddr )
{
logf( XW_LOGINFO, "%s()", __func__ );
DBMgr* dbMgr = DBMgr::Get();
vector<int> ids;
vector<DBMgr::MsgInfo> msgs;
dbMgr->GetStoredMessages( devID.asRelayID(), msgs );
vector<DBMgr::MsgInfo>::const_iterator iter;
vector<int> sentIDs;
dbMgr->GetStoredMessageIDs( devID.asRelayID(), ids );
vector<int>::const_iterator iter;
for ( iter = ids.begin(); iter != ids.end(); ++iter ) {
unsigned char buf[MAX_MSG_LEN];
size_t buflen = sizeof(buf);
AddrInfo::ClientToken clientToken;
if ( dbMgr->GetStoredMessage( *iter, buf, &buflen, &clientToken ) ) {
AddrInfo addr( clientToken, saddr );
if ( ! send_with_length_unsafe( &addr, buf, buflen ) ) {
break;
}
sentIDs.push_back( *iter );
for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) {
DBMgr::MsgInfo msg = *iter;
AddrInfo addr( msg.token, saddr );
if ( ! send_with_length_unsafe( &addr, (unsigned char*)msg.msg.c_str(),
msg.msg.length() ) ) {
break;
}
sentIDs.push_back( msg.msgID );
}
dbMgr->RemoveStoredMessages( sentIDs );
}