diff --git a/xwords4/relay/addrinfo.cpp b/xwords4/relay/addrinfo.cpp index e61f3595d..0fbd607d2 100644 --- a/xwords4/relay/addrinfo.cpp +++ b/xwords4/relay/addrinfo.cpp @@ -21,8 +21,27 @@ #include #include +#include #include "addrinfo.h" +#include "xwrelay_priv.h" +#include "tpool.h" + +void +AddrInfo::construct( int socket, const AddrUnion* saddr, bool isTCP ) +{ + memset( this, 0, sizeof(*this) ); + + struct timespec tp; + clock_gettime( CLOCK_MONOTONIC, &tp ); + m_created = (tp.tv_sec * 1000) + (tp.tv_nsec / 1000); + logf( XW_LOGINFO, "%s: m_created for socket %d: 0x%lx", __func__, socket, m_created ); + + m_socket = socket; + m_isTCP = isTCP; + memcpy( &m_saddr, saddr, sizeof(m_saddr) ); + m_isValid = true; +} bool AddrInfo::equals( const AddrInfo& other ) const @@ -31,6 +50,11 @@ AddrInfo::equals( const AddrInfo& other ) const if ( equal ) { if ( isTCP() ) { equal = m_socket == other.m_socket; + if ( equal && created() != other.created() ) { + logf( XW_LOGINFO, "%s: rejecting on time mismatch (%lx vs %lx)", + __func__, created(), other.created() ); + equal = false; + } } else { // assert( m_socket == other.m_socket ); /* both same UDP socket */ /* what does equal mean on udp addresses? Same host, or same host AND game */ diff --git a/xwords4/relay/addrinfo.h b/xwords4/relay/addrinfo.h index 631e6f19f..7cd7db95f 100644 --- a/xwords4/relay/addrinfo.h +++ b/xwords4/relay/addrinfo.h @@ -57,23 +57,19 @@ class AddrInfo { void setIsTCP( bool val ) { m_isTCP = val; } bool isTCP() const { return m_isTCP; } /* later UDP will be here too */ int socket() const { assert(m_isValid); return m_socket; } - void invalSocket() { m_socket = -1; } ClientToken clientToken() const { assert(m_isValid); return m_clientToken; } struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; } const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; } const AddrUnion* saddr() const { assert(m_isValid); return &m_saddr; } + uint32_t created() const { return m_created; } + bool isCurrent() const { + return XWThreadPool::GetTPool()->IsCurrent( this ); + } bool equals( const AddrInfo& other ) const; private: - void construct( int socket, const AddrUnion* saddr, bool isTCP ) { - memset( this, 0, sizeof(*this) ); - - m_socket = socket; - m_isTCP = isTCP; - memcpy( &m_saddr, saddr, sizeof(m_saddr) ); - m_isValid = true; - } + void construct( int socket, const AddrUnion* saddr, bool isTCP ); // AddrInfo& operator=(const AddrInfo&); // Prevent assignment int m_socket; @@ -81,6 +77,7 @@ class AddrInfo { bool m_isValid; ClientToken m_clientToken; /* must be 32 bit */ AddrUnion m_saddr; + uint32_t m_created; /* microseconds since boot, from clock_gettime() */ }; #endif diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 470f6b3f2..510e3878f 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -859,7 +859,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest ); assert( dest > 0 && dest <= 4 ); - if ( -1 != addr->socket() ) { + if ( addr->isCurrent() ) { for ( ; ; ) { unsigned char buf[MAX_MSG_LEN]; size_t buflen = sizeof(buf); @@ -936,6 +936,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp, { RWWriteLock rwl( &m_socketsRWLock ); HostRec hr( hostid, &evt->addr, nPlayersH, seed, !reconn ); + logf( XW_LOGINFO, "%s: adding socket rec with ts %lx", __func__, + evt->addr.created() ); m_sockets.push_back( hr ); } diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index cf6ddb99d..a88276f29 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -118,13 +118,14 @@ void XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from ) { { + int sock = from->socket(); RWWriteLock ml( &m_activeSocketsRWLock ); SockInfo si; si.m_type = stype; si.m_proc = proc; si.m_addr = *from; - m_activeSockets.push_back( si ); - logf( XW_LOGINFO, "%s: %d sockets active", __func__, + m_activeSockets.insert( pair( sock, si ) ); + logf( XW_LOGINFO, "%s(sock=%d): %d sockets active", __func__, sock, m_activeSockets.size() ); } interrupt_poll(); @@ -138,19 +139,11 @@ XWThreadPool::SocketFound( const AddrInfo* addr ) { RWWriteLock ml( &m_activeSocketsRWLock ); - logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, - m_activeSockets.size() ); - - vector::iterator iter; - for ( iter = m_activeSockets.begin(); - iter != m_activeSockets.end(); ++iter ) { - if ( iter->m_addr.equals( *addr ) ) { - found = true; - break; - } + map::iterator iter = m_activeSockets.find( addr->socket() ); + if ( m_activeSockets.end() != iter + && iter->second.m_addr.equals( *addr ) ) { + found = true; } - logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, - m_activeSockets.size() ); } return found; } @@ -166,14 +159,10 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr ) logf( XW_LOGINFO, "%s: START: %d sockets active", __func__, m_activeSockets.size() ); - vector::iterator iter; - for ( iter = m_activeSockets.begin(); - iter != m_activeSockets.end(); ++iter ) { - if ( iter->m_addr.equals( *addr ) ) { - m_activeSockets.erase( iter ); - found = true; - break; - } + map::iterator iter = m_activeSockets.find( addr->socket() ); + if ( m_activeSockets.end() != iter && iter->second.m_addr.equals( *addr ) ) { + m_activeSockets.erase( iter ); + found = true; } logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__, m_activeSockets.size() ); @@ -186,7 +175,6 @@ XWThreadPool::CloseSocket( const AddrInfo* addr ) { /* bool do_interrupt = false; */ assert( addr->isTCP() ); - UdpQueue::get()->forgetSocket( addr ); if ( !RemoveSocket( addr ) ) { MutexLock ml( &m_queueMutex ); deque::iterator iter = m_queue.begin(); @@ -222,6 +210,28 @@ XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why ) } } +// return true if the addr passed in has a timestamp >= what we have as the +// creation time of the now-open socket. If the socket isn't open, return false. +bool +XWThreadPool::IsCurrent( const AddrInfo* addr ) +{ + bool result = false; + bool sockFound = false; // for debugging + int sock = addr->socket(); + if ( -1 != sock ) { + RWReadLock ml( &m_activeSocketsRWLock ); + map::const_iterator iter = m_activeSockets.find( sock ); + if ( iter != m_activeSockets.end() ) { + assert( !sockFound ); + sockFound = true; + result = iter->second.m_addr.created() <= addr->created(); + logf( XW_LOGINFO, "%s(sock=%d)=>%d (%lx vs %lx)", __func__, sock, result, + iter->second.m_addr.created(), addr->created() ); + } + } + return result; +} + // bool // XWThreadPool::get_process_packet( SockType stype, QueueCallback proc, const AddrInfo* addr ) // { @@ -359,11 +369,11 @@ XWThreadPool::real_listener() #endif ++curfd; - vector::iterator iter; + map::iterator iter; for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end(); ++iter ) { - fds[curfd].fd = iter->m_addr.socket(); - sinfos[curfd] = *iter; + fds[curfd].fd = iter->first; + sinfos[curfd] = iter->second; fds[curfd].events = flags; #ifdef LOG_POLL if ( logCapacity > logLen ) { diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 9ae0b1359..ae65ecb46 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -68,6 +68,8 @@ class XWThreadPool { void EnqueueKill( const AddrInfo* addr, const char* const why ); + bool IsCurrent( const AddrInfo* addr ); + private: typedef enum { Q_READ, Q_KILL } QAction; typedef struct { QAction m_act; SockInfo m_info; } QueuePr; @@ -94,7 +96,7 @@ class XWThreadPool { static void* listener_main( void* closure ); /* Sockets main thread listens on */ - vectorm_activeSockets; + mapm_activeSockets; pthread_rwlock_t m_activeSocketsRWLock; /* Sockets waiting for a thread to read 'em */ diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp index ab08b97a6..c4a89eac3 100644 --- a/xwords4/relay/udpqueue.cpp +++ b/xwords4/relay/udpqueue.cpp @@ -108,55 +108,9 @@ UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); m_queue.push_back( utc ); - int sock = addr->socket(); - map >::iterator iter = m_bySocket.find( sock ); - if ( iter == m_bySocket.end() ) { - logf( XW_LOGINFO, "%s: creating vector for socket %d", __func__, sock ); - vector vect; - vect.push_back( utc ); - m_bySocket.insert( pair >(sock, vect) ); - } else { - iter->second.push_back( utc ); - logf( XW_LOGINFO, "%s: now have %d packets for socket %d", - __func__, iter->second.size(), sock ); - } - pthread_cond_signal( &m_queueCondVar ); } -void -UdpQueue::forgetSocket( const AddrInfo* addr ) -{ - assert( addr->isTCP() ); - int sock = addr->socket(); - MutexLock ml( &m_queueMutex ); - - map >::iterator iter = m_bySocket.find( sock ); - if ( m_bySocket.end() != iter ) { - vector& vect = iter->second; - vector::iterator iter2; - for ( iter2 = vect.begin(); vect.end() != iter2; ++ iter2 ) { - UdpThreadClosure* utc = *iter2; - assert( -1 != utc->addr()->socket() ); - utc->invalSocket(); - logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", - __func__, sock, utc->getID() ); - // vect.erase( iter2 ); - } - vect.clear(); - } - - // deque::iterator iter; - // for ( iter = m_queue.begin(); iter != m_queue.end(); ++iter ) { - // const AddrInfo* addr = (*iter)->addr(); - // if ( sock == addr->socket() ) { - // logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d", - // __func__, sock, (*iter)->getID() ); - // (*iter)->invalSocket(); - // } - // } -} - void* UdpQueue::thread_main() { @@ -168,16 +122,6 @@ UdpQueue::thread_main() UdpThreadClosure* utc = m_queue.front(); m_queue.pop_front(); - int sock = utc->addr()->socket(); - if ( -1 != sock ) { - map >::iterator iter = m_bySocket.find( sock ); - assert ( iter != m_bySocket.end() ); - vector& vect = iter->second; - assert( utc == *vect.begin() ); - vect.erase( vect.begin() ); - logf( XW_LOGINFO, "%s: %d packets remaining for socket %d", - __func__, vect.size(), sock ); - } pthread_mutex_unlock( &m_queueMutex ); utc->noteDequeued(); diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h index 544c47088..890d9d234 100644 --- a/xwords4/relay/udpqueue.h +++ b/xwords4/relay/udpqueue.h @@ -58,7 +58,6 @@ public: const QueueCallback cb() const { return m_cb; } void setID( int id ) { m_id = id; } int getID( void ) { return m_id; } - void invalSocket() { m_addr.invalSocket(); } private: unsigned char* m_buf; @@ -87,7 +86,7 @@ class UdpQueue { pthread_mutex_t m_queueMutex; pthread_cond_t m_queueCondVar; deque m_queue; - map > m_bySocket; + // map > m_bySocket; int m_nextID; }; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 41e4e635b..64f78a0ce 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -448,23 +448,32 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, { assert( !!addr ); bool ok = false; - int socket = addr->socket(); if ( addr->isTCP() ) { - unsigned short len = htons( bufLen ); - ssize_t nSent = send( socket, &len, 2, 0 ); - if ( nSent == 2 ) { - nSent = send( socket, buf, bufLen, 0 ); - if ( nSent == ssize_t(bufLen) ) { - logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); - ok = true; + if ( addr->isCurrent() ) { + int socket = addr->socket(); + unsigned short len = htons( bufLen ); + ssize_t nSent = send( socket, &len, 2, 0 ); + if ( nSent == 2 ) { + nSent = send( socket, buf, bufLen, 0 ); + if ( nSent == ssize_t(bufLen) ) { + logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); + ok = true; + } else { + logf( XW_LOGERROR, "%s: send failed: %s (errno=%d)", __func__, + strerror(errno), errno ); + } } + } else { + logf( XW_LOGINFO, "%s: dropping packet: socket %d reused", + __func__, socket ); } } else { AddrInfo::ClientToken clientToken = addr->clientToken(); assert( 0 != clientToken ); clientToken = htonl(clientToken); const struct sockaddr* saddr = addr->sockaddr(); + int socket = addr->socket(); assert( g_udpsock == socket || socket == -1 ); if ( -1 == socket ) { socket = g_udpsock; @@ -887,37 +896,6 @@ handlePipe( int sig ) logf( XW_LOGINFO, "%s", __func__ ); } -int -read_packet( int sock, unsigned char* buf, int buflen ) -{ - int result = -1; - ssize_t nread; - unsigned short msgLen; - nread = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); - if ( 0 == nread ) { - logf( XW_LOGINFO, "%s: recv => 0: remote closed", __func__ ); - } else if ( nread != sizeof(msgLen) ) { - logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__, - nread, strerror(errno) ); - } else { - msgLen = ntohs( msgLen ); - if ( msgLen >= buflen ) { - logf( XW_LOGERROR, "%s: buf too small; need %d but have %d", - __func__, msgLen, buflen ); - } else { - nread = recv( sock, buf, msgLen, MSG_WAITALL ); - if ( nread == msgLen ) { - result = nread; - } else { - logf( XW_LOGERROR, "%s: second recv failed: %s", __func__, - strerror(errno) ); - } - } - } - - return result; -} /* read_packet */ - static void pushShort( vector& out, unsigned short num ) { @@ -1104,7 +1082,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp, if ( getNetShort( &bufp, end, &len ) ) { if ( handlePutMessage( scr, hid, addr, len, &bufp, end ) ) { continue; - } + } } break; } @@ -1427,7 +1405,7 @@ udp_thread_proc( UdpThreadClosure* utc ) } static void -handle_udp_packet( int udpsock ) +read_udp_packet( int udpsock ) { unsigned char buf[MAX_MSG_LEN]; AddrInfo::AddrUnion saddr; @@ -1951,7 +1929,7 @@ main( int argc, char** argv ) if ( -1 != g_udpsock && FD_ISSET( g_udpsock, &rfds ) ) { // This will need to be done in a separate thread, or pushed // to the existing thread pool - handle_udp_packet( g_udpsock ); + read_udp_packet( g_udpsock ); --retval; } #ifdef DO_HTTP