changes made over the past couple of months toward tracking an

apparent thread leak.
This commit is contained in:
Eric House 2013-01-02 21:12:42 -08:00
parent d34ac1f86d
commit ad78129e74
5 changed files with 88 additions and 41 deletions

View file

@ -47,7 +47,6 @@ static void formatParams( char* paramValues[], int nParams, const char* fmt,
static int here_less_seed( const char* seeds, int perDeviceSum, static int here_less_seed( const char* seeds, int perDeviceSum,
unsigned short seed ); unsigned short seed );
static void destr_function( void* conn ); static void destr_function( void* conn );
static void string_printf( string& str, const char* fmt, ... );
/* static */ DBMgr* /* static */ DBMgr*
DBMgr::Get() DBMgr::Get()
@ -507,7 +506,7 @@ DBMgr::RecordAddress( const char* const connName, HostID hid,
" WHERE connName = '%s'"; " WHERE connName = '%s'";
string query; string query;
string_printf( query, fmt, hid, inet_ntoa(addr), connName ); string_printf( query, fmt, hid, inet_ntoa(addr), connName );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );
execSql( query ); execSql( query );
} }
@ -587,7 +586,7 @@ DBMgr::PendingMsgCount( const char* connName, int hid )
; ;
string query; string query;
string_printf( query, fmt, connName, hid ); string_printf( query, fmt, connName, hid );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() ); PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) { if ( 1 == PQntuples( result ) ) {
@ -878,29 +877,3 @@ DBMgr::getThreadConn( void )
} }
return conn; return conn;
} }
/* From stack overflow, toward a snprintf with an expanding buffer.
*/
static void
string_printf( string& str, const char* fmt, ... )
{
const int origsiz = str.size();
int newsiz = 100;
va_list ap;
for ( ; ; ) {
str.resize( origsiz + newsiz );
va_start( ap, fmt );
int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap );
va_end( ap );
if ( len > newsiz ) { // needs more space
newsiz = len + 1;
} else if ( -1 == len ) {
assert(0); // should be impossible
} else {
str.resize( origsiz + len );
break;
}
}
}

View file

