From c465a0bb0c0078c4b5bd80b49d57a5f8ecda345f Mon Sep 17 00:00:00 2001 From: Eric House Date: Thu, 24 Jan 2013 07:43:24 -0800 Subject: [PATCH] add loop to print when packets haven't been ack'd --- xwords4/relay/udpack.cpp | 109 +++++++++++++++++++++++++++++++++++++- xwords4/relay/udpack.h | 25 +++++++-- xwords4/relay/xwrelay.cpp | 43 +++++++++++---- 3 files changed, 160 insertions(+), 17 deletions(-) diff --git a/xwords4/relay/udpack.cpp b/xwords4/relay/udpack.cpp index 0b01aef5f..2c8d61169 100644 --- a/xwords4/relay/udpack.cpp +++ b/xwords4/relay/udpack.cpp @@ -18,6 +18,113 @@ */ #include "udpack.h" +#include "mlock.h" -uint32_t UDPAckTrack::s_nextID = 0; +UDPAckTrack* UDPAckTrack::s_self = NULL; + +/* static*/ uint32_t +UDPAckTrack::nextPacketID( XWRelayReg cmd ) +{ + return get()->nextPacketIDImpl( cmd ); +} + +/* static*/ void +UDPAckTrack::recordAck( uint32_t packetID ) +{ + get()->recordAckImpl( packetID ); +} + +/* static */ UDPAckTrack* +UDPAckTrack::get() +{ + if ( NULL == s_self ) { + s_self = new UDPAckTrack(); + } + return s_self; +} + +UDPAckTrack::UDPAckTrack() +{ + m_nextID = 0; + pthread_mutex_init( &m_mutex, NULL ); + + pthread_t thread; + pthread_create( &thread, NULL, thread_main, (void*)this ); + pthread_detach( thread ); +} + +uint32_t +UDPAckTrack::nextPacketIDImpl( XWRelayReg cmd ) +{ + uint32_t result; + if ( XWPDEV_ACK == cmd || XWPDEV_ALERT == cmd ) { + result = 0; + } else { + MutexLock ml( &m_mutex ); + AckRecord record; + result = ++m_nextID; + m_pendings.insert( pair(result, record) ); + } + return result; +} + +void +UDPAckTrack::recordAckImpl( uint32_t packetID ) +{ + map::iterator iter; + MutexLock ml( &m_mutex ); + iter = m_pendings.find( packetID ); + if ( m_pendings.end() == iter ) { + logf( XW_LOGERROR, "%s: packet ID %d not found", __func__, packetID ); + } else { + time_t took = time( NULL ) - iter->second.m_createTime; + if ( 5 < took ) { + logf( XW_LOGERROR, "%s: packet ID %d took %d seconds to get acked", __func__, packetID ); + } + m_pendings.erase( iter ); + } +} + +void* +UDPAckTrack::threadProc() +{ + for ( ; ; ) { + sleep( 30 ); + time_t now = time( NULL ); + vector older; + { + MutexLock ml( &m_mutex ); + map::iterator iter; + for ( iter = m_pendings.begin(); iter != m_pendings.end(); ++iter ) { + time_t took = now - iter->second.m_createTime; + if ( 60 < took ) { + older.push_back( iter->first ); + m_pendings.erase( iter ); + } + } + } + if ( 0 < older.size() ) { + string leaked; + vector::const_iterator iter = older.begin(); + for ( ; ; ) { + string_printf( leaked, "%d", *iter ); + if ( ++iter == older.end() ) { + break; + } + string_printf( leaked, ", " ); + } + logf( XW_LOGERROR, "these packets leaked: %s", leaked.c_str() ); + } else { + logf( XW_LOGINFO, "no packets leaked" ); + } + } + return NULL; +} + +/* static */ void* +UDPAckTrack::thread_main( void* arg ) +{ + UDPAckTrack* self = (UDPAckTrack*)arg; + return self->threadProc(); +} diff --git a/xwords4/relay/udpack.h b/xwords4/relay/udpack.h index b161050ea..1e0fa3a80 100644 --- a/xwords4/relay/udpack.h +++ b/xwords4/relay/udpack.h @@ -21,16 +21,31 @@ #define _UDPACK_H_ #include "xwrelay_priv.h" +#include "xwrelay.h" + +class AckRecord { + public: + AckRecord() { m_createTime = time( NULL ); } + time_t m_createTime; +}; class UDPAckTrack { public: - static uint32_t nextPacketID() { return ++s_nextID; } - static void recordAck( uint32_t packetID ) { - logf( XW_LOGINFO, "received ack for %d", packetID ); - } + static uint32_t nextPacketID( XWRelayReg cmd ); + static void recordAck( uint32_t packetID ); private: - static uint32_t s_nextID; + static UDPAckTrack* get(); + static void* thread_main( void* arg ); + UDPAckTrack(); + uint32_t nextPacketIDImpl( XWRelayReg cmd ); + void recordAckImpl( uint32_t packetID ); + void* threadProc(); + + static UDPAckTrack* s_self; + uint32_t m_nextID; + pthread_mutex_t m_mutex; + map m_pendings; }; #endif diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index e2f1e6703..a1ca3da04 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -80,6 +80,12 @@ #include "udpqueue.h" #include "udpack.h" +typedef struct _UDPHeader { + uint32_t packetID; + unsigned char proto; + XWRelayReg cmd; +} UDPHeader; + static int s_nSpawns = 0; static int g_maxsocks = -1; static int g_udpsock = -1; @@ -262,6 +268,23 @@ getNetString( const unsigned char** bufpp, const unsigned char* end, string& out return success; } +static bool +getHeader( const unsigned char** bufpp, const unsigned char* end, + UDPHeader* header ) +{ + unsigned char byt; + bool success = getNetByte( bufpp, end, &header->proto ) + && getNetLong( bufpp, end, &header->packetID ) + && getNetByte( bufpp, end, &byt ) + && XWPDEV_PROTO_VERSION == header->proto; + if ( success ) { + header->cmd = (XWRelayReg)byt; + } else { + logf( XW_LOGERROR, "%s: bad packet header", __func__ ); + } + return success; +} + static void getDevID( const unsigned char** bufpp, const unsigned char* end, unsigned short flags, DevID* devID ) @@ -356,7 +379,7 @@ static ssize_t send_via_udp( int socket, const struct sockaddr *dest_addr, XWRelayReg cmd, ... ) { - uint32_t packetNum = UDPAckTrack::nextPacketID(); + uint32_t packetNum = UDPAckTrack::nextPacketID( cmd ); struct iovec vec[10]; int iocount = 0; @@ -1262,14 +1285,10 @@ udp_thread_proc( UdpThreadClosure* utc ) const unsigned char* ptr = utc->buf(); const unsigned char* end = ptr + utc->len(); - unsigned char proto = *ptr++; - if ( XWPDEV_PROTO_VERSION != 0 ) { - logf( XW_LOGERROR, "unexpected proto %d", __func__, (int) proto ); - } else { - ptr += 4; // skip msgid - XWRelayReg msg = (XWRelayReg)*ptr++; - logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( msg ) ); - switch( msg ) { + UDPHeader header; + if ( getHeader( &ptr, end, &header ) ) { + logf( XW_LOGINFO, "%s(msg=%s)", __func__, msgToStr( header.cmd ) ); + switch( header.cmd ) { case XWPDEV_REG: { DevIDType typ = (DevIDType)*ptr++; unsigned short idLen; @@ -1347,7 +1366,7 @@ udp_thread_proc( UdpThreadClosure* utc ) break; } default: - logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, msg ); + logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, header.cmd ); } } } @@ -1484,7 +1503,9 @@ maint_str_loop( int udpsock, const char* str ) &saddr.addr, &fromlen ); logf( XW_LOGINFO, "%s(); got %d bytes", __func__, nRead); - if ( 1 <= nRead && buf[0] == XWPDEV_PROTO_VERSION ) { + UDPHeader header; + const unsigned char* ptr = buf; + if ( getHeader( &ptr, ptr + nRead, &header ) ) { send_via_udp( udpsock, &saddr.addr, XWPDEV_ALERT, outbuf, sizeof(outbuf), NULL ); } else {