add loop to print when packets haven't been ack'd

This commit is contained in:
Eric House 2013-01-24 07:43:24 -08:00
parent da4d841220
commit c465a0bb0c
3 changed files with 160 additions and 17 deletions

View file

@ -18,6 +18,113 @@
*/
#include "udpack.h"
#include "mlock.h"
uint32_t UDPAckTrack::s_nextID = 0;
UDPAckTrack* UDPAckTrack::s_self = NULL;
/* static*/ uint32_t
UDPAckTrack::nextPacketID( XWRelayReg cmd )
{
return get()->nextPacketIDImpl( cmd );
}
/* static*/ void
UDPAckTrack::recordAck( uint32_t packetID )
{
get()->recordAckImpl( packetID );
}
/* static */ UDPAckTrack*
UDPAckTrack::get()
{
if ( NULL == s_self ) {
s_self = new UDPAckTrack();
}
return s_self;
}
UDPAckTrack::UDPAckTrack()
{
m_nextID = 0;
pthread_mutex_init( &m_mutex, NULL );
pthread_t thread;
pthread_create( &thread, NULL, thread_main, (void*)this );
pthread_detach( thread );
}
uint32_t
UDPAckTrack::nextPacketIDImpl( XWRelayReg cmd )
{
uint32_t result;
if ( XWPDEV_ACK == cmd || XWPDEV_ALERT == cmd ) {
result = 0;
} else {
MutexLock ml( &m_mutex );
AckRecord record;
result = ++m_nextID;
m_pendings.insert( pair<uint32_t,AckRecord>(result, record) );
}
return result;
}
void
UDPAckTrack::recordAckImpl( uint32_t packetID )
{
map<uint32_t, AckRecord>::iterator iter;
MutexLock ml( &m_mutex );
iter = m_pendings.find( packetID );
if ( m_pendings.end() == iter ) {
logf( XW_LOGERROR, "%s: packet ID %d not found", __func__, packetID );
} else {
time_t took = time( NULL ) - iter->second.m_createTime;
if ( 5 < took ) {
logf( XW_LOGERROR, "%s: packet ID %d took %d seconds to get acked", __func__, packetID );
}
m_pendings.erase( iter );
}
}
void*
UDPAckTrack::threadProc()
{
for ( ; ; ) {
sleep( 30 );
time_t now = time( NULL );
vector<uint32_t> older;
{
MutexLock ml( &m_mutex );
map<uint32_t, AckRecord>::iterator iter;
for ( iter = m_pendings.begin(); iter != m_pendings.end(); ++iter ) {
time_t took = now - iter->second.m_createTime;
if ( 60 < took ) {
older.push_back( iter->first );
m_pendings.erase( iter );
}
}
}
if ( 0 < older.size() ) {
string leaked;
vector<uint32_t>::const_iterator iter = older.begin();
for ( ; ; ) {
string_printf( leaked, "%d", *iter );
if ( ++iter == older.end() ) {
break;
}
string_printf( leaked, ", " );
}
logf( XW_LOGERROR, "these packets leaked: %s", leaked.c_str() );
} else {
logf( XW_LOGINFO, "no packets leaked" );
}
}
return NULL;
}
/* static */ void*
UDPAckTrack::thread_main( void* arg )
{
UDPAckTrack* self = (UDPAckTrack*)arg;
return self->threadProc();
}

View file

@ -21,16 +21,31 @@
#define _UDPACK_H_
#include "xwrelay_priv.h"
#include "xwrelay.h"
class AckRecord {
public:
AckRecord() { m_createTime = time( NULL ); }
time_t m_createTime;
};
class UDPAckTrack {
public:
static uint32_t nextPacketID() { return ++s_nextID; }
static void recordAck( uint32_t packetID ) {
logf( XW_LOGINFO, "received ack for %d", packetID );
}
static uint32_t nextPacketID( XWRelayReg cmd );
static void recordAck( uint32_t packetID );
private:
static uint32_t s_nextID;
static UDPAckTrack* get();
static void* thread_main( void* arg );
UDPAckTrack();
uint32_t nextPacketIDImpl( XWRelayReg cmd );
void recordAckImpl( uint32_t packetID );
void* threadProc();
static UDPAckTrack* s_self;
uint32_t m_nextID;
pthread_mutex_t m_mutex;
map<uint32_t, AckRecord> m_pendings;
};
#endif

View file

@ -80,6 +80,12 @@
#include "udpqueue.h"
#include "udpack.h"
typedef struct _UDPHeader {
uint32_t packetID;
unsigned char proto;
XWRelayReg cmd;
} UDPHeader;
static int s_nSpawns = 0;
static int g_maxsocks = -1;
static int g_udpsock = -1;
@ -262,6 +268,23 @@ getNetString( const unsigned char** bufpp, const unsigned char* end, string& out
return success;
}
static bool
getHeader( const unsigned char** bufpp, const unsigned char* end,
UDPHeader* header )
{
unsigned char byt;
bool success = getNetByte( bufpp, end, &header->proto )
&& getNetLong( bufpp, end, &header->packetID )
&& getNetByte( bufpp, end, &byt )
&& XWPDEV_PROTO_VERSION == header->proto;
if ( success ) {
header->cmd = (XWRelayReg)byt;
} else {
logf( XW_LOGERROR, "%s: bad packet header", __func__ );
}
return success;
}
static void
getDevID( const unsigned char** bufpp, const unsigned char* end,
unsigned short flags, DevID* devID )
@ -356,7 +379,7 @@ static ssize_t
send_via_udp( int socket, const struct sockaddr *dest_addr,
XWRelayReg cmd, ... )
{
uint32_t packetNum = UDPAckTrack::nextPacketID();
uint32_t packetNum = UDPAckTrack::nextPacketID( cmd );
struct iovec vec[10];
int iocount = 0;
@ -1262,14 +1285,10 @@ udp_thread_proc( UdpThreadClosure* utc )
const unsigned char* ptr = utc->buf();
const unsigned char* end = ptr + utc->len();
unsigned char proto = *ptr++;
if ( XWPDEV_PROTO_VERSION != 0 ) {
logf( XW_LOGERROR, "unexpected proto %d", __func__, (int) proto );
} else {
ptr += 4; // skip msgid
XWRelayReg msg = (XWRelayReg)*ptr++;
logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( msg ) );
switch( msg ) {
UDPHeader header;
if ( getHeader( &ptr, end, &header ) ) {
logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( header.cmd ) );
switch( header.cmd ) {
case XWPDEV_REG: {
DevIDType typ = (DevIDType)*ptr++;
unsigned short idLen;
@ -1347,7 +1366,7 @@ udp_thread_proc( UdpThreadClosure* utc )
break;
}
default:
logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, msg );
logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, header.cmd );
}
}
}
@ -1484,7 +1503,9 @@ maint_str_loop( int udpsock, const char* str )
&saddr.addr, &fromlen );
logf( XW_LOGINFO, "%s(); got %d bytes", __func__, nRead);
if ( 1 <= nRead && buf[0] == XWPDEV_PROTO_VERSION ) {
UDPHeader header;
const unsigned char* ptr = buf;
if ( getHeader( &ptr, ptr + nRead, &header ) ) {
send_via_udp( udpsock, &saddr.addr, XWPDEV_ALERT,
outbuf, sizeof(outbuf), NULL );
} else {