diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index 9364f0e7b..8d768cc1d 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -20,10 +20,40 @@ */ #include "udpqueue.h" +#include "mlock.h" + static UdpQueue* s_instance = NULL; +void +UdpThreadClosure::logStats() +{ + time_t now = time( NULL ); + if ( 1 < now - m_created ) { + logf( XW_LOGERROR, "packet waited %d s for processing which then took %d s", + m_dequed - m_created, now - m_dequed ); + } +} + +UdpQueue::UdpQueue() +{ + pthread_mutex_init ( &m_queueMutex, NULL ); + pthread_cond_init( &m_queueCondVar, NULL ); + + pthread_t thread; + int result = pthread_create( &thread, NULL, thread_main_static, this ); + assert( result == 0 ); + result = pthread_detach( thread ); + assert( result == 0 ); +} + +UdpQueue::~UdpQueue() +{ + pthread_cond_destroy( &m_queueCondVar ); + pthread_mutex_destroy ( &m_queueMutex ); +} + UdpQueue* UdpQueue::get() { @@ -37,7 +67,45 @@ void UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len, QueueCallback cb ) { - logf( XW_LOGINFO, "%s: still running in same thread!!!", __func__ ); - UdpThreadClosure closure( saddr, buf, len ); - (*cb)( &closure ); + UdpThreadClosure* utc = new UdpThreadClosure( saddr, buf, len ); + MutexLock ml( &m_queueMutex ); + setCB( cb ); + m_queue.push_back( utc ); + pthread_cond_signal( &m_queueCondVar ); +} + +void* +UdpQueue::thread_main() +{ + for ( ; ; ) { + pthread_mutex_lock( &m_queueMutex ); + while ( m_queue.size() == 0 ) { + pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); + } + UdpThreadClosure* utc = m_queue.front(); + m_queue.pop_front(); + pthread_mutex_unlock( &m_queueMutex ); + + utc->noteDequeued(); + (*m_cb)( utc ); + utc->logStats(); + delete utc; + } + return NULL; +} + +/* static */ void* +UdpQueue::thread_main_static( void* closure ) +{ + blockSignals(); + + UdpQueue* me = (UdpQueue*)closure; + return me->thread_main(); +} + +void +UdpQueue::setCB( QueueCallback cb ) +{ + assert( cb == m_cb || !m_cb ); + m_cb = cb; } diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 3638018c1..04f05f956 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -22,6 +22,7 @@ #define _UDPQUEUE_H_ #include +#include #include "xwrelay_priv.h" #include "addrinfo.h" @@ -35,17 +36,22 @@ public: m_buf = new unsigned char[len]; memcpy( m_buf, buf, len ); m_len = len; + m_created = time( NULL ); } ~UdpThreadClosure() { delete m_buf; } const unsigned char* buf() const { return m_buf; } int len() const { return m_len; } const AddrInfo::AddrUnion* saddr() const { return &m_saddr; } + void noteDequeued() { m_dequed = time( NULL ); } + void logStats(); private: unsigned char* m_buf; int m_len; AddrInfo::AddrUnion m_saddr; + time_t m_created; + time_t m_dequed; }; typedef void (*QueueCallback)( UdpThreadClosure* closure ); @@ -53,10 +59,21 @@ typedef void (*QueueCallback)( UdpThreadClosure* closure ); class UdpQueue { public: static UdpQueue* get(); + UdpQueue(); + ~UdpQueue(); void handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len, QueueCallback cb ); private: + static void* thread_main_static( void* closure ); + void* thread_main(); + void setCB( QueueCallback cb ); + + pthread_mutex_t m_queueMutex; + pthread_cond_t m_queueCondVar; + deque m_queue; + + QueueCallback m_cb; }; #endif