register devices more often; add ctrl command to send text message to

device if possible, otherwise to post it to the db (mostly for testing).
This commit is contained in:
Eric House 2013-08-17 14:55:19 -07:00
parent 8ad549d2bd
commit 47203437cc
9 changed files with 189 additions and 15 deletions

View file

@ -243,6 +243,15 @@ relaycon_receive( void* closure, int socket )
XP_LOGF( "got ack for packetID %ld", packetID );
break;
}
case XWPDEV_METAMSG: {
XP_U16 len = getNetShort( &ptr );
unsigned char buf[len + 1];
memcpy( buf, ptr, len );
ptr += len;
buf[len] = '\0';
XP_LOGF( "%s: got message: %s", __func__, buf );
break;
}
default:
XP_LOGF( "%s: Unexpected cmd %d", __func__, header.cmd );
XP_ASSERT( 0 );

View file

@ -68,6 +68,7 @@ pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER;
static bool cmd_quit( int socket, const char** args );
static bool cmd_print( int socket, const char** args );
static bool cmd_devs( int socket, const char** args );
/* static bool cmd_lock( int socket, const char** args ); */
static bool cmd_help( int socket, const char** args );
static bool cmd_start( int socket, const char** args );
@ -130,6 +131,7 @@ static const FuncRec gFuncs[] = {
/* { "kill", cmd_kill_eject }, */
/* { "lock", cmd_lock }, */
{ "print", cmd_print },
{ "devs", cmd_devs },
{ "quit", cmd_quit },
{ "rev", cmd_rev },
{ "set", cmd_set },
@ -469,7 +471,6 @@ print_sockets( int out, int sought )
static bool
cmd_print( int socket, const char** args )
{
logf( XW_LOGINFO, "cmd_print called" );
bool found = false;
if ( 0 == strcmp( "cref", args[1] ) ) {
if ( 0 == strcmp( "all", args[2] ) ) {
@ -496,7 +497,7 @@ cmd_print( int socket, const char** args )
} else if ( 0 == strcmp( "dev", args[1] ) ) {
if ( 0 == strcmp( "all", args[2] ) ) {
string str;
DevMgr::Get()->printDevices( str );
DevMgr::Get()->printDevices( str, 0 );
send( socket, str.c_str(), str.size(), 0 );
found = true;
}
@ -517,6 +518,53 @@ cmd_print( int socket, const char** args )
return false;
} /* cmd_print */
static bool
cmd_devs( int socket, const char** args )
{
bool found = false;
string result;
if ( 0 == strcmp( "print", args[1] ) ) {
DevIDRelay devid = 0;
if ( NULL != args[3] ) {
devid = (DevIDRelay)strtoul( args[3], NULL, 10 );
}
DevMgr::Get()->printDevices( result, devid );
found = true;
} else if ( 0 == strcmp( "ping", args[1] ) ) {
} else if ( 0 == strcmp( "msg", args[1] ) ) {
DevIDRelay devid = (DevIDRelay)strtoul( args[2], NULL, 10 );
const char* msg = args[3];
if ( post_message( devid, msg ) ) {
string_printf( result, "posted message: %s\n", msg );
} else {
string_printf( result, "unable to post; does dev %d exist\n",
devid );
}
found = true;
} else if ( 0 == strcmp( "rm", args[1] ) ) {
}
if ( found ) {
if ( 0 < result.size() ) {
send( socket, result.c_str(), result.size(), 0 );
}
} else {
const char* strs[] = {
"* %s print [<id>]\n",
" %s ping\n",
" %s msg <devid> <msg_text>\n",
" %s rm <devid>\n",
};
string help;
for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) {
string_printf( help, strs[ii], args[0] );
}
send( socket, help.c_str(), help.size(), 0 );
}
return false;
}
#if 0
static bool
cmd_lock( int socket, const char** args )
@ -571,10 +619,11 @@ ctrl_thread_main( void* arg )
g_ctrlSocks.push_back( sock );
}
string cmd;
const char* args[MAX_ARGS] = {0};
string sargs[MAX_ARGS];
for ( ; ; ) {
string cmd;
const char* args[MAX_ARGS] = {0};
string sargs[MAX_ARGS];
print_prompt( sock );

View file

@ -869,6 +869,33 @@ DBMgr::CountStoredMessages( DevIDRelay relayID )
return getCountWhere( MSGS_TABLE, test );
}
void
DBMgr::StoreMessage( DevIDRelay devID, const uint8_t* const buf,
int len )
{
clearHasNoMessages( devID );
size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE " "
"(devid, %s, msglen) VALUES(%d, %s'%s', %d)";
string query;
if ( m_useB64 ) {
gchar* b64 = g_base64_encode( buf, len );
string_printf( query, fmt, "msg64", devID, "", b64, len );
g_free( b64 );
} else {
unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf,
len, &newLen );
assert( NULL != bytes );
string_printf( query, fmt, "msg", devID, "E", bytes, len );
PQfreemem( bytes );
}
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
}
void
DBMgr::StoreMessage( const char* const connName, int hid,
const unsigned char* buf, int len )

