use new methods to store undeliverable messages in a db rather than in

memory.  Confirmed that devices can hookup without ever being
connected at the same time and that the messages survive a relay
reboot.  No further testing yet.
This commit is contained in:
Andy2 2010-09-23 06:32:57 -07:00
parent 74764a493b
commit 40e1d29243
2 changed files with 15 additions and 38 deletions

View file

@ -93,7 +93,6 @@ CookieRef::ReInit( const char* cookie, const char* connName, CookieID id,
m_locking_thread = 0; m_locking_thread = 0;
m_starttime = uptime(); m_starttime = uptime();
m_gameFull = false; m_gameFull = false;
m_nHostMsgs = 0;
m_in_handleEvents = false; m_in_handleEvents = false;
m_langCode = langCode; m_langCode = langCode;
m_nPendingAcks = 0; m_nPendingAcks = 0;
@ -146,14 +145,6 @@ CookieRef::Clear(void)
m_connName = ""; m_connName = "";
m_cookieID = 0; m_cookieID = 0;
m_eventQueue.clear(); m_eventQueue.clear();
if ( 0 < m_nHostMsgs ) {
unsigned int ii;
for ( ii = 0; ii < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]);++ii){
m_hostMsgQueues[ii].clear();
}
/* m_nHostMsgs will get cleared in ReInit */
}
} /* Clear */ } /* Clear */
bool bool
@ -686,7 +677,7 @@ CookieRef::handleEvents()
case XWA_NOTE_EMPTY: case XWA_NOTE_EMPTY:
//cancelAllConnectedTimer(); //cancelAllConnectedTimer();
if ( 0 == count_msgs_stored() ) { if ( 0 == DBMgr::Get()->CountStoredMessages( ConnName() ) ) {
CRefEvent evt( XWE_NOMOREMSGS ); CRefEvent evt( XWE_NOMOREMSGS );
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} }
@ -746,41 +737,30 @@ CookieRef::store_message( HostID dest, const unsigned char* buf,
{ {
logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__, logf( XW_LOGVERBOSE0, "%s: storing msg size %d for dest %d", __func__,
len, dest ); len, dest );
assert( dest > 0 );
--dest; // 1-based DBMgr::Get()->StoreMessage( ConnName(), dest, buf, len );
MsgBuffer* entry = new MsgBuffer( buf, buf+len );
assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) );
m_hostMsgQueues[dest].push_back( entry );
++m_nHostMsgs;
} }
void void
CookieRef::send_stored_messages( HostID dest, int socket ) CookieRef::send_stored_messages( HostID dest, int socket )
{ {
assert( dest > 0 );
logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest ); logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest );
--dest; // 0 is invalid value
assert( dest < sizeof(m_hostMsgQueues)/sizeof(m_hostMsgQueues[0]) ); assert( dest > 0 && dest <= 4 );
assert( -1 != socket ); assert( -1 != socket );
MsgBufQueue& mqueue = m_hostMsgQueues[dest]; for ( ; ; ) {
while ( mqueue.size() > 0 ) { unsigned char buf[MAX_MSG_LEN];
assert( m_nHostMsgs > 0 ); size_t buflen = sizeof(buf);
// send_with_length will call _Remove if it fails to send. So int msgID;
// need to check on presence of socket each time through! No, if ( !DBMgr::Get()->GetStoredMessage( ConnName(), dest,
// the break below takes care of that. buf, &buflen, &msgID ) ) {
MsgBufQueue::iterator iter = mqueue.begin();
logf( XW_LOGVERBOSE0, "%s: sending stored msg (len=%d)",
__func__, (*iter)->size() );
if ( ! send_with_length( socket, &((**iter)[0]), (*iter)->size(),
true ) ) {
break; break;
} }
mqueue.erase( iter ); if ( ! send_with_length( socket, buf, buflen, true ) ) {
delete *iter; break;
--m_nHostMsgs; }
DBMgr::Get()->RemoveStoredMessage( msgID );
} }
} /* send_stored_messages */ } /* send_stored_messages */

View file

@ -242,7 +242,6 @@ class CookieRef {
void store_message( HostID dest, const unsigned char* buf, void store_message( HostID dest, const unsigned char* buf,
unsigned int len ); unsigned int len );
void send_stored_messages( HostID dest, int socket ); void send_stored_messages( HostID dest, int socket );
unsigned int count_msgs_stored( void ) { return m_nHostMsgs; }
void printSeeds( const char* caller ); void printSeeds( const char* caller );
@ -250,8 +249,6 @@ class CookieRef {
static void s_checkAllConnected( void* closure ); static void s_checkAllConnected( void* closure );
static void s_checkAck( void* closure ); static void s_checkAck( void* closure );
unsigned int m_nHostMsgs;
MsgBufQueue m_hostMsgQueues[4];
vector<HostRec> m_sockets; vector<HostRec> m_sockets;
bool m_gameFull; /* once we've filled up, no more *new* bool m_gameFull; /* once we've filled up, no more *new*
connections ever */ connections ever */