From deb0a2d265b6d7846e7d6e742f9504a93badd654 Mon Sep 17 00:00:00 2001 From: ehouse Date: Sun, 23 Oct 2005 15:49:48 +0000 Subject: [PATCH] shutdown in response to SIGINT --- xwords4/relay/crefmgr.cpp | 32 +++++++++++++++--- xwords4/relay/crefmgr.h | 2 ++ xwords4/relay/ctrl.cpp | 40 ++++++++++++++++++++-- xwords4/relay/ctrl.h | 3 +- xwords4/relay/tpool.cpp | 42 ++++++++++++++++++++--- xwords4/relay/tpool.h | 2 ++ xwords4/relay/xwrelay.cpp | 71 ++++++++++++++++++++++++++++++--------- 7 files changed, 164 insertions(+), 28 deletions(-) diff --git a/xwords4/relay/crefmgr.cpp b/xwords4/relay/crefmgr.cpp index 0afd32ef5..5ac398efa 100644 --- a/xwords4/relay/crefmgr.cpp +++ b/xwords4/relay/crefmgr.cpp @@ -40,14 +40,15 @@ class SocketStuff { pthread_mutex_t m_writeMutex; /* so only one thread writes at a time */ }; +static CRefMgr* s_instance = NULL; + /* static */ CRefMgr* CRefMgr::Get() { - static CRefMgr* instance = NULL; - if ( instance == NULL ) { - instance = new CRefMgr(); + if ( s_instance == NULL ) { + s_instance = new CRefMgr(); } - return instance; + return s_instance; } /* Get */ CRefMgr::CRefMgr() @@ -60,8 +61,31 @@ CRefMgr::CRefMgr() CRefMgr::~CRefMgr() { + assert( this == s_instance ); + + pthread_mutex_destroy( &m_guard ); + pthread_rwlock_destroy( &m_cookieMapRWLock ); + + s_instance = NULL; } +void +CRefMgr::CloseAll() +{ + MutexLock ml( &m_guard ); + + /* Get every cref instance, shut it down */ + CookieMap::iterator iter = m_cookieMap.begin(); + while ( iter != m_cookieMap.end() ) { + + CookieRef* cref = iter->second; + delete cref; + + ++iter; + } + +} /* CloseAll */ + CookieRef* CRefMgr::FindOpenGameFor( const char* cORn, int isCookie, HostID hid, int nPlayersH, int nPlayersT ) diff --git a/xwords4/relay/crefmgr.h b/xwords4/relay/crefmgr.h index d2f62e227..e6c8e1ec4 100644 --- a/xwords4/relay/crefmgr.h +++ b/xwords4/relay/crefmgr.h @@ -54,6 +54,8 @@ class CRefMgr { CRefMgr(); ~CRefMgr(); + void CloseAll(); + CookieMapIterator GetCookieIterator(); /* PENDING. These need to go through SafeCref */ diff --git a/xwords4/relay/ctrl.cpp b/xwords4/relay/ctrl.cpp index 678c5f622..dafc00959 100644 --- a/xwords4/relay/ctrl.cpp +++ b/xwords4/relay/ctrl.cpp @@ -41,6 +41,7 @@ #include "ctrl.h" #include "cref.h" #include "crefmgr.h" +#include "mlock.h" #include "xwrelay_priv.h" /* this is *only* for testing. Don't abuse!!!! */ @@ -53,6 +54,10 @@ typedef struct FuncRec { CmdPtr func; } FuncRec; +vector g_ctrlSocks; +pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER; + + static int cmd_quit( int socket, const char** args ); static int cmd_print( int socket, const char** args ); static int cmd_lock( int socket, const char** args ); @@ -386,6 +391,11 @@ ctrl_thread_main( void* arg ) int socket = (int)arg; string arg0, arg1, arg2, arg3; + { + MutexLock ml( &g_ctrlSocksMutex ); + g_ctrlSocks.push_back( socket ); + } + for ( ; ; ) { print_prompt( socket ); @@ -411,22 +421,46 @@ ctrl_thread_main( void* arg ) break; } } + close ( socket ); + + MutexLock ml( &g_ctrlSocksMutex ); + vector::iterator iter = g_ctrlSocks.begin(); + while ( iter != g_ctrlSocks.end() ) { + if ( *iter == socket ) { + g_ctrlSocks.erase(iter); + break; + } + } return NULL; } /* ctrl_thread_main */ void -run_ctrl_thread( int ctrl_listener ) +run_ctrl_thread( int ctrl_sock ) { - logf( XW_LOGINFO, "calling accept on socket %d\n", ctrl_listener ); + logf( XW_LOGINFO, "calling accept on socket %d\n", ctrl_sock ); sockaddr newaddr; socklen_t siz = sizeof(newaddr); - int newSock = accept( ctrl_listener, &newaddr, &siz ); + int newSock = accept( ctrl_sock, &newaddr, &siz ); logf( XW_LOGINFO, "got one for ctrl: %d", newSock ); pthread_t thread; int result = pthread_create( &thread, NULL, ctrl_thread_main, (void*)newSock ); + pthread_detach( thread ); + assert( result == 0 ); } + +void +stop_ctrl_threads() +{ + MutexLock ml( &g_ctrlSocksMutex ); + vector::iterator iter = g_ctrlSocks.begin(); + while ( iter != g_ctrlSocks.end() ) { + int sock = *iter++; + print_to_sock( sock, 1, "relay going down..." ); + close( sock ); + } +} diff --git a/xwords4/relay/ctrl.h b/xwords4/relay/ctrl.h index 108ec8aee..f7661c4c6 100644 --- a/xwords4/relay/ctrl.h +++ b/xwords4/relay/ctrl.h @@ -22,6 +22,7 @@ #ifndef _CTRL_H_ #define _CTRL_H_ -void run_ctrl_thread( int ctrl_listener ); +void run_ctrl_thread( int ctrl_sock ); +void stop_ctrl_threads(); #endif diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 951190547..1430296b5 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -46,6 +46,8 @@ XWThreadPool::GetTPool() } XWThreadPool::XWThreadPool() + : m_timeToDie(0) + , m_nThreads(0) { pthread_rwlock_init( &m_activeSocketsRWLock, NULL ); pthread_mutex_init ( &m_queueMutex, NULL ); @@ -58,10 +60,16 @@ XWThreadPool::XWThreadPool() } m_pipeRead = fd[0]; m_pipeWrite = fd[1]; - - m_nThreads = 0; } +XWThreadPool::~XWThreadPool() +{ + pthread_cond_destroy( &m_queueCondVar ); + + pthread_rwlock_destroy( &m_activeSocketsRWLock ); + pthread_mutex_destroy ( &m_queueMutex ); +} /* ~XWThreadPool */ + void XWThreadPool::Setup( int nThreads, packet_func pFunc ) { @@ -74,12 +82,26 @@ XWThreadPool::Setup( int nThreads, packet_func pFunc ) for ( i = 0; i < nThreads; ++i ) { int result = pthread_create( &thread, NULL, tpool_main, this ); assert( result == 0 ); + pthread_detach( thread ); } int result = pthread_create( &thread, NULL, listener_main, this ); assert( result == 0 ); } +void +XWThreadPool::Stop() +{ + m_timeToDie = 1; + + int i; + for ( i = 0; i < m_nThreads; ++i ) { + enqueue( 0 ); + } + + interrupt_poll(); +} + void XWThreadPool::AddSocket( int socket ) { @@ -180,14 +202,18 @@ XWThreadPool::tpool_main( void* closure ) void* XWThreadPool::real_tpool_main() { - logf( XW_LOGINFO, "worker thread starting" ); + logf( XW_LOGINFO, "tpool worker thread starting" ); for ( ; ; ) { pthread_mutex_lock( &m_queueMutex ); - while ( m_queue.size() == 0 ) { + while ( !m_timeToDie && m_queue.size() == 0 ) { pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); } + if ( m_timeToDie ) { + break; + } + int socket = m_queue.front(); m_queue.pop_front(); pthread_mutex_unlock( &m_queueMutex ); @@ -197,7 +223,7 @@ XWThreadPool::real_tpool_main() AddSocket( socket ); } /* else drop it: error */ } - logf( XW_LOGINFO, "worker thread exiting" ); + logf( XW_LOGINFO, "tpool worker thread exiting" ); return NULL; } @@ -247,6 +273,10 @@ XWThreadPool::real_listener() logf( XW_LOGINFO, "polling %s nmillis=%d", log, nMillis ); int nEvents = poll( fds, nSockets, nMillis ); logf( XW_LOGINFO, "back from poll: %d", nEvents ); + if ( m_timeToDie ) { + break; + } + if ( nEvents == 0 ) { tmgr->FireElapsedTimers(); } else if ( nEvents < 0 ) { @@ -295,6 +325,8 @@ XWThreadPool::real_listener() free( fds ); free( log ); } + + logf( XW_LOGINFO, "real_listener returning" ); return NULL; } /* real_listener */ diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index a6973e36b..78cf3fc00 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -41,6 +41,7 @@ class XWThreadPool { ~XWThreadPool(); void Setup( int nThreads, packet_func pFunc ); + void Stop(); /* Add to set being listened on */ void AddSocket( int socket ); @@ -76,6 +77,7 @@ class XWThreadPool { int m_pipeRead; int m_pipeWrite; + int m_timeToDie; int m_nThreads; packet_func m_pFunc; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index bddd59b24..efd4844bb 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -450,6 +450,41 @@ usage( char* arg0 ) } } +/* sockets that need to be closable from interrupt handler */ +int g_listener; +int g_control; + +void +shutdown() +{ + XWThreadPool* tPool = XWThreadPool::GetTPool(); + if ( tPool != NULL ) { + tPool->Stop(); + } + + CRefMgr* cmgr = CRefMgr::Get(); + if ( cmgr != NULL ) { + cmgr->CloseAll(); + delete cmgr; + } + + delete tPool; + + stop_ctrl_threads(); + + close( g_listener ); + close( g_control ); + + exit( 0 ); + logf( XW_LOGINFO, "exit done" ); +} + +static void +SIGINT_handler( int sig ) +{ + logf( XW_LOGERROR, "sig handler called" ); + shutdown(); +} int main( int argc, char** argv ) { @@ -517,15 +552,21 @@ int main( int argc, char** argv ) PermID::SetServerName( serverName ); - int listener = make_socket( INADDR_ANY, port ); - if ( listener == -1 ) { + g_listener = make_socket( INADDR_ANY, port ); + if ( g_listener == -1 ) { exit( 1 ); } - int control = make_socket( INADDR_LOOPBACK, ctrlport ); - if ( control == -1 ) { + g_control = make_socket( INADDR_LOOPBACK, ctrlport ); + if ( g_control == -1 ) { exit( 1 ); } + struct sigaction act; + memset( &act, 0, sizeof(act) ); + act.sa_handler = SIGINT_handler; + int err = sigaction( SIGINT, &act, NULL ); + logf( XW_LOGERROR, "sigaction=>%d", err ); + XWThreadPool* tPool = XWThreadPool::GetTPool(); tPool->Setup( nWorkerThreads, processMessage ); @@ -533,11 +574,11 @@ int main( int argc, char** argv ) fd_set rfds; for ( ; ; ) { FD_ZERO(&rfds); - FD_SET( listener, &rfds ); - FD_SET( control, &rfds ); - int highest = listener; - if ( control > listener ) { - highest = control; + FD_SET( g_listener, &rfds ); + FD_SET( g_control, &rfds ); + int highest = g_listener; + if ( g_control > g_listener ) { + highest = g_control; } ++highest; @@ -547,10 +588,10 @@ int main( int argc, char** argv ) logf( XW_LOGINFO, "errno: %d", errno ); } } else { - if ( FD_ISSET( listener, &rfds ) ) { + if ( FD_ISSET( g_listener, &rfds ) ) { struct sockaddr_in newaddr; socklen_t siz = sizeof(newaddr); - int newSock = accept( listener, (sockaddr*)&newaddr, &siz ); + int newSock = accept( g_listener, (sockaddr*)&newaddr, &siz ); logf( XW_LOGINFO, "accepting connection from %s", inet_ntoa(newaddr.sin_addr) ); @@ -558,16 +599,16 @@ int main( int argc, char** argv ) tPool->AddSocket( newSock ); --retval; } - if ( FD_ISSET( control, &rfds ) ) { - run_ctrl_thread( control ); + if ( FD_ISSET( g_control, &rfds ) ) { + run_ctrl_thread( g_control ); --retval; } assert( retval == 0 ); } } - close( listener ); - close( control ); + close( g_listener ); + close( g_control ); delete cfg;