From ad78129e74f515acb22322041e2e6edcb560c2e4 Mon Sep 17 00:00:00 2001 From: Eric House Date: Wed, 2 Jan 2013 21:12:42 -0800 Subject: [PATCH] changes made over the past couple of months toward tracking an apparent thread leak. --- xwords4/relay/dbmgr.cpp | 31 ++-------------------- xwords4/relay/tpool.cpp | 51 +++++++++++++++++++++++++++++------- xwords4/relay/tpool.h | 9 ++++++- xwords4/relay/xwrelay.cpp | 35 +++++++++++++++++++++++-- xwords4/relay/xwrelay_priv.h | 3 +++ 5 files changed, 88 insertions(+), 41 deletions(-) diff --git a/xwords4/relay/dbmgr.cpp b/xwords4/relay/dbmgr.cpp index 099403e8d..1fef3f9f6 100644 --- a/xwords4/relay/dbmgr.cpp +++ b/xwords4/relay/dbmgr.cpp @@ -47,7 +47,6 @@ static void formatParams( char* paramValues[], int nParams, const char* fmt, static int here_less_seed( const char* seeds, int perDeviceSum, unsigned short seed ); static void destr_function( void* conn ); -static void string_printf( string& str, const char* fmt, ... ); /* static */ DBMgr* DBMgr::Get() @@ -507,7 +506,7 @@ DBMgr::RecordAddress( const char* const connName, HostID hid, " WHERE connName = '%s'"; string query; string_printf( query, fmt, hid, inet_ntoa(addr), connName ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() ); execSql( query ); } @@ -587,7 +586,7 @@ DBMgr::PendingMsgCount( const char* connName, int hid ) ; string query; string_printf( query, fmt, connName, hid ); - logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); + logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() ); PGresult* result = PQexec( getThreadConn(), query.c_str() ); if ( 1 == PQntuples( result ) ) { @@ -878,29 +877,3 @@ DBMgr::getThreadConn( void ) } return conn; } - -/* From stack overflow, toward a snprintf with an expanding buffer. - */ -static void -string_printf( string& str, const char* fmt, ... ) -{ - const int origsiz = str.size(); - int newsiz = 100; - va_list ap; - for ( ; ; ) { - str.resize( origsiz + newsiz ); - - va_start( ap, fmt ); - int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap ); - va_end( ap ); - - if ( len > newsiz ) { // needs more space - newsiz = len + 1; - } else if ( -1 == len ) { - assert(0); // should be impossible - } else { - str.resize( origsiz + len ); - break; - } - } -} diff --git a/xwords4/relay/tpool.cpp b/xwords4/relay/tpool.cpp index 282fb8593..b06e8aa3a 100644 --- a/xwords4/relay/tpool.cpp +++ b/xwords4/relay/tpool.cpp @@ -81,6 +81,7 @@ void XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) { m_nThreads = nThreads; + m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) ); m_pFunc = pFunc; m_kFunc = kFunc; @@ -88,7 +89,9 @@ XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) int ii; for ( ii = 0; ii < nThreads; ++ii ) { - int result = pthread_create( &thread, NULL, tpool_main, this ); + ThreadInfo* tip = &m_threadInfos[ii]; + tip->me = this; + int result = pthread_create( &thread, NULL, tpool_main, tip ); assert( result == 0 ); pthread_detach( thread ); } @@ -122,6 +125,8 @@ XWThreadPool::AddSocket( int socket, SockType stype, in_addr& from ) si.m_type = stype; si.m_addr = from; m_activeSockets.push_back( pair(socket, si) ); + logf( XW_LOGINFO, "%s: %d sockets active", __func__, + m_activeSockets.size() ); } interrupt_poll(); } @@ -142,6 +147,8 @@ XWThreadPool::RemoveSocket( int socket ) } ++iter; } + logf( XW_LOGINFO, "%s: %d sockets active", __func__, + m_activeSockets.size() ); } return found; } /* RemoveSocket */ @@ -203,23 +210,25 @@ XWThreadPool::get_process_packet( int socket, SockType stype, in_addr& addr ) return success; } /* get_process_packet */ -/* static */ void* +void* XWThreadPool::tpool_main( void* closure ) { blockSignals(); - XWThreadPool* me = (XWThreadPool*)closure; - return me->real_tpool_main(); + ThreadInfo* tip = (ThreadInfo*)closure; + return tip->me->real_tpool_main( tip ); } void* -XWThreadPool::real_tpool_main() +XWThreadPool::real_tpool_main( ThreadInfo* tip ) { logf( XW_LOGINFO, "tpool worker thread starting" ); int socket = -1; for ( ; ; ) { pthread_mutex_lock( &m_queueMutex ); - release_socket_locked( socket ); + tip->recentTime = 0; + + release_socket_locked( socket ); while ( !m_timeToDie && m_queue.size() == 0 ) { pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); @@ -234,6 +243,7 @@ XWThreadPool::real_tpool_main() QueuePr pr; grab_elem_locked( &pr ); + tip->recentTime = time( NULL ); pthread_mutex_unlock( &m_queueMutex ); if ( pr.m_socket >= 0 ) { @@ -415,6 +425,7 @@ XWThreadPool::enqueue( int socket, SockInfo si, QAction act ) m_queue.push_back( pr ); pthread_cond_signal( &m_queueCondVar ); + log_hung_threads(); } void @@ -451,12 +462,34 @@ XWThreadPool::release_socket_locked( int socket ) void XWThreadPool::print_in_use( void ) { - char buf[32] = {0}; - int len = 0; + string str; set::iterator iter; for ( iter = m_sockets_in_use.begin(); iter != m_sockets_in_use.end(); ++iter ) { - len += snprintf( &buf[len], sizeof(buf)-len, "%d ", *iter ); + string_printf( str, "%d ", *iter ); + } + if ( 0 < str.size() ) { + logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() ); + } +} + +// We have the mutex when this is called +void +XWThreadPool::log_hung_threads( void ) +{ + const time_t HUNG_THREASHHOLD = 5; // seconds + int ii; + time_t now = time( NULL ); + for ( ii = 0; ii < m_nThreads; ++ii ) { + ThreadInfo* tip = &m_threadInfos[ii]; + time_t recentTime = tip->recentTime; + if ( 0 != recentTime ) { + time_t howLong = now - recentTime; + if ( HUNG_THREASHHOLD < howLong ) { + logf( XW_LOGERROR, "thread %d stopped for %d seconds!", ii, howLong ); + tip->recentTime = 0; // only log once + } + } } } diff --git a/xwords4/relay/tpool.h b/xwords4/relay/tpool.h index 483900800..0c3e1e404 100644 --- a/xwords4/relay/tpool.h +++ b/xwords4/relay/tpool.h @@ -42,6 +42,11 @@ class XWThreadPool { in_addr m_addr; } SockInfo; + typedef struct _ThreadInfo { + XWThreadPool* me; + time_t recentTime; + } ThreadInfo; + static XWThreadPool* GetTPool(); typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket, in_addr& addr ); @@ -72,11 +77,12 @@ class XWThreadPool { void release_socket_locked( int socket ); void grab_elem_locked( QueuePr* qpp ); void print_in_use( void ); + void log_hung_threads( void ); bool get_process_packet( int socket, SockType stype, in_addr& addr ); void interrupt_poll(); - void* real_tpool_main(); + void* real_tpool_main( ThreadInfo* tsp ); static void* tpool_main( void* closure ); void* real_listener(); @@ -100,6 +106,7 @@ class XWThreadPool { int m_nThreads; packet_func m_pFunc; kill_func m_kFunc; + ThreadInfo* m_threadInfos; static XWThreadPool* g_instance; }; diff --git a/xwords4/relay/xwrelay.cpp b/xwords4/relay/xwrelay.cpp index b01bd0c5c..ba2c17c27 100644 --- a/xwords4/relay/xwrelay.cpp +++ b/xwords4/relay/xwrelay.cpp @@ -851,7 +851,7 @@ handleMsgsMsg( int sock, in_addr& addr, bool sendFull, tmp = htons( nameCount ); memcpy( &out[2], &tmp, sizeof(tmp) ); ssize_t nwritten = write( sock, &out[0], out.size() ); - logf( XW_LOGINFO, "%s: wrote %d bytes", __func__, nwritten ); + logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten ); if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() ); @@ -964,7 +964,7 @@ handleProxyMsgs( int sock, in_addr& addr, unsigned char* bufp, unsigned char* en void handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr ) { - logf( XW_LOGINFO, "%s()", __func__ ); + logf( XW_LOGVERBOSE0, "%s()", __func__ ); if ( len > 0 ) { unsigned char* bufp = buf; unsigned char* end = bufp + len; @@ -1036,6 +1036,32 @@ handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr ) } } /* handle_proxy_packet */ +/* From stack overflow, toward a snprintf with an expanding buffer. + */ +void +string_printf( string& str, const char* fmt, ... ) +{ + const int origsiz = str.size(); + int newsiz = 100; + va_list ap; + for ( ; ; ) { + str.resize( origsiz + newsiz ); + + va_start( ap, fmt ); + int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap ); + va_end( ap ); + + if ( len > newsiz ) { // needs more space + newsiz = len + 1; + } else if ( -1 == len ) { + assert(0); // should be impossible + } else { + str.resize( origsiz + len ); + break; + } + } +} + static void set_timeouts( int sock ) { @@ -1400,6 +1426,11 @@ main( int argc, char** argv ) errno, strerror(errno) ); assert( 0 ); // we're leaking files or load has grown } else { + // I've seen a bug where we accept but never service + // connections. Sockets are not closed, and so the + // number goes up. Probably need a watchdog instead, + // but this will work around it. + assert( 100 > newSock ); /* Set timeout so send and recv won't block forever */ set_timeouts( newSock ); diff --git a/xwords4/relay/xwrelay_priv.h b/xwords4/relay/xwrelay_priv.h index 27c5a088a..16d09237f 100644 --- a/xwords4/relay/xwrelay_priv.h +++ b/xwords4/relay/xwrelay_priv.h @@ -22,6 +22,7 @@ #ifndef _XWRELAY_PRIV_H_ #define _XWRELAY_PRIV_H_ +#include #include #include #include "lstnrmgr.h" @@ -49,6 +50,8 @@ int GetNSpawns(void); int make_socket( unsigned long addr, unsigned short port ); +void string_printf( std::string& str, const char* fmt, ... ); + int read_packet( int sock, unsigned char* buf, int buflen ); void handle_proxy_packet( unsigned char* buf, int bufLen, int socket, in_addr& addr );