From a95d620b0941c5e867a80fdc72b85b81ee17bc08 Mon Sep 17 00:00:00 2001 From: Eric House Date: Sat, 31 Aug 2013 08:30:25 -0700 Subject: [PATCH] Make ack timeout settable from ctrl port; add ctrl facility to print and nack outstanding acks. --- xwords4/relay/ctrl.cpp | 185 +++++++++++++++++++++++++-------------- xwords4/relay/udpack.cpp | 90 +++++++++++++++---- xwords4/relay/udpack.h | 9 +- 3 files changed, 200 insertions(+), 84 deletions(-) diff --git a/xwords4/relay/ctrl.cpp b/xwords4/relay/ctrl.cpp index a25972442..a8008aa68 100644 --- a/xwords4/relay/ctrl.cpp +++ b/xwords4/relay/ctrl.cpp @@ -49,6 +49,7 @@ #include "lstnrmgr.h" #include "tpool.h" #include "devmgr.h" +#include "udpack.h" /* this is *only* for testing. Don't abuse!!!! */ extern pthread_rwlock_t gCookieMapRWLock; @@ -65,6 +66,7 @@ vector g_ctrlSocks; pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER; +static bool cmd_acks( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_quit( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_print( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_devs( int socket, const char* cmd, int argc, gchar** argv ); @@ -126,6 +128,7 @@ print_to_sock( int sock, bool addCR, const char* what, ... ) static const FuncRec gFuncs[] = { { "?", cmd_help }, + { "acks", cmd_acks }, { "crash", cmd_crash }, /* { "eject", cmd_kill_eject }, */ { "get", cmd_get }, @@ -240,9 +243,11 @@ static bool cmd_get( int socket, const char* cmd, int argc, gchar** argv ) { bool needsHelp = true; - if ( 2 == argc ) { + if ( 2 >= argc ) { + int val; + RelayConfigs* rc = RelayConfigs::GetConfigs(); string attr(argv[1]); - const char* const attrs[] = { "help", "listeners", "loglevel" }; + const char* const attrs[] = { "help", "listeners", "loglevel" , "acklimit" }; int index = match( &attr, attrs, sizeof(attrs[0]), sizeof(attrs)/sizeof(attrs[0])); @@ -264,14 +269,12 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv ) needsHelp = false; } break; - case 2: { - RelayConfigs* rc = RelayConfigs::GetConfigs(); - int level; - if ( NULL != rc && rc->GetValueFor( "LOGLEVEL", &level ) ) { - print_to_sock( socket, true, "loglevel=%d\n", level ); + case 2: + case 3: { + const char* key = 2 == index? "LOGLEVEL" : "UDP_ACK_LIMIT"; + if ( NULL != rc && rc->GetValueFor( key, &val ) ) { + print_to_sock( socket, true, "%s=%d\n", attrs[index], val ); needsHelp = false; - } else { - logf( XW_LOGERROR, "RelayConfigs::GetConfigs() => NULL" ); } } break; @@ -280,7 +283,6 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv ) print_to_sock( socket, true, "unknown or ambiguous attribute: %s", attr.c_str() ); } - } if ( needsHelp ) { /* includes help */ @@ -288,7 +290,8 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv ) "* %s -- lists all attributes (unimplemented)\n" "* %s listener\n" "* %s loglevel\n" - , cmd, cmd, cmd ); + "* %s acklimit\n" + , cmd, cmd, cmd, cmd ); } return false; @@ -298,9 +301,10 @@ static bool cmd_set( int socket, const char* cmd, int argc, gchar** argv ) { bool needsHelp = true; - if ( 3 == argc ) { + if ( 3 >= argc ) { + RelayConfigs* rc; const char* val = argv[2]; - const char* const attrs[] = { "help", "listeners", "loglevel" }; + const char* const attrs[] = { "help", "listeners", "loglevel", "acklimit" }; string attr(argv[1]); int index = match( &attr, attrs, sizeof(attrs[0]), sizeof(attrs)/sizeof(attrs[0])); @@ -323,22 +327,30 @@ cmd_set( int socket, const char* cmd, int argc, gchar** argv ) break; case 2: if ( NULL != val && val[0] != '\0' ) { - RelayConfigs* rc = RelayConfigs::GetConfigs(); + rc = RelayConfigs::GetConfigs(); if ( rc != NULL ) { rc->SetValueFor( "LOGLEVEL", val ); needsHelp = false; } } break; + case 3: + rc = RelayConfigs::GetConfigs(); + if ( rc != NULL ) { + rc->SetValueFor( "UDP_ACK_LIMIT", val ); + needsHelp = false; + } + break; default: break; } } if ( needsHelp ) { print_to_sock( socket, true, - "* %s listeners ,[,..,]\n" - "* %s loglevel " - ,cmd, cmd ); + "* %s listeners ,[,..,]" + "\n* %s loglevel " + "\n* %s acklimit " + ,cmd, cmd, cmd ); } return false; } @@ -539,6 +551,42 @@ onAckProc( bool acked, DevIDRelay devid, uint32_t packetID, void* data ) print_prompt( socket ); } +static bool +cmd_acks( int socket, const char* cmd, int argc, gchar** argv ) +{ + bool found = false; + string result; + + if ( 1 >= argc ) { + /* missing param; let help print */ + } else if ( 0 == strcmp( "list", argv[1] ) ) { + UDPAckTrack::printAcks( result ); + found = true; + } else if ( 3 == argc && 0 == strcmp( "nack", argv[1] ) + && 0 == strcmp( "all", argv[2] ) ) { + vector packets; + UDPAckTrack::doNack( packets ); + found = true; + } + + if ( found ) { + if ( 0 < result.size() ) { + send( socket, result.c_str(), result.size(), 0 ); + } + } else { + const char* strs[] = { + "* %s list\n" + ,"* %s nack all\n" + }; + string help; + for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) { + string_printf( help, strs[ii], cmd ); + } + send( socket, help.c_str(), help.size(), 0 ); + } + return false; +} + static bool cmd_devs( int socket, const char* cmd, int argc, gchar** argv ) { @@ -546,60 +594,67 @@ cmd_devs( int socket, const char* cmd, int argc, gchar** argv ) string result; if ( 1 >= argc ) { /* missing param; let help print */ - } else if ( 0 == strcmp( "list", argv[1] ) ) { - vector devids; - if ( 3 == argc && 0 == strcmp( "all", argv[2] ) ) { - /* do nothing; empty vector means all */ - } else { - for ( int ii = 2; ii < argc; ++ii ) { - DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 ); - if ( 0 != devid ) { + } else { + const gchar* arg1 = argv[1]; + if ( 0 == strcmp( "list", arg1 ) || 0 == strcmp( "rm", arg1 ) ) { + if ( 3 <= argc ) { + found = true; + vector devids; + if ( 3 == argc && 0 == strcmp( "all", argv[2] ) ) { + /* do nothing; empty vector means all */ + } else { + found = false; /* if all are bogus numbers, drop */ + for ( int ii = 2; ii < argc; ++ii ) { + DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], + NULL, 10 ); + if ( 0 != devid ) { + devids.push_back( devid ); + found = true; + } + } + } + if ( !found ) { + /* do nothing */ + } else if ( 0 == strcmp( "list", arg1 ) ) { + DevMgr::Get()->printDevices( result, devids ); + } else { + int deleted = DevMgr::Get()->forgetDevices( devids ); + string_printf( result, "Deleted %d devices\n", deleted ); + } + } + } else if ( 0 == strcmp( "ping", arg1 ) ) { + } else if ( 0 == strcmp( "msg", arg1 ) && 3 < argc ) { + /* Get the list to send to */ + vector devids; + if ( 0 == strcmp( "all", argv[3] ) ) { + DevMgr::Get()->getKnownDevices( devids ); + } else { + for ( int ii = 3; ii < argc; ++ii ) { + DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 ); devids.push_back( devid ); } } - } - DevMgr::Get()->printDevices( result, devids ); - found = true; - } else if ( 0 == strcmp( "ping", argv[1] ) ) { - } else if ( 0 == strcmp( "msg", argv[1] ) && 3 < argc ) { - /* Get the list to send to */ - vector devids; - if ( 0 == strcmp( "all", argv[3] ) ) { - DevMgr::Get()->getKnownDevices( devids ); - } else { - for ( int ii = 3; ii < argc; ++ii ) { - DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 ); - devids.push_back( devid ); - } - } - /* Send to each in the list */ - const char* msg = argv[2]; - gchar* unesc = g_strcompress( msg ); - vector::const_iterator iter; - for ( iter = devids.begin(); devids.end() != iter; ++iter ) { - DevIDRelay devid = *iter; - if ( 0 != devid ) { - if ( post_message( devid, unesc, onAckProc, - (void*)socket ) ) { - string_printf( result, "posted message: %s\n", unesc ); - } else { - string_printf( result, "unable to post; does " - "dev %d exist\n", devid ); + /* Send to each in the list */ + const char* msg = argv[2]; + gchar* unesc = g_strcompress( msg ); + vector::const_iterator iter; + for ( iter = devids.begin(); devids.end() != iter; ++iter ) { + DevIDRelay devid = *iter; + if ( 0 != devid ) { + if ( post_message( devid, unesc, onAckProc, + (void*)socket ) ) { + string_printf( result, "posted message: %s\n", unesc ); + } else { + string_printf( result, "unable to post; does " + "dev %d exist\n", devid ); + } } } - } - g_free( unesc ); + g_free( unesc ); - found = true; - } else if ( 0 == strcmp( "rm", argv[1] ) && 2 < argc ) { - DevIDRelay devid = (DevIDRelay)strtoul( argv[2], NULL, 10 ); - if ( DevMgr::Get()->forgetDevice( devid ) ) { - string_printf( result, "dev %d removed\n", devid ); - } else { - string_printf( result, "dev %d unknown\n", devid ); + found = true; } - found = true; } if ( found ) { @@ -612,8 +667,10 @@ cmd_devs( int socket, const char* cmd, int argc, gchar** argv ) " %s ping\n", " %s msg +\n", " %s msg all\n", - " %s rm \n", + " %s rm [all | ]\n", + "* %s rm [all | +]\n" }; + string help; for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) { string_printf( help, strs[ii], cmd ); diff --git a/xwords4/relay/udpack.cpp b/xwords4/relay/udpack.cpp index 0a687126d..72cab2e23 100644 --- a/xwords4/relay/udpack.cpp +++ b/xwords4/relay/udpack.cpp @@ -53,6 +53,18 @@ UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data ) return get()->setOnAckImpl( proc, packetID, data ); } +/* static */ void +UDPAckTrack::printAcks( string& out ) +{ + get()->printAcksImpl( out ); +} + +/* static */ void +UDPAckTrack::doNack( vector ids ) +{ + get()->doNackImpl( ids ); +} + /* static */ UDPAckTrack* UDPAckTrack::get() { @@ -65,14 +77,6 @@ UDPAckTrack::get() UDPAckTrack::UDPAckTrack() { m_nextID = PACKETID_NONE; - int ackLimit; - if ( RelayConfigs::GetConfigs()-> - GetValueFor( "UDP_ACK_LIMIT", &ackLimit ) ) { - m_ackLimit = ackLimit; - } else { - assert( 0 ); - m_ackLimit = 60; - } pthread_mutex_init( &m_mutex, NULL ); @@ -81,12 +85,23 @@ UDPAckTrack::UDPAckTrack() pthread_detach( thread ); } +time_t +UDPAckTrack::ackLimit() +{ + time_t limit; + if ( !RelayConfigs::GetConfigs()->GetValueFor( "UDP_ACK_LIMIT", + &limit ) ) { + assert(0); + } + return limit; +} + uint32_t UDPAckTrack::nextPacketIDImpl() { MutexLock ml( &m_mutex ); - AckRecord record; uint32_t result = ++m_nextID; + AckRecord record; m_pendings.insert( pair(result, record) ); return result; } @@ -106,7 +121,7 @@ UDPAckTrack::recordAckImpl( uint32_t packetID ) __func__, packetID, took ); } - callProc( iter->first, true, &(iter->second) ); + callProc( iter, true ); m_pendings.erase( iter ); } } @@ -127,12 +142,49 @@ UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ) } void -UDPAckTrack::callProc( uint32_t packetID, bool acked, const AckRecord* record ) +UDPAckTrack::printAcksImpl( string& out ) { + time_t now = time( NULL ); + time_t limit = ackLimit(); + MutexLock ml( &m_mutex ); + map::const_iterator iter; + for ( iter = m_pendings.begin(); m_pendings.end() != iter; ++iter ) { + string_printf( out, "id: % 8d; stl: %04d\n", iter->first, + (iter->second.m_createTime + limit) - now ); + } +} + +void +UDPAckTrack::doNackImpl( vector& ids ) +{ + MutexLock ml( &m_mutex ); + map::iterator iter; + if ( 0 == ids.size() ) { + for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) { + callProc( iter, false ); + m_pendings.erase( iter++ ); + } + } else { + vector::const_iterator idsIter; + for ( idsIter = ids.begin(); ids.end() != idsIter; ++idsIter ) { + iter = m_pendings.find( *idsIter ); + if ( m_pendings.end() != iter ) { + callProc( iter, false ); + m_pendings.erase( iter ); + } + } + } +} + +void +UDPAckTrack::callProc( const map::iterator iter, bool acked ) +{ + const AckRecord* record = &(iter->second); OnAckProc proc = record->proc; if ( NULL != proc ) { - logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__, packetID, - acked, proc ); + uint32_t packetID = iter->first; + logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__, + packetID, acked, proc ); (*proc)( acked, packetID, record->data ); } } @@ -141,7 +193,8 @@ void* UDPAckTrack::threadProc() { for ( ; ; ) { - sleep( m_ackLimit / 2 ); + time_t limit = ackLimit(); + sleep( limit / 2 ); vector older; { MutexLock ml( &m_mutex ); @@ -149,9 +202,9 @@ UDPAckTrack::threadProc() map::iterator iter; for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) { time_t took = now - iter->second.m_createTime; - if ( m_ackLimit < took ) { + if ( limit < took ) { older.push_back( iter->first ); - callProc( iter->first, false, &(iter->second) ); + callProc( iter, false ); m_pendings.erase( iter++ ); } else { ++iter; @@ -168,8 +221,9 @@ UDPAckTrack::threadProc() } string_printf( leaked, ", " ); } - logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd within %d seconds): %s", __func__, - m_ackLimit, leaked.c_str() ); + logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd " + "within %d seconds): %s", __func__, + limit, leaked.c_str() ); } } return NULL; diff --git a/xwords4/relay/udpack.h b/xwords4/relay/udpack.h index a5653e8f7..ee946cd41 100644 --- a/xwords4/relay/udpack.h +++ b/xwords4/relay/udpack.h @@ -41,22 +41,27 @@ class UDPAckTrack { static void recordAck( uint32_t packetID ); static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data ); static bool shouldAck( XWRelayReg cmd ); + /* called from ctrl port */ + static void printAcks( string& out ); + static void doNack( vector ids ); private: static UDPAckTrack* get(); static void* thread_main( void* arg ); UDPAckTrack(); + time_t ackLimit(); uint32_t nextPacketIDImpl(); void recordAckImpl( uint32_t packetID ); bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); - void callProc( uint32_t packetID, bool acked, const AckRecord* record ); + void callProc( const map::iterator iter, bool acked ); + void printAcksImpl( string& out ); + void doNackImpl( vector& ids ); void* threadProc(); static UDPAckTrack* s_self; uint32_t m_nextID; pthread_mutex_t m_mutex; map m_pendings; - time_t m_ackLimit; }; #endif