add delete-on-ack logic to another message sending site, and fix

assertion failure caused by not providing a client token on messages
being sent.
This commit is contained in:
Eric House 2013-08-02 22:33:06 -07:00
parent ed0c0fe3a5
commit 4ad6696ecc
6 changed files with 59 additions and 28 deletions

View file

@ -42,6 +42,7 @@
#include "crefmgr.h" #include "crefmgr.h"
#include "devmgr.h" #include "devmgr.h"
#include "permid.h" #include "permid.h"
#include "udpack.h"
using namespace std; using namespace std;
@ -813,10 +814,11 @@ CookieRef::handleEvents()
bool bool
CookieRef::send_with_length( const AddrInfo* addr, HostID dest, CookieRef::send_with_length( const AddrInfo* addr, HostID dest,
const unsigned char* buf, int bufLen, bool cascade ) const unsigned char* buf, int bufLen, bool cascade,
uint32_t* packetIDP )
{ {
bool failed = false; bool failed = false;
if ( send_with_length_unsafe( addr, buf, bufLen ) ) { if ( send_with_length_unsafe( addr, buf, bufLen, packetIDP ) ) {
if ( HOST_ID_NONE == dest ) { if ( HOST_ID_NONE == dest ) {
dest = HostForSocket(addr); dest = HostForSocket(addr);
} }
@ -829,7 +831,7 @@ CookieRef::send_with_length( const AddrInfo* addr, HostID dest,
failed = true; failed = true;
} }
if ( failed && cascade ) { if ( failed && cascade && addr->isTCP() ) {
pushRemoveSocketEvent( addr ); pushRemoveSocketEvent( addr );
XWThreadPool::GetTPool()->CloseSocket( addr ); XWThreadPool::GetTPool()->CloseSocket( addr );
} }
@ -870,12 +872,15 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
for ( iter = msgs.begin(); addr->isCurrent() && msgs.end() != iter; for ( iter = msgs.begin(); addr->isCurrent() && msgs.end() != iter;
++iter ) { ++iter ) {
DBMgr::MsgInfo msg = *iter; DBMgr::MsgInfo msg = *iter;
if ( ! send_with_length( addr, dest, uint32_t packetID;
(const unsigned char*)msg.msg.c_str(), if ( !send_with_length( addr, dest,
msg.msg.length(), true ) ) { (const unsigned char*)msg.msg.c_str(),
msg.msg.length(), true, &packetID ) ) {
break; break;
} }
sentIDs.push_back( msg.msgID ); if ( !UDPAckTrack::setOnAck( onMsgAcked, packetID, (void*)msg.msgID ) ) {
sentIDs.push_back( msg.msgID );
}
} }
dbmgr->RemoveStoredMessages( sentIDs ); dbmgr->RemoveStoredMessages( sentIDs );
} }

View file

@ -196,7 +196,12 @@ class CookieRef {
}; };
bool send_with_length( const AddrInfo* addr, HostID hid, bool send_with_length( const AddrInfo* addr, HostID hid,
const unsigned char* buf, int bufLen, bool cascade ); const unsigned char* buf, int bufLen, bool cascade ) {
return send_with_length( addr, hid, buf, bufLen, cascade, NULL );
}
bool send_with_length( const AddrInfo* addr, HostID hid,
const unsigned char* buf, int bufLen, bool cascade,
uint32_t* packetIDP );
void send_msg( const AddrInfo* addr, HostID id, void send_msg( const AddrInfo* addr, HostID id,
XWRelayMsg msg, XWREASON why, bool cascade ); XWRelayMsg msg, XWREASON why, bool cascade );
void pushConnectEvent( int clientVersion, DevID* devID, void pushConnectEvent( int clientVersion, DevID* devID,

View file

@ -38,6 +38,7 @@ UDPAckTrack::nextPacketID( XWRelayReg cmd )
uint32_t result = 0; uint32_t result = 0;
if ( shouldAck( cmd ) ) { if ( shouldAck( cmd ) ) {
result = get()->nextPacketIDImpl(); result = get()->nextPacketIDImpl();
assert( PACKETID_NONE != result );
} }
return result; return result;
} }
@ -48,10 +49,10 @@ UDPAckTrack::recordAck( uint32_t packetID )
get()->recordAckImpl( packetID ); get()->recordAckImpl( packetID );
} }
/* static */ void /* static */ bool
UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data ) UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data )
{ {
get()->setOnAckImpl( proc, packetID, data ); return get()->setOnAckImpl( proc, packetID, data );
} }
/* static */ UDPAckTrack* /* static */ UDPAckTrack*
@ -65,7 +66,7 @@ UDPAckTrack::get()
UDPAckTrack::UDPAckTrack() UDPAckTrack::UDPAckTrack()
{ {
m_nextID = 0; m_nextID = PACKETID_NONE;
pthread_mutex_init( &m_mutex, NULL ); pthread_mutex_init( &m_mutex, NULL );
pthread_t thread; pthread_t thread;
@ -103,15 +104,19 @@ UDPAckTrack::recordAckImpl( uint32_t packetID )
} }
} }
void bool
UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ) UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data )
{ {
MutexLock ml( &m_mutex ); bool canAdd = PACKETID_NONE != packetID;
map<uint32_t, AckRecord>::iterator iter = m_pendings.find( packetID ); if ( canAdd ) {
if ( m_pendings.end() != iter ) { MutexLock ml( &m_mutex );
iter->second.proc = proc; map<uint32_t, AckRecord>::iterator iter = m_pendings.find( packetID );
iter->second.data = data; if ( m_pendings.end() != iter ) {
iter->second.proc = proc;
iter->second.data = data;
}
} }
return canAdd;
} }
void void

View file

