handle all three types of message in a single thread, punting all

synchronization issues to a future point where the load demands it.
This commit is contained in:
Eric House 2013-01-26 11:54:48 -08:00
parent 9e1d09648d
commit 336e3cd289
7 changed files with 53 additions and 37 deletions

View file

@ -41,8 +41,8 @@ class AddrInfo {
m_isValid = false;
}
AddrInfo( int socket, const AddrUnion* saddr ) {
construct( socket, saddr, true );
AddrInfo( int socket, const AddrUnion* saddr, bool isTCP ) {
construct( socket, saddr, isTCP );
}
AddrInfo( int socket, ClientToken clientToken, const AddrUnion* saddr ) {

View file

@ -78,11 +78,10 @@ XWThreadPool::~XWThreadPool()
} /* ~XWThreadPool */
void
XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
XWThreadPool::Setup( int nThreads, kill_func kFunc )
{
m_nThreads = nThreads;
m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) );
m_pFunc = pFunc;
m_kFunc = kFunc;
for ( int ii = 0; ii < nThreads; ++ii ) {
@ -116,12 +115,13 @@ XWThreadPool::Stop()
}
void
XWThreadPool::AddSocket( SockType stype, const AddrInfo* from )
XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from )
{
{
RWWriteLock ml( &m_activeSocketsRWLock );
SockInfo si;
si.m_type = stype;
si.m_proc = proc;
si.m_addr = *from;
m_activeSockets.push_back( si );
logf( XW_LOGINFO, "%s: %d sockets active", __func__,
@ -197,7 +197,7 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
}
bool
XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr )
{
bool success = false;
short packetSize;
@ -207,13 +207,14 @@ XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
if ( nRead < 0 ) {
EnqueueKill( addr, "bad packet" );
} else if ( STYPE_GAME == stype ) {
logf( XW_LOGINFO, "calling m_pFunc" );
success = (*m_pFunc)( buf, nRead, addr );
} else {
} else if ( STYPE_PROXY == stype && NULL != proc ) {
buf[nRead] = '\0';
handle_proxy_packet( buf, nRead, addr );
CloseSocket( addr );
UdpQueue::get()->handle( addr, buf, nRead+1, proc );
} else if ( STYPE_GAME == stype && NULL != proc ) {
UdpQueue::get()->handle( addr, buf, nRead, proc );
success = true;
} else {
assert(0);
}
return success;
} /* get_process_packet */
@ -260,8 +261,8 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip )
switch ( pr.m_act ) {
case Q_READ:
assert( socket >= 0 );
if ( get_process_packet( pr.m_info.m_type, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, &pr.m_info.m_addr );
if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr );
}
break;
case Q_KILL:

View file

@ -32,6 +32,7 @@
#include <set>
#include "addrinfo.h"
#include "udpqueue.h"
using namespace std;
@ -41,6 +42,7 @@ class XWThreadPool {
typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType;
typedef struct _SockInfo {
SockType m_type;
QueueCallback m_proc;
AddrInfo m_addr;
} SockInfo;
@ -51,18 +53,16 @@ class XWThreadPool {
} ThreadInfo;
static XWThreadPool* GetTPool();
typedef bool (*packet_func)( const unsigned char* buf, int bufLen,
const AddrInfo* from );
typedef void (*kill_func)( const AddrInfo* addr );
XWThreadPool();
~XWThreadPool();
void Setup( int nThreads, packet_func pFunc, kill_func kFunc );
void Setup( int nThreads, kill_func kFunc );
void Stop();
/* Add to set being listened on */
void AddSocket( SockType stype, const AddrInfo* from );
void AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from );
/* remove from tpool altogether, and close */
void CloseSocket( const AddrInfo* addr );
@ -82,7 +82,7 @@ class XWThreadPool {
void print_in_use( void );
void log_hung_threads( void );
bool get_process_packet( SockType stype, const AddrInfo* from );
bool get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* from );
void interrupt_poll();
void* real_tpool_main( ThreadInfo* tsp );
@ -107,7 +107,6 @@ class XWThreadPool {
bool m_timeToDie;
int m_nThreads;
packet_func m_pFunc;
kill_func m_kFunc;
ThreadInfo* m_threadInfos;

View file

@ -64,10 +64,10 @@ UdpQueue::get()
}
void
UdpQueue::handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len,
UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len,
QueueCallback cb )
{
UdpThreadClosure* utc = new UdpThreadClosure( saddr, buf, len, cb );
UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb );
MutexLock ml( &m_queueMutex );
m_queue.push_back( utc );
pthread_cond_signal( &m_queueCondVar );

