move callback into queueelem so same queue can service udp and tcp (eventually)

This commit is contained in:
Eric House 2013-01-25 07:19:44 -08:00
parent 4417591b06
commit 7ee402d2ae
2 changed files with 19 additions and 21 deletions

View file

@ -37,7 +37,6 @@ UdpThreadClosure::logStats()
} }
UdpQueue::UdpQueue() UdpQueue::UdpQueue()
:m_cb(NULL)
{ {
pthread_mutex_init ( &m_queueMutex, NULL ); pthread_mutex_init ( &m_queueMutex, NULL );
pthread_cond_init( &m_queueCondVar, NULL ); pthread_cond_init( &m_queueCondVar, NULL );
@ -68,9 +67,8 @@ void
UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len, UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len,
QueueCallback cb ) QueueCallback cb )
{ {
UdpThreadClosure* utc = new UdpThreadClosure( saddr, buf, len ); UdpThreadClosure* utc = new UdpThreadClosure( saddr, buf, len, cb );
MutexLock ml( &m_queueMutex ); MutexLock ml( &m_queueMutex );
setCB( cb );
m_queue.push_back( utc ); m_queue.push_back( utc );
pthread_cond_signal( &m_queueCondVar ); pthread_cond_signal( &m_queueCondVar );
} }
@ -88,7 +86,7 @@ UdpQueue::thread_main()
pthread_mutex_unlock( &m_queueMutex ); pthread_mutex_unlock( &m_queueMutex );
utc->noteDequeued(); utc->noteDequeued();
(*m_cb)( utc ); (*utc->cb())( utc );
utc->logStats(); utc->logStats();
delete utc; delete utc;
} }
@ -104,9 +102,3 @@ UdpQueue::thread_main_static( void* closure )
return me->thread_main(); return me->thread_main();
} }
void
UdpQueue::setCB( QueueCallback cb )
{
assert( cb == m_cb || !m_cb );
m_cb = cb;
}

View file

@ -29,15 +29,23 @@
using namespace std; using namespace std;
class UdpThreadClosure;
typedef void (*QueueCallback)( UdpThreadClosure* closure );
class UdpThreadClosure { class UdpThreadClosure {
public: public:
UdpThreadClosure( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len ) { UdpThreadClosure( const AddrInfo::AddrUnion* saddr, unsigned char* buf,
m_saddr = *saddr; int len, QueueCallback cb )
m_buf = new unsigned char[len]; : m_buf(new unsigned char[len])
memcpy( m_buf, buf, len ); , m_len(len)
m_len = len; , m_saddr(*saddr)
m_created = time( NULL ); , m_cb(cb)
} , m_created(time( NULL ))
{
memcpy( m_buf, buf, len );
}
~UdpThreadClosure() { delete m_buf; } ~UdpThreadClosure() { delete m_buf; }
const unsigned char* buf() const { return m_buf; } const unsigned char* buf() const { return m_buf; }
@ -45,17 +53,17 @@ public:
const AddrInfo::AddrUnion* saddr() const { return &m_saddr; } const AddrInfo::AddrUnion* saddr() const { return &m_saddr; }
void noteDequeued() { m_dequed = time( NULL ); } void noteDequeued() { m_dequed = time( NULL ); }
void logStats(); void logStats();
const QueueCallback cb() const { return m_cb; }
private: private:
unsigned char* m_buf; unsigned char* m_buf;
int m_len; int m_len;
AddrInfo::AddrUnion m_saddr; AddrInfo::AddrUnion m_saddr;
QueueCallback m_cb;
time_t m_created; time_t m_created;
time_t m_dequed; time_t m_dequed;
}; };
typedef void (*QueueCallback)( UdpThreadClosure* closure );
class UdpQueue { class UdpQueue {
public: public:
static UdpQueue* get(); static UdpQueue* get();
@ -67,13 +75,11 @@ class UdpQueue {
private: private:
static void* thread_main_static( void* closure ); static void* thread_main_static( void* closure );
void* thread_main(); void* thread_main();
void setCB( QueueCallback cb );
pthread_mutex_t m_queueMutex; pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar; pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue; deque<UdpThreadClosure*> m_queue;
QueueCallback m_cb;
}; };
#endif #endif