diff --git a/xwords4/linux/relaycon.c b/xwords4/linux/relaycon.c index ebceddb6c..18eedac9b 100644 --- a/xwords4/linux/relaycon.c +++ b/xwords4/linux/relaycon.c @@ -243,6 +243,15 @@ relaycon_receive( void* closure, int socket ) XP_LOGF( "got ack for packetID %ld", packetID ); break; } + case XWPDEV_METAMSG: { + XP_U16 len = getNetShort( &ptr ); + unsigned char buf[len + 1]; + memcpy( buf, ptr, len ); + ptr += len; + buf[len] = '\0'; + XP_LOGF( "%s: got message: %s", __func__, buf ); + break; + } default: XP_LOGF( "%s: Unexpected cmd %d", __func__, header.cmd ); XP_ASSERT( 0 ); diff --git a/xwords4/relay/ctrl.cpp b/xwords4/relay/ctrl.cpp index b25daafb4..aec4a8737 100644 --- a/xwords4/relay/ctrl.cpp +++ b/xwords4/relay/ctrl.cpp @@ -68,6 +68,7 @@ pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER; static bool cmd_quit( int socket, const char** args ); static bool cmd_print( int socket, const char** args ); +static bool cmd_devs( int socket, const char** args ); /* static bool cmd_lock( int socket, const char** args ); */ static bool cmd_help( int socket, const char** args ); static bool cmd_start( int socket, const char** args ); @@ -130,6 +131,7 @@ static const FuncRec gFuncs[] = { /* { "kill", cmd_kill_eject }, */ /* { "lock", cmd_lock }, */ { "print", cmd_print }, + { "devs", cmd_devs }, { "quit", cmd_quit }, { "rev", cmd_rev }, { "set", cmd_set }, @@ -469,7 +471,6 @@ print_sockets( int out, int sought ) static bool cmd_print( int socket, const char** args ) { - logf( XW_LOGINFO, "cmd_print called" ); bool found = false; if ( 0 == strcmp( "cref", args[1] ) ) { if ( 0 == strcmp( "all", args[2] ) ) { @@ -496,7 +497,7 @@ cmd_print( int socket, const char** args ) } else if ( 0 == strcmp( "dev", args[1] ) ) { if ( 0 == strcmp( "all", args[2] ) ) { string str; - DevMgr::Get()->printDevices( str ); + DevMgr::Get()->printDevices( str, 0 ); send( socket, str.c_str(), str.size(), 0 ); found = true; } @@ -517,6 +518,53 @@ cmd_print( int socket, const char** args ) return false; } /* cmd_print */ +static bool +cmd_devs( int socket, const char** args ) +{ + bool found = false; + string result; + + if ( 0 == strcmp( "print", args[1] ) ) { + DevIDRelay devid = 0; + if ( NULL != args[3] ) { + devid = (DevIDRelay)strtoul( args[3], NULL, 10 ); + } + DevMgr::Get()->printDevices( result, devid ); + found = true; + } else if ( 0 == strcmp( "ping", args[1] ) ) { + } else if ( 0 == strcmp( "msg", args[1] ) ) { + DevIDRelay devid = (DevIDRelay)strtoul( args[2], NULL, 10 ); + const char* msg = args[3]; + if ( post_message( devid, msg ) ) { + string_printf( result, "posted message: %s\n", msg ); + } else { + string_printf( result, "unable to post; does dev %d exist\n", + devid ); + } + found = true; + } else if ( 0 == strcmp( "rm", args[1] ) ) { + } + + if ( found ) { + if ( 0 < result.size() ) { + send( socket, result.c_str(), result.size(), 0 ); + } + } else { + const char* strs[] = { + "* %s print []\n", + " %s ping\n", + " %s msg \n", + " %s rm \n", + }; + string help; + for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) { + string_printf( help, strs[ii], args[0] ); + } + send( socket, help.c_str(), help.size(), 0 ); + } + return false; +} + #if 0 static bool cmd_lock( int socket, const char** args ) @@ -571,10 +619,11 @@ ctrl_thread_main( void* arg ) g_ctrlSocks.push_back( sock ); } + string cmd; + const char* args[MAX_ARGS] = {0}; + string sargs[MAX_ARGS]; + for ( ; ; ) { - string cmd; - const char* args[MAX_ARGS] = {0}; - string sargs[MAX_ARGS]; print_prompt( sock ); diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index 15825c6ff..1c5ce67c3 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -869,6 +869,33 @@ DBMgr::CountStoredMessages( DevIDRelay relayID ) return getCountWhere( MSGS_TABLE, test ); } +void +DBMgr::StoreMessage( DevIDRelay devID, const uint8_t* const buf, + int len ) +{ + clearHasNoMessages( devID ); + + size_t newLen; + const char* fmt = "INSERT INTO " MSGS_TABLE " " + "(devid, %s, msglen) VALUES(%d, %s'%s', %d)"; + + string query; + if ( m_useB64 ) { + gchar* b64 = g_base64_encode( buf, len ); + string_printf( query, fmt, "msg64", devID, "", b64, len ); + g_free( b64 ); + } else { + unsigned char* bytes = PQescapeByteaConn( getThreadConn(), buf, + len, &newLen ); + assert( NULL != bytes ); + string_printf( query, fmt, "msg", devID, "E", bytes, len ); + PQfreemem( bytes ); + } + + logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + execSql( query ); +} + void DBMgr::StoreMessage( const char* const connName, int hid, const unsigned char* buf, int len ) diff --git a/xwords4/relay/dbmgr.h b/xwords4/relay/dbmgr.h index 3b629999a..692f507f8 100644 --- a/xwords4/relay/dbmgr.h +++ b/xwords4/relay/dbmgr.h @@ -115,6 +115,7 @@ class DBMgr { /* message storage -- different DB */ int CountStoredMessages( const char* const connName ); int CountStoredMessages( DevIDRelay relayID ); + void StoreMessage( DevIDRelay relayID, const uint8_t* const buf, int len ); void StoreMessage( const char* const connName, int hid, const unsigned char* const buf, int len ); void GetStoredMessages( DevIDRelay relayID, vector& msgs ); diff --git a/xwords4/relay/devmgr.cpp b/xwords4/relay/devmgr.cpp index acc86c631..0129b7732 100644 --- a/xwords4/relay/devmgr.cpp +++ b/xwords4/relay/devmgr.cpp @@ -107,16 +107,21 @@ DevMgr::get( DevIDRelay devid ) // it, so that as much work as possible is done on the calling thread without // holding up more important stuff. void -DevMgr::printDevices( string& str ) +DevMgr::printDevices( string& str, DevIDRelay devid ) { map agedDevs; { MutexLock ml( &m_mapLock ); - map::const_iterator iter; - for ( iter = m_devAddrMap.begin(); iter != m_devAddrMap.end(); ++iter ) { - DevIDRelay devid = iter->first; - uint32_t added = iter->second.m_added; - agedDevs.insert( pair(added, devid) ); + map::const_iterator iter; + if ( 0 != devid ) { + iter = m_devAddrMap.find( devid ); + if ( m_devAddrMap.end() != iter ) { + addDevice( agedDevs, iter ); + } + } else { + for ( iter = m_devAddrMap.begin(); iter != m_devAddrMap.end(); ++iter ) { + addDevice( agedDevs, iter ); + } } } @@ -142,3 +147,12 @@ DevMgr::printDevices( string& str ) } } + +void +DevMgr::addDevice( map& devs, + map::const_iterator iter ) +{ + DevIDRelay devid = iter->first; + uint32_t added = iter->second.m_added; + devs.insert( pair(added, devid) ); +} diff --git a/xwords4/relay/devmgr.h b/xwords4/relay/devmgr.h index 8dea06dc1..83fdd89e8 100644 --- a/xwords4/relay/devmgr.h +++ b/xwords4/relay/devmgr.h @@ -35,10 +35,11 @@ class DevMgr { void Remember( DevIDRelay devid, const AddrInfo* addr ); const AddrInfo::AddrUnion* get( DevIDRelay devid ); - void printDevices( string& str ); + /* Called from ctrl port */ + void printDevices( string& str, DevIDRelay devid /* 0 means all */ ); private: - DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); } + /* destructor's never called.... ~DevMgr() { pthread_mutex_destroy( &m_mapLock ); } */ @@ -52,6 +53,10 @@ class DevMgr { time_t m_added; }; + DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); } + void addDevice( map& devs, + map::const_iterator iter ); + map m_devAddrMap; map m_addrDevMap; pthread_mutex_t m_mapLock; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index ad57fe251..82bcad572 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -601,6 +601,62 @@ send_havemsgs( const AddrInfo* addr ) send_via_udp( addr, NULL, XWPDEV_HAVEMSGS, NULL ); } +class MsgClosure { +public: + MsgClosure( DevIDRelay devid, gpointer msg, uint16_t len ) + { + m_devid = devid; + m_len = len; + m_msg = g_malloc(len); + memcpy( m_msg, msg, len ); + } + ~MsgClosure() { + g_free( m_msg ); + } + DevIDRelay m_devid; + uint16_t m_len; + gpointer m_msg; +}; + +static void +onPostedMsgAcked( bool acked, uint32_t packetID, void* data ) +{ + MsgClosure* mc = (MsgClosure*)data; + if ( !acked ) { + DBMgr::Get()->StoreMessage( mc->m_devid, + (const unsigned char*)mc->m_msg, + mc->m_len ); + } + delete mc; +} + +bool +post_message( DevIDRelay devid, const char* message ) +{ + const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( devid ); + bool success = !!addru; + if ( success ) { + AddrInfo addr( addru ); + short len = strlen(message); + short netLen = htons(len); + uint32_t packetID; + uint8_t buf[len + sizeof(netLen) + sizeof(XWPDEV_METAMSG)]; + buf[0] = XWPDEV_METAMSG; + memcpy( &buf[sizeof(XWPDEV_METAMSG)], &netLen, sizeof(netLen) ); + memcpy( &buf[sizeof(XWPDEV_METAMSG) + sizeof(netLen)], message, len ); + + bool sent = send_via_udp( &addr, &packetID, XWPDEV_METAMSG, &buf[1], + VSIZE(buf)-1, NULL ); + if ( sent ) { + MsgClosure* mc = new MsgClosure( devid, buf, VSIZE(buf) ); + UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc ); + } else { + DBMgr::Get()->StoreMessage( devid, (const unsigned char*)buf, VSIZE(buf) ); + } + } + return success; +} + /* A CONNECT message from a device gives us the hostID and socket we'll * associate with one participant in a relayed session. We'll store this * information with the cookie where other participants can find it when they @@ -1355,8 +1411,11 @@ retrieveMessages( DevID& devID, const AddrInfo* addr ) for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) { DBMgr::MsgInfo msg = *iter; uint32_t packetID; - if ( !send_msg_via_udp( addr, msg.token, (unsigned char*)msg.msg.c_str(), + if ( !send_msg_via_udp( addr, msg.token, + (unsigned char*)msg.msg.c_str(), msg.msg.length(), &packetID ) ) { + logf( XW_LOGERROR, "%s: unable to send to devID %d", + __func__, devID.asRelayID() ); break; } UDPAckTrack::setOnAck( onMsgAcked, packetID, (void*)msg.msgID ); @@ -1471,7 +1530,13 @@ handle_udp_packet( UdpThreadClosure* utc ) DevID devID( ID_TYPE_RELAY ); devID.m_devIDString.append( (const char*)ptr, idLen ); ptr += idLen; - retrieveMessages( devID, utc->addr() ); + + const AddrInfo* addr = utc->addr(); + DevMgr::Get()->Remember( devID.asRelayID(), addr ); + + if ( XWPDEV_RQSTMSGS == header.cmd ) { + retrieveMessages( devID, addr ); + } break; } case XWPDEV_ACK: { diff --git a/xwords4/relay/xwrelay.h b/xwords4/relay/xwrelay.h index 2674f7c89..43266f8b2 100644 --- a/xwords4/relay/xwrelay.h +++ b/xwords4/relay/xwrelay.h @@ -94,6 +94,8 @@ enum { XWPDEV_NONE /* 0 is an illegal value */ ,XWPDEV_DELGAME /* dev->relay: game's been deleted. format: header, relayid: 4, clientToken: 4 */ + ,XWPDEV_METAMSG /* Message to be displayed to user */ + } #ifndef CANT_DO_TYPEDEF XWRelayReg diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 7c5bc0577..35b9fa57f 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -50,6 +50,8 @@ bool send_with_length_unsafe( const AddrInfo* addr, uint32_t* packetIDP ); void send_havemsgs( const AddrInfo* addr ); +bool post_message( DevIDRelay devid, const char* message ); + time_t uptime(void); void blockSignals( void ); /* call from all but main thread */