From 4ad6696ecc43a75971fdf30181ebefc07ac2fce5 Mon Sep 17 00:00:00 2001 From: Eric House Date: Fri, 2 Aug 2013 22:33:06 -0700 Subject: [PATCH] add delete-on-ack logic to another message sending site, and fix assertion failure caused by not providing a client token on messages being sent. --- xwords4/relay/cref.cpp | 19 ++++++++++++------- xwords4/relay/cref.h | 7 ++++++- xwords4/relay/udpack.cpp | 23 ++++++++++++++--------- xwords4/relay/udpack.h | 6 ++++-- xwords4/relay/xwrelay.cpp | 27 +++++++++++++++++++-------- xwords4/relay/xwrelay_priv.h | 5 ++++- 6 files changed, 59 insertions(+), 28 deletions(-) diff --git a/xwords4/relay/cref.cpp b/xwords4/relay/cref.cpp index 68b4fae14..2d53d5b24 100644 --- a/xwords4/relay/cref.cpp +++ b/xwords4/relay/cref.cpp @@ -42,6 +42,7 @@ #include "crefmgr.h" #include "devmgr.h" #include "permid.h" +#include "udpack.h" using namespace std; @@ -813,10 +814,11 @@ CookieRef::handleEvents() bool CookieRef::send_with_length( const AddrInfo* addr, HostID dest, - const unsigned char* buf, int bufLen, bool cascade ) + const unsigned char* buf, int bufLen, bool cascade, + uint32_t* packetIDP ) { bool failed = false; - if ( send_with_length_unsafe( addr, buf, bufLen ) ) { + if ( send_with_length_unsafe( addr, buf, bufLen, packetIDP ) ) { if ( HOST_ID_NONE == dest ) { dest = HostForSocket(addr); } @@ -829,7 +831,7 @@ CookieRef::send_with_length( const AddrInfo* addr, HostID dest, failed = true; } - if ( failed && cascade ) { + if ( failed && cascade && addr->isTCP() ) { pushRemoveSocketEvent( addr ); XWThreadPool::GetTPool()->CloseSocket( addr ); } @@ -870,12 +872,15 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr ) for ( iter = msgs.begin(); addr->isCurrent() && msgs.end() != iter; ++iter ) { DBMgr::MsgInfo msg = *iter; - if ( ! send_with_length( addr, dest, - (const unsigned char*)msg.msg.c_str(), - msg.msg.length(), true ) ) { + uint32_t packetID; + if ( !send_with_length( addr, dest, + (const unsigned char*)msg.msg.c_str(), + msg.msg.length(), true, &packetID ) ) { break; } - sentIDs.push_back( msg.msgID ); + if ( !UDPAckTrack::setOnAck( onMsgAcked, packetID, (void*)msg.msgID ) ) { + sentIDs.push_back( msg.msgID ); + } } dbmgr->RemoveStoredMessages( sentIDs ); } diff --git a/xwords4/relay/cref.h b/xwords4/relay/cref.h index 773a2fdeb..169161d99 100644 --- a/xwords4/relay/cref.h +++ b/xwords4/relay/cref.h @@ -196,7 +196,12 @@ class CookieRef { }; bool send_with_length( const AddrInfo* addr, HostID hid, - const unsigned char* buf, int bufLen, bool cascade ); + const unsigned char* buf, int bufLen, bool cascade ) { + return send_with_length( addr, hid, buf, bufLen, cascade, NULL ); + } + bool send_with_length( const AddrInfo* addr, HostID hid, + const unsigned char* buf, int bufLen, bool cascade, + uint32_t* packetIDP ); void send_msg( const AddrInfo* addr, HostID id, XWRelayMsg msg, XWREASON why, bool cascade ); void pushConnectEvent( int clientVersion, DevID* devID, diff --git a/xwords4/relay/udpack.cpp b/xwords4/relay/udpack.cpp index 8dd0d6746..6100d48fd 100644 --- a/xwords4/relay/udpack.cpp +++ b/xwords4/relay/udpack.cpp @@ -38,6 +38,7 @@ UDPAckTrack::nextPacketID( XWRelayReg cmd ) uint32_t result = 0; if ( shouldAck( cmd ) ) { result = get()->nextPacketIDImpl(); + assert( PACKETID_NONE != result ); } return result; } @@ -48,10 +49,10 @@ UDPAckTrack::recordAck( uint32_t packetID ) get()->recordAckImpl( packetID ); } -/* static */ void +/* static */ bool UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data ) { - get()->setOnAckImpl( proc, packetID, data ); + return get()->setOnAckImpl( proc, packetID, data ); } /* static */ UDPAckTrack* @@ -65,7 +66,7 @@ UDPAckTrack::get() UDPAckTrack::UDPAckTrack() { - m_nextID = 0; + m_nextID = PACKETID_NONE; pthread_mutex_init( &m_mutex, NULL ); pthread_t thread; @@ -103,15 +104,19 @@ UDPAckTrack::recordAckImpl( uint32_t packetID ) } } -void +bool UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ) { - MutexLock ml( &m_mutex ); - map::iterator iter = m_pendings.find( packetID ); - if ( m_pendings.end() != iter ) { - iter->second.proc = proc; - iter->second.data = data; + bool canAdd = PACKETID_NONE != packetID; + if ( canAdd ) { + MutexLock ml( &m_mutex ); + map::iterator iter = m_pendings.find( packetID ); + if ( m_pendings.end() != iter ) { + iter->second.proc = proc; + iter->second.data = data; + } } + return canAdd; } void diff --git a/xwords4/relay/udpack.h b/xwords4/relay/udpack.h index 1a9c4de7c..50244f936 100644 --- a/xwords4/relay/udpack.h +++ b/xwords4/relay/udpack.h @@ -35,9 +35,11 @@ class AckRecord { class UDPAckTrack { public: + static const uint32_t PACKETID_NONE = 0; + static uint32_t nextPacketID( XWRelayReg cmd ); static void recordAck( uint32_t packetID ); - static void setOnAck( OnAckProc proc, uint32_t packetID, void* data ); + static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data ); static bool shouldAck( XWRelayReg cmd ); private: @@ -46,7 +48,7 @@ class UDPAckTrack { UDPAckTrack(); uint32_t nextPacketIDImpl(); void recordAckImpl( uint32_t packetID ); - void setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); + bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); void callProc( uint32_t packetID, bool acked, const AckRecord* record ); void* threadProc(); diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index 51a6bf3e1..79e9d470d 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -416,7 +416,7 @@ denyConnection( const AddrInfo* addr, XWREASON err ) buf[0] = XWRELAY_CONNECTDENIED; buf[1] = err; - send_with_length_unsafe( addr, buf, sizeof(buf) ); + send_with_length_unsafe( addr, buf, sizeof(buf), NULL ); } static ssize_t @@ -525,10 +525,10 @@ send_via_udp( int socket, const struct sockaddr* dest_addr, } static bool -send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf, - const size_t bufLen, uint32_t* packetIDP ) +send_msg_via_udp( const AddrInfo* addr, AddrInfo::ClientToken clientToken, + const unsigned char* buf, const size_t bufLen, + uint32_t* packetIDP ) { - const AddrInfo::ClientToken clientToken = addr->clientToken(); assert( 0 != clientToken ); uint32_t asNetTok = htonl(clientToken); ssize_t nSent = send_via_udp( addr, packetIDP, XWPDEV_MSG, &asNetTok, @@ -541,11 +541,19 @@ send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf, return result; } +static bool +send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf, + const size_t bufLen, uint32_t* packetIDP ) +{ + return send_msg_via_udp( addr, addr->clientToken(), buf, + bufLen, packetIDP ); +} + /* No mutex here. Caller better be ensuring no other thread can access this * socket. */ bool send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, - const size_t bufLen ) + const size_t bufLen, uint32_t* packetIDP ) { assert( !!addr ); bool ok = false; @@ -570,8 +578,11 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, logf( XW_LOGINFO, "%s: dropping packet: socket %d reused", __func__, socket ); } + if ( NULL != packetIDP ) { + *packetIDP = UDPAckTrack::PACKETID_NONE; + } } else { - ok = send_msg_via_udp( addr, buf, bufLen, NULL ); + ok = send_msg_via_udp( addr, buf, bufLen, packetIDP ); } if ( !ok ) { @@ -1325,7 +1336,7 @@ registerDevice( const DevID* devID, const AddrInfo* addr ) } } -static void +void onMsgAcked( bool acked, uint32_t packetID, void* data ) { logf( XW_LOGINFO, "%s(packetID=%d)", __func__, packetID ); @@ -1347,7 +1358,7 @@ retrieveMessages( DevID& devID, const AddrInfo* addr ) for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) { DBMgr::MsgInfo msg = *iter; uint32_t packetID; - if ( !send_msg_via_udp( addr, (unsigned char*)msg.msg.c_str(), + if ( !send_msg_via_udp( addr, msg.token, (unsigned char*)msg.msg.c_str(), msg.msg.length(), &packetID ) ) { break; } diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index ef9cfbeac..898486308 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -46,7 +46,8 @@ bool willLog( XW_LogLevel level ); void denyConnection( const AddrInfo* addr, XWREASON err ); bool send_with_length_unsafe( const AddrInfo* addr, - const unsigned char* buf, size_t bufLen ); + const unsigned char* buf, size_t bufLen, + uint32_t* packetIDP ); void send_havemsgs( const AddrInfo* addr ); time_t uptime(void); @@ -61,6 +62,8 @@ void string_printf( std::string& str, const char* fmt, ... ); int read_packet( int sock, unsigned char* buf, int buflen ); +void onMsgAcked( bool acked, uint32_t packetID, void* data ); + const char* cmdToStr( XWRELAY_Cmd cmd ); extern class ListenerMgr g_listeners;