View file

@ -115,6 +115,7 @@ class DBMgr {
/* message storage -- different DB */
int CountStoredMessages( const char* const connName );
int CountStoredMessages( DevIDRelay relayID );
void StoreMessage( DevIDRelay relayID, const uint8_t* const buf, int len );
void StoreMessage( const char* const connName, int hid,
const unsigned char* const buf, int len );
void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs );

View file

@ -107,16 +107,21 @@ DevMgr::get( DevIDRelay devid )
// it, so that as much work as possible is done on the calling thread without
// holding up more important stuff.
void
DevMgr::printDevices( string& str )
DevMgr::printDevices( string& str, DevIDRelay devid )
{
map<uint32_t, DevIDRelay> agedDevs;
{
MutexLock ml( &m_mapLock );
map<DevIDRelay,UDPAddrRec>::const_iterator iter;
for ( iter = m_devAddrMap.begin(); iter != m_devAddrMap.end(); ++iter ) {
DevIDRelay devid = iter->first;
uint32_t added = iter->second.m_added;
agedDevs.insert( pair<uint32_t, DevIDRelay>(added, devid) );
map<DevIDRelay, UDPAddrRec>::const_iterator iter;
if ( 0 != devid ) {
iter = m_devAddrMap.find( devid );
if ( m_devAddrMap.end() != iter ) {
addDevice( agedDevs, iter );
}
} else {
for ( iter = m_devAddrMap.begin(); iter != m_devAddrMap.end(); ++iter ) {
addDevice( agedDevs, iter );
}
}
}
@ -142,3 +147,12 @@ DevMgr::printDevices( string& str )
}
}
void
DevMgr::addDevice( map<uint32_t, DevIDRelay>& devs,
map<DevIDRelay,UDPAddrRec>::const_iterator iter )
{
DevIDRelay devid = iter->first;
uint32_t added = iter->second.m_added;
devs.insert( pair<uint32_t, DevIDRelay>(added, devid) );
}

View file

@ -35,10 +35,11 @@ class DevMgr {
void Remember( DevIDRelay devid, const AddrInfo* addr );
const AddrInfo::AddrUnion* get( DevIDRelay devid );
void printDevices( string& str );
/* Called from ctrl port */
void printDevices( string& str, DevIDRelay devid /* 0 means all */ );
private:
DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); }
/* destructor's never called....
~DevMgr() { pthread_mutex_destroy( &m_mapLock ); }
*/
@ -52,6 +53,10 @@ class DevMgr {
time_t m_added;
};
DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); }
void addDevice( map<uint32_t, DevIDRelay>& devs,
map<DevIDRelay,UDPAddrRec>::const_iterator iter );
map<DevIDRelay,UDPAddrRec> m_devAddrMap;
map<AddrInfo::AddrUnion, DevIDRelay> m_addrDevMap;
pthread_mutex_t m_mapLock;

View file

