shutdown in response to SIGINT

This commit is contained in:
ehouse 2005-10-23 15:49:48 +00:00
parent 7b04db43ae
commit bdace2be94
7 changed files with 164 additions and 28 deletions

View file

@ -40,14 +40,15 @@ class SocketStuff {
pthread_mutex_t m_writeMutex; /* so only one thread writes at a time */
};
static CRefMgr* s_instance = NULL;
/* static */ CRefMgr*
CRefMgr::Get()
{
static CRefMgr* instance = NULL;
if ( instance == NULL ) {
instance = new CRefMgr();
if ( s_instance == NULL ) {
s_instance = new CRefMgr();
}
return instance;
return s_instance;
} /* Get */
CRefMgr::CRefMgr()
@ -60,8 +61,31 @@ CRefMgr::CRefMgr()
CRefMgr::~CRefMgr()
{
assert( this == s_instance );
pthread_mutex_destroy( &m_guard );
pthread_rwlock_destroy( &m_cookieMapRWLock );
s_instance = NULL;
}
void
CRefMgr::CloseAll()
{
MutexLock ml( &m_guard );
/* Get every cref instance, shut it down */
CookieMap::iterator iter = m_cookieMap.begin();
while ( iter != m_cookieMap.end() ) {
CookieRef* cref = iter->second;
delete cref;
++iter;
}
} /* CloseAll */
CookieRef*
CRefMgr::FindOpenGameFor( const char* cORn, int isCookie,
HostID hid, int nPlayersH, int nPlayersT )

View file

@ -54,6 +54,8 @@ class CRefMgr {
CRefMgr();
~CRefMgr();
void CloseAll();
CookieMapIterator GetCookieIterator();
/* PENDING. These need to go through SafeCref */

View file

@ -41,6 +41,7 @@
#include "ctrl.h"
#include "cref.h"
#include "crefmgr.h"
#include "mlock.h"
#include "xwrelay_priv.h"
/* this is *only* for testing. Don't abuse!!!! */
@ -53,6 +54,10 @@ typedef struct FuncRec {
CmdPtr func;
} FuncRec;
vector<int> g_ctrlSocks;
pthread_mutex_t g_ctrlSocksMutex = PTHREAD_MUTEX_INITIALIZER;
static int cmd_quit( int socket, const char** args );
static int cmd_print( int socket, const char** args );
static int cmd_lock( int socket, const char** args );
@ -386,6 +391,11 @@ ctrl_thread_main( void* arg )
int socket = (int)arg;
string arg0, arg1, arg2, arg3;
{
MutexLock ml( &g_ctrlSocksMutex );
g_ctrlSocks.push_back( socket );
}
for ( ; ; ) {
print_prompt( socket );
@ -411,22 +421,46 @@ ctrl_thread_main( void* arg )
break;
}
}
close ( socket );
MutexLock ml( &g_ctrlSocksMutex );
vector<int>::iterator iter = g_ctrlSocks.begin();
while ( iter != g_ctrlSocks.end() ) {
if ( *iter == socket ) {
g_ctrlSocks.erase(iter);
break;
}
}
return NULL;
} /* ctrl_thread_main */
void
run_ctrl_thread( int ctrl_listener )
run_ctrl_thread( int ctrl_sock )
{
logf( XW_LOGINFO, "calling accept on socket %d\n", ctrl_listener );
logf( XW_LOGINFO, "calling accept on socket %d\n", ctrl_sock );
sockaddr newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( ctrl_listener, &newaddr, &siz );
int newSock = accept( ctrl_sock, &newaddr, &siz );
logf( XW_LOGINFO, "got one for ctrl: %d", newSock );
pthread_t thread;
int result = pthread_create( &thread, NULL,
ctrl_thread_main, (void*)newSock );
pthread_detach( thread );
assert( result == 0 );
}
void
stop_ctrl_threads()
{
MutexLock ml( &g_ctrlSocksMutex );
vector<int>::iterator iter = g_ctrlSocks.begin();
while ( iter != g_ctrlSocks.end() ) {
int sock = *iter++;
print_to_sock( sock, 1, "relay going down..." );
close( sock );
}
}

View file

@ -22,6 +22,7 @@
#ifndef _CTRL_H_
#define _CTRL_H_
void run_ctrl_thread( int ctrl_listener );
void run_ctrl_thread( int ctrl_sock );
void stop_ctrl_threads();
#endif

View file

