From a546c025d5a4a9fff5b684f92ac3cf455aeaf5ca Mon Sep 17 00:00:00 2001 From: Eric House Date: Mon, 17 Jun 2013 07:25:25 -0700 Subject: [PATCH] Rather than queuing sockets needing reading, read them immediately and queue the packets for processing. Add ids so they can be tracked in the logs. In addition to making tcp and udp packet processing more similar this fixes the case where a read is delayed until after the client has closed the connection (and so returns an error.) --- xwords4/relay/tpool.cpp | 94 +++++++++++++++++++++++++------------- xwords4/relay/tpool.h | 2 + xwords4/relay/udpqueue.cpp | 38 +++++++++++++++ xwords4/relay/udpqueue.h | 6 ++- 4 files changed, 108 insertions(+), 32 deletions(-) diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index ffb850281..396c5a4b1 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -130,6 +130,31 @@ XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* fro interrupt_poll(); } +bool +XWThreadPool::SocketFound( const AddrInfo* addr ) +{ + assert( addr->isTCP() ); + bool found = false; + { + RWWriteLock ml( &m_activeSocketsRWLock ); + + logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, + m_activeSockets.size() ); + + vector::iterator iter; + for ( iter = m_activeSockets.begin(); + iter != m_activeSockets.end(); ++iter ) { + if ( iter->m_addr.equals( *addr ) ) { + found = true; + break; + } + } + logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, + m_activeSockets.size() ); + } + return found; +} + bool XWThreadPool::RemoveSocket( const AddrInfo* addr ) { @@ -196,29 +221,29 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) } } -bool -XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) -{ - bool success = false; - short packetSize; - assert( sizeof(packetSize) == 2 ); +// bool +// XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) +// { +// bool success = false; +// short packetSize; +// assert( sizeof(packetSize) == 2 ); - // Fix this to return an allocated buffer - unsigned char buf[MAX_MSG_LEN+1]; - int nRead = read_packet( addr->socket(), buf, sizeof(buf) ); - if ( nRead < 0 ) { - EnqueueKill( addr, "bad packet" ); - } else if ( STYPE_PROXY == stype && NULL != proc ) { - buf[nRead] = '\0'; - UdpQueue::get()->handle( addr, buf, nRead+1, proc ); - } else if ( STYPE_GAME == stype && NULL != proc ) { - UdpQueue::get()->handle( addr, buf, nRead, proc ); - success = true; - } else { - assert(0); - } - return success; -} /* get_process_packet */ +// // Fix this to return an allocated buffer +// unsigned char buf[MAX_MSG_LEN+1]; +// int nRead = read_packet( addr->socket(), buf, sizeof(buf) ); +// if ( nRead < 0 ) { +// EnqueueKill( addr, "bad packet" ); +// } else if ( STYPE_PROXY == stype && NULL != proc ) { +// buf[nRead] = '\0'; +// UdpQueue::get()->handle( addr, buf, nRead+1, proc ); +// } else if ( STYPE_GAME == stype && NULL != proc ) { +// UdpQueue::get()->handle( addr, buf, nRead, proc ); +// success = true; +// } else { +// assert(0); +// } +// return success; +// } /* get_process_packet */ void* XWThreadPool::tpool_main( void* closure ) @@ -261,10 +286,11 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip ) logf( XW_LOGINFO, "worker thread got socket %d from queue", socket ); switch ( pr.m_act ) { case Q_READ: - assert( socket >= 0 ); - if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) { - AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ); - } + assert( 0 ); + // assert( socket >= 0 ); + // if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) { + // AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ); + // } break; case Q_KILL: (*m_kFunc)( &pr.m_info.m_addr ); @@ -387,10 +413,12 @@ XWThreadPool::real_listener() for ( ii = 0; ii < nSockets && nEvents > 0; ++ii ) { if ( fds[curfd].revents != 0 ) { - int socket = fds[curfd].fd; - const AddrInfo* addr = &sinfos[curfd].m_addr; - assert( socket == addr->socket() ); - if ( !RemoveSocket( addr ) ) { + // int socket = fds[curfd].fd; + SockInfo* sinfo = &sinfos[curfd]; + const AddrInfo* addr = &sinfo->m_addr; + + assert( fds[curfd].fd == addr->socket() ); + if ( !SocketFound( addr ) ) { /* no further processing if it's been removed while we've been sleeping in poll */ --nEvents; @@ -398,10 +426,14 @@ XWThreadPool::real_listener() } if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) { - enqueue( sinfos[curfd] ); + if ( !UdpQueue::get()->handle( addr, sinfo->m_proc ) ) { + RemoveSocket( addr ); + EnqueueKill( addr, "bad packet" ); + } } else { logf( XW_LOGERROR, "odd revents: %x", fds[curfd].revents ); + RemoveSocket( addr ); EnqueueKill( addr, "error/hup in poll()" ); } --nEvents; diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index f8fa997ed..9ae0b1359 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -74,6 +74,8 @@ class XWThreadPool { /* Remove from set being listened on */ bool RemoveSocket( const AddrInfo* addr ); + /* test if is in set being listened on */ + bool SocketFound( const AddrInfo* addr ); void enqueue( QAction act = Q_READ ); void enqueue( SockInfo si, QAction act = Q_READ ); diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index 724d73c16..17eae630c 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -19,6 +19,7 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +#include #include "udpqueue.h" #include "mlock.h" @@ -38,6 +39,7 @@ UdpThreadClosure::logStats() UdpQueue::UdpQueue() { + m_nextID = 0; pthread_mutex_init ( &m_queueMutex, NULL ); pthread_cond_init( &m_queueCondVar, NULL ); @@ -63,12 +65,47 @@ UdpQueue::get() return s_instance; } +bool +UdpQueue::handle( const AddrInfo* addr, QueueCallback cb ) +{ + bool success = false; + int sock = addr->socket(); + unsigned short msgLen; + ssize_t nRead = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); + if ( 0 == nRead ) { + logf( XW_LOGINFO, "%s: recv(sock=%d) => 0: remote closed", __func__, sock ); + } else if ( nRead != sizeof(msgLen) ) { + logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__, + nRead, strerror(errno) ); + } else { + msgLen = ntohs( msgLen ); + if ( MAX_MSG_LEN <= msgLen ) { + logf( XW_LOGERROR, "%s: message of len %d too large; dropping", __func__, msgLen ); + } else { + unsigned char buf[msgLen]; + nRead = recv( sock, buf, msgLen, MSG_WAITALL ); + if ( nRead == msgLen ) { + logf( XW_LOGINFO, "%s: read %d bytes on socket %d", __func__, nRead, sock ); + handle( addr, buf, msgLen, cb ); + success = true; + } else { + logf( XW_LOGERROR, "%s: second recv failed: %s", __func__, + strerror(errno) ); + } + } + } + return success; +} + void UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, QueueCallback cb ) { UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb ); MutexLock ml( &m_queueMutex ); + int id = ++m_nextID; + utc->setID( id ); + logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); m_queue.push_back( utc ); pthread_cond_signal( &m_queueCondVar ); } @@ -86,6 +123,7 @@ UdpQueue::thread_main() pthread_mutex_unlock( &m_queueMutex ); utc->noteDequeued(); + logf( XW_LOGINFO, "%s: dispatching packet %d", __func__, utc->getID() ); (*utc->cb())( utc ); utc->logStats(); delete utc; diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 57cbda212..a2ee730a8 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -55,6 +55,8 @@ public: void noteDequeued() { m_dequed = time( NULL ); } void logStats(); const QueueCallback cb() const { return m_cb; } + void setID( int id ) { m_id = id; } + int getID( void ) { return m_id; } private: unsigned char* m_buf; @@ -63,6 +65,7 @@ public: QueueCallback m_cb; time_t m_created; time_t m_dequed; + int m_id; }; class UdpQueue { @@ -70,6 +73,7 @@ class UdpQueue { static UdpQueue* get(); UdpQueue(); ~UdpQueue(); + bool handle( const AddrInfo* addr, QueueCallback cb ); void handle( const AddrInfo* addr, unsigned char* buf, int len, QueueCallback cb ); @@ -80,7 +84,7 @@ class UdpQueue { pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar; deque m_queue; - + int m_nextID; }; #endif