replace killSocket(), which jumped right into crefmgr and could get

ahead of processing data arrived on the same socket, with EnqueueKill
that adds to same queue from which data's taken.  So if device dies
immediately after sending data there won't be a race between closing
the cref (if this is the last open socket) and handling the data.  I'm
still dying with assert fails when running 100 games at once, but much
less frequently
This commit is contained in:
Andy2 2010-09-18 08:47:56 -07:00
parent a4913596db
commit df1ec1628a
5 changed files with 58 additions and 41 deletions

View file

@ -46,6 +46,7 @@
#include "xwrelay_priv.h"
#include "configs.h"
#include "lstnrmgr.h"
#include "tpool.h"
#define MAX_ARGS 10
@ -190,7 +191,7 @@ cmd_kill_eject( int socket, const char** args )
if ( 0 == strcmp( args[1], "socket" ) ) {
int victim = atoi( args[2] );
if ( victim != 0 ) {
killSocket( victim, "ctrl command" );
XWThreadPool::GetTPool()-> EnqueueKill( victim, "ctrl command" );
found = true;
}
} else if ( 0 == strcmp( args[1], "cref" ) ) {

View file

@ -78,10 +78,11 @@ XWThreadPool::~XWThreadPool()
} /* ~XWThreadPool */
void
XWThreadPool::Setup( int nThreads, packet_func pFunc )
XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
{
m_nThreads = nThreads;
m_pFunc = pFunc;
m_kFunc = kFunc;
pthread_t thread;
@ -148,7 +149,7 @@ XWThreadPool::CloseSocket( int socket )
RWWriteLock rwl( &m_activeSocketsRWLock );
deque<QueuePr>::iterator iter = m_queue.begin();
while ( iter != m_queue.end() ) {
if ( iter->second == socket ) {
if ( iter->m_socket == socket ) {
m_queue.erase( iter );
/* do_interrupt = true; */
break;
@ -167,6 +168,12 @@ XWThreadPool::CloseSocket( int socket )
/* } */
}
void
XWThreadPool::EnqueueKill( int socket, const char* const why )
{
enqueue( socket, Q_KILL );
}
bool
XWThreadPool::get_process_packet( int socket )
{
@ -177,18 +184,18 @@ XWThreadPool::get_process_packet( int socket )
ssize_t nRead = recv( socket, &packetSize,
sizeof(packetSize), MSG_WAITALL );
if ( nRead != 2 ) {
killSocket( socket, "nRead != 2" );
EnqueueKill( socket, "nRead != 2" );
} else {
packetSize = ntohs( packetSize );
if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) {
killSocket( socket, "packetSize wrong" );
EnqueueKill( socket, "packetSize wrong" );
} else {
unsigned char buf[MAX_MSG_LEN];
nRead = recv( socket, buf, packetSize, MSG_WAITALL );
if ( nRead != packetSize ) {
killSocket( socket, "nRead != packetSize" );
EnqueueKill( socket, "nRead != packetSize" );
} else {
logf( XW_LOGINFO, "read %d bytes", nRead );
@ -213,7 +220,6 @@ XWThreadPool::real_tpool_main()
logf( XW_LOGINFO, "tpool worker thread starting" );
int socket = -1;
for ( ; ; ) {
pthread_mutex_lock( &m_queueMutex );
release_socket_locked( socket );
@ -227,17 +233,26 @@ XWThreadPool::real_tpool_main()
}
QueuePr pr;
if ( grab_elem_locked( socket, &pr ) ) {
socket = pr.second;
} else {
socket = -1;
}
pthread_mutex_unlock( &m_queueMutex );
logf( XW_LOGINFO, "worker thread got socket %d from queue", socket );
grab_elem_locked( socket, &pr );
if ( socket >= 0 && get_process_packet( socket ) ) {
AddSocket( socket );
} /* else drop it: error */
pthread_mutex_unlock( &m_queueMutex );
if ( pr.m_socket >= 0 ) {
logf( XW_LOGINFO, "worker thread got socket %d from queue",
pr.m_socket );
switch ( pr.m_act ) {
case Q_READ:
if ( get_process_packet( pr.m_socket ) ) {
AddSocket( pr.m_socket );
}
break;
case Q_KILL:
(*m_kFunc)( pr.m_socket );
CloseSocket( pr.m_socket );
break;
}
}
socket = pr.m_socket;
}
logf( XW_LOGINFO, "tpool worker thread exiting" );
return NULL;
@ -359,7 +374,7 @@ XWThreadPool::real_listener()
enqueue( socket );
} else {
logf( XW_LOGERROR, "odd revents: %x", curfd->revents );
killSocket( socket, "error/hup in poll()" );
EnqueueKill( socket, "error/hup in poll()" );
}
--nEvents;
}
@ -381,10 +396,10 @@ XWThreadPool::listener_main( void* closure )
}
void
XWThreadPool::enqueue( int socket )
XWThreadPool::enqueue( int socket, QAction act )
{
MutexLock ml( &m_queueMutex );
pair<QAction, int> pr(Q_READ, socket);
QueuePr pr = { act, socket };
m_queue.push_back( pr );
logf( XW_LOGINFO, "calling pthread_cond_signal" );
@ -392,13 +407,14 @@ XWThreadPool::enqueue( int socket )
/* implicit unlock */
}
bool
void
XWThreadPool::grab_elem_locked( int curSock, QueuePr* prp )
{
bool found = false;
prp->m_socket = -1;
deque<QueuePr>::iterator iter;
for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) {
int socket = iter->second;
int socket = iter->m_socket;
if ( socket == curSock
|| m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) {
*prp = *iter;
@ -407,8 +423,14 @@ XWThreadPool::grab_elem_locked( int curSock, QueuePr* prp )
found = true;
}
}
/* I think once an event isn't "found" here there's a chance of events
sitting in the queue without the threads knowing to go after them. So
IFF this happens need to deal with it or at least confirm that there's
no chance of starvation */
// assert( found ); THIS IS FIRING
logf( XW_LOGINFO, "%s()=>%d", __func__, found );
return found;
}
void

View file

@ -37,11 +37,12 @@ class XWThreadPool {
public:
static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket );
typedef void (*kill_func)( int socket );
XWThreadPool();
~XWThreadPool();
void Setup( int nThreads, packet_func pFunc );
void Setup( int nThreads, packet_func pFunc, kill_func kFunc );
void Stop();
/* Add to set being listened on */
@ -49,18 +50,18 @@ class XWThreadPool {
/* remove from tpool altogether, and close */
void CloseSocket( int socket );
void EnqueueKill( int socket );
void EnqueueKill( int socket, const char* const why );
private:
typedef enum { Q_READ, Q_KILL } QAction;
typedef pair<QAction, int> QueuePr;
typedef struct { QAction m_act; int m_socket; } QueuePr;
/* Remove from set being listened on */
bool RemoveSocket( int socket );
void enqueue( int socket );
void enqueue( int socket, QAction act = Q_READ );
void release_socket_locked( int socket );
bool grab_elem_locked( int curSock, QueuePr* qpp );
void grab_elem_locked( int curSock, QueuePr* qpp );
bool get_process_packet( int socket );
void interrupt_poll();
@ -88,6 +89,7 @@ class XWThreadPool {
bool m_timeToDie;
int m_nThreads;
packet_func m_pFunc;
kill_func m_kFunc;
static XWThreadPool* g_instance;
};

View file

@ -218,9 +218,6 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket )
SafeCref scr( socket );
success = scr.HandleHeartbeat( hostID, socket );
}
if ( !success ) {
killSocket( socket, "no cref for socket" );
}
}
return success;
} /* processHeartbeat */
@ -424,15 +421,11 @@ processDisconnect( unsigned char* bufp, int bufLen, int socket )
return success;
} /* processDisconnect */
void
killSocket( int socket, const char* why )
static void
killSocket( int socket )
{
logf( XW_LOGINFO, "killSocket(%d): %s", socket, why );
logf( XW_LOGINFO, "killSocket(%d)", socket );
CRefMgr::Get()->RemoveSocketRefs( socket );
/* Might want to kill the thread it belongs to if we're not in it,
e.g. when unable to write to another socket. */
logf( XW_LOGINFO, "killSocket done" );
XWThreadPool::GetTPool()->CloseSocket( socket );
}
time_t
@ -512,7 +505,7 @@ processMessage( unsigned char* buf, int bufLen, int socket )
}
if ( !success ) {
killSocket( socket, "failure" );
XWThreadPool::GetTPool()->EnqueueKill( socket, "failure" );
}
return success;
@ -974,7 +967,7 @@ main( int argc, char** argv )
(void)sigaction( SIGINT, &act, NULL );
XWThreadPool* tPool = XWThreadPool::GetTPool();
tPool->Setup( nWorkerThreads, processMessage );
tPool->Setup( nWorkerThreads, processMessage, killSocket );
/* set up select call */
fd_set rfds;

View file

@ -19,7 +19,6 @@ typedef enum {
void logf( XW_LogLevel level, const char* format, ... );
void denyConnection( int socket, XWREASON err );
void killSocket( int socket, const char* why );
bool send_with_length_unsafe( int socket, unsigned char* buf, int bufLen );
time_t uptime(void);