heartbeats: send to clients in connection response; note when

heartbeat and other messages arrive; and periodically reap sockets
that haven't been active in long enough.
This commit is contained in:
ehouse 2005-06-23 04:26:44 +00:00
parent e404d77f87
commit 210dcc88c0
6 changed files with 183 additions and 45 deletions

View file

@ -19,9 +19,9 @@ CC = g++
SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp
OBJ = $(patsubst %.cpp,%.o,$(SRC))
LDFLAGS += -lpthread -g
CPPFLAGS += -g -Wall
CPPFLAGS += -g -Wall -DHEARTBEAT=10
all: xwrelay
memdebug all: xwrelay
xwrelay: $(OBJ)

View file

@ -33,6 +33,7 @@ using namespace std;
static CookieMap gCookieMap;
pthread_rwlock_t gCookieMapRWLock = PTHREAD_RWLOCK_INITIALIZER;
CookieID CookieRef::ms_nextConnectionID = 1000;
/* static */ CookieRef*
@ -81,6 +82,20 @@ CookieIdForName( const char* name )
return 0;
} /* CookieIdForName */
void
CheckHeartbeats( time_t now, vector<int>* sockets )
{
logf( "CheckHeartbeats" );
RWReadLock rwl( &gCookieMapRWLock );
CookieMap::iterator iter = gCookieMap.begin();
while ( iter != gCookieMap.end() ) {
CookieRef* ref = iter->second;
ref->CheckHeartbeats( now, sockets );
++iter;
}
logf( "CheckHeartbeats done" );
} /* CheckHeartbeats */
CookieRef*
get_make_cookieRef( const char* cookie,
CookieID connID ) /* connID ignored for now */
@ -268,13 +283,13 @@ CookieRef::~CookieRef()
for ( ; ; ) {
RWWriteLock rwl( &m_sockets_rwlock );
map<HostID,int>::iterator iter = m_hostSockets.begin();
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
if ( iter == m_hostSockets.end() ) {
break;
}
int socket = iter->second;
int socket = iter->second.m_socket;
tPool->CloseSocket( socket );
m_hostSockets.erase( iter );
}
@ -289,18 +304,19 @@ CookieRef::Associate( int socket, HostID srcID )
assert( srcID != HOST_ID_NONE );
logf( "remembering pair: hostid=%x, socket=%d", srcID, socket );
RWWriteLock ml( &m_sockets_rwlock );
m_hostSockets.insert( pair<HostID,int>(srcID,socket) );
HostRec hr(socket);
m_hostSockets.insert( pair<HostID,HostRec>(srcID,hr) );
}
int
CookieRef::SocketForHost( HostID dest )
{
int socket;
map<HostID,int>::iterator iter = m_hostSockets.find( dest );
map<HostID,HostRec>::iterator iter = m_hostSockets.find( dest );
if ( iter == m_hostSockets.end() ) {
socket = -1;
} else {
socket = iter->second;
socket = iter->second.m_socket;
logf( "socketForHost(%x) => %d", dest, socket );
}
logf( "returning socket=%d for hostid=%x", socket, dest );
@ -316,9 +332,9 @@ CookieRef::Remove( int socket )
count = CountSockets();
assert( count > 0 );
map<HostID,int>::iterator iter = m_hostSockets.begin();
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
if ( iter->second == socket ) {
if ( iter->second.m_socket == socket ) {
m_hostSockets.erase(iter);
--count;
break;
@ -333,6 +349,41 @@ CookieRef::Remove( int socket )
}
}
void
CookieRef::HandleHeartbeat( HostID id, int socket )
{
RWWriteLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.find(id);
assert( iter != m_hostSockets.end() );
/* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. */
assert( iter->second.m_socket == socket );
logf( "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, now() );
iter->second.m_lastHeartbeat = now();
} /* HandleHeartbeat */
void
CookieRef::CheckHeartbeats( time_t now, vector<int>* victims )
{
logf( "CookieRef::CheckHeartbeats" );
RWWriteLock rwl( &m_sockets_rwlock );
map<HostID,HostRec>::iterator iter = m_hostSockets.begin();
while ( iter != m_hostSockets.end() ) {
time_t last = iter->second.m_lastHeartbeat;
if ( (now - last) > HEARTBEAT * 2 ) {
victims->push_back( iter->second.m_socket );
}
++iter;
}
logf( "CookieRef::CheckHeartbeats done" );
} /* CheckHeartbeats */
void
CookieRef::PrintCookieInfo( string& out )
{

View file

@ -11,10 +11,22 @@
typedef unsigned short CookieID;
#ifndef HEARTBEAT
# define HEARTBEAT 60
#endif
using namespace std;
class CookieMapIterator; /* forward */
class HostRec {
public:
HostRec(int socket) : m_socket(socket), m_lastHeartbeat(now()) {}
~HostRec() {}
int m_socket;
time_t m_lastHeartbeat;
};
class CookieRef {
public:
@ -24,7 +36,7 @@ class CookieRef {
/* Within this cookie, remember that this hostID and socket go together.
If the hostID is HOST_ID_SERVER, it's the server. */
void Associate( int socket, HostID srcID );
short GetHeartbeat() { return 60; }
short GetHeartbeat() { return HEARTBEAT; }
CookieID GetCookieID() { return m_connectionID; }
int SocketForHost( HostID dest );
void Remove( int socket );
@ -36,6 +48,9 @@ class CookieRef {
m_totalSent += nBytes;
}
void HandleHeartbeat( HostID id, int socket );
void CheckHeartbeats( time_t now, vector<int>* victims );
/* for console */
void PrintCookieInfo( string& out );
void PrintSocketInfo( string& out, int socket );
@ -49,7 +64,7 @@ class CookieRef {
private:
CookieRef( string s );
map<HostID,int> m_hostSockets;
map<HostID,HostRec> m_hostSockets;
pthread_rwlock_t m_sockets_rwlock;
CookieID m_connectionID;
string m_name;
@ -72,6 +87,7 @@ class CookieMapIterator {
CookieRef* get_make_cookieRef( const char* cookie, CookieID connID );
CookieRef* get_cookieRef( unsigned short cookieID );
CookieID CookieIdForName( const char* name );
void CheckHeartbeats( time_t now, vector<int>* victims );
class SocketStuff;
typedef map< int, SocketStuff* > SocketMap;

View file

@ -108,8 +108,7 @@ XWThreadPool::RemoveSocket( int socket )
}
}
return found;
}
} /* RemoveSocket */
void
XWThreadPool::CloseSocket( int socket )
@ -128,9 +127,13 @@ XWThreadPool::CloseSocket( int socket )
}
}
close( socket );
if ( do_interrupt ) {
interrupt_poll();
}
/* if ( do_interrupt ) { */
/* We always need to interrupt the poll because the socket we're closing
will be in the list being listened to. That or we need to drop sockets
that have been removed on some other thread while the poll call's
blocking.*/
interrupt_poll();
/* } */
}
int
@ -208,6 +211,18 @@ XWThreadPool::interrupt_poll()
}
}
static int
figureTimeout()
{
return -1;
}
static void
considerFireTimer()
{
/* logf( "timer fired" ); */
}
void*
XWThreadPool::real_listener()
{
@ -237,8 +252,10 @@ XWThreadPool::real_listener()
}
pthread_rwlock_unlock( &m_activeSocketsRWLock );
int nMillis = figureTimeout();
logf( "calling poll on %s", log );
int nEvents = poll( fds, nSockets, -1 ); /* -1: infinite timeout */
int nEvents = poll( fds, nSockets, nMillis ); /* -1: infinite timeout */
logf( "back from poll: %d", nEvents );
if ( nEvents < 0 ) {
logf( "errno: %d", errno );
@ -261,9 +278,14 @@ XWThreadPool::real_listener()
if ( curfd->revents != 0 ) {
int socket = curfd->fd;
RemoveSocket( socket );
if ( !RemoveSocket( socket ) ) {
/* no further processing if it's been removed while
we've been sleeping in poll */
continue;
}
if ( curfd->revents == POLLIN || curfd->revents == POLLPRI ) {
if ( curfd->revents == POLLIN
|| curfd->revents == POLLPRI ) {
logf( "enqueuing %d", socket );
enqueue( socket );
} else {
@ -277,6 +299,8 @@ XWThreadPool::real_listener()
assert( nEvents == 0 );
}
considerFireTimer();
free( fds );
}
return NULL;

View file

@ -34,6 +34,7 @@
#include <unistd.h>
#include <netdb.h> /* gethostbyname */
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
@ -55,6 +56,7 @@
#include "tpool.h"
#define N_WORKER_THREADS 5
#define MILLIS 1000000
void
logf( const char* format, ... )
@ -96,8 +98,13 @@ putNetShort( unsigned char** bufpp, unsigned short s )
}
static void
processHeartbeat( const unsigned char* buf, int bufLen )
processHeartbeat( unsigned char* buf, int bufLen, int socket )
{
CookieID cookieID = getNetShort( &buf );
HostID hostID = getNetShort( &buf );
logf( "processHeartbeat: cookieID 0x%x, hostID 0x%x", cookieID, hostID );
CookieRef* cref = get_cookieRef( cookieID );
cref->HandleHeartbeat( hostID, socket );
} /* processHeartbeat */
/* A CONNECT message from a device gives us the hostID and socket we'll
@ -146,6 +153,13 @@ killSocket( int socket, char* why )
SocketMgr::RemoveSocketRefs( socket );
/* Might want to kill the thread it belongs to if we're not in it,
e.g. when unable to write to another socket. */
logf( "killSocket done" );
}
time_t
now()
{
return (unsigned long)time(NULL);
}
static void
@ -186,7 +200,7 @@ sendConnResp( CookieRef* cref, int socket )
/* forward the message. Need only change the command after looking up the
* socket and it's ready to go. */
static int
forwardMessage( unsigned char* buf, int bufLen )
forwardMessage( unsigned char* buf, int bufLen, int srcSocket )
{
int success = 0;
unsigned char* bufp = buf + 1; /* skip cmd */
@ -194,16 +208,20 @@ forwardMessage( unsigned char* buf, int bufLen )
logf( "cookieID = %d", cookieID );
CookieRef* cref = get_cookieRef( cookieID );
if ( cref != NULL ) {
HostID src = getNetShort( &bufp );
/* we heard from host: good as a heartbeat */
cref->HandleHeartbeat( src, srcSocket );
HostID dest = getNetShort( &bufp );
logf( "forwarding from %x to %x", src, dest );
int socket = cref->SocketForHost( dest );
int destSocket = cref->SocketForHost( dest );
logf( "got socket %d for dest %x", socket, dest );
if ( socket != -1 ) {
logf( "got socket %d for dest %x", destSocket, dest );
if ( destSocket != -1 ) {
*buf = XWRELAY_MSG_FROMRELAY;
send_with_length( socket, buf, bufLen );
cref->RecordSent( bufLen, socket );
send_with_length( destSocket, buf, bufLen );
cref->RecordSent( bufLen, destSocket );
success = 1;
} else if ( dest == HOST_ID_SERVER ) {
logf( "server not connected yet; fail silently" );
@ -237,12 +255,13 @@ processMessage( unsigned char* buf, int bufLen, int socket )
break;
case XWRELAY_HEARTBEAT:
logf( "processMessage got XWRELAY_HEARTBEAT" );
processHeartbeat( buf + 1, bufLen - 1 );
processHeartbeat( buf + 1, bufLen - 1, socket );
break;
case XWRELAY_MSG_TORELAY:
logf( "processMessage got XWRELAY_MSG_TORELAY" );
if ( !forwardMessage( buf, bufLen ) ) {
if ( !forwardMessage( buf, bufLen, socket ) ) {
killSocket( socket, "couldn't forward message" );
} else {
}
break;
}
@ -274,6 +293,22 @@ make_socket( unsigned long addr, unsigned short port )
return sock;
} /* make_socket */
static void
sighandler( int signal )
{
logf( "sighandler" );
vector<int> victims;
CheckHeartbeats( now(), &victims );
unsigned int i;
for ( i = 0; i < victims.size(); ++i ) {
killSocket( victims[i], "heartbeat check failed" );
}
logf( "sighandler done" );
} /* sighandler */
int main( int argc, char** argv )
{
int port = 10999;
@ -285,9 +320,17 @@ int main( int argc, char** argv )
which relevant stuff is passed. */
int listener = make_socket( INADDR_ANY, port );
if ( listener == -1 ) exit( 1 );
if ( listener == -1 ) {
exit( 1 );
}
int control = make_socket( INADDR_LOOPBACK, port + 1 );
if ( control == -1 ) exit( 1 );
if ( control == -1 ) {
exit( 1 );
}
/* generate a signal after n milliseconds, then every m milliseconds */
(void)signal( SIGALRM, sighandler );
(void)ualarm( 2 * HEARTBEAT * MILLIS, 2 * HEARTBEAT* MILLIS );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( N_WORKER_THREADS, processMessage );
@ -305,24 +348,26 @@ int main( int argc, char** argv )
++highest;
int retval = select( highest, &rfds, NULL, NULL, NULL );
assert( retval > 0 );
if ( FD_ISSET( listener, &rfds ) ) {
struct sockaddr_in newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( listener, (sockaddr*)&newaddr, &siz );
if ( retval < 0 ) {
logf( "errno: %d", errno );
} else {
if ( FD_ISSET( listener, &rfds ) ) {
struct sockaddr_in newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( listener, (sockaddr*)&newaddr, &siz );
unsigned long remoteIP = newaddr.sin_addr.s_addr;
logf( "accepting connection from 0x%lx", ntohl( remoteIP ) );
unsigned long remoteIP = newaddr.sin_addr.s_addr;
logf( "accepting connection from 0x%lx", ntohl( remoteIP ) );
tPool->AddSocket( newSock );
--retval;
tPool->AddSocket( newSock );
--retval;
}
if ( FD_ISSET( control, &rfds ) ) {
run_ctrl_thread( control );
--retval;
}
assert( retval == 0 );
}
if ( FD_ISSET( control, &rfds ) ) {
run_ctrl_thread( control );
--retval;
}
assert( retval == 0 );
}
close( listener );

View file

@ -10,4 +10,6 @@ void logf( const char* format, ... );
void killSocket( int socket, char* why );
time_t now();
#endif