store message first, remove on ack

This commit is contained in:
Eric House 2017-10-25 05:51:45 -07:00
parent c68a067009
commit cc54621e45
5 changed files with 55 additions and 44 deletions

View file

@ -875,13 +875,13 @@ putNetShort( uint8_t** bufpp, unsigned short s )
*bufpp += sizeof(s); *bufpp += sizeof(s);
} }
void int
CookieRef::store_message( HostID dest, const uint8_t* buf, CookieRef::store_message( HostID dest, const uint8_t* buf,
unsigned int len ) unsigned int len )
{ {
logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__, logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__,
len, dest ); len, dest );
DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len ); return DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len );
} }
void void
@ -1044,6 +1044,7 @@ CookieRef::postCheckAllHere()
void void
CookieRef::postDropDevice( HostID hostID ) CookieRef::postDropDevice( HostID hostID )
{ {
logf( XW_LOGINFO, "%s(hostID=%d)", __func__, hostID );
CRefEvent evt( XWE_ACKTIMEOUT ); CRefEvent evt( XWE_ACKTIMEOUT );
evt.u.ack.srcID = hostID; evt.u.ack.srcID = hostID;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
@ -1192,21 +1193,16 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
} }
typedef struct _StoreData { typedef struct _StoreData {
string connName; int msgID;
HostID dest;
uint8_t* buf;
int buflen;
} StoreData; } StoreData;
void void
CookieRef::storeNoAck( bool acked, uint32_t packetID, void* data ) CookieRef::storeNoAck( bool acked, uint32_t packetID, void* data )
{ {
StoreData* sdata = (StoreData*)data; StoreData* sdata = (StoreData*)data;
if ( !acked ) { if ( acked ) {
DBMgr::Get()->StoreMessage( sdata->connName.c_str(), sdata->dest, DBMgr::Get()->RemoveStoredMessages( &sdata->msgID, 1 );
sdata->buf, sdata->buflen );
} }
free( sdata->buf );
delete sdata; delete sdata;
} }
@ -1237,17 +1233,13 @@ CookieRef::forward_or_store( const CRefEvent* evt )
} }
uint32_t packetID = 0; uint32_t packetID = 0;
int msgID = store_message( dest, buf, buflen );
if ( (NULL == destAddr) if ( (NULL == destAddr)
|| !send_with_length( destAddr, dest, buf, buflen, true, || !send_with_length( destAddr, dest, buf, buflen, true,
&packetID ) ) { &packetID ) ) {
store_message( dest, buf, buflen ); } else if ( 0 != msgID && 0 != packetID ) { // sent via UDP
} else if ( 0 != packetID ) { // sent via UDP
StoreData* data = new StoreData; StoreData* data = new StoreData;
data->connName = m_connName; data->msgID = msgID;
data->dest = dest;
data->buf = (uint8_t*)malloc( buflen );
memcpy( data->buf, buf, buflen );
data->buflen = buflen;
UDPAckTrack::setOnAck( storeNoAck, packetID, data ); UDPAckTrack::setOnAck( storeNoAck, packetID, data );
} }
@ -1376,20 +1368,16 @@ CookieRef::sendAllHere( bool initial )
through the vector each time. */ through the vector each time. */
HostID dest; HostID dest;
for ( dest = 1; dest <= m_nPlayersSought; ++dest ) { for ( dest = 1; dest <= m_nPlayersSought; ++dest ) {
bool sent = false;
*idLoc = dest; /* write in this target's hostId */ *idLoc = dest; /* write in this target's hostId */
{ {
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
HostRec* hr = m_sockets[dest-1]; HostRec* hr = m_sockets[dest-1];
if ( !!hr ) { if ( !!hr ) {
sent = send_with_length( &hr->m_addr, dest, buf, (void)send_with_length( &hr->m_addr, dest, buf, bufp-buf, true );
bufp-buf, true );
} }
} }
if ( !sent ) { (void)store_message( dest, buf, bufp-buf );
store_message( dest, buf, bufp-buf );
}
} }
} /* sendAllHere */ } /* sendAllHere */

View file

@ -275,8 +275,7 @@ class CookieRef {
bool notInUse(void) { return m_cid == 0; } bool notInUse(void) { return m_cid == 0; }
void store_message( HostID dest, const uint8_t* buf, int store_message( HostID dest, const uint8_t* buf, unsigned int len );
unsigned int len );
void send_stored_messages( HostID dest, const AddrInfo* addr ); void send_stored_messages( HostID dest, const AddrInfo* addr );
void printSeeds( const char* caller ); void printSeeds( const char* caller );

View file

@ -1014,15 +1014,16 @@ DBMgr::CountStoredMessages( DevIDRelay relayID )
return getCountWhere( MSGS_TABLE, test ); return getCountWhere( MSGS_TABLE, test );
} }
void int
DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf, DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf,
int len ) int len )
{ {
int msgID = 0;
clearHasNoMessages( destDevID ); clearHasNoMessages( destDevID );
size_t newLen; size_t newLen;
const char* fmt = "INSERT INTO " MSGS_TABLE " " const char* fmt = "INSERT INTO " MSGS_TABLE " "
"(devid, %s, msglen) VALUES(%d, %s'%s', %d)"; "(devid, %s, msglen) VALUES(%d, %s'%s', %d) RETURNING id";
StrWPF query; StrWPF query;
if ( m_useB64 ) { if ( m_useB64 ) {
@ -1038,13 +1039,20 @@ DBMgr::StoreMessage( DevIDRelay destDevID, const uint8_t* const buf,
} }
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
msgID = atoi( PQgetvalue( result, 0, 0 ) );
}
PQclear( result );
return msgID;
} }
void int
DBMgr::StoreMessage( const char* const connName, int destHid, DBMgr::StoreMessage( const char* const connName, int destHid,
const uint8_t* buf, int len ) const uint8_t* buf, int len )
{ {
int msgID = 0;
clearHasNoMessages( connName, destHid ); clearHasNoMessages( connName, destHid );
DevIDRelay devID = getDevID( connName, destHid ); DevIDRelay devID = getDevID( connName, destHid );
@ -1074,7 +1082,7 @@ DBMgr::StoreMessage( const char* const connName, int destHid,
#ifdef HAVE_STIME #ifdef HAVE_STIME
" AND stime='epoch'" " AND stime='epoch'"
#endif #endif
" );", connName, destHid, b64 ); " )", connName, destHid, b64 );
g_free( b64 ); g_free( b64 );
} else { } else {
uint8_t* bytes = PQescapeByteaConn( getThreadConn(), buf, uint8_t* bytes = PQescapeByteaConn( getThreadConn(), buf,
@ -1085,9 +1093,17 @@ DBMgr::StoreMessage( const char* const connName, int destHid,
"E", bytes, len ); "E", bytes, len );
PQfreemem( bytes ); PQfreemem( bytes );
} }
query.catf(" RETURNING id;");
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query ); PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
msgID = atoi( PQgetvalue( result, 0, 0 ) );
} else {
logf( XW_LOGINFO, "Not stored; duplicate?" );
}
PQclear( result );
return msgID;
} }
void void

