diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index ffb850281..396c5a4b1 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -130,6 +130,31 @@ XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* fro interrupt_poll(); } +bool +XWThreadPool::SocketFound( const AddrInfo* addr ) +{ + assert( addr->isTCP() ); + bool found = false; + { + RWWriteLock ml( &m_activeSocketsRWLock ); + + logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, + m_activeSockets.size() ); + + vector::iterator iter; + for ( iter = m_activeSockets.begin(); + iter != m_activeSockets.end(); ++iter ) { + if ( iter->m_addr.equals( *addr ) ) { + found = true; + break; + } + } + logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, + m_activeSockets.size() ); + } + return found; +} + bool XWThreadPool::RemoveSocket( const AddrInfo* addr ) { @@ -196,29 +221,29 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) } } -bool -XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) -{ - bool success = false; - short packetSize; - assert( sizeof(packetSize) == 2 ); +// bool +// XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) +// { +// bool success = false; +// short packetSize; +// assert( sizeof(packetSize) == 2 ); - // Fix this to return an allocated buffer - unsigned char buf[MAX_MSG_LEN+1]; - int nRead = read_packet( addr->socket(), buf, sizeof(buf) ); - if ( nRead < 0 ) { - EnqueueKill( addr, "bad packet" ); - } else if ( STYPE_PROXY == stype && NULL != proc ) { - buf[nRead] = '\0'; - UdpQueue::get()->handle( addr, buf, nRead+1, proc ); - } else if ( STYPE_GAME == stype && NULL != proc ) { - UdpQueue::get()->handle( addr, buf, nRead, proc ); - success = true; - } else { - assert(0); - } - return success; -} /* get_process_packet */ +// // Fix this to return an allocated buffer +// unsigned char buf[MAX_MSG_LEN+1]; +// int nRead = read_packet( addr->socket(), buf, sizeof(buf) ); +// if ( nRead < 0 ) { +// EnqueueKill( addr, "bad packet" ); +// } else if ( STYPE_PROXY == stype && NULL != proc ) { +// buf[nRead] = '\0'; +// UdpQueue::get()->handle( addr, buf, nRead+1, proc ); +// } else if ( STYPE_GAME == stype && NULL != proc ) { +// UdpQueue::get()->handle( addr, buf, nRead, proc ); +// success = true; +// } else { +// assert(0); +// } +// return success; +// } /* get_process_packet */ void* XWThreadPool::tpool_main( void* closure ) @@ -261,10 +286,11 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip ) logf( XW_LOGINFO, "worker thread got socket %d from queue", socket ); switch ( pr.m_act ) { case Q_READ: - assert( socket >= 0 ); - if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) { - AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ); - } + assert( 0 ); + // assert( socket >= 0 ); + // if ( get_process_packet( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ) ) { + // AddSocket( pr.m_info.m_type, pr.m_info.m_proc, &pr.m_info.m_addr ); + // } break; case Q_KILL: (*m_kFunc)( &pr.m_info.m_addr ); @@ -387,10 +413,12 @@ XWThreadPool::real_listener() for ( ii = 0; ii < nSockets && nEvents > 0; ++ii ) { if ( fds[curfd].revents != 0 ) { - int socket = fds[curfd].fd; - const AddrInfo* addr = &sinfos[curfd].m_addr; - assert( socket == addr->socket() ); - if ( !RemoveSocket( addr ) ) { + // int socket = fds[curfd].fd; + SockInfo* sinfo = &sinfos[curfd]; + const AddrInfo* addr = &sinfo->m_addr; + + assert( fds[curfd].fd == addr->socket() ); + if ( !SocketFound( addr ) ) { /* no further processing if it's been removed while we've been sleeping in poll */ --nEvents; @@ -398,10 +426,14 @@ XWThreadPool::real_listener() } if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) { - enqueue( sinfos[curfd] ); + if ( !UdpQueue::get()->handle( addr, sinfo->m_proc ) ) { + RemoveSocket( addr ); + EnqueueKill( addr, "bad packet" ); + } } else { logf( XW_LOGERROR, "odd revents: %x", fds[curfd].revents ); + RemoveSocket( addr ); EnqueueKill( addr, "error/hup in poll()" ); } --nEvents; diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index f8fa997ed..9ae0b1359 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -74,6 +74,8 @@ class XWThreadPool { /* Remove from set being listened on */ bool RemoveSocket( const AddrInfo* addr ); + /* test if is in set being listened on */ + bool SocketFound( const AddrInfo* addr ); void enqueue( QAction act = Q_READ ); void enqueue( SockInfo si, QAction act = Q_READ ); diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index 724d73c16..17eae630c 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -19,6 +19,7 @@ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +#include #include "udpqueue.h" #include "mlock.h" @@ -38,6 +39,7 @@ UdpThreadClosure::logStats() UdpQueue::UdpQueue() { + m_nextID = 0; pthread_mutex_init ( &m_queueMutex, NULL ); pthread_cond_init( &m_queueCondVar, NULL ); @@ -63,12 +65,47 @@ UdpQueue::get() return s_instance; } +bool +UdpQueue::handle( const AddrInfo* addr, QueueCallback cb ) +{ + bool success = false; + int sock = addr->socket(); + unsigned short msgLen; + ssize_t nRead = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); + if ( 0 == nRead ) { + logf( XW_LOGINFO, "%s: recv(sock=%d) => 0: remote closed", __func__, sock ); + } else if ( nRead != sizeof(msgLen) ) { + logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__, + nRead, strerror(errno) ); + } else { + msgLen = ntohs( msgLen ); + if ( MAX_MSG_LEN <= msgLen ) { + logf( XW_LOGERROR, "%s: message of len %d too large; dropping", __func__, msgLen ); + } else { + unsigned char buf[msgLen]; + nRead = recv( sock, buf, msgLen, MSG_WAITALL ); + if ( nRead == msgLen ) { + logf( XW_LOGINFO, "%s: read %d bytes on socket %d", __func__, nRead, sock ); + handle( addr, buf, msgLen, cb ); + success = true; + } else { + logf( XW_LOGERROR, "%s: second recv failed: %s", __func__, + strerror(errno) ); + } + } + } + return success; +} + void UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, QueueCallback cb ) { UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb ); MutexLock ml( &m_queueMutex ); + int id = ++m_nextID; + utc->setID( id ); + logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); m_queue.push_back( utc ); pthread_cond_signal( &m_queueCondVar ); } @@ -86,6 +123,7 @@ UdpQueue::thread_main() pthread_mutex_unlock( &m_queueMutex ); utc->noteDequeued(); + logf( XW_LOGINFO, "%s: dispatching packet %d", __func__, utc->getID() ); (*utc->cb())( utc ); utc->logStats(); delete utc; diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 57cbda212..a2ee730a8 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -55,6 +55,8 @@ public: void noteDequeued() { m_dequed = time( NULL ); } void logStats(); const QueueCallback cb() const { return m_cb; } + void setID( int id ) { m_id = id; } + int getID( void ) { return m_id; } private: unsigned char* m_buf; @@ -63,6 +65,7 @@ public: QueueCallback m_cb; time_t m_created; time_t m_dequed; + int m_id; }; class UdpQueue { @@ -70,6 +73,7 @@ class UdpQueue { static UdpQueue* get(); UdpQueue(); ~UdpQueue(); + bool handle( const AddrInfo* addr, QueueCallback cb ); void handle( const AddrInfo* addr, unsigned char* buf, int len, QueueCallback cb ); @@ -80,7 +84,7 @@ class UdpQueue { pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar; deque m_queue; - + int m_nextID; }; #endif