inval tcp sockets in return addresses of packets waiting processing

when they're closed to prevent attempting to write replies to the
wrong device should the socket be reopened.
This commit is contained in:
Eric House 2013-06-20 07:07:56 -07:00
parent 5bec10048b
commit 46bd4d0047
4 changed files with 68 additions and 3 deletions

View file

@ -57,6 +57,7 @@ class AddrInfo {
void setIsTCP( bool val ) { m_isTCP = val; }
bool isTCP() const { return m_isTCP; } /* later UDP will be here too */
int socket() const { assert(m_isValid); return m_socket; }
void invalSocket() { m_socket = -1; }
ClientToken clientToken() const { assert(m_isValid); return m_clientToken; }
struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; }
const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; }

View file

@ -186,6 +186,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
{
/* bool do_interrupt = false; */
assert( addr->isTCP() );
UdpQueue::get()->forgetSocket( addr );
if ( !RemoveSocket( addr ) ) {
MutexLock ml( &m_queueMutex );
deque<QueuePr>::iterator iter = m_queue.begin();
@ -482,7 +483,8 @@ XWThreadPool::grab_elem_locked( QueuePr* prp )
for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) {
int socket = iter->m_info.m_addr.socket();
/* If NOT found */
if ( m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) {
if ( -1 != socket
&& m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) {
*prp = *iter;
m_queue.erase( iter ); /* this was a double-free once! */
m_sockets_in_use.insert( socket );
@ -513,10 +515,10 @@ XWThreadPool::print_in_use( void )
for ( iter = m_sockets_in_use.begin();
iter != m_sockets_in_use.end(); ++iter ) {
string_printf( str, "%d ", *iter );
string_printf( str, "%d ", *iter );
}
if ( 0 < str.size() ) {
logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() );
logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() );
}
}

View file

@ -107,9 +107,56 @@ UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len,
utc->setID( id );
logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id );
m_queue.push_back( utc );
int sock = addr->socket();
map<int, vector<UdpThreadClosure*> >::iterator iter = m_bySocket.find( sock );
if ( iter == m_bySocket.end() ) {
logf( XW_LOGINFO, "%s: creating vector for socket %d", __func__, sock );
vector<UdpThreadClosure*> vect;
vect.push_back( utc );
m_bySocket.insert( pair<int, vector<UdpThreadClosure*> >(sock, vect) );
} else {
iter->second.push_back( utc );
logf( XW_LOGINFO, "%s: now have %d packets for socket %d",
__func__, iter->second.size(), sock );
}
pthread_cond_signal( &m_queueCondVar );
}
void
UdpQueue::forgetSocket( const AddrInfo* addr )
{
assert( addr->isTCP() );
int sock = addr->socket();
MutexLock ml( &m_queueMutex );
map<int, vector<UdpThreadClosure*> >::iterator iter = m_bySocket.find( sock );
if ( m_bySocket.end() != iter ) {
vector<UdpThreadClosure*>& vect = iter->second;
vector<UdpThreadClosure*>::iterator iter2;
for ( iter2 = vect.begin(); vect.end() != iter2; ++ iter2 ) {
UdpThreadClosure* utc = *iter2;
assert( -1 != utc->addr()->socket() );
utc->invalSocket();
logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d",
__func__, sock, utc->getID() );
// vect.erase( iter2 );
}
vect.clear();
}
// deque<UdpThreadClosure*>::iterator iter;
// for ( iter = m_queue.begin(); iter != m_queue.end(); ++iter ) {
// const AddrInfo* addr = (*iter)->addr();
// if ( sock == addr->socket() ) {
// logf( XW_LOGINFO, "%s: invalidating socket %d in packet %d",
// __func__, sock, (*iter)->getID() );
// (*iter)->invalSocket();
// }
// }
}
void*
UdpQueue::thread_main()
{
@ -120,6 +167,17 @@ UdpQueue::thread_main()
}
UdpThreadClosure* utc = m_queue.front();
m_queue.pop_front();
int sock = utc->addr()->socket();
if ( -1 != sock ) {
map<int, vector<UdpThreadClosure*> >::iterator iter = m_bySocket.find( sock );
assert ( iter != m_bySocket.end() );
vector<UdpThreadClosure*>& vect = iter->second;
assert( utc == *vect.begin() );
vect.erase( vect.begin() );
logf( XW_LOGINFO, "%s: %d packets remaining for socket %d",
__func__, vect.size(), sock );
}
pthread_mutex_unlock( &m_queueMutex );
utc->noteDequeued();

View file

@ -23,6 +23,7 @@
#include <pthread.h>
#include <deque>
#include <map>
#include "xwrelay_priv.h"
#include "addrinfo.h"
@ -57,6 +58,7 @@ public:
const QueueCallback cb() const { return m_cb; }
void setID( int id ) { m_id = id; }
int getID( void ) { return m_id; }
void invalSocket() { m_addr.invalSocket(); }
private:
unsigned char* m_buf;
@ -76,6 +78,7 @@ class UdpQueue {
bool handle( const AddrInfo* addr, QueueCallback cb );
void handle( const AddrInfo* addr, unsigned char* buf, int len,
QueueCallback cb );
void forgetSocket( const AddrInfo* addr );
private:
static void* thread_main_static( void* closure );
@ -84,6 +87,7 @@ class UdpQueue {
pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue;
map<int, vector<UdpThreadClosure*> > m_bySocket;
int m_nextID;
};