Make ack timeout settable from ctrl port; add ctrl facility to print

and nack outstanding acks.
This commit is contained in:
Eric House 2013-08-31 08:30:25 -07:00
parent 07c9722433
commit a95d620b09
3 changed files with 200 additions and 84 deletions

View file

@ -49,6 +49,7 @@
#include "lstnrmgr.h" #include "lstnrmgr.h"
#include "tpool.h" #include "tpool.h"
#include "devmgr.h" #include "devmgr.h"
#include "udpack.h"
/* this is *only* for testing. Don't abuse!!!! */ /* this is *only* for testing. Don't abuse!!!! */
extern pthread_rwlock_t gCookieMapRWLock; extern pthread_rwlock_t gCookieMapRWLock;
@ -65,6 +66,7 @@ vector<int> g_ctrlSocks;
pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER;
static bool cmd_acks( int socket, const char* cmd, int argc, gchar** argv );
static bool cmd_quit( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_quit( int socket, const char* cmd, int argc, gchar** argv );
static bool cmd_print( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_print( int socket, const char* cmd, int argc, gchar** argv );
static bool cmd_devs( int socket, const char* cmd, int argc, gchar** argv ); static bool cmd_devs( int socket, const char* cmd, int argc, gchar** argv );
@ -126,6 +128,7 @@ print_to_sock( int sock, bool addCR, const char* what, ... )
static const FuncRec gFuncs[] = { static const FuncRec gFuncs[] = {
{ "?", cmd_help }, { "?", cmd_help },
{ "acks", cmd_acks },
{ "crash", cmd_crash }, { "crash", cmd_crash },
/* { "eject", cmd_kill_eject }, */ /* { "eject", cmd_kill_eject }, */
{ "get", cmd_get }, { "get", cmd_get },
@ -240,9 +243,11 @@ static bool
cmd_get( int socket, const char* cmd, int argc, gchar** argv ) cmd_get( int socket, const char* cmd, int argc, gchar** argv )
{ {
bool needsHelp = true; bool needsHelp = true;
if ( 2 == argc ) { if ( 2 >= argc ) {
int val;
RelayConfigs* rc = RelayConfigs::GetConfigs();
string attr(argv[1]); string attr(argv[1]);
const char* const attrs[] = { "help", "listeners", "loglevel" }; const char* const attrs[] = { "help", "listeners", "loglevel" , "acklimit" };
int index = match( &attr, attrs, sizeof(attrs[0]), int index = match( &attr, attrs, sizeof(attrs[0]),
sizeof(attrs)/sizeof(attrs[0])); sizeof(attrs)/sizeof(attrs[0]));
@ -264,14 +269,12 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv )
needsHelp = false; needsHelp = false;
} }
break; break;
case 2: { case 2:
RelayConfigs* rc = RelayConfigs::GetConfigs(); case 3: {
int level; const char* key = 2 == index? "LOGLEVEL" : "UDP_ACK_LIMIT";
if ( NULL != rc && rc->GetValueFor( "LOGLEVEL", &level ) ) { if ( NULL != rc && rc->GetValueFor( key, &val ) ) {
print_to_sock( socket, true, "loglevel=%d\n", level ); print_to_sock( socket, true, "%s=%d\n", attrs[index], val );
needsHelp = false; needsHelp = false;
} else {
logf( XW_LOGERROR, "RelayConfigs::GetConfigs() => NULL" );
} }
} }
break; break;
@ -280,7 +283,6 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv )
print_to_sock( socket, true, "unknown or ambiguous attribute: %s", print_to_sock( socket, true, "unknown or ambiguous attribute: %s",
attr.c_str() ); attr.c_str() );
} }
} }
if ( needsHelp ) { if ( needsHelp ) {
/* includes help */ /* includes help */
@ -288,7 +290,8 @@ cmd_get( int socket, const char* cmd, int argc, gchar** argv )
"* %s -- lists all attributes (unimplemented)\n" "* %s -- lists all attributes (unimplemented)\n"
"* %s listener\n" "* %s listener\n"
"* %s loglevel\n" "* %s loglevel\n"
, cmd, cmd, cmd ); "* %s acklimit\n"
, cmd, cmd, cmd, cmd );
} }
return false; return false;
@ -298,9 +301,10 @@ static bool
cmd_set( int socket, const char* cmd, int argc, gchar** argv ) cmd_set( int socket, const char* cmd, int argc, gchar** argv )
{ {
bool needsHelp = true; bool needsHelp = true;
if ( 3 == argc ) { if ( 3 >= argc ) {
RelayConfigs* rc;
const char* val = argv[2]; const char* val = argv[2];
const char* const attrs[] = { "help", "listeners", "loglevel" }; const char* const attrs[] = { "help", "listeners", "loglevel", "acklimit" };
string attr(argv[1]); string attr(argv[1]);
int index = match( &attr, attrs, sizeof(attrs[0]), int index = match( &attr, attrs, sizeof(attrs[0]),
sizeof(attrs)/sizeof(attrs[0])); sizeof(attrs)/sizeof(attrs[0]));
@ -323,22 +327,30 @@ cmd_set( int socket, const char* cmd, int argc, gchar** argv )
break; break;
case 2: case 2:
if ( NULL != val && val[0] != '\0' ) { if ( NULL != val && val[0] != '\0' ) {
RelayConfigs* rc = RelayConfigs::GetConfigs(); rc = RelayConfigs::GetConfigs();
if ( rc != NULL ) { if ( rc != NULL ) {
rc->SetValueFor( "LOGLEVEL", val ); rc->SetValueFor( "LOGLEVEL", val );
needsHelp = false; needsHelp = false;
} }
} }
break; break;
case 3:
rc = RelayConfigs::GetConfigs();
if ( rc != NULL ) {
rc->SetValueFor( "UDP_ACK_LIMIT", val );
needsHelp = false;
}
break;
default: default:
break; break;
} }
} }
if ( needsHelp ) { if ( needsHelp ) {
print_to_sock( socket, true, print_to_sock( socket, true,
"* %s listeners <n>,[<n>,..<n>,]\n" "* %s listeners <n>,[<n>,..<n>,]"
"* %s loglevel <n>" "\n* %s loglevel <n>"
,cmd, cmd ); "\n* %s acklimit <n>"
,cmd, cmd, cmd );
} }
return false; return false;
} }
@ -539,6 +551,42 @@ onAckProc( bool acked, DevIDRelay devid, uint32_t packetID, void* data )
print_prompt( socket ); print_prompt( socket );
} }
static bool
cmd_acks( int socket, const char* cmd, int argc, gchar** argv )
{
bool found = false;
string result;
if ( 1 >= argc ) {
/* missing param; let help print */
} else if ( 0 == strcmp( "list", argv[1] ) ) {
UDPAckTrack::printAcks( result );
found = true;
} else if ( 3 == argc && 0 == strcmp( "nack", argv[1] )
&& 0 == strcmp( "all", argv[2] ) ) {
vector<uint32_t> packets;
UDPAckTrack::doNack( packets );
found = true;
}
if ( found ) {
if ( 0 < result.size() ) {
send( socket, result.c_str(), result.size(), 0 );
}
} else {
const char* strs[] = {
"* %s list\n"
,"* %s nack all\n"
};
string help;
for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) {
string_printf( help, strs[ii], cmd );
}
send( socket, help.c_str(), help.size(), 0 );
}
return false;
}
static bool static bool
cmd_devs( int socket, const char* cmd, int argc, gchar** argv ) cmd_devs( int socket, const char* cmd, int argc, gchar** argv )
{ {
@ -546,60 +594,67 @@ cmd_devs( int socket, const char* cmd, int argc, gchar** argv )
string result; string result;
if ( 1 >= argc ) { if ( 1 >= argc ) {
/* missing param; let help print */ /* missing param; let help print */
} else if ( 0 == strcmp( "list", argv[1] ) ) { } else {
vector<DevIDRelay> devids; const gchar* arg1 = argv[1];
if ( 3 == argc && 0 == strcmp( "all", argv[2] ) ) { if ( 0 == strcmp( "list", arg1 ) || 0 == strcmp( "rm", arg1 ) ) {
/* do nothing; empty vector means all */ if ( 3 <= argc ) {
} else { found = true;
for ( int ii = 2; ii < argc; ++ii ) { vector<DevIDRelay> devids;
DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 ); if ( 3 == argc && 0 == strcmp( "all", argv[2] ) ) {
if ( 0 != devid ) { /* do nothing; empty vector means all */
} else {
found = false; /* if all are bogus numbers, drop */
for ( int ii = 2; ii < argc; ++ii ) {
DevIDRelay devid = (DevIDRelay)strtoul( argv[ii],
NULL, 10 );
if ( 0 != devid ) {
devids.push_back( devid );
found = true;
}
}
}
if ( !found ) {
/* do nothing */
} else if ( 0 == strcmp( "list", arg1 ) ) {
DevMgr::Get()->printDevices( result, devids );
} else {
int deleted = DevMgr::Get()->forgetDevices( devids );
string_printf( result, "Deleted %d devices\n", deleted );
}
}
} else if ( 0 == strcmp( "ping", arg1 ) ) {
} else if ( 0 == strcmp( "msg", arg1 ) && 3 < argc ) {
/* Get the list to send to */
vector<DevIDRelay> devids;
if ( 0 == strcmp( "all", argv[3] ) ) {
DevMgr::Get()->getKnownDevices( devids );
} else {
for ( int ii = 3; ii < argc; ++ii ) {
DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 );
devids.push_back( devid ); devids.push_back( devid );
} }
} }
}
DevMgr::Get()->printDevices( result, devids );
found = true;
} else if ( 0 == strcmp( "ping", argv[1] ) ) {
} else if ( 0 == strcmp( "msg", argv[1] ) && 3 < argc ) {
/* Get the list to send to */
vector<DevIDRelay> devids;
if ( 0 == strcmp( "all", argv[3] ) ) {
DevMgr::Get()->getKnownDevices( devids );
} else {
for ( int ii = 3; ii < argc; ++ii ) {
DevIDRelay devid = (DevIDRelay)strtoul( argv[ii], NULL, 10 );
devids.push_back( devid );
}
}
/* Send to each in the list */ /* Send to each in the list */
const char* msg = argv[2]; const char* msg = argv[2];
gchar* unesc = g_strcompress( msg ); gchar* unesc = g_strcompress( msg );
vector<DevIDRelay>::const_iterator iter; vector<DevIDRelay>::const_iterator iter;
for ( iter = devids.begin(); devids.end() != iter; ++iter ) { for ( iter = devids.begin(); devids.end() != iter; ++iter ) {
DevIDRelay devid = *iter; DevIDRelay devid = *iter;
if ( 0 != devid ) { if ( 0 != devid ) {
if ( post_message( devid, unesc, onAckProc, if ( post_message( devid, unesc, onAckProc,
(void*)socket ) ) { (void*)socket ) ) {
string_printf( result, "posted message: %s\n", unesc ); string_printf( result, "posted message: %s\n", unesc );
} else { } else {
string_printf( result, "unable to post; does " string_printf( result, "unable to post; does "
"dev %d exist\n", devid ); "dev %d exist\n", devid );
}
} }
} }
} g_free( unesc );
g_free( unesc );
found = true; found = true;
} else if ( 0 == strcmp( "rm", argv[1] ) && 2 < argc ) {
DevIDRelay devid = (DevIDRelay)strtoul( argv[2], NULL, 10 );
if ( DevMgr::Get()->forgetDevice( devid ) ) {
string_printf( result, "dev %d removed\n", devid );
} else {
string_printf( result, "dev %d unknown\n", devid );
} }
found = true;
} }
if ( found ) { if ( found ) {
@ -612,8 +667,10 @@ cmd_devs( int socket, const char* cmd, int argc, gchar** argv )
" %s ping\n", " %s ping\n",
" %s msg <msg_text> <devid>+\n", " %s msg <msg_text> <devid>+\n",
" %s msg <msg_text> all\n", " %s msg <msg_text> all\n",
" %s rm <devid>\n", " %s rm [all | <devid>]\n",
"* %s rm [all | <id>+]\n"
}; };
string help; string help;
for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) { for ( size_t ii = 0; ii < VSIZE(strs); ++ii ) {
string_printf( help, strs[ii], cmd ); string_printf( help, strs[ii], cmd );

View file

@ -53,6 +53,18 @@ UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data )
return get()->setOnAckImpl( proc, packetID, data ); return get()->setOnAckImpl( proc, packetID, data );
} }
/* static */ void
UDPAckTrack::printAcks( string& out )
{
get()->printAcksImpl( out );
}
/* static */ void
UDPAckTrack::doNack( vector<uint32_t> ids )
{
get()->doNackImpl( ids );
}
/* static */ UDPAckTrack* /* static */ UDPAckTrack*
UDPAckTrack::get() UDPAckTrack::get()
{ {
@ -65,14 +77,6 @@ UDPAckTrack::get()
UDPAckTrack::UDPAckTrack() UDPAckTrack::UDPAckTrack()
{ {
m_nextID = PACKETID_NONE; m_nextID = PACKETID_NONE;
int ackLimit;
if ( RelayConfigs::GetConfigs()->
GetValueFor( "UDP_ACK_LIMIT", &ackLimit ) ) {
m_ackLimit = ackLimit;
} else {
assert( 0 );
m_ackLimit = 60;
}
pthread_mutex_init( &m_mutex, NULL ); pthread_mutex_init( &m_mutex, NULL );
@ -81,12 +85,23 @@ UDPAckTrack::UDPAckTrack()
pthread_detach( thread ); pthread_detach( thread );
} }
time_t
UDPAckTrack::ackLimit()
{
time_t limit;
if ( !RelayConfigs::GetConfigs()->GetValueFor( "UDP_ACK_LIMIT",
&limit ) ) {
assert(0);
}
return limit;
}
uint32_t uint32_t
UDPAckTrack::nextPacketIDImpl() UDPAckTrack::nextPacketIDImpl()
{ {
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
AckRecord record;
uint32_t result = ++m_nextID; uint32_t result = ++m_nextID;
AckRecord record;
m_pendings.insert( pair<uint32_t,AckRecord>(result, record) ); m_pendings.insert( pair<uint32_t,AckRecord>(result, record) );
return result; return result;
} }
@ -106,7 +121,7 @@ UDPAckTrack::recordAckImpl( uint32_t packetID )
__func__, packetID, took ); __func__, packetID, took );
} }
callProc( iter->first, true, &(iter->second) ); callProc( iter, true );
m_pendings.erase( iter ); m_pendings.erase( iter );
} }
} }
@ -127,12 +142,49 @@ UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data )
} }
void void
UDPAckTrack::callProc( uint32_t packetID, bool acked, const AckRecord* record ) UDPAckTrack::printAcksImpl( string& out )
{ {
time_t now = time( NULL );
time_t limit = ackLimit();
MutexLock ml( &m_mutex );
map<uint32_t, AckRecord>::const_iterator iter;
for ( iter = m_pendings.begin(); m_pendings.end() != iter; ++iter ) {
string_printf( out, "id: % 8d; stl: %04d\n", iter->first,
(iter->second.m_createTime + limit) - now );
}
}
void
UDPAckTrack::doNackImpl( vector<uint32_t>& ids )
{
MutexLock ml( &m_mutex );
map<uint32_t, AckRecord>::iterator iter;
if ( 0 == ids.size() ) {
for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) {
callProc( iter, false );
m_pendings.erase( iter++ );
}
} else {
vector<uint32_t>::const_iterator idsIter;
for ( idsIter = ids.begin(); ids.end() != idsIter; ++idsIter ) {
iter = m_pendings.find( *idsIter );
if ( m_pendings.end() != iter ) {
callProc( iter, false );
m_pendings.erase( iter );
}
}
}
}
void
UDPAckTrack::callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked )
{
const AckRecord* record = &(iter->second);
OnAckProc proc = record->proc; OnAckProc proc = record->proc;
if ( NULL != proc ) { if ( NULL != proc ) {
logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__, packetID, uint32_t packetID = iter->first;
acked, proc ); logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__,
packetID, acked, proc );
(*proc)( acked, packetID, record->data ); (*proc)( acked, packetID, record->data );
} }
} }
@ -141,7 +193,8 @@ void*
UDPAckTrack::threadProc() UDPAckTrack::threadProc()
{ {
for ( ; ; ) { for ( ; ; ) {
sleep( m_ackLimit / 2 ); time_t limit = ackLimit();
sleep( limit / 2 );
vector<uint32_t> older; vector<uint32_t> older;
{ {
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
@ -149,9 +202,9 @@ UDPAckTrack::threadProc()
map<uint32_t, AckRecord>::iterator iter; map<uint32_t, AckRecord>::iterator iter;
for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) { for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) {
time_t took = now - iter->second.m_createTime; time_t took = now - iter->second.m_createTime;
if ( m_ackLimit < took ) { if ( limit < took ) {
older.push_back( iter->first ); older.push_back( iter->first );
callProc( iter->first, false, &(iter->second) ); callProc( iter, false );
m_pendings.erase( iter++ ); m_pendings.erase( iter++ );
} else { } else {
++iter; ++iter;
@ -168,8 +221,9 @@ UDPAckTrack::threadProc()
} }
string_printf( leaked, ", " ); string_printf( leaked, ", " );
} }
logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd within %d seconds): %s", __func__, logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd "
m_ackLimit, leaked.c_str() ); "within %d seconds): %s", __func__,
limit, leaked.c_str() );
} }
} }
return NULL; return NULL;