View file

@ -35,11 +35,11 @@ typedef void (*QueueCallback)( UdpThreadClosure* closure );
class UdpThreadClosure {
public:
UdpThreadClosure( const AddrInfo::AddrUnion* saddr, unsigned char* buf,
UdpThreadClosure( const AddrInfo* addr, unsigned char* buf,
int len, QueueCallback cb )
: m_buf(new unsigned char[len])
, m_len(len)
, m_saddr(*saddr)
, m_addr(*addr)
, m_cb(cb)
, m_created(time( NULL ))
{
@ -50,7 +50,8 @@ public:
const unsigned char* buf() const { return m_buf; }
int len() const { return m_len; }
const AddrInfo::AddrUnion* saddr() const { return &m_saddr; }
const AddrInfo::AddrUnion* saddr() const { return m_addr.saddr(); }
const AddrInfo* addr() const { return &m_addr; }
void noteDequeued() { m_dequed = time( NULL ); }
void logStats();
const QueueCallback cb() const { return m_cb; }
@ -58,7 +59,7 @@ public:
private:
unsigned char* m_buf;
int m_len;
AddrInfo::AddrUnion m_saddr;
AddrInfo m_addr;
QueueCallback m_cb;
time_t m_created;
time_t m_dequed;
@ -69,7 +70,7 @@ class UdpQueue {
static UdpQueue* get();
UdpQueue();
~UdpQueue();
void handle( const AddrInfo::AddrUnion* saddr, unsigned char* buf, int len,
void handle( const AddrInfo* addr, unsigned char* buf, int len,
QueueCallback cb );
private:

View file

@ -1096,9 +1096,22 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp,
}
} // handleProxyMsgs
void
handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
static void
game_thread_proc( UdpThreadClosure* utc )
{
if ( !processMessage( utc->buf(), utc->len(), utc->addr() ) ) {
XWThreadPool::GetTPool()->CloseSocket( utc->addr() );
}
}
static void
proxy_thread_proc( UdpThreadClosure* utc )
{
int len = utc->len();
const AddrInfo* addr = utc->addr();
const unsigned char* buf = utc->buf();
logf( XW_LOGINFO, "%s called", __func__ );
logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( len > 0 ) {
assert( addr->isTCP() );
@ -1171,7 +1184,8 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
}
}
}
} /* handle_proxy_packet */
XWThreadPool::GetTPool()->CloseSocket( addr );
}
static short
addRegID( unsigned char* ptr, DevIDRelay relayID )
@ -1373,7 +1387,7 @@ udp_thread_proc( UdpThreadClosure* utc )
static void
handle_udp_packet( int udpsock )
{
unsigned char buf[512];
unsigned char buf[MAX_MSG_LEN];
AddrInfo::AddrUnion saddr;
memset( &saddr, 0, sizeof(saddr) );
socklen_t fromlen = sizeof(saddr.addr_in);
@ -1382,7 +1396,8 @@ handle_udp_packet( int udpsock )
&saddr.addr, &fromlen );
logf( XW_LOGINFO, "%s: recvfrom=>%d", __func__, nRead );
if ( 0 < nRead ) {
UdpQueue::get()->handle( &saddr, buf, nRead, udp_thread_proc );
AddrInfo addr( udpsock, &saddr, false );
UdpQueue::get()->handle( &addr, buf, nRead, udp_thread_proc );
}
}
@ -1808,7 +1823,7 @@ main( int argc, char** argv )
(void)sigaction( SIGINT, &act, NULL );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( nWorkerThreads, processMessage, killSocket );
tPool->Setup( nWorkerThreads, killSocket );
/* set up select call */
fd_set rfds;
@ -1876,9 +1891,11 @@ main( int argc, char** argv )
"%s: accepting connection from %s on socket %d",
__func__, inet_ntoa(saddr.addr_in.sin_addr), newSock );
AddrInfo addr( newSock, &saddr );
tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
AddrInfo addr( newSock, &saddr, true );
tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
: XWThreadPool::STYPE_PROXY,
perGame ? game_thread_proc
: proxy_thread_proc,
&addr );
}
--retval;

View file

@ -59,8 +59,6 @@ int make_socket( unsigned long addr, unsigned short port );
void string_printf( std::string& str, const char* fmt, ... );
int read_packet( int sock, unsigned char* buf, int buflen );
void handle_proxy_packet( unsigned char* buf, int bufLen,
const AddrInfo* addr );
const char* cmdToStr( XWRELAY_Cmd cmd );