From 210dcc88c098fdebfdb67570593d8e6d25ee78aa Mon Sep 17 00:00:00 2001 From: ehouse Date: Thu, 23 Jun 2005 04:26:44 +0000 Subject: [PATCH] heartbeats: send to clients in connection response; note when heartbeat and other messages arrive; and periodically reap sockets that haven't been active in long enough. --- relay/Makefile | 4 +- relay/cref.cpp | 65 +++++++++++++++++++++++++---- relay/cref.h | 20 ++++++++- relay/tpool.cpp | 40 ++++++++++++++---- relay/xwrelay.cpp | 97 ++++++++++++++++++++++++++++++++------------ relay/xwrelay_priv.h | 2 + 6 files changed, 183 insertions(+), 45 deletions(-) diff --git a/relay/Makefile b/relay/Makefile index 2ed8d3ecf..5cdf546c9 100644 --- a/relay/Makefile +++ b/relay/Makefile @@ -19,9 +19,9 @@ CC = g++ SRC = xwrelay.cpp cref.cpp ctrl.cpp tpool.cpp OBJ = $(patsubst %.cpp,%.o,$(SRC)) LDFLAGS += -lpthread -g -CPPFLAGS += -g -Wall +CPPFLAGS += -g -Wall -DHEARTBEAT=10 -all: xwrelay +memdebug all: xwrelay xwrelay: $(OBJ) diff --git a/relay/cref.cpp b/relay/cref.cpp index f1c295d76..cc4375f28 100644 --- a/relay/cref.cpp +++ b/relay/cref.cpp @@ -33,6 +33,7 @@ using namespace std; static CookieMap gCookieMap; pthread_rwlock_t gCookieMapRWLock = PTHREAD_RWLOCK_INITIALIZER; + CookieID CookieRef::ms_nextConnectionID = 1000; /* static */ CookieRef* @@ -81,6 +82,20 @@ CookieIdForName( const char* name ) return 0; } /* CookieIdForName */ +void +CheckHeartbeats( time_t now, vector* sockets ) +{ + logf( "CheckHeartbeats" ); + RWReadLock rwl( &gCookieMapRWLock ); + CookieMap::iterator iter = gCookieMap.begin(); + while ( iter != gCookieMap.end() ) { + CookieRef* ref = iter->second; + ref->CheckHeartbeats( now, sockets ); + ++iter; + } + logf( "CheckHeartbeats done" ); +} /* CheckHeartbeats */ + CookieRef* get_make_cookieRef( const char* cookie, CookieID connID ) /* connID ignored for now */ @@ -268,13 +283,13 @@ CookieRef::~CookieRef() for ( ; ; ) { RWWriteLock rwl( &m_sockets_rwlock ); - map::iterator iter = m_hostSockets.begin(); + map::iterator iter = m_hostSockets.begin(); if ( iter == m_hostSockets.end() ) { break; } - int socket = iter->second; + int socket = iter->second.m_socket; tPool->CloseSocket( socket ); m_hostSockets.erase( iter ); } @@ -289,18 +304,19 @@ CookieRef::Associate( int socket, HostID srcID ) assert( srcID != HOST_ID_NONE ); logf( "remembering pair: hostid=%x, socket=%d", srcID, socket ); RWWriteLock ml( &m_sockets_rwlock ); - m_hostSockets.insert( pair(srcID,socket) ); + HostRec hr(socket); + m_hostSockets.insert( pair(srcID,hr) ); } int CookieRef::SocketForHost( HostID dest ) { int socket; - map::iterator iter = m_hostSockets.find( dest ); + map::iterator iter = m_hostSockets.find( dest ); if ( iter == m_hostSockets.end() ) { socket = -1; } else { - socket = iter->second; + socket = iter->second.m_socket; logf( "socketForHost(%x) => %d", dest, socket ); } logf( "returning socket=%d for hostid=%x", socket, dest ); @@ -316,9 +332,9 @@ CookieRef::Remove( int socket ) count = CountSockets(); assert( count > 0 ); - map::iterator iter = m_hostSockets.begin(); + map::iterator iter = m_hostSockets.begin(); while ( iter != m_hostSockets.end() ) { - if ( iter->second == socket ) { + if ( iter->second.m_socket == socket ) { m_hostSockets.erase(iter); --count; break; @@ -333,6 +349,41 @@ CookieRef::Remove( int socket ) } } +void +CookieRef::HandleHeartbeat( HostID id, int socket ) +{ + RWWriteLock rwl( &m_sockets_rwlock ); + + map::iterator iter = m_hostSockets.find(id); + assert( iter != m_hostSockets.end() ); + + /* PENDING If the message came on an unexpected socket, kill the + connection. An attack is the most likely explanation. */ + assert( iter->second.m_socket == socket ); + + logf( "upping m_lastHeartbeat from %d to %d", + iter->second.m_lastHeartbeat, now() ); + iter->second.m_lastHeartbeat = now(); +} /* HandleHeartbeat */ + +void +CookieRef::CheckHeartbeats( time_t now, vector* victims ) +{ + logf( "CookieRef::CheckHeartbeats" ); + + RWWriteLock rwl( &m_sockets_rwlock ); + + map::iterator iter = m_hostSockets.begin(); + while ( iter != m_hostSockets.end() ) { + time_t last = iter->second.m_lastHeartbeat; + if ( (now - last) > HEARTBEAT * 2 ) { + victims->push_back( iter->second.m_socket ); + } + ++iter; + } + logf( "CookieRef::CheckHeartbeats done" ); +} /* CheckHeartbeats */ + void CookieRef::PrintCookieInfo( string& out ) { diff --git a/relay/cref.h b/relay/cref.h index 3f4008476..89ba81f39 100644 --- a/relay/cref.h +++ b/relay/cref.h @@ -11,10 +11,22 @@ typedef unsigned short CookieID; +#ifndef HEARTBEAT +# define HEARTBEAT 60 +#endif + using namespace std; class CookieMapIterator; /* forward */ +class HostRec { + public: + HostRec(int socket) : m_socket(socket), m_lastHeartbeat(now()) {} + ~HostRec() {} + int m_socket; + time_t m_lastHeartbeat; +}; + class CookieRef { public: @@ -24,7 +36,7 @@ class CookieRef { /* Within this cookie, remember that this hostID and socket go together. If the hostID is HOST_ID_SERVER, it's the server. */ void Associate( int socket, HostID srcID ); - short GetHeartbeat() { return 60; } + short GetHeartbeat() { return HEARTBEAT; } CookieID GetCookieID() { return m_connectionID; } int SocketForHost( HostID dest ); void Remove( int socket ); @@ -36,6 +48,9 @@ class CookieRef { m_totalSent += nBytes; } + void HandleHeartbeat( HostID id, int socket ); + void CheckHeartbeats( time_t now, vector* victims ); + /* for console */ void PrintCookieInfo( string& out ); void PrintSocketInfo( string& out, int socket ); @@ -49,7 +64,7 @@ class CookieRef { private: CookieRef( string s ); - map m_hostSockets; + map m_hostSockets; pthread_rwlock_t m_sockets_rwlock; CookieID m_connectionID; string m_name; @@ -72,6 +87,7 @@ class CookieMapIterator { CookieRef* get_make_cookieRef( const char* cookie, CookieID connID ); CookieRef* get_cookieRef( unsigned short cookieID ); CookieID CookieIdForName( const char* name ); +void CheckHeartbeats( time_t now, vector* victims ); class SocketStuff; typedef map< int, SocketStuff* > SocketMap; diff --git a/relay/tpool.cpp b/relay/tpool.cpp index ca2366d6f..b05b904b6 100644 --- a/relay/tpool.cpp +++ b/relay/tpool.cpp @@ -108,8 +108,7 @@ XWThreadPool::RemoveSocket( int socket ) } } return found; -} - +} /* RemoveSocket */ void XWThreadPool::CloseSocket( int socket ) @@ -128,9 +127,13 @@ XWThreadPool::CloseSocket( int socket ) } } close( socket ); - if ( do_interrupt ) { - interrupt_poll(); - } +/* if ( do_interrupt ) { */ + /* We always need to interrupt the poll because the socket we're closing + will be in the list being listened to. That or we need to drop sockets + that have been removed on some other thread while the poll call's + blocking.*/ + interrupt_poll(); +/* } */ } int @@ -208,6 +211,18 @@ XWThreadPool::interrupt_poll() } } +static int +figureTimeout() +{ + return -1; +} + +static void +considerFireTimer() +{ +/* logf( "timer fired" ); */ +} + void* XWThreadPool::real_listener() { @@ -237,8 +252,10 @@ XWThreadPool::real_listener() } pthread_rwlock_unlock( &m_activeSocketsRWLock ); + int nMillis = figureTimeout(); + logf( "calling poll on %s", log ); - int nEvents = poll( fds, nSockets, -1 ); /* -1: infinite timeout */ + int nEvents = poll( fds, nSockets, nMillis ); /* -1: infinite timeout */ logf( "back from poll: %d", nEvents ); if ( nEvents < 0 ) { logf( "errno: %d", errno ); @@ -261,9 +278,14 @@ XWThreadPool::real_listener() if ( curfd->revents != 0 ) { int socket = curfd->fd; - RemoveSocket( socket ); + if ( !RemoveSocket( socket ) ) { + /* no further processing if it's been removed while + we've been sleeping in poll */ + continue; + } - if ( curfd->revents == POLLIN || curfd->revents == POLLPRI ) { + if ( curfd->revents == POLLIN + || curfd->revents == POLLPRI ) { logf( "enqueuing %d", socket ); enqueue( socket ); } else { @@ -277,6 +299,8 @@ XWThreadPool::real_listener() assert( nEvents == 0 ); } + considerFireTimer(); + free( fds ); } return NULL; diff --git a/relay/xwrelay.cpp b/relay/xwrelay.cpp index fe14976cc..620eb67a4 100644 --- a/relay/xwrelay.cpp +++ b/relay/xwrelay.cpp @@ -34,6 +34,7 @@ #include #include /* gethostbyname */ #include +#include #include #include #include @@ -55,6 +56,7 @@ #include "tpool.h" #define N_WORKER_THREADS 5 +#define MILLIS 1000000 void logf( const char* format, ... ) @@ -96,8 +98,13 @@ putNetShort( unsigned char** bufpp, unsigned short s ) } static void -processHeartbeat( const unsigned char* buf, int bufLen ) +processHeartbeat( unsigned char* buf, int bufLen, int socket ) { + CookieID cookieID = getNetShort( &buf ); + HostID hostID = getNetShort( &buf ); + logf( "processHeartbeat: cookieID 0x%x, hostID 0x%x", cookieID, hostID ); + CookieRef* cref = get_cookieRef( cookieID ); + cref->HandleHeartbeat( hostID, socket ); } /* processHeartbeat */ /* A CONNECT message from a device gives us the hostID and socket we'll @@ -146,6 +153,13 @@ killSocket( int socket, char* why ) SocketMgr::RemoveSocketRefs( socket ); /* Might want to kill the thread it belongs to if we're not in it, e.g. when unable to write to another socket. */ + logf( "killSocket done" ); +} + +time_t +now() +{ + return (unsigned long)time(NULL); } static void @@ -186,7 +200,7 @@ sendConnResp( CookieRef* cref, int socket ) /* forward the message. Need only change the command after looking up the * socket and it's ready to go. */ static int -forwardMessage( unsigned char* buf, int bufLen ) +forwardMessage( unsigned char* buf, int bufLen, int srcSocket ) { int success = 0; unsigned char* bufp = buf + 1; /* skip cmd */ @@ -194,16 +208,20 @@ forwardMessage( unsigned char* buf, int bufLen ) logf( "cookieID = %d", cookieID ); CookieRef* cref = get_cookieRef( cookieID ); if ( cref != NULL ) { + HostID src = getNetShort( &bufp ); + /* we heard from host: good as a heartbeat */ + cref->HandleHeartbeat( src, srcSocket ); + HostID dest = getNetShort( &bufp ); logf( "forwarding from %x to %x", src, dest ); - int socket = cref->SocketForHost( dest ); + int destSocket = cref->SocketForHost( dest ); - logf( "got socket %d for dest %x", socket, dest ); - if ( socket != -1 ) { + logf( "got socket %d for dest %x", destSocket, dest ); + if ( destSocket != -1 ) { *buf = XWRELAY_MSG_FROMRELAY; - send_with_length( socket, buf, bufLen ); - cref->RecordSent( bufLen, socket ); + send_with_length( destSocket, buf, bufLen ); + cref->RecordSent( bufLen, destSocket ); success = 1; } else if ( dest == HOST_ID_SERVER ) { logf( "server not connected yet; fail silently" ); @@ -237,12 +255,13 @@ processMessage( unsigned char* buf, int bufLen, int socket ) break; case XWRELAY_HEARTBEAT: logf( "processMessage got XWRELAY_HEARTBEAT" ); - processHeartbeat( buf + 1, bufLen - 1 ); + processHeartbeat( buf + 1, bufLen - 1, socket ); break; case XWRELAY_MSG_TORELAY: logf( "processMessage got XWRELAY_MSG_TORELAY" ); - if ( !forwardMessage( buf, bufLen ) ) { + if ( !forwardMessage( buf, bufLen, socket ) ) { killSocket( socket, "couldn't forward message" ); + } else { } break; } @@ -274,6 +293,22 @@ make_socket( unsigned long addr, unsigned short port ) return sock; } /* make_socket */ +static void +sighandler( int signal ) +{ + logf( "sighandler" ); + + vector victims; + CheckHeartbeats( now(), &victims ); + + unsigned int i; + for ( i = 0; i < victims.size(); ++i ) { + killSocket( victims[i], "heartbeat check failed" ); + } + + logf( "sighandler done" ); +} /* sighandler */ + int main( int argc, char** argv ) { int port = 10999; @@ -285,9 +320,17 @@ int main( int argc, char** argv ) which relevant stuff is passed. */ int listener = make_socket( INADDR_ANY, port ); - if ( listener == -1 ) exit( 1 ); + if ( listener == -1 ) { + exit( 1 ); + } int control = make_socket( INADDR_LOOPBACK, port + 1 ); - if ( control == -1 ) exit( 1 ); + if ( control == -1 ) { + exit( 1 ); + } + + /* generate a signal after n milliseconds, then every m milliseconds */ + (void)signal( SIGALRM, sighandler ); + (void)ualarm( 2 * HEARTBEAT * MILLIS, 2 * HEARTBEAT* MILLIS ); XWThreadPool* tPool = XWThreadPool::GetTPool(); tPool->Setup( N_WORKER_THREADS, processMessage ); @@ -305,24 +348,26 @@ int main( int argc, char** argv ) ++highest; int retval = select( highest, &rfds, NULL, NULL, NULL ); - assert( retval > 0 ); - - if ( FD_ISSET( listener, &rfds ) ) { - struct sockaddr_in newaddr; - socklen_t siz = sizeof(newaddr); - int newSock = accept( listener, (sockaddr*)&newaddr, &siz ); + if ( retval < 0 ) { + logf( "errno: %d", errno ); + } else { + if ( FD_ISSET( listener, &rfds ) ) { + struct sockaddr_in newaddr; + socklen_t siz = sizeof(newaddr); + int newSock = accept( listener, (sockaddr*)&newaddr, &siz ); - unsigned long remoteIP = newaddr.sin_addr.s_addr; - logf( "accepting connection from 0x%lx", ntohl( remoteIP ) ); + unsigned long remoteIP = newaddr.sin_addr.s_addr; + logf( "accepting connection from 0x%lx", ntohl( remoteIP ) ); - tPool->AddSocket( newSock ); - --retval; + tPool->AddSocket( newSock ); + --retval; + } + if ( FD_ISSET( control, &rfds ) ) { + run_ctrl_thread( control ); + --retval; + } + assert( retval == 0 ); } - if ( FD_ISSET( control, &rfds ) ) { - run_ctrl_thread( control ); - --retval; - } - assert( retval == 0 ); } close( listener ); diff --git a/relay/xwrelay_priv.h b/relay/xwrelay_priv.h index 7da4bb72b..40c18d957 100644 --- a/relay/xwrelay_priv.h +++ b/relay/xwrelay_priv.h @@ -10,4 +10,6 @@ void logf( const char* format, ... ); void killSocket( int socket, char* why ); +time_t now(); + #endif