View file

@ -41,22 +41,27 @@ class UDPAckTrack {
static void recordAck( uint32_t packetID ); static void recordAck( uint32_t packetID );
static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data ); static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data );
static bool shouldAck( XWRelayReg cmd ); static bool shouldAck( XWRelayReg cmd );
/* called from ctrl port */
static void printAcks( string& out );
static void doNack( vector<uint32_t> ids );
private: private:
static UDPAckTrack* get(); static UDPAckTrack* get();
static void* thread_main( void* arg ); static void* thread_main( void* arg );
UDPAckTrack(); UDPAckTrack();
time_t ackLimit();
uint32_t nextPacketIDImpl(); uint32_t nextPacketIDImpl();
void recordAckImpl( uint32_t packetID ); void recordAckImpl( uint32_t packetID );
bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data );
void callProc( uint32_t packetID, bool acked, const AckRecord* record ); void callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked );
void printAcksImpl( string& out );
void doNackImpl( vector<uint32_t>& ids );
void* threadProc(); void* threadProc();
static UDPAckTrack* s_self; static UDPAckTrack* s_self;
uint32_t m_nextID; uint32_t m_nextID;
pthread_mutex_t m_mutex; pthread_mutex_t m_mutex;
map<uint32_t, AckRecord> m_pendings; map<uint32_t, AckRecord> m_pendings;
time_t m_ackLimit;
}; };
#endif #endif