add new proxy command that fetches actual messages stored for a device.

This commit is contained in:
Andy2 2011-01-22 12:52:26 -08:00
parent d98a3fe232
commit f96f4a040b
4 changed files with 96 additions and 41 deletions

View file

@ -451,13 +451,14 @@ DBMgr::StoreMessage( const char* const connName, int hid,
}
bool
DBMgr::GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID )
DBMgr::GetNthStoredMessage( const char* const connName, int hid,
int nn, unsigned char* buf, size_t* buflen,
int* msgID )
{
const char* fmt = "SELECT id, msg, msglen FROM " MSGS_TABLE
" WHERE connName = '%s' AND hid = %d ORDER BY id LIMIT 1";
" WHERE connName = '%s' AND hid = %d ORDER BY id LIMIT 1 OFFSET %d";
char query[256];
snprintf( query, sizeof(query), fmt, connName, hid );
snprintf( query, sizeof(query), fmt, connName, hid, nn );
logf( XW_LOGINFO, "%s: query: %s", __func__, query );
PGresult* result = PQexec( getThreadConn(), query );
@ -466,7 +467,9 @@ DBMgr::GetStoredMessage( const char* const connName, int hid,
bool found = nTuples == 1;
if ( found ) {
*msgID = atoi( PQgetvalue( result, 0, 0 ) );
if ( NULL != msgID ) {
*msgID = atoi( PQgetvalue( result, 0, 0 ) );
}
size_t msglen = atoi( PQgetvalue( result, 0, 2 ) );
/* int len = PQgetlength( result, 0, 1 ); */
@ -478,12 +481,19 @@ DBMgr::GetStoredMessage( const char* const connName, int hid,
memcpy( buf, bytes, to_length );
PQfreemem( bytes );
*buflen = to_length;
assert( to_length == msglen );
assert( 0 == msglen || to_length == msglen );
}
PQclear( result );
return found;
}
bool
DBMgr::GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID )
{
return GetNthStoredMessage( connName, hid, 0, buf, buflen, msgID );
}
void
DBMgr::RemoveStoredMessage( int msgID )
{

View file

@ -76,6 +76,8 @@ class DBMgr {
const unsigned char* const buf, int len );
bool GetStoredMessage( const char* const connName, int hid,
unsigned char* buf, size_t* buflen, int* msgID );
bool GetNthStoredMessage( const char* const connName, int hid, int nn,
unsigned char* buf, size_t* buflen, int* msgID );
void RemoveStoredMessage( int msgID );
private:

View file

@ -193,7 +193,8 @@ parseRelayID( const char* const in, char* buf, HostID* hid )
}
static bool
getNetShort( unsigned char** bufpp, unsigned char* end, unsigned short* out )
getNetShort( unsigned char** bufpp, const unsigned char* end,
unsigned short* out )
{
bool ok = *bufpp + 2 <= end;
if ( ok ) {
@ -206,7 +207,8 @@ getNetShort( unsigned char** bufpp, unsigned char* end, unsigned short* out )
} /* getNetShort */
static bool
getNetByte( unsigned char** bufpp, unsigned char* end, unsigned char* out )
getNetByte( unsigned char** bufpp, const unsigned char* end,
unsigned char* out )
{
bool ok = *bufpp < end;
if ( ok ) {
@ -693,6 +695,76 @@ read_packet( int sock, unsigned char* buf, int buflen )
return result;
}
static void
pushShort( vector<unsigned char>& out, unsigned short num )
{
num = htons( num );
out.insert( out.end(), (unsigned char*)&num, ((unsigned char*)&num) + 2 );
}
static void
pushMsgs( vector<unsigned char>& out, DBMgr* dbmgr, const char* connName,
HostID hid, int msgCount )
{
int ii;
for ( ii = 0; ii < msgCount; ++ii ) {
unsigned char buf[1024];
size_t buflen = sizeof(buf);
if ( !dbmgr->GetNthStoredMessage( connName, hid, ii, buf,
&buflen, NULL ) ) {
logf( XW_LOGERROR, "%s: %dth message not there", __func__, ii );
break;
}
pushShort( out, buflen );
out.insert( out.end(), buf, buf + buflen );
}
}
static void
handleMsgsMsg( int sock, bool sendFull,
unsigned char* bufp, const unsigned char* end )
{
unsigned short nameCount;
if ( getNetShort( &bufp, end, &nameCount ) ) {
char* in = (char*)bufp;
DBMgr* dbmgr = DBMgr::Get();
unsigned short count;
/* reply format: PRX_GET_MSGS case: <message len><n_msgs>[<len><msg>]*
* PRX_HAS_MSGS case: <message len><n_msgs><count>*
*/
vector<unsigned char> out(4); /* space for len and n_msgs */
assert( out.size() == 4 );
char* saveptr;
for ( count = 0; ; ++count ) {
char* name = strtok_r( in, "\n", &saveptr );
if ( NULL == name ) {
break;
}
HostID hid;
char connName[MAX_CONNNAME_LEN+1];
if ( !parseRelayID( name, connName, &hid ) ) {
break;
}
int msgCount = dbmgr->PendingMsgCount( connName, hid );
if ( sendFull ) {
pushMsgs( out, dbmgr, connName, hid, msgCount );
} else {
pushShort( out, msgCount );
}
in = NULL;
}
unsigned short tmp = htons( out.size() - sizeof(tmp) );
memcpy( &out[0], &tmp, sizeof(tmp) );
tmp = htons( count );
memcpy( &out[2], &tmp, sizeof(tmp) );
write( sock, &out[0], out.size() );
}
}
void
handle_proxy_packet( unsigned char* buf, int len, int sock )
{
@ -723,42 +795,12 @@ handle_proxy_packet( unsigned char* buf, int len, int sock )
}
break;
case PRX_HAS_MSGS:
case PRX_GET_MSGS:
if ( len >= 2 ) {
unsigned short nameCount;
if ( getNetShort( &bufp, end, &nameCount ) ) {
char* in = (char*)bufp;
char* saveptr;
vector<int> ids;
for ( ; ; ) {
char* name = strtok_r( in, "\n", &saveptr );
if ( NULL == name ) {
break;
}
HostID hid;
char connName[MAX_CONNNAME_LEN+1];
if ( parseRelayID( name, connName, &hid ) ) {
ids.push_back( DBMgr::Get()->
PendingMsgCount( connName, hid ) );
}
in = NULL;
}
unsigned short len =
(ids.size() * sizeof(unsigned short))
+ sizeof( unsigned short );
len = htons( len );
write( sock, &len, sizeof(len) );
len = htons( nameCount );
write( sock, &len, sizeof(len) );
vector<int>::const_iterator iter;
for ( iter = ids.begin(); iter != ids.end(); ++iter ) {
unsigned short num = *iter;
num = htons( num );
write( sock, &num, sizeof(num) );
}
}
handleMsgsMsg( sock, PRX_GET_MSGS == cmd, bufp, end );
}
break; /* PRX_HAS_MSGS */
case PRX_DEVICE_GONE:
logf( XW_LOGINFO, "%s: got PRX_DEVICE_GONE", __func__ );
if ( len >= 2 ) {

View file

@ -154,6 +154,7 @@ enum { PRX_NONE /* 0 is an illegal value */
,PRX_PUB_ROOMS /* list all public rooms for lang/nPlayers */
,PRX_HAS_MSGS /* return message counts for connName/devid array */
,PRX_DEVICE_GONE /* return message counts for connName/devid array */
,PRX_GET_MSGS /* return full messages for connName/devid array */
}
#ifndef CANT_DO_TYPEDEF
XWPRXYCMD