Rather than queuing sockets needing reading, read them immediately and

queue the packets for processing.  Add ids so they can be tracked in
the logs.  In addition to making tcp and udp packet processing more
similar this fixes the case where a read is delayed until after the
client has closed the connection (and so returns an error.)
This commit is contained in:
Eric House 2013-06-17 07:25:25 -07:00
parent 1928554444
commit a546c025d5
4 changed files with 108 additions and 32 deletions

View file

@ -130,6 +130,31 @@ XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* fro
interrupt_poll();
}
bool
XWThreadPool::SocketFound( const AddrInfo* addr )
{
assert( addr->isTCP() );
bool found = false;
{
RWWriteLock ml( &m_activeSocketsRWLock );
logf( XW_LOGINFO, "%s: START: %d sockets active", __func__,
m_activeSockets.size() );
vector<SockInfo>::iterator iter;
for ( iter = m_activeSockets.begin();
iter != m_activeSockets.end(); ++iter ) {
if ( iter->m_addr.equals( *addr ) ) {
found = true;
break;
}
}
logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__,
m_activeSockets.size() );
}
return found;
}
bool
XWThreadPool::RemoveSocket( const AddrInfo* addr )
{
@ -196,29 +221,29 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
}
}
bool
XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr )
{
bool success = false;
short packetSize;
assert( sizeof(packetSize) == 2 );
// bool
// XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr )
// {
// bool success = false;
// short packetSize;
// assert( sizeof(packetSize) == 2 );
// Fix this to return an allocated buffer
unsigned char buf[MAX_MSG_LEN+1];
int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
if ( nRead < 0 ) {
EnqueueKill( addr, "bad packet" );
} else if ( STYPE_PROXY == stype && NULL != proc ) {
buf[nRead] = '\0';
UdpQueue::get()->handle( addr, buf, nRead+1, proc );
} else if ( STYPE_GAME == stype && NULL != proc ) {
UdpQueue::get()->handle( addr, buf, nRead, proc );
success = true;
} else {
assert(0);
}
return success;
} /* get_process_packet */
// // Fix this to return an allocated buffer
// unsigned char buf[MAX_MSG_LEN+1];
// int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
// if ( nRead < 0 ) {
// EnqueueKill( addr, "bad packet" );
// } else if ( STYPE_PROXY == stype && NULL != proc ) {
// buf[nRead] = '\0';
// UdpQueue::get()->handle( addr, buf, nRead+1, proc );
// } else if ( STYPE_GAME == stype && NULL != proc ) {
// UdpQueue::get()->handle( addr, buf, nRead, proc );
// success = true;
// } else {
// assert(0);
// }
// return success;
// } /* get_process_packet */
void*
XWThreadPool::tpool_main( void* closure )
@ -261,10 +286,11 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip )
logf( XW_LOGINFO, "worker thread got socket %d from queue", socket );
switch ( pr.m_act ) {
case Q_READ:
assert( socket >= 0 );
if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr );
}
assert( 0 );
// assert( socket >= 0 );
// if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) {
// AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr );
// }
break;
case Q_KILL:
(*m_kFunc)( &pr.m_info.m_addr );
@ -387,10 +413,12 @@ XWThreadPool::real_listener()
for ( ii = 0; ii < nSockets && nEvents > 0; ++ii ) {
if ( fds[curfd].revents != 0 ) {
int socket = fds[curfd].fd;
const AddrInfo* addr = &sinfos[curfd].m_addr;
assert( socket == addr->socket() );
if ( !RemoveSocket( addr ) ) {
// int socket = fds[curfd].fd;
SockInfo* sinfo = &sinfos[curfd];
const AddrInfo* addr = &sinfo->m_addr;
assert( fds[curfd].fd == addr->socket() );
if ( !SocketFound( addr ) ) {
/* no further processing if it's been removed while
we've been sleeping in poll */
--nEvents;
@ -398,10 +426,14 @@ XWThreadPool::real_listener()
}
if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) {
enqueue( sinfos[curfd] );
if ( !UdpQueue::get()->handle( addr, sinfo->m_proc ) ) {
RemoveSocket( addr );
EnqueueKill( addr, "bad packet" );
}
} else {
logf( XW_LOGERROR, "odd revents: %x",
fds[curfd].revents );
RemoveSocket( addr );
EnqueueKill( addr, "error/hup in poll()" );
}
--nEvents;

View file

@ -74,6 +74,8 @@ class XWThreadPool {
/* Remove from set being listened on */
bool RemoveSocket( const AddrInfo* addr );
/* test if is in set being listened on */
bool SocketFound( const AddrInfo* addr );
void enqueue( QAction act = Q_READ );
void enqueue( SockInfo si, QAction act = Q_READ );

View file

@ -19,6 +19,7 @@
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <errno.h>
#include "udpqueue.h"
#include "mlock.h"
@ -38,6 +39,7 @@ UdpThreadClosure::logStats()
UdpQueue::UdpQueue()
{
m_nextID = 0;
pthread_mutex_init ( &m_queueMutex, NULL );
pthread_cond_init( &m_queueCondVar, NULL );
@ -63,12 +65,47 @@ UdpQueue::get()
return s_instance;
}
bool
UdpQueue::handle( const AddrInfo* addr, QueueCallback cb )
{
bool success = false;
int sock = addr->socket();
unsigned short msgLen;
ssize_t nRead = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL );
if ( 0 == nRead ) {
logf( XW_LOGINFO, "%s: recv(sock=%d) => 0: remote closed", __func__, sock );
} else if ( nRead != sizeof(msgLen) ) {
logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__,
nRead, strerror(errno) );
} else {
msgLen = ntohs( msgLen );
if ( MAX_MSG_LEN <= msgLen ) {
logf( XW_LOGERROR, "%s: message of len %d too large; dropping", __func__, msgLen );
} else {
unsigned char buf[msgLen];
nRead = recv( sock, buf, msgLen, MSG_WAITALL );
if ( nRead == msgLen ) {
logf( XW_LOGINFO, "%s: read %d bytes on socket %d", __func__, nRead, sock );
handle( addr, buf, msgLen, cb );
success = true;
} else {
logf( XW_LOGERROR, "%s: second recv failed: %s", __func__,
strerror(errno) );
}
}
}
return success;
}
void
UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len,
QueueCallback cb )
{
UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb );
MutexLock ml( &m_queueMutex );
int id = ++m_nextID;
utc->setID( id );
logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id );
m_queue.push_back( utc );
pthread_cond_signal( &m_queueCondVar );
}
@ -86,6 +123,7 @@ UdpQueue::thread_main()
pthread_mutex_unlock( &m_queueMutex );
utc->noteDequeued();
logf( XW_LOGINFO, "%s: dispatching packet %d", __func__, utc->getID() );
(*utc->cb())( utc );
utc->logStats();
delete utc;

View file

@ -55,6 +55,8 @@ public:
void noteDequeued() { m_dequed = time( NULL ); }
void logStats();
const QueueCallback cb() const { return m_cb; }
void setID( int id ) { m_id = id; }
int getID( void ) { return m_id; }
private:
unsigned char* m_buf;
@ -63,6 +65,7 @@ public:
QueueCallback m_cb;
time_t m_created;
time_t m_dequed;
int m_id;
};
class UdpQueue {
@ -70,6 +73,7 @@ class UdpQueue {
static UdpQueue* get();
UdpQueue();
~UdpQueue();
bool handle( const AddrInfo* addr, QueueCallback cb );
void handle( const AddrInfo* addr, unsigned char* buf, int len,
QueueCallback cb );
@ -80,7 +84,7 @@ class UdpQueue {
pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue;
int m_nextID;
};
#endif