@ -35,9 +35,11 @@ class AckRecord {
class UDPAckTrack { class UDPAckTrack {
public: public:
static const uint32_t PACKETID_NONE = 0;
static uint32_t nextPacketID( XWRelayReg cmd ); static uint32_t nextPacketID( XWRelayReg cmd );
static void recordAck( uint32_t packetID ); static void recordAck( uint32_t packetID );
static void setOnAck( OnAckProc proc, uint32_t packetID, void* data ); static bool setOnAck( OnAckProc proc, uint32_t packetID, void* data );
static bool shouldAck( XWRelayReg cmd ); static bool shouldAck( XWRelayReg cmd );
private: private:
@ -46,7 +48,7 @@ class UDPAckTrack {
UDPAckTrack(); UDPAckTrack();
uint32_t nextPacketIDImpl(); uint32_t nextPacketIDImpl();
void recordAckImpl( uint32_t packetID ); void recordAckImpl( uint32_t packetID );
void setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data ); bool setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data );
void callProc( uint32_t packetID, bool acked, const AckRecord* record ); void callProc( uint32_t packetID, bool acked, const AckRecord* record );
void* threadProc(); void* threadProc();

View file

@ -416,7 +416,7 @@ denyConnection( const AddrInfo* addr, XWREASON err )
buf[0] = XWRELAY_CONNECTDENIED; buf[0] = XWRELAY_CONNECTDENIED;
buf[1] = err; buf[1] = err;
send_with_length_unsafe( addr, buf, sizeof(buf) ); send_with_length_unsafe( addr, buf, sizeof(buf), NULL );
} }
static ssize_t static ssize_t
@ -525,10 +525,10 @@ send_via_udp( int socket, const struct sockaddr* dest_addr,
} }
static bool static bool
send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf, send_msg_via_udp( const AddrInfo* addr, AddrInfo::ClientToken clientToken,
const size_t bufLen, uint32_t* packetIDP ) const unsigned char* buf, const size_t bufLen,
uint32_t* packetIDP )
{ {
const AddrInfo::ClientToken clientToken = addr->clientToken();
assert( 0 != clientToken ); assert( 0 != clientToken );
uint32_t asNetTok = htonl(clientToken); uint32_t asNetTok = htonl(clientToken);
ssize_t nSent = send_via_udp( addr, packetIDP, XWPDEV_MSG, &asNetTok, ssize_t nSent = send_via_udp( addr, packetIDP, XWPDEV_MSG, &asNetTok,
@ -541,11 +541,19 @@ send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf,
return result; return result;
} }
static bool
send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf,
const size_t bufLen, uint32_t* packetIDP )
{
return send_msg_via_udp( addr, addr->clientToken(), buf,
bufLen, packetIDP );
}
/* No mutex here. Caller better be ensuring no other thread can access this /* No mutex here. Caller better be ensuring no other thread can access this
* socket. */ * socket. */
bool bool
send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf, send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
const size_t bufLen ) const size_t bufLen, uint32_t* packetIDP )
{ {
assert( !!addr ); assert( !!addr );
bool ok = false; bool ok = false;
@ -570,8 +578,11 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
logf( XW_LOGINFO, "%s: dropping packet: socket %d reused", logf( XW_LOGINFO, "%s: dropping packet: socket %d reused",
__func__, socket ); __func__, socket );
} }
if ( NULL != packetIDP ) {
*packetIDP = UDPAckTrack::PACKETID_NONE;
}
} else { } else {
ok = send_msg_via_udp( addr, buf, bufLen, NULL ); ok = send_msg_via_udp( addr, buf, bufLen, packetIDP );
} }
if ( !ok ) { if ( !ok ) {
@ -1325,7 +1336,7 @@ registerDevice( const DevID* devID, const AddrInfo* addr )
} }
} }
static void void
onMsgAcked( bool acked, uint32_t packetID, void* data ) onMsgAcked( bool acked, uint32_t packetID, void* data )
{ {
logf( XW_LOGINFO, "%s(packetID=%d)", __func__, packetID ); logf( XW_LOGINFO, "%s(packetID=%d)", __func__, packetID );
@ -1347,7 +1358,7 @@ retrieveMessages( DevID& devID, const AddrInfo* addr )
for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) { for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) {
DBMgr::MsgInfo msg = *iter; DBMgr::MsgInfo msg = *iter;
uint32_t packetID; uint32_t packetID;
if ( !send_msg_via_udp( addr, (unsigned char*)msg.msg.c_str(), if ( !send_msg_via_udp( addr, msg.token, (unsigned char*)msg.msg.c_str(),
msg.msg.length(), &packetID ) ) { msg.msg.length(), &packetID ) ) {
break; break;
} }

View file

@ -46,7 +46,8 @@ bool willLog( XW_LogLevel level );
void denyConnection( const AddrInfo* addr, XWREASON err ); void denyConnection( const AddrInfo* addr, XWREASON err );
bool send_with_length_unsafe( const AddrInfo* addr, bool send_with_length_unsafe( const AddrInfo* addr,
const unsigned char* buf, size_t bufLen ); const unsigned char* buf, size_t bufLen,
uint32_t* packetIDP );
void send_havemsgs( const AddrInfo* addr ); void send_havemsgs( const AddrInfo* addr );
time_t uptime(void); time_t uptime(void);
@ -61,6 +62,8 @@ void string_printf( std::string& str, const char* fmt, ... );
int read_packet( int sock, unsigned char* buf, int buflen ); int read_packet( int sock, unsigned char* buf, int buflen );
void onMsgAcked( bool acked, uint32_t packetID, void* data );
const char* cmdToStr( XWRELAY_Cmd cmd ); const char* cmdToStr( XWRELAY_Cmd cmd );
extern class ListenerMgr g_listeners; extern class ListenerMgr g_listeners;