@ -81,6 +81,7 @@ void
XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
{ {
m_nThreads = nThreads; m_nThreads = nThreads;
m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) );
m_pFunc = pFunc; m_pFunc = pFunc;
m_kFunc = kFunc; m_kFunc = kFunc;
@ -88,7 +89,9 @@ XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
int ii; int ii;
for ( ii = 0; ii < nThreads; ++ii ) { for ( ii = 0; ii < nThreads; ++ii ) {
int result = pthread_create( &thread, NULL, tpool_main, this ); ThreadInfo* tip = &m_threadInfos[ii];
tip->me = this;
int result = pthread_create( &thread, NULL, tpool_main, tip );
assert( result == 0 ); assert( result == 0 );
pthread_detach( thread ); pthread_detach( thread );
} }
@ -122,6 +125,8 @@ XWThreadPool::AddSocket( int socket, SockType stype, in_addr& from )
si.m_type = stype; si.m_type = stype;
si.m_addr = from; si.m_addr = from;
m_activeSockets.push_back( pair<int,SockInfo>(socket, si) ); m_activeSockets.push_back( pair<int,SockInfo>(socket, si) );
logf( XW_LOGINFO, "%s: %d sockets active", __func__,
m_activeSockets.size() );
} }
interrupt_poll(); interrupt_poll();
} }
@ -142,6 +147,8 @@ XWThreadPool::RemoveSocket( int socket )
} }
++iter; ++iter;
} }
logf( XW_LOGINFO, "%s: %d sockets active", __func__,
m_activeSockets.size() );
} }
return found; return found;
} /* RemoveSocket */ } /* RemoveSocket */
@ -203,23 +210,25 @@ XWThreadPool::get_process_packet( int socket, SockType stype, in_addr& addr )
return success; return success;
} /* get_process_packet */ } /* get_process_packet */
/* static */ void* void*
XWThreadPool::tpool_main( void* closure ) XWThreadPool::tpool_main( void* closure )
{ {
blockSignals(); blockSignals();
XWThreadPool* me = (XWThreadPool*)closure; ThreadInfo* tip = (ThreadInfo*)closure;
return me->real_tpool_main(); return tip->me->real_tpool_main( tip );
} }
void* void*
XWThreadPool::real_tpool_main() XWThreadPool::real_tpool_main( ThreadInfo* tip )
{ {
logf( XW_LOGINFO, "tpool worker thread starting" ); logf( XW_LOGINFO, "tpool worker thread starting" );
int socket = -1; int socket = -1;
for ( ; ; ) { for ( ; ; ) {
pthread_mutex_lock( &m_queueMutex ); pthread_mutex_lock( &m_queueMutex );
release_socket_locked( socket ); tip->recentTime = 0;
release_socket_locked( socket );
while ( !m_timeToDie && m_queue.size() == 0 ) { while ( !m_timeToDie && m_queue.size() == 0 ) {
pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); pthread_cond_wait( &m_queueCondVar, &m_queueMutex );
@ -234,6 +243,7 @@ XWThreadPool::real_tpool_main()
QueuePr pr; QueuePr pr;
grab_elem_locked( &pr ); grab_elem_locked( &pr );
tip->recentTime = time( NULL );
pthread_mutex_unlock( &m_queueMutex ); pthread_mutex_unlock( &m_queueMutex );
if ( pr.m_socket >= 0 ) { if ( pr.m_socket >= 0 ) {
@ -415,6 +425,7 @@ XWThreadPool::enqueue( int socket, SockInfo si, QAction act )
m_queue.push_back( pr ); m_queue.push_back( pr );
pthread_cond_signal( &m_queueCondVar ); pthread_cond_signal( &m_queueCondVar );
log_hung_threads();
} }
void void
@ -451,12 +462,34 @@ XWThreadPool::release_socket_locked( int socket )
void void
XWThreadPool::print_in_use( void ) XWThreadPool::print_in_use( void )
{ {
char buf[32] = {0}; string str;
int len = 0;
set<int>::iterator iter; set<int>::iterator iter;
for ( iter = m_sockets_in_use.begin(); for ( iter = m_sockets_in_use.begin();
iter != m_sockets_in_use.end(); ++iter ) { iter != m_sockets_in_use.end(); ++iter ) {
len += snprintf( &buf[len], sizeof(buf)-len, "%d ", *iter ); string_printf( str, "%d ", *iter );
}
if ( 0 < str.size() ) {
logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() );
}
}
// We have the mutex when this is called
void
XWThreadPool::log_hung_threads( void )
{
const time_t HUNG_THREASHHOLD = 5; // seconds
int ii;
time_t now = time( NULL );
for ( ii = 0; ii < m_nThreads; ++ii ) {
ThreadInfo* tip = &m_threadInfos[ii];
time_t recentTime = tip->recentTime;
if ( 0 != recentTime ) {
time_t howLong = now - recentTime;
if ( HUNG_THREASHHOLD < howLong ) {
logf( XW_LOGERROR, "thread %d stopped for %d seconds!", ii, howLong );
tip->recentTime = 0; // only log once
}
}
} }
} }

View file

@ -42,6 +42,11 @@ class XWThreadPool {
in_addr m_addr; in_addr m_addr;
} SockInfo; } SockInfo;
typedef struct _ThreadInfo {
XWThreadPool* me;
time_t recentTime;
} ThreadInfo;
static XWThreadPool* GetTPool(); static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket, typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket,
in_addr& addr ); in_addr& addr );
@ -72,11 +77,12 @@ class XWThreadPool {
void release_socket_locked( int socket ); void release_socket_locked( int socket );
void grab_elem_locked( QueuePr* qpp ); void grab_elem_locked( QueuePr* qpp );
void print_in_use( void ); void print_in_use( void );
void log_hung_threads( void );
bool get_process_packet( int socket, SockType stype, in_addr& addr ); bool get_process_packet( int socket, SockType stype, in_addr& addr );
void interrupt_poll(); void interrupt_poll();
void* real_tpool_main(); void* real_tpool_main( ThreadInfo* tsp );
static void* tpool_main( void* closure ); static void* tpool_main( void* closure );
void* real_listener(); void* real_listener();
@ -100,6 +106,7 @@ class XWThreadPool {
int m_nThreads; int m_nThreads;
packet_func m_pFunc; packet_func m_pFunc;
kill_func m_kFunc; kill_func m_kFunc;
ThreadInfo* m_threadInfos;
static XWThreadPool* g_instance; static XWThreadPool* g_instance;
}; };

View file

@ -851,7 +851,7 @@ handleMsgsMsg( int sock, in_addr& addr, bool sendFull,
tmp = htons( nameCount ); tmp = htons( nameCount );
memcpy( &out[2], &tmp, sizeof(tmp) ); memcpy( &out[2], &tmp, sizeof(tmp) );
ssize_t nwritten = write( sock, &out[0], out.size() ); ssize_t nwritten = write( sock, &out[0], out.size() );
logf( XW_LOGINFO, "%s: wrote %d bytes", __func__, nwritten ); logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten );
if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) {
dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); dbmgr->RecordSent( &msgIDs[0], msgIDs.size() );
dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() ); dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() );
@ -964,7 +964,7 @@ handleProxyMsgs( int sock, in_addr& addr, unsigned char* bufp, unsigned char* en
void void
handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr ) handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr )
{ {
logf( XW_LOGINFO, "%s()", __func__ ); logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( len > 0 ) { if ( len > 0 ) {
unsigned char* bufp = buf; unsigned char* bufp = buf;
unsigned char* end = bufp + len; unsigned char* end = bufp + len;
@ -1036,6 +1036,32 @@ handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr )
} }
} /* handle_proxy_packet */ } /* handle_proxy_packet */
/* From stack overflow, toward a snprintf with an expanding buffer.
*/
void
string_printf( string& str, const char* fmt, ... )
{
const int origsiz = str.size();
int newsiz = 100;
va_list ap;
for ( ; ; ) {
str.resize( origsiz + newsiz );
va_start( ap, fmt );
int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap );
va_end( ap );
if ( len > newsiz ) { // needs more space
newsiz = len + 1;
} else if ( -1 == len ) {
assert(0); // should be impossible
} else {
str.resize( origsiz + len );
break;
}
}
}
static void static void
set_timeouts( int sock ) set_timeouts( int sock )
{ {
@ -1400,6 +1426,11 @@ main( int argc, char** argv )
errno, strerror(errno) ); errno, strerror(errno) );
assert( 0 ); // we're leaking files or load has grown assert( 0 ); // we're leaking files or load has grown
} else { } else {
// I've seen a bug where we accept but never service
// connections. Sockets are not closed, and so the
// number goes up. Probably need a watchdog instead,
// but this will work around it.
assert( 100 > newSock );
/* Set timeout so send and recv won't block forever */ /* Set timeout so send and recv won't block forever */
set_timeouts( newSock ); set_timeouts( newSock );

View file

@ -22,6 +22,7 @@
#ifndef _XWRELAY_PRIV_H_ #ifndef _XWRELAY_PRIV_H_
#define _XWRELAY_PRIV_H_ #define _XWRELAY_PRIV_H_
#include <string>
#include <time.h> #include <time.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "lstnrmgr.h" #include "lstnrmgr.h"
@ -49,6 +50,8 @@ int GetNSpawns(void);
int make_socket( unsigned long addr, unsigned short port ); int make_socket( unsigned long addr, unsigned short port );
void string_printf( std::string& str, const char* fmt, ... );
int read_packet( int sock, unsigned char* buf, int buflen ); int read_packet( int sock, unsigned char* buf, int buflen );
void handle_proxy_packet( unsigned char* buf, int bufLen, int socket, void handle_proxy_packet( unsigned char* buf, int bufLen, int socket,
in_addr& addr ); in_addr& addr );