track and log types of (un-acked) messages

This commit is contained in:
Eric House 2018-05-24 21:03:25 -07:00
parent 39a5ce92ee
commit 89f4246e83
4 changed files with 56 additions and 31 deletions

View file

@ -35,16 +35,16 @@ UDPAckTrack::nextPacketID( XWRelayReg cmd )
{ {
uint32_t result = 0; uint32_t result = 0;
if ( shouldAck( cmd ) ) { if ( shouldAck( cmd ) ) {
result = get()->nextPacketIDImpl(); result = get()->nextPacketIDImpl( cmd );
assert( PACKETID_NONE != result ); assert( PACKETID_NONE != result );
} }
return result; return result;
} }
/* static*/ void /* static*/ string
UDPAckTrack::recordAck( uint32_t packetID ) UDPAckTrack::recordAck( uint32_t packetID )
{ {
get()->recordAckImpl( packetID ); return get()->recordAckImpl( packetID );
} }
/* static */ bool /* static */ bool
@ -97,33 +97,37 @@ UDPAckTrack::ackLimit()
} }
uint32_t uint32_t
UDPAckTrack::nextPacketIDImpl() UDPAckTrack::nextPacketIDImpl( XWRelayReg cmd )
{ {
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
uint32_t result = ++m_nextID; uint32_t result = ++m_nextID;
AckRecord record; AckRecord record( cmd , result );
m_pendings.insert( pair<uint32_t,AckRecord>(result, record) ); m_pendings.insert( pair<uint32_t,AckRecord>(result, record) );
return result; return result;
} }
void string
UDPAckTrack::recordAckImpl( uint32_t packetID ) UDPAckTrack::recordAckImpl( uint32_t packetID )
{ {
string str;
map<uint32_t, AckRecord>::iterator iter; map<uint32_t, AckRecord>::iterator iter;
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
iter = m_pendings.find( packetID ); iter = m_pendings.find( packetID );
if ( m_pendings.end() == iter ) { if ( m_pendings.end() == iter ) {
logf( XW_LOGERROR, "%s: packet ID %d not found", __func__, packetID ); logf( XW_LOGERROR, "%s: packet ID %d not found", __func__, packetID );
} else { } else {
time_t took = time( NULL ) - iter->second.m_createTime; AckRecord& rec = iter->second;
str = rec.toStr();
time_t took = time( NULL ) - rec.m_createTime;
if ( 5 < took ) { if ( 5 < took ) {
logf( XW_LOGERROR, "%s: packet ID %d took %d seconds to get acked", logf( XW_LOGERROR, "%s: packet %s took %d seconds to get acked",
__func__, packetID, took ); __func__, str.c_str(), took );
} }
callProc( iter, true ); callProc( iter, true );
m_pendings.erase( iter ); m_pendings.erase( iter );
} }
return str;
} }
bool bool
@ -134,8 +138,8 @@ UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data )
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
map<uint32_t, AckRecord>::iterator iter = m_pendings.find( packetID ); map<uint32_t, AckRecord>::iterator iter = m_pendings.find( packetID );
if ( m_pendings.end() != iter ) { if ( m_pendings.end() != iter ) {
iter->second.proc = proc; iter->second.m_proc = proc;
iter->second.data = data; iter->second.m_data = data;
} }
} }
return canAdd; return canAdd;
@ -180,12 +184,12 @@ void
UDPAckTrack::callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked ) UDPAckTrack::callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked )
{ {
const AckRecord* record = &(iter->second); const AckRecord* record = &(iter->second);
OnAckProc proc = record->proc; OnAckProc proc = record->m_proc;
if ( NULL != proc ) { if ( NULL != proc ) {
uint32_t packetID = iter->first; uint32_t packetID = iter->first;
logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__, logf( XW_LOGINFO, "%s(packetID=%d, acked=%d, proc=%p)", __func__,
packetID, acked, proc ); packetID, acked, proc );
(*proc)( acked, packetID, record->data ); (*proc)( acked, packetID, record->m_data );
} }
} }
@ -195,15 +199,16 @@ UDPAckTrack::threadProc()
for ( ; ; ) { for ( ; ; ) {
time_t limit = ackLimit(); time_t limit = ackLimit();
sleep( limit / 2 ); sleep( limit / 2 );
vector<uint32_t> older; vector<string> older;
{ {
MutexLock ml( &m_mutex ); MutexLock ml( &m_mutex );
time_t now = time( NULL ); time_t now = time( NULL );
map<uint32_t, AckRecord>::iterator iter; map<uint32_t, AckRecord>::iterator iter;
for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) { for ( iter = m_pendings.begin(); m_pendings.end() != iter; ) {
time_t took = now - iter->second.m_createTime; AckRecord& rec = iter->second;
time_t took = now - rec.m_createTime;
if ( limit < took ) { if ( limit < took ) {
older.push_back( iter->first ); older.push_back( rec.toStr() );
callProc( iter, false ); callProc( iter, false );
m_pendings.erase( iter++ ); m_pendings.erase( iter++ );
} else { } else {
@ -212,14 +217,14 @@ UDPAckTrack::threadProc()
} }
} }
if ( 0 < older.size() ) { if ( 0 < older.size() ) {
StrWPF leaked; string leaked;
vector<uint32_t>::const_iterator iter = older.begin(); vector<string>::const_iterator iter = older.begin();
for ( ; ; ) { for ( ; ; ) {
leaked.catf( "%d", *iter ); leaked += iter->c_str();
if ( ++iter == older.end() ) { if ( ++iter == older.end() ) {
break; break;
} }
leaked.catf( ", " ); leaked += ", ";
} }
logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd " logf( XW_LOGERROR, "%s: these packets leaked (were not ack'd "
"within %d seconds): %s", __func__, "within %d seconds): %s", __func__,

View file

@ -1,6 +1,7 @@
/* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */ /* -*- compile-command: "make -j3"; -*- */
/* /*
* Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. * Copyright 2013 - 2018 by Eric House (xwords@eehouse.org). All rights
* reserved.
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -20,6 +21,8 @@
#ifndef _UDPACK_H_ #ifndef _UDPACK_H_
#define _UDPACK_H_ #define _UDPACK_H_
#include <stdio.h>
#include "xwrelay_priv.h" #include "xwrelay_priv.h"
#include "xwrelay.h" #include "xwrelay.h"
#include "strwpf.h" #include "strwpf.h"
@ -28,10 +31,26 @@ typedef void (*OnAckProc)( bool acked, uint32_t packetID, void* data );
class AckRecord { class AckRecord {
public: public:
AckRecord() { m_createTime = time( NULL ); proc = NULL; } AckRecord( XWRelayReg cmd, uint32_t id ) {
m_createTime = time( NULL );
m_proc = NULL;
m_cmd = cmd;
m_id = id;
}
string toStr()
{
char buf[64];
sprintf( buf, "%d/%s", m_id, msgToStr( m_cmd ) );
string str(buf);
return str;
}
time_t m_createTime; time_t m_createTime;
OnAckProc proc; OnAckProc m_proc;
void* data; XWRelayReg m_cmd;
void* m_data;
private:
uint32_t m_id;
}; };
class UDPAckTrack { class UDPAckTrack {
@ -39,7 +58,7 @@ class UDPAckTrack {
static const uint32_t PACKETID_NONE = 0; static const uint32_t PACKETID_NONE = 0;
static uint32_t nextPacketID( XWRelayReg cmd ); static uint32_t nextPacketID( XWRelayReg cmd );
static void recordAck( uint32_t packetID ); static string recordAck( uint32_t packetID );
static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data ); static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data );
static bool shouldAck( XWRelayReg cmd ); static bool shouldAck( XWRelayReg cmd );
/* called from ctrl port */ /* called from ctrl port */
@ -51,8 +70,8 @@ class UDPAckTrack {
static void* thread_main( void* arg ); static void* thread_main( void* arg );
UDPAckTrack(); UDPAckTrack();
time_t ackLimit(); time_t ackLimit();
uint32_t nextPacketIDImpl(); uint32_t nextPacketIDImpl( XWRelayReg cmd );
void recordAckImpl( uint32_t packetID ); string recordAckImpl( uint32_t packetID );
bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data );
void callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked ); void callProc( const map<uint32_t, AckRecord>::iterator iter, bool acked );
void printAcksImpl( StrWPF& out ); void printAcksImpl( StrWPF& out );

View file

@ -1702,7 +1702,7 @@ retrieveMessages( DevID& devID, const AddrInfo* addr )
} }
} }
static const char* const char*
msgToStr( XWRelayReg msg ) msgToStr( XWRelayReg msg )
{ {
const char* str; const char* str;
@ -1855,8 +1855,8 @@ handle_udp_packet( PacketThreadClosure* ptc )
case XWPDEV_ACK: { case XWPDEV_ACK: {
uint32_t packetID; uint32_t packetID;
if ( vli2un( &ptr, end, &packetID ) ) { if ( vli2un( &ptr, end, &packetID ) ) {
logf( XW_LOGINFO, "%s: got ack for packet %d", __func__, packetID ); string str = UDPAckTrack::recordAck( packetID );
UDPAckTrack::recordAck( packetID ); logf( XW_LOGINFO, "%s: got ack for packet %s", __func__, str.c_str() );
} }
break; break;
} }

View file

@ -69,6 +69,7 @@ int read_packet( int sock, uint8_t* buf, int buflen );
void onMsgAcked( bool acked, uint32_t packetID, void* data ); void onMsgAcked( bool acked, uint32_t packetID, void* data );
const char* cmdToStr( XWRELAY_Cmd cmd ); const char* cmdToStr( XWRELAY_Cmd cmd );
const char* msgToStr( XWRelayReg msg );
extern class ListenerMgr g_listeners; extern class ListenerMgr g_listeners;