Attempt to detect connections that are duplicates by checking if that

socket is already in a connection.  This doesn't seem to always work,
and I may be leaking cref instances when it does, so needs debugging
still.
This commit is contained in:
ehouse 2009-02-01 16:00:20 +00:00
parent 3c16f9781d
commit 7a24d40d1f
5 changed files with 53 additions and 37 deletions

View file

@ -34,7 +34,7 @@ SVNINFO = svnversion.txt
OBJ = $(patsubst %.cpp,%.o,$(SRC))
LDFLAGS += -pthread -g -lmcheck $(STATIC)
CPPFLAGS += -DSPAWN_SELF -g -Wall \
CPPFLAGS += -DSPAWN_SELF -DRELAY_HEARTBEAT -g -Wall \
-DSVN_REV=\"$(shell cat $(SVNINFO) 2>/dev/null || svnversion -n ..)\"
# turn on semaphore debugging

View file

@ -114,19 +114,24 @@ CookieRef::~CookieRef()
void
CookieRef::_Connect( int socket, HostID hid, int nPlayersH, int nPlayersT )
{
CRefMgr::Get()->Associate( socket, this );
if ( hid == HOST_ID_NONE ) {
hid = nextHostID();
logf( XW_LOGINFO, "assigned host id: %x", hid );
if ( CRefMgr::Get()->Associate( socket, this ) ) {
if ( hid == HOST_ID_NONE ) {
hid = nextHostID();
logf( XW_LOGINFO, "assigned host id: %x", hid );
} else {
logf( XW_LOGINFO, "NOT assigned host id; why?" );
}
pushConnectEvent( socket, hid, nPlayersH, nPlayersT );
handleEvents();
} else {
logf( XW_LOGINFO, "dropping connect event; already connected" );
}
pushConnectEvent( socket, hid, nPlayersH, nPlayersT );
handleEvents();
}
void
CookieRef::_Reconnect( int socket, HostID hid, int nPlayersH, int nPlayersT )
{
CRefMgr::Get()->Associate( socket, this );
(void)CRefMgr::Get()->Associate( socket, this );
/* MutexLock ml( &m_EventsMutex ); */
pushReconnectEvent( socket, hid, nPlayersH, nPlayersT );
handleEvents();
@ -135,6 +140,7 @@ CookieRef::_Reconnect( int socket, HostID hid, int nPlayersH, int nPlayersT )
void
CookieRef::_Disconnect( int socket, HostID hostID )
{
logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID );
CRefMgr::Get()->Disassociate( socket, this );
CRefEvent evt;
@ -225,6 +231,7 @@ CookieRef::notifyDisconn( const CRefEvent* evt )
void
CookieRef::removeSocket( int socket )
{
logf( XW_LOGINFO, "%s(%d)", __func__, socket );
int count;
{
/* RWWriteLock rwl( &m_sockets_rwlock ); */

View file

@ -95,7 +95,7 @@ CookieRef*
CRefMgr::FindOpenGameFor( const char* cORn, bool isCookie,
HostID hid, int nPlayersH, int nPlayersT )
{
logf( XW_LOGINFO, "FindOpenGameFor with %s", cORn );
logf( XW_LOGINFO, "%s(%s)", __func__, cORn );
CookieRef* cref = NULL;
RWReadLock rwl( &m_cookieMapRWLock );
@ -202,17 +202,26 @@ CRefMgr::getMakeCookieRef_locked( const char* cORn, bool isCookie, HostID hid,
return cref;
} /* getMakeCookieRef_locked */
void
bool
CRefMgr::Associate( int socket, CookieRef* cref )
{
bool isNew = false;
MutexLock ml( &m_SocketStuffMutex );
SocketMap::iterator iter = m_SocketStuff.find( socket );
if ( iter != m_SocketStuff.end() ) {
logf( XW_LOGINFO, "replacing existing cref/threadID pair for socket %d", socket );
/* This isn't enough. Must provide a way to reuse sockets should a
genuinely different connection appear. Now maybe we already remove
this reference when a socket is closed. Test this! Or assert
something here. Bottom line: need to swallow repeated/duplicate
connect messages from same host. */
if ( iter == m_SocketStuff.end() ) {
SocketStuff* stuff = new SocketStuff( cref );
m_SocketStuff.insert( pair< int, SocketStuff* >( socket, stuff ) );
isNew = true;
} else {
logf( XW_LOGERROR, "Already have cref/threadID pair for socket %d; "
"error???", socket );
}
SocketStuff* stuff = new SocketStuff( cref );
m_SocketStuff.insert( pair< int, SocketStuff* >( socket, stuff ) );
return isNew;
}
void

View file

@ -69,7 +69,7 @@ class CRefMgr {
void UnlockAll() { pthread_rwlock_unlock( &m_cookieMapRWLock ); }
/* Track sockets independent of cookie refs */
void Associate( int socket, CookieRef* cref );
bool Associate( int socket, CookieRef* cref );
void Disassociate( int socket, CookieRef* cref );
pthread_mutex_t* GetWriteMutexForSocket( int socket );
void RemoveSocketRefs( int socket );
@ -170,7 +170,7 @@ class SafeCref {
bool HandleHeartbeat( HostID id, int socket ) {
if ( IsValid() ) {
m_cref->_HandleHeartbeat( id, socket );
return true
return true;
} else {
return false;
}

View file

@ -109,7 +109,7 @@ XWThreadPool::Stop()
void
XWThreadPool::AddSocket( int socket )
{
logf( XW_LOGINFO, "AddSocket(%d)", socket );
logf( XW_LOGINFO, "%s(%d)", __func__, socket );
{
RWWriteLock ml( &m_activeSocketsRWLock );
m_activeSockets.push_back( socket );
@ -166,6 +166,7 @@ XWThreadPool::CloseSocket( int socket )
bool
XWThreadPool::get_process_packet( int socket )
{
bool success = false;
short packetSize;
assert( sizeof(packetSize) == 2 );
@ -173,26 +174,25 @@ XWThreadPool::get_process_packet( int socket )
sizeof(packetSize), MSG_WAITALL );
if ( nRead != 2 ) {
killSocket( socket, "nRead != 2" );
return false;
} else {
packetSize = ntohs( packetSize );
if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) {
killSocket( socket, "packetSize wrong" );
} else {
unsigned char buf[MAX_MSG_LEN];
nRead = recv( socket, buf, packetSize, MSG_WAITALL );
if ( nRead != packetSize ) {
killSocket( socket, "nRead != packetSize" );
} else {
logf( XW_LOGINFO, "read %d bytes", nRead );
logf( XW_LOGINFO, "calling m_pFunc" );
success = (*m_pFunc)( buf, packetSize, socket );
}
}
}
packetSize = ntohs( packetSize );
if ( packetSize < 0 || packetSize > MAX_MSG_LEN ) {
killSocket( socket, "packetSize wrong" );
return false;
}
unsigned char buf[MAX_MSG_LEN];
nRead = recv( socket, buf, packetSize, MSG_WAITALL );
if ( nRead != packetSize ) {
killSocket( socket, "nRead != packetSize" );
return false;
}
logf( XW_LOGINFO, "read %d bytes\n", nRead );
logf( XW_LOGINFO, "calling m_pFunc" );
bool success = (*m_pFunc)( buf, packetSize, socket );
return success;
} /* get_process_packet */