For a subset of stored messages sent, don't remove from DB until

they're acked.  (This is the goal for all messages sent via UDP, but
will be harder elsewhere in the code.)
This commit is contained in:
Eric House 2013-08-01 07:48:44 -07:00
parent 684effd170
commit 4ecb0f534d
3 changed files with 87 additions and 27 deletions

View file

@ -48,6 +48,12 @@ UDPAckTrack::recordAck( uint32_t packetID )
get()->recordAckImpl( packetID );
}
/* static */ void
UDPAckTrack::setOnAck( OnAckProc proc, uint32_t packetID, void* data )
{
get()->setOnAckImpl( proc, packetID, data );
}
/* static */ UDPAckTrack*
UDPAckTrack::get()
{
@ -91,24 +97,48 @@ UDPAckTrack::recordAckImpl( uint32_t packetID )
logf( XW_LOGERROR, "%s: packet ID %d took %d seconds to get acked",
__func__, packetID, took );
}
callProc( iter->first, true, &(iter->second) );
m_pendings.erase( iter );
}
}
void
UDPAckTrack::setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data )
{
MutexLock ml( &m_mutex );
map<uint32_t, AckRecord>::iterator iter = m_pendings.find( packetID );
if ( m_pendings.end() != iter ) {
iter->second.proc = proc;
iter->second.data = data;
}
}
void
UDPAckTrack::callProc( uint32_t packetID, bool acked, const AckRecord* record )
{
OnAckProc proc = record->proc;
logf( XW_LOGINFO, "%s: acked=%d, proc=%p", __func__, acked, proc );
if ( NULL != proc ) {
(*proc)( acked, packetID, record->data );
}
}
void*
UDPAckTrack::threadProc()
{
for ( ; ; ) {
sleep( 30 );
time_t now = time( NULL );
vector<uint32_t> older;
{
MutexLock ml( &m_mutex );
time_t now = time( NULL );
map<uint32_t, AckRecord>::iterator iter;
for ( iter = m_pendings.begin(); iter != m_pendings.end(); ++iter ) {
time_t took = now - iter->second.m_createTime;
if ( ACK_LIMIT < took ) {
older.push_back( iter->first );
callProc( iter->first, false, &(iter->second) );
m_pendings.erase( iter );
}
}

View file

@ -23,16 +23,21 @@
#include "xwrelay_priv.h"
#include "xwrelay.h"
typedef void (*OnAckProc)( bool acked, uint32_t packetID, void* data );
class AckRecord {
public:
AckRecord() { m_createTime = time( NULL ); }
AckRecord() { m_createTime = time( NULL ); proc = NULL; }
time_t m_createTime;
OnAckProc proc;
void* data;
};
class UDPAckTrack {
public:
static uint32_t nextPacketID( XWRelayReg cmd );
static void recordAck( uint32_t packetID );
static void setOnAck( OnAckProc proc, uint32_t packetID, void* data );
static bool shouldAck( XWRelayReg cmd );
private:
@ -41,6 +46,8 @@ class UDPAckTrack {
UDPAckTrack();
uint32_t nextPacketIDImpl();
void recordAckImpl( uint32_t packetID );
void setOnAckImpl( OnAckProc proc, uint32_t packetID, void* data );
void callProc( uint32_t packetID, bool acked, const AckRecord* record );
void* threadProc();
static UDPAckTrack* s_self;

View file

@ -421,7 +421,7 @@ denyConnection( const AddrInfo* addr, XWREASON err )
static ssize_t
send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
XWRelayReg cmd, va_list* app )
uint32_t* packetIDP, XWRelayReg cmd, va_list* app )
{
uint32_t packetNum = UDPAckTrack::nextPacketID( cmd );
struct iovec vec[10];
@ -429,6 +429,9 @@ send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
unsigned char header[1 + 1 + sizeof(packetNum)];
header[0] = XWPDEV_PROTO_VERSION;
if ( NULL != packetIDP ) {
*packetIDP = packetNum;
}
packetNum = htonl( packetNum );
memcpy( &header[1], &packetNum, sizeof(packetNum) );
header[5] = cmd;
@ -488,7 +491,7 @@ send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
} // send_via_udp_impl
static ssize_t
send_via_udp( const AddrInfo* addr, XWRelayReg cmd, ... )
send_via_udp( const AddrInfo* addr, uint32_t* packetIDP, XWRelayReg cmd, ... )
{
ssize_t result = 0;
if ( addr->isCurrent() ) {
@ -500,7 +503,8 @@ send_via_udp( const AddrInfo* addr, XWRelayReg cmd, ... )
va_list ap;
va_start( ap, cmd );
result = send_via_udp_impl( socket, addr->sockaddr(), cmd, &ap );
result = send_via_udp_impl( socket, addr->sockaddr(), packetIDP,
cmd, &ap );
va_end( ap );
} else {
logf( XW_LOGINFO, "%s: not sending to out-of-date packet", __func__ );
@ -510,15 +514,33 @@ send_via_udp( const AddrInfo* addr, XWRelayReg cmd, ... )
static ssize_t
send_via_udp( int socket, const struct sockaddr* dest_addr,
XWRelayReg cmd, ... )
uint32_t* packetIDP, XWRelayReg cmd, ... )
{
va_list ap;
va_start( ap, cmd );
ssize_t result = send_via_udp_impl( socket, dest_addr, cmd, &ap );
ssize_t result = send_via_udp_impl( socket, dest_addr, packetIDP,
cmd, &ap );
va_end( ap );
return result;
}
static bool
send_msg_via_udp( const AddrInfo* addr, const unsigned char* buf,
const size_t bufLen, uint32_t* packetIDP )
{
const AddrInfo::ClientToken clientToken = addr->clientToken();
assert( 0 != clientToken );
uint32_t asNetTok = htonl(clientToken);
ssize_t nSent = send_via_udp( addr, packetIDP, XWPDEV_MSG, &asNetTok,
sizeof(asNetTok), buf, bufLen, NULL );
logf( XW_LOGINFO, "%s: sent %d bytes (plus header) on UDP socket, "
"token=%x(%d)", __func__, bufLen, clientToken,
clientToken );
bool result = 0 < nSent;
logf( XW_LOGINFO, "%s()=>%d", __func__, result );
return result;
}
/* No mutex here. Caller better be ensuring no other thread can access this
* socket. */
bool
@ -549,15 +571,7 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
__func__, socket );
}
} else {
const AddrInfo::ClientToken clientToken = addr->clientToken();
assert( 0 != clientToken );
uint32_t asNetTok = htonl(clientToken);
send_via_udp( addr, XWPDEV_MSG, &asNetTok,
sizeof(asNetTok), buf, bufLen, NULL );
logf( XW_LOGINFO, "%s: sent %d bytes (plus header) on UDP socket, "
"token=%x(%d)", __func__, bufLen, clientToken,
clientToken );
ok = true;
ok = send_msg_via_udp( addr, buf, bufLen, NULL );
}
if ( !ok ) {
@ -576,7 +590,7 @@ send_havemsgs( const AddrInfo* addr )
socket = g_udpsock;
}
send_via_udp( addr, XWPDEV_HAVEMSGS, NULL );
send_via_udp( addr, NULL, XWPDEV_HAVEMSGS, NULL );
}
/* A CONNECT message from a device gives us the hostID and socket we'll
@ -1288,7 +1302,7 @@ registerDevice( const DevID* devID, const AddrInfo* addr )
}
} else {
indx += addRegID( &buf[indx], relayID );
send_via_udp( addr, XWPDEV_BADREG, buf, indx, NULL );
send_via_udp( addr, NULL, XWPDEV_BADREG, buf, indx, NULL );
relayID = DBMgr::DEVID_NONE;
}
@ -1303,7 +1317,7 @@ registerDevice( const DevID* devID, const AddrInfo* addr )
uint16_t maxInterval = UDPAger::Get()->MaxIntervalSeconds();
maxInterval = ntohs(maxInterval);
send_via_udp( addr, XWPDEV_REGRSP, buf, indx,
send_via_udp( addr, NULL, XWPDEV_REGRSP, buf, indx,
&maxInterval, sizeof(maxInterval), NULL );
// Map the address to the devid for future sending purposes.
@ -1311,6 +1325,16 @@ registerDevice( const DevID* devID, const AddrInfo* addr )
}
}
static void
onMsgAcked( bool acked, uint32_t packetID, void* data )
{
logf( XW_LOGINFO, "%s(packetID=%d)", __func__, packetID );
if ( acked ) {
int msgID = (int)data;
DBMgr::Get()->RemoveStoredMessage( msgID );
}
}
static void
retrieveMessages( DevID& devID, const AddrInfo* addr )
{
@ -1320,16 +1344,15 @@ retrieveMessages( DevID& devID, const AddrInfo* addr )
dbMgr->GetStoredMessages( devID.asRelayID(), msgs );
vector<DBMgr::MsgInfo>::const_iterator iter;
vector<int> sentIDs;
for ( iter = msgs.begin(); iter != msgs.end(); ++iter ) {
DBMgr::MsgInfo msg = *iter;
if ( ! send_with_length_unsafe( addr, (unsigned char*)msg.msg.c_str(),
msg.msg.length() ) ) {
uint32_t packetID;
if ( !send_msg_via_udp( addr, (unsigned char*)msg.msg.c_str(),
msg.msg.length(), &packetID ) ) {
break;
}
sentIDs.push_back( msg.msgID );
UDPAckTrack::setOnAck( onMsgAcked, packetID, (void*)msg.msgID );
}
dbMgr->RemoveStoredMessages( sentIDs );
}
static const char*
@ -1366,7 +1389,7 @@ ackPacketIf( const UDPHeader* header, const AddrInfo* addr )
uint32_t packetID = header->packetID;
logf( XW_LOGINFO, "%s: acking packet %d", __func__, packetID );
packetID = htonl( packetID );
send_via_udp( addr, XWPDEV_ACK,
send_via_udp( addr, NULL, XWPDEV_ACK,
&packetID, sizeof(packetID), NULL );
}
}
@ -1622,14 +1645,14 @@ maint_str_loop( int udpsock, const char* str )
UDPHeader header;
const unsigned char* ptr = buf;
if ( getHeader( &ptr, ptr + nRead, &header ) ) {
send_via_udp( udpsock, &saddr.u.addr, XWPDEV_ALERT,
send_via_udp( udpsock, &saddr.u.addr, NULL, XWPDEV_ALERT,
outbuf, sizeof(outbuf), NULL );
} else {
logf( XW_LOGERROR, "unexpected data" );
}
}
} // for
}
} // maint_str_loop
int
main( int argc, char** argv )