View file

@ -137,9 +137,9 @@ class DBMgr {
/* message storage -- different DB */ /* message storage -- different DB */
int CountStoredMessages( const char* const connName ); int CountStoredMessages( const char* const connName );
int CountStoredMessages( DevIDRelay relayID ); int CountStoredMessages( DevIDRelay relayID );
void StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf, int StoreMessage( DevIDRelay destRelayID, const uint8_t* const buf,
int len ); int len );
void StoreMessage( const char* const connName, int destHid, int StoreMessage( const char* const connName, int destHid,
const uint8_t* const buf, int len ); const uint8_t* const buf, int len );
void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs ); void GetStoredMessages( DevIDRelay relayID, vector<MsgInfo>& msgs );
void GetStoredMessages( const char* const connName, HostID hid, void GetStoredMessages( const char* const connName, HostID hid,

View file

@ -761,13 +761,17 @@ send_havemsgs( const AddrInfo* addr )
class MsgClosure { class MsgClosure {
public: public:
MsgClosure( DevIDRelay dest, const vector<uint8_t>* packet, MsgClosure( DevIDRelay dest, const vector<uint8_t>* packet,
OnMsgAckProc proc, void* procClosure ) int msgID, OnMsgAckProc proc, void* procClosure )
{ {
assert(m_msgID != 0);
m_destDevID = dest; m_destDevID = dest;
m_packet = *packet; m_packet = *packet;
m_proc = proc; m_proc = proc;
m_procClosure = procClosure; m_procClosure = procClosure;
m_msgID = msgID;
} }
int getMsgID() { return m_msgID; }
int m_msgID;
DevIDRelay m_destDevID; DevIDRelay m_destDevID;
vector<uint8_t> m_packet; vector<uint8_t> m_packet;
OnMsgAckProc m_proc; OnMsgAckProc m_proc;
@ -778,9 +782,14 @@ static void
onPostedMsgAcked( bool acked, uint32_t packetID, void* data ) onPostedMsgAcked( bool acked, uint32_t packetID, void* data )
{ {
MsgClosure* mc = (MsgClosure*)data; MsgClosure* mc = (MsgClosure*)data;
if ( !acked ) { int msgID = mc->getMsgID();
DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(), if ( acked ) {
mc->m_packet.size() ); DBMgr::Get()->RemoveStoredMessages( &msgID, 1 );
} else {
assert( msgID != 0 );
// So we only store after ack fails? Change that!!!
// DBMgr::Get()->StoreMessage( mc->m_destDevID, mc->m_packet.data(),
// mc->m_packet.size() );
} }
if ( NULL != mc->m_proc ) { if ( NULL != mc->m_proc ) {
(*mc->m_proc)( acked, mc->m_destDevID, packetID, mc->m_procClosure ); (*mc->m_proc)( acked, mc->m_destDevID, packetID, mc->m_procClosure );
@ -793,6 +802,8 @@ static bool
post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID, post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID,
OnMsgAckProc proc, void* procClosure ) OnMsgAckProc proc, void* procClosure )
{ {
int msgID = DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() );
const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( destDevID ); const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( destDevID );
bool canSendNow = !!addru; bool canSendNow = !!addru;
@ -804,16 +815,13 @@ post_or_store( DevIDRelay destDevID, vector<uint8_t>& packet, uint32_t packetID,
if ( get_addr_info_if( &addr, &sock, &dest_addr ) ) { if ( get_addr_info_if( &addr, &sock, &dest_addr ) ) {
sent = 0 < send_packet_via_udp_impl( packet, sock, dest_addr ); sent = 0 < send_packet_via_udp_impl( packet, sock, dest_addr );
if ( sent ) { if ( sent && msgID != 0 ) {
MsgClosure* mc = new MsgClosure( destDevID, &packet, MsgClosure* mc = new MsgClosure( destDevID, &packet, msgID,
proc, procClosure ); proc, procClosure );
UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc ); UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc );
} }
} }
} }
if ( !sent ) {
DBMgr::Get()->StoreMessage( destDevID, packet.data(), packet.size() );
}
return sent; return sent;
} }