queue incoming packets and process them in order in a separate thread

This commit is contained in:
Eric House 2013-01-18 19:56:21 -08:00
parent 1519a00004
commit 99307e45a0
2 changed files with 88 additions and 3 deletions

View file

@ -20,10 +20,40 @@
*/ */
#include "udpqueue.h" #include "udpqueue.h"
#include "mlock.h"
static UdpQueue* s_instance = NULL; static UdpQueue* s_instance = NULL;
void
UdpThreadClosure::logStats()
{
time_t now = time( NULL );
if ( 1 < now - m_created ) {
logf( XW_LOGERROR, "packet waited %d s for processing which then took %d s",
m_dequed - m_created, now - m_dequed );
}
}
UdpQueue::UdpQueue()
{
pthread_mutex_init ( &m_queueMutex, NULL );
pthread_cond_init( &m_queueCondVar, NULL );
pthread_t thread;
int result = pthread_create( &thread, NULL, thread_main_static, this );
assert( result == 0 );
result = pthread_detach( thread );
assert( result == 0 );
}
UdpQueue::~UdpQueue()
{
pthread_cond_destroy( &m_queueCondVar );
pthread_mutex_destroy ( &m_queueMutex );
}
UdpQueue* UdpQueue*
UdpQueue::get() UdpQueue::get()
{ {
@ -37,7 +67,45 @@ void
UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len, UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len,
QueueCallback cb ) QueueCallback cb )
{ {
logf( XW_LOGINFO, "%s: still running in same thread!!!", __func__ ); UdpThreadClosure* utc = new UdpThreadClosure( saddr, buf, len );
UdpThreadClosure closure( saddr, buf, len ); MutexLock ml( &m_queueMutex );
(*cb)( &closure ); setCB( cb );
m_queue.push_back( utc );
pthread_cond_signal( &m_queueCondVar );
}
void*
UdpQueue::thread_main()
{
for ( ; ; ) {
pthread_mutex_lock( &m_queueMutex );
while ( m_queue.size() == 0 ) {
pthread_cond_wait( &m_queueCondVar, &m_queueMutex );
}
UdpThreadClosure* utc = m_queue.front();
m_queue.pop_front();
pthread_mutex_unlock( &m_queueMutex );
utc->noteDequeued();
(*m_cb)( utc );
utc->logStats();
delete utc;
}
return NULL;
}
/* static */ void*
UdpQueue::thread_main_static( void* closure )
{
blockSignals();
UdpQueue* me = (UdpQueue*)closure;
return me->thread_main();
}
void
UdpQueue::setCB( QueueCallback cb )
{
assert( cb == m_cb || !m_cb );
m_cb = cb;
} }

View file

@ -22,6 +22,7 @@
#define _UDPQUEUE_H_ #define _UDPQUEUE_H_
#include <pthread.h> #include <pthread.h>
#include <deque>
#include "xwrelay_priv.h" #include "xwrelay_priv.h"
#include "addrinfo.h" #include "addrinfo.h"
@ -35,17 +36,22 @@ public:
m_buf = new unsigned char[len]; m_buf = new unsigned char[len];
memcpy( m_buf, buf, len ); memcpy( m_buf, buf, len );
m_len = len; m_len = len;
m_created = time( NULL );
} }
~UdpThreadClosure() { delete m_buf; } ~UdpThreadClosure() { delete m_buf; }
const unsigned char* buf() const { return m_buf; } const unsigned char* buf() const { return m_buf; }
int len() const { return m_len; } int len() const { return m_len; }
const AddrInfo::AddrUnion* saddr() const { return &m_saddr; } const AddrInfo::AddrUnion* saddr() const { return &m_saddr; }
void noteDequeued() { m_dequed = time( NULL ); }
void logStats();
private: private:
unsigned char* m_buf; unsigned char* m_buf;
int m_len; int m_len;
AddrInfo::AddrUnion m_saddr; AddrInfo::AddrUnion m_saddr;
time_t m_created;
time_t m_dequed;
}; };
typedef void (*QueueCallback)( UdpThreadClosure* closure ); typedef void (*QueueCallback)( UdpThreadClosure* closure );
@ -53,10 +59,21 @@ typedef void (*QueueCallback)( UdpThreadClosure* closure );
class UdpQueue { class UdpQueue {
public: public:
static UdpQueue* get(); static UdpQueue* get();
UdpQueue();
~UdpQueue();
void handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len, void handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len,
QueueCallback cb ); QueueCallback cb );
private: private:
static void* thread_main_static( void* closure );
void* thread_main();
void setCB( QueueCallback cb );
pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue;
QueueCallback m_cb;
}; };
#endif #endif