@ -46,6 +46,8 @@ XWThreadPool::GetTPool()
}
XWThreadPool::XWThreadPool()
: m_timeToDie(0)
, m_nThreads(0)
{
pthread_rwlock_init( &m_activeSocketsRWLock, NULL );
pthread_mutex_init ( &m_queueMutex, NULL );
@ -58,10 +60,16 @@ XWThreadPool::XWThreadPool()
}
m_pipeRead = fd[0];
m_pipeWrite = fd[1];
m_nThreads = 0;
}
XWThreadPool::~XWThreadPool()
{
pthread_cond_destroy( &m_queueCondVar );
pthread_rwlock_destroy( &m_activeSocketsRWLock );
pthread_mutex_destroy ( &m_queueMutex );
} /* ~XWThreadPool */
void
XWThreadPool::Setup( int nThreads, packet_func pFunc )
{
@ -74,12 +82,26 @@ XWThreadPool::Setup( int nThreads, packet_func pFunc )
for ( i = 0; i < nThreads; ++i ) {
int result = pthread_create( &thread, NULL, tpool_main, this );
assert( result == 0 );
pthread_detach( thread );
}
int result = pthread_create( &thread, NULL, listener_main, this );
assert( result == 0 );
}
void
XWThreadPool::Stop()
{
m_timeToDie = 1;
int i;
for ( i = 0; i < m_nThreads; ++i ) {
enqueue( 0 );
}
interrupt_poll();
}
void
XWThreadPool::AddSocket( int socket )
{
@ -180,14 +202,18 @@ XWThreadPool::tpool_main( void* closure )
void*
XWThreadPool::real_tpool_main()
{
logf( XW_LOGINFO, "worker thread starting" );
logf( XW_LOGINFO, "tpool worker thread starting" );
for ( ; ; ) {
pthread_mutex_lock( &m_queueMutex );
while ( m_queue.size() == 0 ) {
while ( !m_timeToDie && m_queue.size() == 0 ) {
pthread_cond_wait( &m_queueCondVar, &m_queueMutex );
}
if ( m_timeToDie ) {
break;
}
int socket = m_queue.front();
m_queue.pop_front();
pthread_mutex_unlock( &m_queueMutex );
@ -197,7 +223,7 @@ XWThreadPool::real_tpool_main()
AddSocket( socket );
} /* else drop it: error */
}
logf( XW_LOGINFO, "worker thread exiting" );
logf( XW_LOGINFO, "tpool worker thread exiting" );
return NULL;
}
@ -247,6 +273,10 @@ XWThreadPool::real_listener()
logf( XW_LOGINFO, "polling %s nmillis=%d", log, nMillis );
int nEvents = poll( fds, nSockets, nMillis );
logf( XW_LOGINFO, "back from poll: %d", nEvents );
if ( m_timeToDie ) {
break;
}
if ( nEvents == 0 ) {
tmgr->FireElapsedTimers();
} else if ( nEvents < 0 ) {
@ -295,6 +325,8 @@ XWThreadPool::real_listener()
free( fds );
free( log );
}
logf( XW_LOGINFO, "real_listener returning" );
return NULL;
} /* real_listener */

View file

@ -41,6 +41,7 @@ class XWThreadPool {
~XWThreadPool();
void Setup( int nThreads, packet_func pFunc );
void Stop();
/* Add to set being listened on */
void AddSocket( int socket );
@ -76,6 +77,7 @@ class XWThreadPool {
int m_pipeRead;
int m_pipeWrite;
int m_timeToDie;
int m_nThreads;
packet_func m_pFunc;

View file

@ -450,6 +450,41 @@ usage( char* arg0 )
}
}
/* sockets that need to be closable from interrupt handler */
int g_listener;
int g_control;
void
shutdown()
{
XWThreadPool* tPool = XWThreadPool::GetTPool();
if ( tPool != NULL ) {
tPool->Stop();
}
CRefMgr* cmgr = CRefMgr::Get();
if ( cmgr != NULL ) {
cmgr->CloseAll();
delete cmgr;
}
delete tPool;
stop_ctrl_threads();
close( g_listener );
close( g_control );
exit( 0 );
logf( XW_LOGINFO, "exit done" );
}
static void
SIGINT_handler( int sig )
{
logf( XW_LOGERROR, "sig handler called" );
shutdown();
}
int main( int argc, char** argv )
{
@ -517,15 +552,21 @@ int main( int argc, char** argv )
PermID::SetServerName( serverName );
int listener = make_socket( INADDR_ANY, port );
if ( listener == -1 ) {
g_listener = make_socket( INADDR_ANY, port );
if ( g_listener == -1 ) {
exit( 1 );
}
int control = make_socket( INADDR_LOOPBACK, ctrlport );
if ( control == -1 ) {
g_control = make_socket( INADDR_LOOPBACK, ctrlport );
if ( g_control == -1 ) {
exit( 1 );
}
struct sigaction act;
memset( &act, 0, sizeof(act) );
act.sa_handler = SIGINT_handler;
int err = sigaction( SIGINT, &act, NULL );
logf( XW_LOGERROR, "sigaction=>%d", err );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( nWorkerThreads, processMessage );
@ -533,11 +574,11 @@ int main( int argc, char** argv )
fd_set rfds;
for ( ; ; ) {
FD_ZERO(&rfds);
FD_SET( listener, &rfds );
FD_SET( control, &rfds );
int highest = listener;
if ( control > listener ) {
highest = control;
FD_SET( g_listener, &rfds );
FD_SET( g_control, &rfds );
int highest = g_listener;
if ( g_control > g_listener ) {
highest = g_control;
}
++highest;
@ -547,10 +588,10 @@ int main( int argc, char** argv )
logf( XW_LOGINFO, "errno: %d", errno );
}
} else {
if ( FD_ISSET( listener, &rfds ) ) {
if ( FD_ISSET( g_listener, &rfds ) ) {
struct sockaddr_in newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( listener, (sockaddr*)&newaddr, &siz );
int newSock = accept( g_listener, (sockaddr*)&newaddr, &siz );
logf( XW_LOGINFO, "accepting connection from %s",
inet_ntoa(newaddr.sin_addr) );
@ -558,16 +599,16 @@ int main( int argc, char** argv )
tPool->AddSocket( newSock );
--retval;
}
if ( FD_ISSET( control, &rfds ) ) {
run_ctrl_thread( control );
if ( FD_ISSET( g_control, &rfds ) ) {
run_ctrl_thread( g_control );
--retval;
}
assert( retval == 0 );
}
}
close( listener );
close( control );
close( g_listener );
close( g_control );
delete cfg;