@ -601,6 +601,62 @@ send_havemsgs( const AddrInfo* addr )
send_via_udp( addr, NULL, XWPDEV_HAVEMSGS, NULL );
}
class MsgClosure {
public:
MsgClosure( DevIDRelay devid, gpointer msg, uint16_t len )
{
m_devid = devid;
m_len = len;
m_msg = g_malloc(len);
memcpy( m_msg, msg, len );
}
~MsgClosure() {
g_free( m_msg );
}
DevIDRelay m_devid;
uint16_t m_len;
gpointer m_msg;
};
static void
onPostedMsgAcked( bool acked, uint32_t packetID, void* data )
{
MsgClosure* mc = (MsgClosure*)data;
if ( !acked ) {
DBMgr::Get()->StoreMessage( mc->m_devid,
(const unsigned char*)mc->m_msg,
mc->m_len );
}
delete mc;
}
bool
post_message( DevIDRelay devid, const char* message )
{
const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( devid );
bool success = !!addru;
if ( success ) {
AddrInfo addr( addru );
short len = strlen(message);
short netLen = htons(len);
uint32_t packetID;
uint8_t buf[len + sizeof(netLen) + sizeof(XWPDEV_METAMSG)];
buf[0] = XWPDEV_METAMSG;
memcpy( &buf[sizeof(XWPDEV_METAMSG)], &netLen, sizeof(netLen) );
memcpy( &buf[sizeof(XWPDEV_METAMSG) + sizeof(netLen)], message, len );
bool sent = send_via_udp( &addr, &packetID, XWPDEV_METAMSG, &buf[1],
VSIZE(buf)-1, NULL );
if ( sent ) {
MsgClosure* mc = new MsgClosure( devid, buf, VSIZE(buf) );
UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc );
} else {
DBMgr::Get()->StoreMessage( devid, (const unsigned char*)buf, VSIZE(buf) );
}
}
return success;
}
/* A CONNECT message from a device gives us the hostID and socket we'll
* associate with one participant in a relayed session. We'll store this
* information with the cookie where other participants can find it when they
@ -1355,8 +1411,11 @@ retrieveMessages( DevID& devID, const AddrInfo* addr )
for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) {
DBMgr::MsgInfo msg = *iter;
uint32_t packetID;
if ( !send_msg_via_udp( addr, msg.token, (unsigned char*)msg.msg.c_str(),
if ( !send_msg_via_udp( addr, msg.token,
(unsigned char*)msg.msg.c_str(),
msg.msg.length(), &packetID ) ) {
logf( XW_LOGERROR, "%s: unable to send to devID %d",
__func__, devID.asRelayID() );
break;
}
UDPAckTrack::setOnAck( onMsgAcked, packetID, (void*)msg.msgID );
@ -1471,7 +1530,13 @@ handle_udp_packet( UdpThreadClosure* utc )
DevID devID( ID_TYPE_RELAY );
devID.m_devIDString.append( (const char*)ptr, idLen );
ptr += idLen;
retrieveMessages( devID, utc->addr() );
const AddrInfo* addr = utc->addr();
DevMgr::Get()->Remember( devID.asRelayID(), addr );
if ( XWPDEV_RQSTMSGS == header.cmd ) {
retrieveMessages( devID, addr );
}
break;
}
case XWPDEV_ACK: {

View file

@ -94,6 +94,8 @@ enum { XWPDEV_NONE /* 0 is an illegal value */
,XWPDEV_DELGAME /* dev->relay: game's been deleted. format:
header, relayid: 4, clientToken: 4 */
,XWPDEV_METAMSG /* Message to be displayed to user */
}
#ifndef CANT_DO_TYPEDEF
XWRelayReg

View file

@ -50,6 +50,8 @@ bool send_with_length_unsafe( const AddrInfo* addr,
uint32_t* packetIDP );
void send_havemsgs( const AddrInfo* addr );
bool post_message( DevIDRelay devid, const char* message );
time_t uptime(void);
void blockSignals( void ); /* call from all but main thread */