From 2a35fac1e89a5f6107e1f726e09b6e666e5fc4d5 Mon Sep 17 00:00:00 2001 From: Eric House Date: Fri, 21 Jun 2013 06:05:26 -0700 Subject: [PATCH] rather than invalidating socket in AddrRec when it's closed, add a timestamp set when it's opened. Older copies with the same socket can be tested against the cannonical copy maintained by tpool and sending avoided when the timestamp shows the endpoint has likely changed. Change tpool's list of sockets to a map for faster lookup, and get rid of similar structure in udpqueue. --- xwords4/relay/addrinfo.cpp | 24 +++++++++++++++ xwords4/relay/addrinfo.h | 15 ++++----- xwords4/relay/cref.cpp | 4 ++- xwords4/relay/tpool.cpp | 62 ++++++++++++++++++++++---------------- xwords4/relay/tpool.h | 4 ++- xwords4/relay/udpqueue.cpp | 56 ---------------------------------- xwords4/relay/udpqueue.h | 3 +- xwords4/relay/xwrelay.cpp | 62 ++++++++++++-------------------------- 8 files changed, 93 insertions(+), 137 deletions(-) diff --git a/xwords4/relay/addrinfo.cpp b/xwords4/relay/addrinfo.cpp index e61f3595d..0fbd607d2 100644 --- a/xwords4/relay/addrinfo.cpp +++ b/xwords4/relay/addrinfo.cpp @@ -21,8 +21,27 @@ #include #include +#include #include "addrinfo.h" +#include "xwrelay_priv.h" +#include "tpool.h" + +void +AddrInfo::construct( int socket, const AddrUnion* saddr, bool isTCP ) +{ + memset( this, 0, sizeof(*this) ); + + struct timespec tp; + clock_gettime( CLOCK_MONOTONIC, &tp ); + m_created = (tp.tv_sec * 1000) + (tp.tv_nsec / 1000); + logf( XW_LOGINFO, "%s: m_created for socket %d: 0x%lx", __func__, socket, m_created ); + + m_socket = socket; + m_isTCP = isTCP; + memcpy( &m_saddr, saddr, sizeof(m_saddr) ); + m_isValid = true; +} bool AddrInfo::equals( const AddrInfo& other ) const @@ -31,6 +50,11 @@ AddrInfo::equals( const AddrInfo& other ) const if ( equal ) { if ( isTCP() ) { equal = m_socket == other.m_socket; + if ( equal && created() != other.created() ) { + logf( XW_LOGINFO, "%s: rejecting on time mismatch (%lx vs %lx)", + __func__, created(), other.created() ); + equal = false; + } } else { // assert( m_socket == other.m_socket ); /* both same UDP socket */ /* what does equal mean on udp addresses? Same host, or same host AND game */ diff --git a/xwords4/relay/addrinfo.h b/xwords4/relay/addrinfo.h index 631e6f19f..7cd7db95f 100644 --- a/xwords4/relay/addrinfo.h +++ b/xwords4/relay/addrinfo.h @@ -57,23 +57,19 @@ class AddrInfo { void setIsTCP( bool val ) { m_isTCP = val; } bool isTCP() const { return m_isTCP; } /* later UDP will be here too */ int socket() const { assert(m_isValid); return m_socket; } - void invalSocket() { m_socket = -1; } ClientToken clientToken() const { assert(m_isValid); return m_clientToken; } struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; } const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; } const AddrUnion* saddr() const { assert(m_isValid); return &m_saddr; } + uint32_t created() const { return m_created; } + bool isCurrent() const { + return XWThreadPool::GetTPool()->IsCurrent( this ); + } bool equals( const AddrInfo& other ) const; private: - void construct( int socket, const AddrUnion* saddr, bool isTCP ) { - memset( this, 0, sizeof(*this) ); - - m_socket = socket; - m_isTCP = isTCP; - memcpy( &m_saddr, saddr, sizeof(m_saddr) ); - m_isValid = true; - } + void construct( int socket, const AddrUnion* saddr, bool isTCP ); // AddrInfo& operator=(const AddrInfo&); // Prevent assignment int m_socket; @@ -81,6 +77,7 @@ class AddrInfo { bool m_isValid; ClientToken m_clientToken; /* must be 32 bit */ AddrUnion m_saddr; + uint32_t m_created; /* microseconds since boot, from clock_gettime() */ }; #endif diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 470f6b3f2..510e3878f 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -859,7 +859,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest ); assert( dest > 0 && dest <= 4 ); - if ( -1 != addr->socket() ) { + if ( addr->isCurrent() ) { for ( ; ; ) { unsigned char buf[MAX_MSG_LEN]; size_t buflen = sizeof(buf); @@ -936,6 +936,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, { RWWriteLock rwl( &m_socketsRWLock ); HostRec hr( hostid, &evt->addr, nPlayersH, seed, !reconn ); + logf( XW_LOGINFO, "%s: adding socket rec with ts %lx", __func__, + evt->addr.created() ); m_sockets.push_back( hr ); } diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index cf6ddb99d..a88276f29 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -118,13 +118,14 @@ void XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from ) { { + int sock = from->socket(); RWWriteLock ml( &m_activeSocketsRWLock ); SockInfo si; si.m_type = stype; si.m_proc = proc; si.m_addr = *from; - m_activeSockets.push_back( si ); - logf( XW_LOGINFO, "%s: %d sockets active", __func__, + m_activeSockets.insert( pair( sock, si ) ); + logf( XW_LOGINFO, "%s(sock=%d): %d sockets active", __func__, sock, m_activeSockets.size() ); } interrupt_poll(); @@ -138,19 +139,11 @@ XWThreadPool::SocketFound( const AddrInfo* addr ) { RWWriteLock ml( &m_activeSocketsRWLock ); - logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, - m_activeSockets.size() ); - - vector::iterator iter; - for ( iter = m_activeSockets.begin(); - iter != m_activeSockets.end(); ++iter ) { - if ( iter->m_addr.equals( *addr ) ) { - found = true; - break; - } + map::iterator iter = m_activeSockets.find( addr->socket() ); + if ( m_activeSockets.end() != iter + && iter->second.m_addr.equals( *addr ) ) { + found = true; } - logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, - m_activeSockets.size() ); } return found; } @@ -166,14 +159,10 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr ) logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, m_activeSockets.size() ); - vector::iterator iter; - for ( iter = m_activeSockets.begin(); - iter != m_activeSockets.end(); ++iter ) { - if ( iter->m_addr.equals( *addr ) ) { - m_activeSockets.erase( iter ); - found = true; - break; - } + map::iterator iter = m_activeSockets.find( addr->socket() ); + if ( m_activeSockets.end() != iter && iter->second.m_addr.equals( *addr ) ) { + m_activeSockets.erase( iter ); + found = true; } logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, m_activeSockets.size() ); @@ -186,7 +175,6 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) { /* bool do_interrupt = false; */ assert( addr->isTCP() ); - UdpQueue::get()->forgetSocket( addr ); if ( !RemoveSocket( addr ) ) { MutexLock ml( &m_queueMutex ); deque::iterator iter = m_queue.begin(); @@ -222,6 +210,28 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) } } +// return true if the addr passed in has a timestamp >= what we have as the +// creation time of the now-open socket. If the socket isn't open, return false. +bool +XWThreadPool::IsCurrent( const AddrInfo* addr ) +{ + bool result = false; + bool sockFound = false; // for debugging + int sock = addr->socket(); + if ( -1 != sock ) { + RWReadLock ml( &m_activeSocketsRWLock ); + map::const_iterator iter = m_activeSockets.find( sock ); + if ( iter != m_activeSockets.end() ) { + assert( !sockFound ); + sockFound = true; + result = iter->second.m_addr.created() <= addr->created(); + logf( XW_LOGINFO, "%s(sock=%d)=>%d (%lx vs %lx)", __func__, sock, result, + iter->second.m_addr.created(), addr->created() ); + } + } + return result; +} + // bool // XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) // { @@ -359,11 +369,11 @@ XWThreadPool::real_listener() #endif ++curfd; - vector::iterator iter; + map::iterator iter; for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end(); ++iter ) { - fds[curfd].fd = iter->m_addr.socket(); - sinfos[curfd] = *iter; + fds[curfd].fd = iter->first; + sinfos[curfd] = iter->second; fds[curfd].events = flags; #ifdef LOG_POLL if ( logCapacity > logLen ) { diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 9ae0b1359..ae65ecb46 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -68,6 +68,8 @@ class XWThreadPool { void EnqueueKill( const AddrInfo* addr, const char* const why ); + bool IsCurrent( const AddrInfo* addr ); + private: typedef enum { Q_READ, Q_KILL } QAction; typedef struct { QAction m_act; SockInfo m_info; } QueuePr; @@ -94,7 +96,7 @@ class XWThreadPool { static void* listener_main( void* closure ); /* Sockets main thread listens on */ - vectorm_activeSockets; + mapm_activeSockets; pthread_rwlock_t m_activeSocketsRWLock; /* Sockets waiting for a thread to read 'em */ diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index ab08b97a6..c4a89eac3 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -108,55 +108,9 @@ UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); m_queue.push_back( utc ); - int sock = addr->socket(); - map >::iterator iter = m_bySocket.find( sock ); - if ( iter == m_bySocket.end() ) { - logf( XW_LOGINFO, "%s: creating vector for socket %d", __func__, sock ); - vector vect; - vect.push_back( utc ); - m_bySocket.insert( pair >(sock, vect) ); - } else { - iter->second.push_back( utc ); - logf( XW_LOGINFO, "%s: now have %d packets for socket %d", - __func__, iter->second.size(), sock ); - } - pthread_cond_signal( &m_queueCondVar ); } -void -UdpQueue::forgetSocket( const AddrInfo* addr ) -{ - assert( addr->isTCP() ); - int sock = addr->socket(); - MutexLock ml( &m_queueMutex ); - - map >::iterator iter = m_bySocket.find( sock ); - if ( m_bySocket.end() != iter ) { - vector& vect = iter->second; - vector::iterator iter2; - for ( iter2 = vect.begin(); vect.end() != iter2; ++ iter2 ) { - UdpThreadClosure* utc = *iter2; - assert( -1 != utc->addr()->socket() ); - utc->invalSocket(); - logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", - __func__, sock, utc->getID() ); - // vect.erase( iter2 ); - } - vect.clear(); - } - - // deque::iterator iter; - // for ( iter = m_queue.begin(); iter != m_queue.end(); ++iter ) { - // const AddrInfo* addr = (*iter)->addr(); - // if ( sock == addr->socket() ) { - // logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", - // __func__, sock, (*iter)->getID() ); - // (*iter)->invalSocket(); - // } - // } -} - void* UdpQueue::thread_main() { @@ -168,16 +122,6 @@ UdpQueue::thread_main() UdpThreadClosure* utc = m_queue.front(); m_queue.pop_front(); - int sock = utc->addr()->socket(); - if ( -1 != sock ) { - map >::iterator iter = m_bySocket.find( sock ); - assert ( iter != m_bySocket.end() ); - vector& vect = iter->second; - assert( utc == *vect.begin() ); - vect.erase( vect.begin() ); - logf( XW_LOGINFO, "%s: %d packets remaining for socket %d", - __func__, vect.size(), sock ); - } pthread_mutex_unlock( &m_queueMutex ); utc->noteDequeued(); diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 544c47088..890d9d234 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -58,7 +58,6 @@ public: const QueueCallback cb() const { return m_cb; } void setID( int id ) { m_id = id; } int getID( void ) { return m_id; } - void invalSocket() { m_addr.invalSocket(); } private: unsigned char* m_buf; @@ -87,7 +86,7 @@ class UdpQueue { pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar; deque m_queue; - map > m_bySocket; + // map > m_bySocket; int m_nextID; }; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 41e4e635b..64f78a0ce 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -448,23 +448,32 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, { assert( !!addr ); bool ok = false; - int socket = addr->socket(); if ( addr->isTCP() ) { - unsigned short len = htons( bufLen ); - ssize_t nSent = send( socket, &len, 2, 0 ); - if ( nSent == 2 ) { - nSent = send( socket, buf, bufLen, 0 ); - if ( nSent == ssize_t(bufLen) ) { - logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); - ok = true; + if ( addr->isCurrent() ) { + int socket = addr->socket(); + unsigned short len = htons( bufLen ); + ssize_t nSent = send( socket, &len, 2, 0 ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == ssize_t(bufLen) ) { + logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); + ok = true; + } else { + logf( XW_LOGERROR, "%s: send failed: %s (errno=%d)", __func__, + strerror(errno), errno ); + } } + } else { + logf( XW_LOGINFO, "%s: dropping packet: socket %d reused", + __func__, socket ); } } else { AddrInfo::ClientToken clientToken = addr->clientToken(); assert( 0 != clientToken ); clientToken = htonl(clientToken); const struct sockaddr* saddr = addr->sockaddr(); + int socket = addr->socket(); assert( g_udpsock == socket || socket == -1 ); if ( -1 == socket ) { socket = g_udpsock; @@ -887,37 +896,6 @@ handlePipe( int sig ) logf( XW_LOGINFO, "%s", __func__ ); } -int -read_packet( int sock, unsigned char* buf, int buflen ) -{ - int result = -1; - ssize_t nread; - unsigned short msgLen; - nread = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); - if ( 0 == nread ) { - logf( XW_LOGINFO, "%s: recv => 0: remote closed", __func__ ); - } else if ( nread != sizeof(msgLen) ) { - logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__, - nread, strerror(errno) ); - } else { - msgLen = ntohs( msgLen ); - if ( msgLen >= buflen ) { - logf( XW_LOGERROR, "%s: buf too small; need %d but have %d", - __func__, msgLen, buflen ); - } else { - nread = recv( sock, buf, msgLen, MSG_WAITALL ); - if ( nread == msgLen ) { - result = nread; - } else { - logf( XW_LOGERROR, "%s: second recv failed: %s", __func__, - strerror(errno) ); - } - } - } - - return result; -} /* read_packet */ - static void pushShort( vector& out, unsigned short num ) { @@ -1104,7 +1082,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp, if ( getNetShort( &bufp, end, &len ) ) { if ( handlePutMessage( scr, hid, addr, len, &bufp, end ) ) { continue; - } + } } break; } @@ -1427,7 +1405,7 @@ udp_thread_proc( UdpThreadClosure* utc ) } static void -handle_udp_packet( int udpsock ) +read_udp_packet( int udpsock ) { unsigned char buf[MAX_MSG_LEN]; AddrInfo::AddrUnion saddr; @@ -1951,7 +1929,7 @@ main( int argc, char** argv ) if ( -1 != g_udpsock && FD_ISSET( g_udpsock, &rfds ) ) { // This will need to be done in a separate thread, or pushed // to the existing thread pool - handle_udp_packet( g_udpsock ); + read_udp_packet( g_udpsock ); --retval; } #ifdef DO_HTTP