Merge remote-tracking branch 'origin/android_branch' into android_branch

This commit is contained in:
Eric House 2013-01-19 15:54:40 -08:00
commit 54ad381749
20 changed files with 700 additions and 500 deletions

View file

@ -1380,16 +1380,17 @@ gtkDrawCtxtMake( GtkWidget* drawing_area, GtkAppGlobals* globals )
allocAndSet( map, &dctx->white, 0xFFFF, 0xFFFF, 0xFFFF ); allocAndSet( map, &dctx->white, 0xFFFF, 0xFFFF, 0xFFFF );
{ {
GdkWindow *window = NULL; // GdkWindow *window = NULL;
if ( GTK_WIDGET_FLAGS(GTK_WIDGET(drawing_area)) & GTK_NO_WINDOW ) { /* if ( GTK_WIDGET_FLAGS(GTK_WIDGET(drawing_area)) & GTK_NO_WINDOW ) { */
/* XXX I'm not sure about this function because I never used it. /* /\* XXX I'm not sure about this function because I never used it. */
* (the name seems to indicate what you want though). /* * (the name seems to indicate what you want though). */
*/ /* *\/ */
window = gtk_widget_get_parent_window( GTK_WIDGET(drawing_area) ); /* window = gtk_widget_get_parent_window( GTK_WIDGET(drawing_area) ); */
} else { /* } else { */
window = GTK_WIDGET(drawing_area)->window; /* window = GTK_WIDGET(drawing_area)->window; */
} /* } */
window = GTK_WIDGET(drawing_area)->window; GdkWindow* window = GTK_WIDGET(drawing_area)->window;
XP_ASSERT( !!window );
#ifdef USE_CAIRO #ifdef USE_CAIRO
dctx->cr = gdk_cairo_create( window ); dctx->cr = gdk_cairo_create( window );
XP_LOGF( "dctx->cr=%p", dctx->cr ); XP_LOGF( "dctx->cr=%p", dctx->cr );

View file

@ -30,6 +30,7 @@ SRC = \
timermgr.cpp \ timermgr.cpp \
tpool.cpp \ tpool.cpp \
cidlock.cpp \ cidlock.cpp \
addrinfo.cpp \
xwrelay.cpp \ xwrelay.cpp \
# STATIC ?= -static # STATIC ?= -static

View file

@ -0,0 +1,40 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2005-2011 by Eric House (xwords@eehouse.org). All rights
* reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <assert.h>
#include <string.h>
#include "addrinfo.h"
bool
AddrInfo::equals( const AddrInfo& other ) const
{
bool equal = other.isTCP() == isTCP();
if ( equal ) {
if ( isTCP() ) {
equal = m_socket == other.m_socket;
} else {
assert(0); /* later.... */
}
}
return equal;
}

63
xwords4/relay/addrinfo.h Normal file
View file

@ -0,0 +1,63 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2005 - 2013 by Eric House (xwords@eehouse.org). All rights
* reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _ADDRINFO_H_
#define _ADDRINFO_H_
#include <netinet/in.h>
#include <string.h>
class AddrInfo {
public:
typedef union _AddrUnion {
struct sockaddr addr;
struct sockaddr_in addr_in;
} AddrUnion;
AddrInfo() {
memset( this, 0, sizeof(*this) );
m_socket = -1;
m_isValid = false;
}
AddrInfo( bool isTCP, int socket, const AddrUnion* saddr ) {
m_isValid = true;
m_isTCP = isTCP;
m_socket = socket;
memcpy( &m_saddr, saddr, sizeof(m_saddr) );
}
void setIsTCP( bool val ) { m_isTCP = val; }
bool isTCP() const { return m_isTCP; } /* later UDP will be here too */
int socket() const { assert(m_isValid); return m_socket; }
struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; }
bool equals( const AddrInfo& other ) const;
private:
// AddrInfo& operator=(const AddrInfo&); // Prevent assignment
int m_socket;
bool m_isTCP;
bool m_isValid;
AddrUnion m_saddr;
};
#endif

View file

@ -27,11 +27,11 @@
// #define CIDLOCK_DEBUG // #define CIDLOCK_DEBUG
const set<int> const vector<AddrInfo>
CidInfo::GetSockets( void ) CidInfo::GetAddrs( void )
{ {
return 0 == m_owner || NULL == m_cref ? return 0 == m_owner || NULL == m_cref ?
m_sockets : m_cref->GetSockets(); m_addrs : m_cref->GetAddrs();
} }
CidLock* CidLock::s_instance = NULL; CidLock* CidLock::s_instance = NULL;
@ -117,7 +117,7 @@ CidLock::Claim( CookieID cid )
} /* CidLock::Claim */ } /* CidLock::Claim */
CidInfo* CidInfo*
CidLock::ClaimSocket( int sock ) CidLock::ClaimSocket( const AddrInfo* addr )
{ {
CidInfo* info = NULL; CidInfo* info = NULL;
#ifdef CIDLOCK_DEBUG #ifdef CIDLOCK_DEBUG
@ -126,16 +126,19 @@ CidLock::ClaimSocket( int sock )
for ( ; ; ) { for ( ; ; ) {
MutexLock ml( &m_infos_mutex ); MutexLock ml( &m_infos_mutex );
map< CookieID, CidInfo*>::iterator iter; map<CookieID, CidInfo*>::iterator iter;
for ( iter = m_infos.begin(); iter != m_infos.end(); ++iter ) { for ( iter = m_infos.begin(); NULL == info && iter != m_infos.end(); ++iter ) {
const set<int>& sockets = iter->second->GetSockets(); const vector<AddrInfo>& addrs = iter->second->GetAddrs();
if ( sockets.end() != sockets.find( sock ) ) { vector<AddrInfo>::const_iterator iter2;
if ( 0 == iter->second->GetOwner() ) { for ( iter2 = addrs.begin(); iter2 != addrs.end(); ++iter2 ) {
info = iter->second; if ( iter2->equals(*addr) ) {
info->SetOwner( pthread_self() ); if ( 0 == iter->second->GetOwner() ) {
PRINT_CLAIMED(); info = iter->second;
info->SetOwner( pthread_self() );
PRINT_CLAIMED();
}
break;
} }
break;
} }
} }
@ -177,7 +180,7 @@ CidLock::Relinquish( CidInfo* claim, bool drop )
} else { } else {
CookieRef* ref = claim->GetRef(); CookieRef* ref = claim->GetRef();
if ( NULL != ref ) { if ( NULL != ref ) {
claim->SetSockets( ref->GetSockets() ); /* cache these */ claim->SetAddrs( ref->GetAddrs() ); /* cache these */
} }
claim->SetOwner( 0 ); claim->SetOwner( 0 );
} }

View file

@ -39,8 +39,8 @@ class CidInfo {
CookieID GetCid( void ) { return m_cid; } CookieID GetCid( void ) { return m_cid; }
CookieRef* GetRef( void ) { return m_cref; } CookieRef* GetRef( void ) { return m_cref; }
pthread_t GetOwner( void ) { return m_owner; } pthread_t GetOwner( void ) { return m_owner; }
const set<int> GetSockets( void ); const vector<AddrInfo> GetAddrs( void );
void SetSockets( set<int> sockets ) { m_sockets = sockets; }; void SetAddrs( vector<AddrInfo> addrs ) { m_addrs = addrs; };
void SetRef( CookieRef* cref ) { m_cref = cref; } void SetRef( CookieRef* cref ) { m_cref = cref; }
void SetOwner( pthread_t owner ) { m_owner = owner; } void SetOwner( pthread_t owner ) { m_owner = owner; }
@ -49,7 +49,7 @@ class CidInfo {
CookieID m_cid; CookieID m_cid;
CookieRef* m_cref; CookieRef* m_cref;
pthread_t m_owner; pthread_t m_owner;
set<int> m_sockets; vector<AddrInfo> m_addrs;
}; };
class CidLock { class CidLock {
@ -65,7 +65,7 @@ class CidLock {
CidInfo* Claim( void ) { return Claim(0); } CidInfo* Claim( void ) { return Claim(0); }
CidInfo* Claim( CookieID cid ); CidInfo* Claim( CookieID cid );
CidInfo* ClaimSocket( int sock ); CidInfo* ClaimSocket( const AddrInfo* addr );
void Relinquish( CidInfo* claim, bool drop ); void Relinquish( CidInfo* claim, bool drop );
private: private:

View file

@ -128,12 +128,14 @@ CookieRef::~CookieRef()
XWThreadPool* tPool = XWThreadPool::GetTPool(); XWThreadPool* tPool = XWThreadPool::GetTPool();
ASSERT_LOCKED(); ASSERT_LOCKED();
map<int,HostRec>::iterator iter; vector<HostRec>::iterator iter;
{ {
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int socket = iter->first; AddrInfo addr = iter->m_addr;
tPool->CloseSocket( socket ); if ( addr.isTCP() ) {
tPool->CloseSocket( &addr );
}
m_sockets.erase( iter ); m_sockets.erase( iter );
} }
} }
@ -180,13 +182,13 @@ CookieRef::Unlock() {
} }
bool bool
CookieRef::_Connect( int socket, int clientVersion, DevID* devID, CookieRef::_Connect( int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS, int seed, int nPlayersH, int nPlayersS, int seed,
bool seenSeed, in_addr& addr ) bool seenSeed, const AddrInfo* addr )
{ {
bool connected = false; bool connected = false;
HostID prevHostID = HOST_ID_NONE; HostID prevHostID = HOST_ID_NONE;
bool alreadyHere = AlreadyHere( seed, socket, &prevHostID ); bool alreadyHere = AlreadyHere( seed, addr, &prevHostID );
if ( alreadyHere ) { if ( alreadyHere ) {
if ( seenSeed ) { /* we need to get rid of the current entry, then if ( seenSeed ) { /* we need to get rid of the current entry, then
@ -199,12 +201,23 @@ CookieRef::_Connect( int socket, int clientVersion, DevID* devID,
} }
if ( !connected ) { if ( !connected ) {
set<int> sockets = GetSockets(); bool socketOK = !addr->isTCP();
if ( sockets.end() == sockets.find( socket ) ) { if ( !socketOK ) {
pushConnectEvent( socket, clientVersion, devID, nPlayersH, socketOK = true;
nPlayersS, seed, addr ); vector<AddrInfo> addrs = GetAddrs();
vector<AddrInfo>::const_iterator iter;
for ( iter = addrs.begin(); iter != addrs.end(); ++iter ) {
if ( iter->equals( *addr ) ) {
socketOK = false;
break;
}
}
}
if ( socketOK ) {
pushConnectEvent( clientVersion, devID, nPlayersH, nPlayersS,
seed, addr );
handleEvents(); handleEvents();
connected = HasSocket_locked( socket ); connected = HasSocket_locked( addr );
} else { } else {
logf( XW_LOGINFO, "dropping connect event; already connected" ); logf( XW_LOGINFO, "dropping connect event; already connected" );
} }
@ -213,12 +226,12 @@ CookieRef::_Connect( int socket, int clientVersion, DevID* devID,
} }
bool bool
CookieRef::_Reconnect( int socket, int clientVersion, DevID* devID, CookieRef::_Reconnect( int clientVersion, DevID* devID, HostID hid,
HostID hid, int nPlayersH, int nPlayersS, int nPlayersH, int nPlayersS, int seed,
int seed, in_addr& addr, bool gameDead ) const AddrInfo* addr, bool gameDead )
{ {
bool spotTaken = false; bool spotTaken = false;
bool alreadyHere = AlreadyHere( hid, seed, socket, &spotTaken ); bool alreadyHere = AlreadyHere( hid, seed, addr, &spotTaken );
if ( spotTaken ) { if ( spotTaken ) {
logf( XW_LOGINFO, "%s: failing because spot taken", __func__ ); logf( XW_LOGINFO, "%s: failing because spot taken", __func__ );
} else { } else {
@ -226,11 +239,11 @@ CookieRef::_Reconnect( int socket, int clientVersion, DevID* devID,
logf( XW_LOGINFO, "%s: dropping because already here", logf( XW_LOGINFO, "%s: dropping because already here",
__func__ ); __func__ );
} else { } else {
pushReconnectEvent( socket, clientVersion, devID, hid, nPlayersH, pushReconnectEvent( clientVersion, devID, hid, nPlayersH,
nPlayersS, seed, addr ); nPlayersS, seed, addr );
} }
if ( gameDead ) { if ( gameDead ) {
pushGameDead( socket ); pushGameDead( addr );
} }
handleEvents(); handleEvents();
} }
@ -247,27 +260,25 @@ CookieRef::_HandleAck( HostID hostID )
} }
void void
CookieRef::_PutMsg( HostID srcID, in_addr& addr, HostID destID, CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen ) unsigned char* buf, int buflen )
{ {
CRefEvent evt( XWE_PROXYMSG ); CRefEvent evt( XWE_PROXYMSG, addr );
evt.u.fwd.src = srcID; evt.u.fwd.src = srcID;
evt.u.fwd.dest = destID; evt.u.fwd.dest = destID;
evt.u.fwd.buf = buf; evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen; evt.u.fwd.buflen = buflen;
evt.u.fwd.addr = addr;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
handleEvents(); handleEvents();
} }
void void
CookieRef::_Disconnect( int socket, HostID hostID ) CookieRef::_Disconnect( const AddrInfo* addr, HostID hostID )
{ {
logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID ); logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID );
CRefEvent evt( XWE_DISCONN ); CRefEvent evt( XWE_DISCONN, addr );
evt.u.discon.socket = socket;
evt.u.discon.srcID = hostID; evt.u.discon.srcID = hostID;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
@ -295,54 +306,58 @@ CookieRef::_Shutdown()
} /* _Shutdown */ } /* _Shutdown */
HostID HostID
CookieRef::HostForSocket( int sock ) CookieRef::HostForSocket( const AddrInfo* addr )
{ {
HostID hid = -1; HostID hid = -1;
ASSERT_LOCKED(); ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int, HostRec>::const_iterator iter = m_sockets.find( sock ); vector<HostRec>::const_iterator iter;
if ( iter != m_sockets.end() ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
hid = iter->second.m_hostID; if ( iter->m_addr.equals( *addr ) ) {
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid ); hid = iter->m_hostID;
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid );
break;
}
} }
return hid; return hid;
} }
int const AddrInfo*
CookieRef::SocketForHost( HostID dest ) CookieRef::SocketForHost( HostID dest )
{ {
int socket = -1; const AddrInfo* result = NULL;
ASSERT_LOCKED(); ASSERT_LOCKED();
assert( dest != 0 ); /* don't use as lookup before assigned */ assert( dest != 0 ); /* don't use as lookup before assigned */
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_hostID == dest ) { if ( iter->m_hostID == dest ) {
socket = iter->first; result = &iter->m_addr;
break; break;
} }
} }
logf( XW_LOGVERBOSE0, "returning socket=%d for hostid=%x", socket, dest ); // logf( XW_LOGVERBOSE0, "returning socket=%d for hostid=%x", socket, dest );
return socket; return result;
} }
bool bool
CookieRef::AlreadyHere( unsigned short seed, int socket, HostID* prevHostID ) CookieRef::AlreadyHere( unsigned short seed, const AddrInfo* addr,
HostID* prevHostID )
{ {
logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket ); logf( XW_LOGINFO, "%s(seed=%x(%d),socket=%d)", __func__, seed, seed, socket );
bool here = false; bool here = false;
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
here = iter->second.m_seed == seed; /* client already registered */ here = iter->m_seed == seed; /* client already registered */
if ( here ) { if ( here ) {
if ( iter->first != socket ) { /* not just a dupe packet */ if ( !addr->equals(iter->m_addr) ) { /* not just a dupe packet */
logf( XW_LOGINFO, "%s: seeds match; socket %d assumed closed", logf( XW_LOGINFO, "%s: seeds match; socket assumed closed",
__func__, iter->first ); __func__ );
*prevHostID = iter->second.m_hostID; *prevHostID = iter->m_hostID;
} }
break; break;
} }
@ -353,7 +368,7 @@ CookieRef::AlreadyHere( unsigned short seed, int socket, HostID* prevHostID )
} }
bool bool
CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket, CookieRef::AlreadyHere( HostID hid, unsigned short seed, const AddrInfo* addr,
bool* spotTaken ) bool* spotTaken )
{ {
logf( XW_LOGINFO, "%s(hid=%d,seed=%x(%d),socket=%d)", __func__, logf( XW_LOGINFO, "%s(hid=%d,seed=%x(%d),socket=%d)", __func__,
@ -361,17 +376,16 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket,
bool here = false; bool here = false;
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter; vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_hostID == hid ) { if ( iter->m_hostID == hid ) {
if ( seed != iter->second.m_seed ) { if ( seed != iter->m_seed ) {
*spotTaken = true; *spotTaken = true;
} else if ( socket == iter->first ) { } else if ( addr->equals( iter->m_addr ) ) {
here = true; /* dup packet */ here = true; /* dup packet */
} else { } else {
logf( XW_LOGINFO, "%s: hids match; nuking existing record " logf( XW_LOGINFO, "%s: hids match; nuking existing record "
"for socket %d b/c assumed closed", __func__, "for socket b/c assumed closed", __func__ );
iter->first );
m_sockets.erase( iter ); m_sockets.erase( iter );
} }
break; break;
@ -385,41 +399,42 @@ CookieRef::AlreadyHere( HostID hid, unsigned short seed, int socket,
void void
CookieRef::notifyDisconn( const CRefEvent* evt ) CookieRef::notifyDisconn( const CRefEvent* evt )
{ {
int socket = evt->u.disnote.socket;
unsigned char buf[] = { unsigned char buf[] = {
XWRELAY_DISCONNECT_YOU, XWRELAY_DISCONNECT_YOU,
evt->u.disnote.why evt->u.disnote.why
}; };
send_with_length( socket, buf, sizeof(buf), true ); send_with_length( &evt->addr, buf, sizeof(buf), true );
} /* notifyDisconn */ } /* notifyDisconn */
void void
CookieRef::removeSocket( int socket ) CookieRef::removeSocket( const AddrInfo* addr )
{ {
logf( XW_LOGINFO, "%s(socket=%d)", __func__, socket ); logf( XW_LOGINFO, "%s(socket=%d)", __func__, addr->socket() );
bool found = false; bool found = false;
ASSERT_LOCKED(); ASSERT_LOCKED();
{ {
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter = m_sockets.find(socket); vector<HostRec>::iterator iter;
if ( iter != m_sockets.end() ) { for ( iter = m_sockets.begin(); !found && iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_ackPending ) { if ( iter->m_addr.equals( *addr ) ) {
logf( XW_LOGINFO, if ( iter->m_ackPending ) {
"Never got ack; removing hid %d from DB", logf( XW_LOGINFO,
iter->second.m_hostID ); "Never got ack; removing hid %d from DB",
DBMgr::Get()->RmDeviceByHid( ConnName(), iter->m_hostID );
iter->second.m_hostID ); DBMgr::Get()->RmDeviceByHid( ConnName(),
m_nPlayersHere -= iter->second.m_nPlayersH; iter->m_hostID );
cancelAckTimer( iter->second.m_hostID ); m_nPlayersHere -= iter->m_nPlayersH;
cancelAckTimer( iter->m_hostID );
}
m_sockets.erase(iter);
found = true;
} }
m_sockets.erase(iter);
found = true;
} }
} }
if ( !found ) { if ( !found ) {
logf( XW_LOGINFO, "%s: socket %d not found", __func__, socket ); logf( XW_LOGINFO, "%s: socket not found", __func__ );
} }
printSeeds(__func__); printSeeds(__func__);
@ -441,35 +456,38 @@ CookieRef::HaveRoom( int nPlayers )
} }
bool bool
CookieRef::HasSocket( int socket ) CookieRef::HasSocket( const AddrInfo* addr )
{ {
bool result = Lock(); bool result = Lock();
if ( result ) { if ( result ) {
result = HasSocket_locked( socket ); result = HasSocket_locked( addr );
Unlock(); Unlock();
} }
return result; return result;
} }
set<int> vector<AddrInfo>
CookieRef::GetSockets() CookieRef::GetAddrs()
{ {
set<int> result; vector<AddrInfo> result;
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
result.insert( iter->first ); result.push_back( iter->m_addr );
} }
return result; return result;
} }
bool bool
CookieRef::HasSocket_locked( int socket ) CookieRef::HasSocket_locked( const AddrInfo* addr )
{ {
ASSERT_LOCKED(); ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter = m_sockets.find( socket ); vector<HostRec>::const_iterator iter;
bool found = iter != m_sockets.end(); bool found = false;
for ( iter = m_sockets.begin(); !found && iter != m_sockets.end(); ++iter ) {
found = iter->m_addr.equals( *addr );
}
logf( XW_LOGINFO, "%s=>%d", __func__, found ); logf( XW_LOGINFO, "%s=>%d", __func__, found );
return found; return found;
@ -503,52 +521,47 @@ CookieRef::_CheckHeartbeats( time_t now )
#endif #endif
void void
CookieRef::_Forward( HostID src, in_addr& addr, HostID dest, unsigned char* buf, CookieRef::_Forward( HostID src, const AddrInfo* addr,
int buflen ) HostID dest, unsigned char* buf, int buflen )
{ {
pushForwardEvent( src, addr, dest, buf, buflen ); pushForwardEvent( src, addr, dest, buf, buflen );
handleEvents(); handleEvents();
} /* Forward */ } /* Forward */
void void
CookieRef::_Remove( int socket ) CookieRef::_Remove( const AddrInfo* addr )
{ {
pushRemoveSocketEvent( socket ); pushRemoveSocketEvent( addr );
handleEvents(); handleEvents();
} /* Forward */ } /* Forward */
void void
CookieRef::pushConnectEvent( int socket, int clientVersion, DevID* devID, CookieRef::pushConnectEvent( int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS, int nPlayersH, int nPlayersS,
int seed, in_addr& addr ) int seed, const AddrInfo* addr )
{ {
CRefEvent evt; CRefEvent evt( XWE_DEVCONNECT, addr );
evt.type = XWE_DEVCONNECT;
evt.u.con.socket = socket;
evt.u.con.clientVersion = clientVersion; evt.u.con.clientVersion = clientVersion;
evt.u.con.devID = devID; evt.u.con.devID = devID;
evt.u.con.srcID = HOST_ID_NONE; evt.u.con.srcID = HOST_ID_NONE;
evt.u.con.nPlayersH = nPlayersH; evt.u.con.nPlayersH = nPlayersH;
evt.u.con.nPlayersS = nPlayersS; evt.u.con.nPlayersS = nPlayersS;
evt.u.con.seed = seed; evt.u.con.seed = seed;
evt.u.con.addr = addr;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} /* pushConnectEvent */ } /* pushConnectEvent */
void void
CookieRef::pushReconnectEvent( int socket, int clientVersion, DevID* devID, CookieRef::pushReconnectEvent( int clientVersion, DevID* devID,
HostID srcID, int nPlayersH, int nPlayersS, HostID srcID, int nPlayersH, int nPlayersS,
int seed, in_addr& addr ) int seed, const AddrInfo* addr )
{ {
CRefEvent evt( XWE_RECONNECT ); CRefEvent evt( XWE_RECONNECT, addr );
evt.u.con.socket = socket;
evt.u.con.clientVersion = clientVersion; evt.u.con.clientVersion = clientVersion;
evt.u.con.devID = devID; evt.u.con.devID = devID;
evt.u.con.srcID = srcID; evt.u.con.srcID = srcID;
evt.u.con.nPlayersH = nPlayersH; evt.u.con.nPlayersH = nPlayersH;
evt.u.con.nPlayersS = nPlayersS; evt.u.con.nPlayersS = nPlayersS;
evt.u.con.seed = seed; evt.u.con.seed = seed;
evt.u.con.addr = addr;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} /* pushReconnectEvent */ } /* pushReconnectEvent */
@ -573,32 +586,29 @@ CookieRef::pushHeartFailedEvent( int socket )
#endif #endif
void void
CookieRef::pushForwardEvent( HostID src, in_addr& addr, HostID dest, CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen ) unsigned char* buf, int buflen )
{ {
logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest ); logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest );
CRefEvent evt( XWE_FORWARDMSG ); CRefEvent evt( XWE_FORWARDMSG, addr );
evt.u.fwd.src = src; evt.u.fwd.src = src;
evt.u.fwd.dest = dest; evt.u.fwd.dest = dest;
evt.u.fwd.buf = buf; evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen; evt.u.fwd.buflen = buflen;
evt.u.fwd.addr = addr;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} }
void void
CookieRef::pushRemoveSocketEvent( int socket ) CookieRef::pushRemoveSocketEvent( const AddrInfo* addr )
{ {
CRefEvent evt( XWE_REMOVESOCKET ); CRefEvent evt( XWE_REMOVESOCKET, addr );
evt.u.rmsock.socket = socket;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} }
void void
CookieRef::pushNotifyDisconEvent( int socket, XWREASON why ) CookieRef::pushNotifyDisconEvent( const AddrInfo* addr, XWREASON why )
{ {
CRefEvent evt( XWE_NOTIFYDISCON ); CRefEvent evt( XWE_NOTIFYDISCON, addr );
evt.u.disnote.socket = socket;
evt.u.disnote.why = why; evt.u.disnote.why = why;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} }
@ -611,10 +621,9 @@ CookieRef::pushLastSocketGoneEvent()
} }
void void
CookieRef::pushGameDead( int socket ) CookieRef::pushGameDead( const AddrInfo* addr )
{ {
CRefEvent evt( XWE_GAMEDEAD ); CRefEvent evt( XWE_GAMEDEAD, addr );
evt.u.discon.socket = socket;
m_eventQueue.push_back( evt ); m_eventQueue.push_back( evt );
} }
@ -679,11 +688,11 @@ CookieRef::handleEvents()
break; break;
case XWA_SEND_DUP_ROOM: case XWA_SEND_DUP_ROOM:
send_denied( &evt, XWRELAY_ERROR_DUP_ROOM ); send_denied( &evt, XWRELAY_ERROR_DUP_ROOM );
removeSocket( evt.u.rmsock.socket ); removeSocket( &evt.addr );
break; break;
case XWA_SEND_TOO_MANY: case XWA_SEND_TOO_MANY:
send_denied( &evt, XWRELAY_ERROR_TOO_MANY ); send_denied( &evt, XWRELAY_ERROR_TOO_MANY );
removeSocket( evt.u.rmsock.socket ); removeSocket( &evt.addr );
break; break;
case XWA_FWD: case XWA_FWD:
@ -703,20 +712,19 @@ CookieRef::handleEvents()
break; break;
case XWA_HEARTDISCONN: case XWA_HEARTDISCONN:
notifyOthers( evt.u.heart.socket, XWRELAY_DISCONNECT_OTHER, notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_HEART_OTHER ); XWRELAY_ERROR_HEART_OTHER );
setAllConnectedTimer(); setAllConnectedTimer();
// reducePlayerCounts( evt.u.discon.socket ); // reducePlayerCounts( evt.u.discon.socket );
disconnectSocket( evt.u.heart.socket, disconnectSocket( &evt.addr, XWRELAY_ERROR_HEART_YOU );
XWRELAY_ERROR_HEART_YOU );
break; break;
case XWA_DISCONNECT: case XWA_DISCONNECT:
setAllConnectedTimer(); setAllConnectedTimer();
// reducePlayerCounts( evt.u.discon.socket ); // reducePlayerCounts( evt.u.discon.socket );
notifyOthers( evt.u.discon.socket, XWRELAY_DISCONNECT_OTHER, notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_OTHER_DISCON ); XWRELAY_ERROR_OTHER_DISCON );
removeSocket( evt.u.discon.socket ); removeSocket( &evt.addr );
/* Don't notify. This is a normal part of a game ending. */ /* Don't notify. This is a normal part of a game ending. */
break; break;
@ -725,7 +733,7 @@ CookieRef::handleEvents()
break; break;
case XWA_TELLGAMEDEAD: case XWA_TELLGAMEDEAD:
notifyGameDead( evt.u.discon.socket ); notifyGameDead( &evt.addr );
break; break;
case XWA_NOTEHEART: case XWA_NOTEHEART:
@ -742,10 +750,10 @@ CookieRef::handleEvents()
case XWA_REMOVESOCK_1: case XWA_REMOVESOCK_1:
// reducePlayerCounts( evt.u.rmsock.socket ); // reducePlayerCounts( evt.u.rmsock.socket );
if ( XWA_REMOVESOCK_2 == takeAction ) { if ( XWA_REMOVESOCK_2 == takeAction ) {
notifyOthers( evt.u.rmsock.socket, XWRELAY_DISCONNECT_OTHER, notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_LOST_OTHER ); XWRELAY_ERROR_LOST_OTHER );
} }
removeSocket( evt.u.rmsock.socket ); removeSocket( &evt.addr );
break; break;
case XWA_SENDALLHERE: case XWA_SENDALLHERE:
@ -799,19 +807,19 @@ CookieRef::handleEvents()
} /* handleEvents */ } /* handleEvents */
bool bool
CookieRef::send_with_length( int socket, unsigned char* buf, int bufLen, CookieRef::send_with_length( const AddrInfo* addr,
bool cascade ) unsigned char* buf, int bufLen, bool cascade )
{ {
bool failed = false; bool failed = false;
if ( send_with_length_unsafe( socket, buf, bufLen ) ) { if ( send_with_length_unsafe( addr, buf, bufLen ) ) {
DBMgr::Get()->RecordSent( ConnName(), HostForSocket(socket), bufLen ); DBMgr::Get()->RecordSent( ConnName(), HostForSocket(addr), bufLen );
} else { } else {
failed = true; failed = true;
} }
if ( failed && cascade ) { if ( failed && cascade ) {
pushRemoveSocketEvent( socket ); pushRemoveSocketEvent( addr );
XWThreadPool::GetTPool()->CloseSocket( socket ); XWThreadPool::GetTPool()->CloseSocket( addr );
} }
return !failed; return !failed;
} /* send_with_length */ } /* send_with_length */
@ -834,12 +842,13 @@ CookieRef::store_message( HostID dest, const unsigned char* buf,
} }
void void
CookieRef::send_stored_messages( HostID dest, int socket ) CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
{ {
logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest ); logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest );
assert( dest > 0 && dest <= 4 ); assert( dest > 0 && dest <= 4 );
assert( -1 != socket ); assert( -1 != addr->socket() );
assert( addr->isTCP() );
for ( ; ; ) { for ( ; ; ) {
unsigned char buf[MAX_MSG_LEN]; unsigned char buf[MAX_MSG_LEN];
@ -849,7 +858,7 @@ CookieRef::send_stored_messages( HostID dest, int socket )
buf, &buflen, &msgID ) ) { buf, &buflen, &msgID ) ) {
break; break;
} }
if ( ! send_with_length( socket, buf, buflen, true ) ) { if ( ! send_with_length( addr, buf, buflen, true ) ) {
break; break;
} }
DBMgr::Get()->RemoveStoredMessages( &msgID, 1 ); DBMgr::Get()->RemoveStoredMessages( &msgID, 1 );
@ -862,7 +871,6 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
{ {
DBMgr::DevIDRelay devID = DBMgr::DEVID_NONE; DBMgr::DevIDRelay devID = DBMgr::DEVID_NONE;
int nPlayersH = evt->u.con.nPlayersH; int nPlayersH = evt->u.con.nPlayersH;
int socket = evt->u.con.socket;
int seed = evt->u.con.seed; int seed = evt->u.con.seed;
assert( m_nPlayersSought > 0 ); assert( m_nPlayersSought > 0 );
@ -899,7 +907,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
evt->u.con.srcID = evt->u.con.srcID =
DBMgr::Get()->AddDevice( ConnName(), evt->u.con.srcID, DBMgr::Get()->AddDevice( ConnName(), evt->u.con.srcID,
evt->u.con.clientVersion, nPlayersH, seed, evt->u.con.clientVersion, nPlayersH, seed,
evt->u.con.addr, devID, reconn ); &evt->addr, devID, reconn );
HostID hostid = evt->u.con.srcID; HostID hostid = evt->u.con.srcID;
if ( NULL != hidp ) { if ( NULL != hidp ) {
@ -915,8 +923,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
{ {
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
HostRec hr( hostid, nPlayersH, seed, !reconn ); HostRec hr( hostid, &evt->addr, nPlayersH, seed, !reconn );
m_sockets.insert( pair<int,HostRec>(socket, hr) ); m_sockets.push_back( hr );
} }
printSeeds(__func__); printSeeds(__func__);
@ -932,28 +940,28 @@ CookieRef::updateAck( HostID hostID, bool keep )
{ {
assert( hostID >= HOST_ID_SERVER ); assert( hostID >= HOST_ID_SERVER );
assert( hostID <= 4 ); assert( hostID <= 4 );
int socket = 0; const AddrInfo* nonKeeper = NULL;
cancelAckTimer( hostID ); cancelAckTimer( hostID );
{ {
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
map<int, HostRec>::iterator iter; vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_ackPending && iter->second.m_hostID == hostID ) { if ( iter->m_ackPending && iter->m_hostID == hostID ) {
if ( keep ) { if ( keep ) {
iter->second.m_ackPending = false; iter->m_ackPending = false;
DBMgr::Get()->NoteAckd( ConnName(), hostID ); DBMgr::Get()->NoteAckd( ConnName(), hostID );
} else { } else {
socket = iter->first; nonKeeper = &iter->m_addr;
} }
break; break;
} }
} }
} }
if ( 0 != socket ) { if ( NULL != nonKeeper ) {
removeSocket( socket ); removeSocket( nonKeeper );
} }
printSeeds(__func__); printSeeds(__func__);
@ -1032,8 +1040,6 @@ void
CookieRef::sendResponse( const CRefEvent* evt, bool initial, CookieRef::sendResponse( const CRefEvent* evt, bool initial,
const DBMgr::DevIDRelay* devID ) const DBMgr::DevIDRelay* devID )
{ {
int socket = evt->u.con.socket;
/* Now send the response */ /* Now send the response */
unsigned char buf[1 /* cmd */ unsigned char buf[1 /* cmd */
+ sizeof(unsigned char) /* hostID */ + sizeof(unsigned char) /* hostID */
@ -1099,7 +1105,7 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial,
} }
} }
send_with_length( socket, buf, bufp - buf, true ); send_with_length( &evt->addr, buf, bufp - buf, true );
logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) ); logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) );
} /* sendResponse */ } /* sendResponse */
@ -1108,7 +1114,7 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
{ {
HostID dest = evt->u.con.srcID; HostID dest = evt->u.con.srcID;
if ( HOST_ID_NONE != dest ) { if ( HOST_ID_NONE != dest ) {
send_stored_messages( dest, evt->u.con.socket ); send_stored_messages( dest, &evt->addr );
} }
} }
@ -1130,20 +1136,20 @@ CookieRef::forward_or_store( const CRefEvent* evt )
int buflen = evt->u.fwd.buflen; int buflen = evt->u.fwd.buflen;
HostID dest = evt->u.fwd.dest; HostID dest = evt->u.fwd.dest;
int destSocket = SocketForHost( dest ); const AddrInfo* destAddr = SocketForHost( dest );
if ( 0 < m_delayMicros && destSocket != -1 ) { if ( 0 < m_delayMicros && NULL != destAddr ) {
usleep( m_delayMicros ); usleep( m_delayMicros );
} }
if ( (destSocket == -1) if ( (NULL == destAddr)
|| !send_with_length( destSocket, buf, buflen, true ) ) { || !send_with_length( destAddr, buf, buflen, true ) ) {
store_message( dest, buf, buflen ); store_message( dest, buf, buflen );
} }
/* also note that we've heard from src recently */ /* also note that we've heard from src recently */
HostID src = evt->u.fwd.src; HostID src = evt->u.fwd.src;
DBMgr::Get()->RecordAddress( ConnName(), src, evt->u.fwd.addr ); DBMgr::Get()->RecordAddress( ConnName(), src, &evt->addr );
#ifdef RELAY_HEARTBEAT #ifdef RELAY_HEARTBEAT
pushHeartbeatEvent( src, SocketForHost(src) ); pushHeartbeatEvent( src, SocketForHost(src) );
#endif #endif
@ -1153,12 +1159,12 @@ CookieRef::forward_or_store( const CRefEvent* evt )
void void
CookieRef::send_denied( const CRefEvent* evt, XWREASON why ) CookieRef::send_denied( const CRefEvent* evt, XWREASON why )
{ {
denyConnection( evt->u.con.socket, why ); denyConnection( &evt->addr, why );
} }
void void
CookieRef::send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why, CookieRef::send_msg( const AddrInfo* addr, HostID id,
bool cascade ) XWRelayMsg msg, XWREASON why, bool cascade )
{ {
unsigned char buf[10]; unsigned char buf[10];
short tmp; short tmp;
@ -1178,34 +1184,35 @@ CookieRef::send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why,
} }
assert( len <= sizeof(buf) ); assert( len <= sizeof(buf) );
send_with_length( socket, buf, len, cascade ); send_with_length( addr, buf, len, cascade );
} /* send_msg */ } /* send_msg */
void void
CookieRef::notifyOthers( int socket, XWRelayMsg msg, XWREASON why ) CookieRef::notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why )
{ {
assert( socket != 0 ); assert( addr->socket() != 0 );
assert( addr->isTCP() );
ASSERT_LOCKED(); ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
int other = iter->first; const AddrInfo* other = &iter->m_addr;
if ( other != socket ) { if ( !other->equals( *addr ) ) {
send_msg( other, iter->second.m_hostID, msg, why, false ); send_msg( other, iter->m_hostID, msg, why, false );
} }
} }
} /* notifyOthers */ } /* notifyOthers */
void void
CookieRef::notifyGameDead( int socket ) CookieRef::notifyGameDead( const AddrInfo* addr )
{ {
unsigned char buf[] = { unsigned char buf[] = {
XWRELAY_MSG_STATUS XWRELAY_MSG_STATUS
,XWRELAY_ERROR_DELETED ,XWRELAY_ERROR_DELETED
}; };
send_with_length( socket, buf, sizeof(buf), true ); send_with_length( addr, buf, sizeof(buf), true );
} }
/* void */ /* void */
@ -1255,11 +1262,10 @@ CookieRef::sendAllHere( bool initial )
{ {
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_hostID == dest ) { if ( iter->m_hostID == dest ) {
sent = send_with_length( iter->first, buf, bufp-buf, sent = send_with_length( &iter->m_addr, buf, bufp-buf, true );
true );
break; break;
} }
} }
@ -1289,21 +1295,23 @@ CookieRef::disconnectSockets( XWREASON why )
{ {
ASSERT_LOCKED(); ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
assert( iter->first != 0 ); const AddrInfo* addr = &iter->m_addr;
if ( iter->first != 0 ) { if ( addr->socket() != 0 ) {
disconnectSocket( iter->first, why ); disconnectSocket( addr, why );
} else {
assert( 0 );
} }
} }
} }
void void
CookieRef::disconnectSocket( int socket, XWREASON why ) CookieRef::disconnectSocket( const AddrInfo* addr, XWREASON why )
{ {
ASSERT_LOCKED(); ASSERT_LOCKED();
pushNotifyDisconEvent( socket, why ); pushNotifyDisconEvent( addr, why );
pushRemoveSocketEvent( socket ); pushRemoveSocketEvent( addr );
} /* disconnectSocket */ } /* disconnectSocket */
void void
@ -1315,9 +1323,9 @@ CookieRef::removeDevice( const CRefEvent* const evt )
dbmgr->KillGame( ConnName(), evt->u.devgone.hid ); dbmgr->KillGame( ConnName(), evt->u.devgone.hid );
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
notifyGameDead( iter->first ); notifyGameDead( &iter->m_addr );
} }
} }
} }
@ -1325,25 +1333,24 @@ CookieRef::removeDevice( const CRefEvent* const evt )
void void
CookieRef::noteHeartbeat( const CRefEvent* evt ) CookieRef::noteHeartbeat( const CRefEvent* evt )
{ {
int socket = evt->u.heart.socket; const AddrInfo& addr = evt->addr;
HostID id = evt->u.heart.id; HostID id = evt->u.heart.id;
ASSERT_LOCKED(); ASSERT_LOCKED();
RWWriteLock rwl( &m_socketsRWLock ); RWWriteLock rwl( &m_socketsRWLock );
map<int,HostRec>::iterator iter; vector<HostRec>::iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->second.m_hostID == id ) { if ( iter->m_hostID == id ) {
int second_socket = iter->first; if ( iter->m_addr.equals(addr) ) {
if ( second_socket == socket ) {
logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d", logf( XW_LOGVERBOSE1, "upping m_lastHeartbeat from %d to %d",
iter->second.m_lastHeartbeat, uptime() ); iter->m_lastHeartbeat, uptime() );
iter->second.m_lastHeartbeat = uptime(); iter->m_lastHeartbeat = uptime();
} else { } else {
/* PENDING If the message came on an unexpected socket, kill the /* PENDING If the message came on an unexpected socket, kill the
connection. An attack is the most likely explanation. But: connection. An attack is the most likely explanation. But:
now it's happening after a crash and clients reconnect. */ now it's happening after a crash and clients reconnect. */
logf( XW_LOGERROR, "wrong socket record for HostID %x; " logf( XW_LOGERROR, "wrong socket record for HostID %x; "
"wanted %d, found %d", id, socket, second_socket ); "wanted %d, found %d", id, addr.socket(), iter->m_addr.socket() );
} }
break; break;
} }
@ -1403,12 +1410,12 @@ CookieRef::printSeeds( const char* caller )
char buf[64] = {0}; char buf[64] = {0};
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
len += snprintf( &buf[len], sizeof(buf)-len, "[%d]%.4x(%d)/%d/%c ", len += snprintf( &buf[len], sizeof(buf)-len, "[%d]%.4x(%d)/%d/%c ",
iter->second.m_hostID, iter->second.m_seed, iter->m_hostID, iter->m_seed,
iter->second.m_seed, iter->first, iter->m_seed, iter->m_addr.socket(),
iter->second.m_ackPending?'a':'A' ); iter->m_ackPending?'a':'A' );
} }
logf( XW_LOGINFO, "seeds/sockets/ack'd after %s(): %s", caller, buf ); logf( XW_LOGINFO, "seeds/sockets/ack'd after %s(): %s", caller, buf );
} }
@ -1464,11 +1471,11 @@ CookieRef::_PrintCookieInfo( string& out )
out += buf; out += buf;
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n", snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n",
iter->second.m_hostID, iter->first, iter->m_hostID, iter->m_addr.socket(),
iter->second.m_lastHeartbeat ); iter->m_lastHeartbeat );
out += buf; out += buf;
} }
@ -1479,26 +1486,26 @@ CookieRef::_FormatHostInfo( string* hostIds, string* seeds, string* addrs )
{ {
ASSERT_LOCKED(); ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock ); RWReadLock rrl( &m_socketsRWLock );
map<int,HostRec>::const_iterator iter; vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) { for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( !!hostIds ) { if ( !!hostIds ) {
char buf[8]; char buf[8];
snprintf( buf, sizeof(buf), "%d ", iter->second.m_hostID ); snprintf( buf, sizeof(buf), "%d ", iter->m_hostID );
*hostIds += buf; *hostIds += buf;
} }
if ( !!seeds ) { if ( !!seeds ) {
char buf[6]; char buf[6];
snprintf( buf, sizeof(buf), "%.4X ", iter->second.m_seed ); snprintf( buf, sizeof(buf), "%.4X ", iter->m_seed );
*seeds += buf; *seeds += buf;
} }
if ( !!addrs ) { if ( !!addrs ) {
int s = iter->first; int socket = iter->m_addr.socket();
struct sockaddr_in name; sockaddr_in name;
socklen_t siz = sizeof(name); socklen_t siz = sizeof(name);
if ( 0 == getpeername( s, (struct sockaddr*)&name, &siz) ) { if ( 0 == getpeername( socket, (struct sockaddr*)&name, &siz) ) {
char buf[32] = {0}; char buf[32] = {0};
snprintf( buf, sizeof(buf), "%s ", inet_ntoa(name.sin_addr) ); snprintf( buf, sizeof(buf), "%s ", inet_ntoa(name.sin_addr) );
*addrs += buf; *addrs += buf;

View file

@ -43,8 +43,9 @@ class CookieMapIterator; /* forward */
struct HostRec { struct HostRec {
public: public:
HostRec(HostID hostID, int nPlayersH, int seed, bool ackPending ) HostRec(HostID hostID, const AddrInfo* addr, int nPlayersH, int seed, bool ackPending )
: m_hostID(hostID) : m_hostID(hostID)
, m_addr(*addr)
, m_nPlayersH(nPlayersH) , m_nPlayersH(nPlayersH)
, m_seed(seed) , m_seed(seed)
, m_lastHeartbeat(uptime()) , m_lastHeartbeat(uptime())
@ -53,6 +54,7 @@ HostRec(HostID hostID, int nPlayersH, int seed, bool ackPending )
::logf( XW_LOGINFO, "created HostRec with id %d", m_hostID); ::logf( XW_LOGINFO, "created HostRec with id %d", m_hostID);
} }
HostID m_hostID; HostID m_hostID;
AddrInfo m_addr;
int m_nPlayersH; int m_nPlayersH;
int m_seed; int m_seed;
time_t m_lastHeartbeat; time_t m_lastHeartbeat;
@ -67,7 +69,7 @@ public:
class CookieRef { class CookieRef {
public: public:
set<int> GetSockets(); vector<AddrInfo> GetAddrs( void );
private: private:
/* These classes have access to CookieRef. All others should go through /* These classes have access to CookieRef. All others should go through
@ -96,19 +98,19 @@ class CookieRef {
bool HaveRoom( int nPlayers ); bool HaveRoom( int nPlayers );
int CountSockets() { return m_sockets.size(); } int CountSockets() { return m_sockets.size(); }
bool HasSocket( int socket ); bool HasSocket( const AddrInfo* addr );
bool HasSocket_locked( int socket ); bool HasSocket_locked( const AddrInfo* addr );
const char* Cookie() const { return m_cookie.c_str(); } const char* Cookie() const { return m_cookie.c_str(); }
const char* ConnName() { return m_connName.c_str(); } const char* ConnName() { return m_connName.c_str(); }
int GetHeartbeat() { return m_heatbeat; } int GetHeartbeat() { return m_heatbeat; }
int SocketForHost( HostID dest ); const AddrInfo* SocketForHost( HostID dest );
HostID HostForSocket( int sock ); HostID HostForSocket( const AddrInfo* addr );
/* connect case */ /* connect case */
bool AlreadyHere( unsigned short seed, int socket, HostID* prevHostID ); bool AlreadyHere( unsigned short seed, const AddrInfo* addr, HostID* prevHostID );
/* reconnect case */ /* reconnect case */
bool AlreadyHere( HostID hid, unsigned short seed, int socket, bool* spotTaken ); bool AlreadyHere( HostID hid, unsigned short seed, const AddrInfo* addr, bool* spotTaken );
/* for console */ /* for console */
void _PrintCookieInfo( string& out ); void _PrintCookieInfo( string& out );
@ -121,21 +123,23 @@ class CookieRef {
static void Delete( CookieID cid ); static void Delete( CookieID cid );
static void Delete( const char* name ); static void Delete( const char* name );
bool _Connect( int socket, int clientVersion, DevID* devID, bool _Connect( int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS, int seed, bool seenSeed, int nPlayersH, int nPlayersS, int seed, bool seenSeed,
in_addr& addr ); const AddrInfo* addr );
bool _Reconnect( int socket, int clientVersion, DevID* devID, bool _Reconnect( int clientVersion, DevID* devID,
HostID srcID, int nPlayersH, int nPlayersS, HostID srcID, int nPlayersH, int nPlayersS,
int seed, in_addr& addr, bool gameDead ); int seed, const AddrInfo* addr, bool gameDead );
void _HandleAck( HostID hostID ); void _HandleAck( HostID hostID );
void _PutMsg( HostID srcID, in_addr& addr, HostID destID, unsigned char* buf, int buflen ); void _PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
void _Disconnect(int socket, HostID hostID ); unsigned char* buf, int buflen );
void _Disconnect( const AddrInfo* addr, HostID hostID );
void _DeviceGone( HostID hostID, int seed ); void _DeviceGone( HostID hostID, int seed );
void _Shutdown(); void _Shutdown();
void _HandleHeartbeat( HostID id, int socket ); void _HandleHeartbeat( HostID id, const AddrInfo* addr );
void _CheckHeartbeats( time_t now ); void _CheckHeartbeats( time_t now );
void _Forward( HostID src, in_addr& addr, HostID dest, unsigned char* buf, int buflen ); void _Forward( HostID src, const AddrInfo* addr, HostID dest,
void _Remove( int socket ); unsigned char* buf, int buflen );
void _Remove( const AddrInfo* addr );
void _CheckAllConnected(); void _CheckAllConnected();
void _CheckNotAcked( HostID hid ); void _CheckNotAcked( HostID hid );
@ -148,30 +152,28 @@ class CookieRef {
public: public:
CRefEvent() { type = XWE_NONE; } CRefEvent() { type = XWE_NONE; }
CRefEvent( XW_RELAY_EVENT typ ) { type = typ; } CRefEvent( XW_RELAY_EVENT typ ) { type = typ; }
CRefEvent( XW_RELAY_EVENT typ, const AddrInfo* addrp ) { type = typ; addr = *addrp; }
XW_RELAY_EVENT type; XW_RELAY_EVENT type;
AddrInfo addr; /* sender's address */
union { union {
struct { struct {
HostID src; HostID src;
HostID dest; HostID dest;
unsigned char* buf; unsigned char* buf;
int buflen; int buflen;
in_addr addr;
} fwd; } fwd;
struct { struct {
int socket;
int clientVersion; int clientVersion;
DevID* devID; DevID* devID;
int nPlayersH; int nPlayersH;
int nPlayersS; int nPlayersS;
int seed; int seed;
HostID srcID; HostID srcID;
in_addr addr;
} con; } con;
struct { struct {
HostID srcID; HostID srcID;
} ack; } ack;
struct { struct {
int socket;
HostID srcID; HostID srcID;
} discon; } discon;
struct { struct {
@ -180,42 +182,39 @@ class CookieRef {
} devgone; } devgone;
struct { struct {
HostID id; HostID id;
int socket;
} heart; } heart;
struct { struct {
time_t now; time_t now;
} htime; } htime;
struct { struct {
int socket;
} rmsock; } rmsock;
struct { struct {
int socket;
XWREASON why; XWREASON why;
} disnote; } disnote;
} u; } u;
}; };
bool send_with_length( int socket, unsigned char* buf, int bufLen, bool send_with_length( const AddrInfo* addr,
bool cascade ); unsigned char* buf, int bufLen, bool cascade );
void send_msg( int socket, HostID id, XWRelayMsg msg, XWREASON why, void send_msg( const AddrInfo* addr, HostID id,
bool cascade ); XWRelayMsg msg, XWREASON why, bool cascade );
void pushConnectEvent( int socket, int clientVersion, DevID* devID, void pushConnectEvent( int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS, int nPlayersH, int nPlayersS,
int seed, in_addr& addr ); int seed, const AddrInfo* addr );
void pushReconnectEvent( int socket, int clientVersion, DevID* devID, void pushReconnectEvent( int clientVersion, DevID* devID,
HostID srcID, int nPlayersH, int nPlayersS, HostID srcID, int nPlayersH, int nPlayersS,
int seed, in_addr& addr ); int seed, const AddrInfo* addr );
void pushHeartbeatEvent( HostID id, int socket ); void pushHeartbeatEvent( HostID id, const AddrInfo* addr );
void pushHeartFailedEvent( int socket ); void pushHeartFailedEvent( const AddrInfo* addr );
void pushForwardEvent( HostID src, in_addr& addr, HostID dest, unsigned char* buf, void pushForwardEvent( HostID src, const AddrInfo* addr,
int buflen ); HostID dest, unsigned char* buf, int buflen );
void pushDestBadEvent(); void pushDestBadEvent();
void pushLastSocketGoneEvent(); void pushLastSocketGoneEvent();
void pushGameDead( int socket ); void pushGameDead( const AddrInfo* addr );
void checkHaveRoom( const CRefEvent* evt ); void checkHaveRoom( const CRefEvent* evt );
void pushRemoveSocketEvent( int socket ); void pushRemoveSocketEvent( const AddrInfo* addr );
void pushNotifyDisconEvent( int socket, XWREASON why ); void pushNotifyDisconEvent( const AddrInfo* addr, XWREASON why );
void handleEvents(); void handleEvents();
@ -231,8 +230,6 @@ class CookieRef {
void postCheckAllHere(); void postCheckAllHere();
void postDropDevice( HostID hostID ); void postDropDevice( HostID hostID );
void reducePlayerCounts( int socket );
void setAllConnectedTimer(); void setAllConnectedTimer();
void cancelAllConnectedTimer(); void cancelAllConnectedTimer();
void setAckTimer( HostID hid ); void setAckTimer( HostID hid );
@ -242,15 +239,15 @@ class CookieRef {
void send_denied( const CRefEvent* evt, XWREASON why ); void send_denied( const CRefEvent* evt, XWREASON why );
void checkFromServer( const CRefEvent* evt ); void checkFromServer( const CRefEvent* evt );
void notifyOthers( int socket, XWRelayMsg msg, XWREASON why ); void notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why );
void notifyGameDead( int socket ); void notifyGameDead( const AddrInfo* addr );
void disconnectSockets( XWREASON why ); void disconnectSockets( XWREASON why );
void disconnectSocket( int socket, XWREASON why ); void disconnectSocket( const AddrInfo* addr, XWREASON why );
void removeDevice( const CRefEvent* const evt ); void removeDevice( const CRefEvent* const evt );
void noteHeartbeat(const CRefEvent* evt); void noteHeartbeat(const CRefEvent* evt);
void notifyDisconn(const CRefEvent* evt); void notifyDisconn(const CRefEvent* evt);
void removeSocket( int socket ); void removeSocket( const AddrInfo* addr );
void sendAllHere( bool initial ); void sendAllHere( bool initial );
void assignConnName( void ); void assignConnName( void );
@ -262,19 +259,18 @@ class CookieRef {
void store_message( HostID dest, const unsigned char* buf, void store_message( HostID dest, const unsigned char* buf,
unsigned int len ); unsigned int len );
void send_stored_messages( HostID dest, int socket ); void send_stored_messages( HostID dest, const AddrInfo* addr );
void printSeeds( const char* caller ); void printSeeds( const char* caller );
void AddSocket( int socket );
void RmSocket( int socket );
/* timer callback */ /* timer callback */
static void s_checkAllConnected( void* closure ); static void s_checkAllConnected( void* closure );
static void s_checkAck( void* closure ); static void s_checkAck( void* closure );
/* Track sockets (= current connections with games on devices) in this
game. There will never be more than four of these */
pthread_rwlock_t m_socketsRWLock; pthread_rwlock_t m_socketsRWLock;
map<int, HostRec> m_sockets; vector<HostRec> m_sockets;
int m_heatbeat; /* might change per carrier or something. */ int m_heatbeat; /* might change per carrier or something. */
string m_cookie; /* cookie used for initial connections */ string m_cookie; /* cookie used for initial connections */

View file

@ -24,6 +24,7 @@
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h> #include <pthread.h>
#include <unistd.h>
#include "crefmgr.h" #include "crefmgr.h"
#include "cref.h" #include "cref.h"
@ -215,7 +216,7 @@ CRefMgr::getFromFreeList( void )
/* connect case */ /* connect case */
CidInfo* CidInfo*
CRefMgr::getMakeCookieRef( const char* cookie, HostID hid, int socket, CRefMgr::getMakeCookieRef( const char* cookie, HostID hid,
int nPlayersH, int nPlayersT, int langCode, int nPlayersH, int nPlayersT, int langCode,
int seed, bool wantsPublic, int seed, bool wantsPublic,
bool makePublic, bool* seenSeed ) bool makePublic, bool* seenSeed )
@ -286,9 +287,8 @@ CRefMgr::getMakeCookieRef( const char* cookie, HostID hid, int socket,
/* reconnect case */ /* reconnect case */
CidInfo* CidInfo*
CRefMgr::getMakeCookieRef( const char* connName, const char* cookie, CRefMgr::getMakeCookieRef( const char* connName, const char* cookie,
HostID hid, int socket, int nPlayersH, HostID hid, int nPlayersH, int nPlayersS, int seed,
int nPlayersS, int seed, int langCode, int langCode, bool isPublic, bool* isDead )
bool isPublic, bool* isDead )
{ {
CookieRef* cref = NULL; CookieRef* cref = NULL;
CidInfo* cinfo; CidInfo* cinfo;
@ -377,11 +377,11 @@ CRefMgr::getMakeCookieRef( const char* const connName, bool* isDead )
} }
void void
CRefMgr::RemoveSocketRefs( int socket ) CRefMgr::RemoveSocketRefs( const AddrInfo* addr )
{ {
{ {
SafeCref scr( socket ); SafeCref scr( addr );
scr.Remove( socket ); scr.Remove( addr );
} }
} }
@ -413,14 +413,16 @@ CRefMgr::getCookieRef( CookieID cid, bool failOk )
break; break;
} }
m_cidlock->Relinquish( cinfo, true ); m_cidlock->Relinquish( cinfo, true );
logf( XW_LOGINFO, "%s: sleeping after failing to get cinfo", __func__ );
usleep(200000); /* 2/10 second */
} }
return cinfo; return cinfo;
} /* getCookieRef */ } /* getCookieRef */
CidInfo* CidInfo*
CRefMgr::getCookieRef( int socket ) CRefMgr::getCookieRef( const AddrInfo* addr )
{ {
CidInfo* cinfo = m_cidlock->ClaimSocket( socket ); CidInfo* cinfo = m_cidlock->ClaimSocket( addr );
assert( NULL == cinfo || NULL != cinfo->GetRef() ); assert( NULL == cinfo || NULL != cinfo->GetRef() );
return cinfo; return cinfo;
@ -589,21 +591,21 @@ CookieMapIterator::Next()
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/* connect case */ /* connect case */
SafeCref::SafeCref( const char* cookie, int socket, int clientVersion, SafeCref::SafeCref( const char* cookie, const AddrInfo* addr, int clientVers,
DevID* devID, int nPlayersH, int nPlayersS, DevID* devID, int nPlayersH, int nPlayersS,
unsigned short gameSeed, int langCode, bool wantsPublic, unsigned short gameSeed, int langCode, bool wantsPublic,
bool makePublic ) bool makePublic )
: m_cinfo( NULL ) : m_cinfo( NULL )
, m_mgr( CRefMgr::Get() ) , m_mgr( CRefMgr::Get() )
, m_clientVersion( clientVersion ) , m_addr( *addr )
, m_clientVersion( clientVers )
, m_devID( devID ) , m_devID( devID )
, m_isValid( false ) , m_isValid( false )
, m_seenSeed( false ) , m_seenSeed( false )
{ {
CidInfo* cinfo; CidInfo* cinfo;
cinfo = m_mgr->getMakeCookieRef( cookie, 0, socket, cinfo = m_mgr->getMakeCookieRef( cookie, 0, nPlayersH, nPlayersS, langCode,
nPlayersH, nPlayersS, langCode,
gameSeed, wantsPublic, makePublic, gameSeed, wantsPublic, makePublic,
&m_seenSeed ); &m_seenSeed );
if ( cinfo != NULL ) { if ( cinfo != NULL ) {
@ -616,12 +618,13 @@ SafeCref::SafeCref( const char* cookie, int socket, int clientVersion,
/* REconnect case */ /* REconnect case */
SafeCref::SafeCref( const char* connName, const char* cookie, HostID hid, SafeCref::SafeCref( const char* connName, const char* cookie, HostID hid,
int socket, int clientVersion, DevID* devID, int nPlayersH, const AddrInfo* addr, int clientVers, DevID* devID, int nPlayersH,
int nPlayersS, unsigned short gameSeed, int langCode, int nPlayersS, unsigned short gameSeed, int langCode,
bool wantsPublic, bool makePublic ) bool wantsPublic, bool makePublic )
: m_cinfo( NULL ) : m_cinfo( NULL )
, m_mgr( CRefMgr::Get() ) , m_mgr( CRefMgr::Get() )
, m_clientVersion( clientVersion ) , m_addr( *addr )
, m_clientVersion( clientVers )
, m_devID( devID ) , m_devID( devID )
, m_isValid( false ) , m_isValid( false )
{ {
@ -629,7 +632,7 @@ SafeCref::SafeCref( const char* connName, const char* cookie, HostID hid,
assert( hid <= 4 ); /* no more than 4 hosts */ assert( hid <= 4 ); /* no more than 4 hosts */
bool isDead = false; bool isDead = false;
cinfo = m_mgr->getMakeCookieRef( connName, cookie, hid, socket, nPlayersH, cinfo = m_mgr->getMakeCookieRef( connName, cookie, hid, nPlayersH,
nPlayersS, gameSeed, langCode, nPlayersS, gameSeed, langCode,
wantsPublic || makePublic, &isDead ); wantsPublic || makePublic, &isDead );
if ( cinfo != NULL ) { if ( cinfo != NULL ) {
@ -676,17 +679,18 @@ SafeCref::SafeCref( CookieID cid, bool failOk )
} }
} }
SafeCref::SafeCref( int socket ) SafeCref::SafeCref( const AddrInfo* addr )
: m_cinfo( NULL ) : m_cinfo( NULL )
, m_mgr( CRefMgr::Get() ) , m_mgr( CRefMgr::Get() )
, m_addr( *addr )
, m_isValid( false ) , m_isValid( false )
{ {
CidInfo* cinfo = m_mgr->getCookieRef( socket ); CidInfo* cinfo = m_mgr->getCookieRef( addr );
if ( cinfo != NULL ) { /* known socket? */ if ( cinfo != NULL ) { /* known socket? */
CookieRef* cref = cinfo->GetRef(); CookieRef* cref = cinfo->GetRef();
assert( cinfo->GetCid() == cref->GetCid() ); assert( cinfo->GetCid() == cref->GetCid() );
m_locked = cref->Lock(); m_locked = cref->Lock();
m_isValid = m_locked && cref->HasSocket_locked( socket ); m_isValid = m_locked && cref->HasSocket_locked( addr );
m_cinfo = cinfo; m_cinfo = cinfo;
} }
} }

View file

@ -89,7 +89,7 @@ class CRefMgr {
void MoveSockets( vector<int> sockets, CookieRef* cref ); void MoveSockets( vector<int> sockets, CookieRef* cref );
pthread_mutex_t* GetWriteMutexForSocket( int socket ); pthread_mutex_t* GetWriteMutexForSocket( int socket );
void RemoveSocketRefs( int socket ); void RemoveSocketRefs( const AddrInfo* addr );
void PrintSocketInfo( int socket, string& out ); void PrintSocketInfo( int socket, string& out );
void IncrementFullCount( void ); void IncrementFullCount( void );
@ -117,22 +117,21 @@ class CRefMgr {
CookieRef* getFromFreeList( void ); CookieRef* getFromFreeList( void );
/* connect case */ /* connect case */
CidInfo* getMakeCookieRef( const char* cookie, CidInfo* getMakeCookieRef( const char* cookie, HostID hid, int nPlayersH,
HostID hid, int socket, int nPlayersH,
int nPlayersS, int langCode, int seed, int nPlayersS, int langCode, int seed,
bool wantsPublic, bool makePublic, bool wantsPublic, bool makePublic,
bool* seenSeed ); bool* seenSeed );
/* reconnect case; just the stuff we don't have in db */ /* reconnect case; just the stuff we don't have in db */
CidInfo* getMakeCookieRef( const char* connName, const char* cookie, CidInfo* getMakeCookieRef( const char* connName, const char* cookie,
HostID hid, int socket, int nPlayersH, HostID hid, int nPlayersH,
int nPlayersS, int seed, int langCode, int nPlayersS, int seed, int langCode,
bool isPublic, bool* isDead ); bool isPublic, bool* isDead );
CidInfo* getMakeCookieRef( const char* const connName, bool* isDead ); CidInfo* getMakeCookieRef( const char* const connName, bool* isDead );
CidInfo* getCookieRef( CookieID cid, bool failOk = false ); CidInfo* getCookieRef( CookieID cid, bool failOk = false );
CidInfo* getCookieRef( int socket ); CidInfo* getCookieRef( const AddrInfo* addr );
bool checkCookieRef_locked( CookieRef* cref ); bool checkCookieRef_locked( CookieRef* cref );
CidInfo* getCookieRef_impl( CookieID cid ); CidInfo* getCookieRef_impl( CookieID cid );
CookieRef* AddNew( const char* cookie, const char* connName, CookieID cid, CookieRef* AddNew( const char* cookie, const char* connName, CookieID cid,
@ -171,23 +170,23 @@ class SafeCref {
public: public:
/* for connect */ /* for connect */
SafeCref( const char* cookie, int socket, int clientVersion, SafeCref( const char* cookie, const AddrInfo* addr, int clientVers,
DevID* devID, int nPlayersH, int nPlayersS, DevID* devID, int nPlayersH, int nPlayersS,
unsigned short gameSeed, int langCode, bool wantsPublic, unsigned short gameSeed, int langCode, bool wantsPublic,
bool makePublic ); bool makePublic );
/* for reconnect */ /* for reconnect */
SafeCref( const char* connName, const char* cookie, HostID hid, SafeCref( const char* connName, const char* cookie, HostID hid,
int socket, int clientVersion, DevID* devID, int nPlayersH, const AddrInfo* addr, int clientVersion, DevID* devID,
int nPlayersS, unsigned short gameSeed, int langCode, int nPlayersH, int nPlayersS, unsigned short gameSeed,
bool wantsPublic, bool makePublic ); int langCode, bool wantsPublic, bool makePublic );
SafeCref( const char* const connName ); SafeCref( const char* const connName );
SafeCref( CookieID cid, bool failOk = false ); SafeCref( CookieID cid, bool failOk = false );
SafeCref( int socket ); SafeCref( const AddrInfo* addr );
/* SafeCref( CookieRef* cref ); */ /* SafeCref( CookieRef* cref ); */
~SafeCref(); ~SafeCref();
bool Forward( HostID src, in_addr& addr, HostID dest, unsigned char* buf, bool Forward( HostID src, const AddrInfo* addr, HostID dest,
int buflen ) { unsigned char* buf, int buflen ) {
if ( IsValid() ) { if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef(); CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() ); assert( 0 != cref->GetCid() );
@ -198,8 +197,8 @@ class SafeCref {
} }
} }
void PutMsg( HostID srcID, in_addr& addr, HostID destID, unsigned char* buf, void PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
int buflen ) { unsigned char* buf, int buflen ) {
if ( IsValid() ) { if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef(); CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() ); assert( 0 != cref->GetCid() );
@ -207,20 +206,19 @@ class SafeCref {
} }
} }
bool Connect( int socket, int nPlayersH, int nPlayersS, int seed, bool Connect( int nPlayersH, int nPlayersS, int seed ) {
in_addr& addr ) {
if ( IsValid() ) { if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef(); CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() ); assert( 0 != cref->GetCid() );
return cref->_Connect( socket, m_clientVersion, m_devID, return cref->_Connect( m_clientVersion, m_devID,
nPlayersH, nPlayersS, seed, nPlayersH, nPlayersS, seed,
m_seenSeed, addr ); m_seenSeed, &m_addr );
} else { } else {
return false; return false;
} }
} }
bool Reconnect( int socket, HostID srcID, int nPlayersH, int nPlayersS, bool Reconnect( HostID srcID, int nPlayersH, int nPlayersS,
int seed, in_addr& addr, XWREASON* errp ) { int seed, XWREASON* errp ) {
bool success = false; bool success = false;
*errp = XWRELAY_ERROR_NONE; *errp = XWRELAY_ERROR_NONE;
if ( IsValid() ) { if ( IsValid() ) {
@ -229,18 +227,18 @@ class SafeCref {
if ( m_dead ) { if ( m_dead ) {
*errp = XWRELAY_ERROR_DEADGAME; *errp = XWRELAY_ERROR_DEADGAME;
} else { } else {
success = cref->_Reconnect( socket, m_clientVersion, m_devID, success = cref->_Reconnect( m_clientVersion, m_devID,
srcID, nPlayersH, nPlayersS, seed, srcID, nPlayersH, nPlayersS, seed,
addr, m_dead ); &m_addr, m_dead );
} }
} }
return success; return success;
} }
void Disconnect(int socket, HostID hostID ) { void Disconnect( const AddrInfo* addr, HostID hostID ) {
if ( IsValid() ) { if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef(); CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() ); assert( 0 != cref->GetCid() );
cref->_Disconnect( socket, hostID ); cref->_Disconnect( addr, hostID );
} }
} }
@ -270,11 +268,11 @@ class SafeCref {
cref->_Shutdown(); cref->_Shutdown();
} }
} }
void Remove( int socket ) { void Remove( const AddrInfo* addr ) {
if ( IsValid() ) { if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef(); CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() ); assert( 0 != cref->GetCid() );
cref->_Remove( socket ); cref->_Remove( addr );
} }
} }
@ -390,6 +388,7 @@ class SafeCref {
private: private:
CidInfo* m_cinfo; CidInfo* m_cinfo;
CRefMgr* m_mgr; CRefMgr* m_mgr;
AddrInfo m_addr;
int m_clientVersion; int m_clientVersion;
DevID* m_devID; DevID* m_devID;
bool m_isValid; bool m_isValid;

View file

@ -47,7 +47,6 @@ static void formatParams( char* paramValues[], int nParams, const char* fmt,
static int here_less_seed( const char* seeds, int perDeviceSum, static int here_less_seed( const char* seeds, int perDeviceSum,
unsigned short seed ); unsigned short seed );
static void destr_function( void* conn ); static void destr_function( void* conn );
static void string_printf( string& str, const char* fmt, ... );
/* static */ DBMgr* /* static */ DBMgr*
DBMgr::Get() DBMgr::Get()
@ -302,7 +301,7 @@ DBMgr::RegisterDevice( const DevID* host )
HostID HostID
DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion, DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion,
int nToAdd, unsigned short seed, const in_addr& addr, int nToAdd, unsigned short seed, const AddrInfo* addr,
DevIDRelay devID, bool ackd ) DevIDRelay devID, bool ackd )
{ {
HostID newID = curID; HostID newID = curID;
@ -331,9 +330,10 @@ DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion,
" mtimes[%d]='now', ack[%d]=\'%c\'" " mtimes[%d]='now', ack[%d]=\'%c\'"
" WHERE connName = '%s'"; " WHERE connName = '%s'";
string query; string query;
char* ntoa = inet_ntoa( addr->sin_addr() );
string_printf( query, fmt, newID, nToAdd, newID, clientVersion, string_printf( query, fmt, newID, nToAdd, newID, clientVersion,
newID, seed, newID, inet_ntoa(addr), devIDBuf.c_str(), newID, seed, newID, ntoa, devIDBuf.c_str(), newID,
newID, newID, ackd?'A':'a', connName ); newID, ackd?'A':'a', connName );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() );
execSql( query ); execSql( query );
@ -500,14 +500,15 @@ DBMgr::RecordSent( const int* msgIDs, int nMsgIDs )
void void
DBMgr::RecordAddress( const char* const connName, HostID hid, DBMgr::RecordAddress( const char* const connName, HostID hid,
const in_addr& addr ) const AddrInfo* addr )
{ {
assert( hid >= 0 && hid <= 4 ); assert( hid >= 0 && hid <= 4 );
const char* fmt = "UPDATE " GAMES_TABLE " SET addrs[%d] = \'%s\'" const char* fmt = "UPDATE " GAMES_TABLE " SET addrs[%d] = \'%s\'"
" WHERE connName = '%s'"; " WHERE connName = '%s'";
string query; string query;
string_printf( query, fmt, hid, inet_ntoa(addr), connName ); char* ntoa = inet_ntoa( addr->sin_addr() );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); string_printf( query, fmt, hid, ntoa, connName );
logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );
execSql( query ); execSql( query );
} }
@ -587,7 +588,7 @@ DBMgr::PendingMsgCount( const char* connName, int hid )
; ;
string query; string query;
string_printf( query, fmt, connName, hid ); string_printf( query, fmt, connName, hid );
logf( XW_LOGINFO, "%s: query: %s", __func__, query.c_str() ); logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );
PGresult* result = PQexec( getThreadConn(), query.c_str() ); PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) { if ( 1 == PQntuples( result ) ) {
@ -742,9 +743,8 @@ DBMgr::StoreMessage( const char* const connName, int hid,
} }
bool bool
DBMgr::GetNthStoredMessage( const char* const connName, int hid, DBMgr::GetNthStoredMessage( const char* const connName, int hid, int nn,
int nn, unsigned char* buf, size_t* buflen, unsigned char* buf, size_t* buflen, int* msgID )
int* msgID )
{ {
const char* fmt = "SELECT id, msg, msglen FROM " MSGS_TABLE const char* fmt = "SELECT id, msg, msglen FROM " MSGS_TABLE
" WHERE connName = '%s' AND hid = %d " " WHERE connName = '%s' AND hid = %d "
@ -878,29 +878,3 @@ DBMgr::getThreadConn( void )
} }
return conn; return conn;
} }
/* From stack overflow, toward a snprintf with an expanding buffer.
*/
static void
string_printf( string& str, const char* fmt, ... )
{
const int origsiz = str.size();
int newsiz = 100;
va_list ap;
for ( ; ; ) {
str.resize( origsiz + newsiz );
va_start( ap, fmt );
int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap );
va_end( ap );
if ( len > newsiz ) { // needs more space
newsiz = len + 1;
} else if ( -1 == len ) {
assert(0); // should be impossible
} else {
str.resize( origsiz + len );
break;
}
}
}

View file

@ -65,7 +65,7 @@ class DBMgr {
DevIDRelay RegisterDevice( const DevID* hosts ); DevIDRelay RegisterDevice( const DevID* hosts );
HostID AddDevice( const char* const connName, HostID curID, int clientVersion, HostID AddDevice( const char* const connName, HostID curID, int clientVersion,
int nToAdd, unsigned short seed, const in_addr& addr, int nToAdd, unsigned short seed, const AddrInfo* addr,
DevIDRelay devID, bool unAckd ); DevIDRelay devID, bool unAckd );
void NoteAckd( const char* const connName, HostID id ); void NoteAckd( const char* const connName, HostID id );
HostID HIDForSeed( const char* const connName, unsigned short seed ); HostID HIDForSeed( const char* const connName, unsigned short seed );
@ -77,7 +77,7 @@ class DBMgr {
void RecordSent( const char* const connName, HostID hid, int nBytes ); void RecordSent( const char* const connName, HostID hid, int nBytes );
void RecordSent( const int* msgID, int nMsgIDs ); void RecordSent( const int* msgID, int nMsgIDs );
void RecordAddress( const char* const connName, HostID hid, void RecordAddress( const char* const connName, HostID hid,
const in_addr& addr ); const AddrInfo* addr );
void GetPlayerCounts( const char* const connName, int* nTotal, void GetPlayerCounts( const char* const connName, int* nTotal,
int* nHere ); int* nHere );

View file

@ -238,7 +238,7 @@ http_thread_main( void* arg )
struct sockaddr_in name; struct sockaddr_in name;
socklen_t namelen = sizeof(name); socklen_t namelen = sizeof(name);
bool isLocal = 0 == getpeername( sock, (struct sockaddr*)&name, bool isLocal = 0 == getpeername( sock, (AddrInfo*)&name,
&namelen ); &namelen );
if ( isLocal ) { if ( isLocal ) {
in_addr_t s_addr = name.sin_addr.s_addr; in_addr_t s_addr = name.sin_addr.s_addr;

View file

@ -1,3 +1,4 @@
gcm.py gcm.py
gcm.pyc gcm.pyc
mykey.py* mykey.py*
gcm_loop.pyc

View file

@ -1,30 +1,32 @@
#!/usr/bin/python #!/usr/bin/python
import sys, gcm, psycopg2, json import sys, psycopg2, json, urllib, urllib2
# I'm not checking my key in... # I'm not checking my key in...
import mykey import mykey
GCM_URL = 'https://android.googleapis.com/gcm/send'
def usage(): def usage():
print 'usage:', sys.argv[0], '[--to <name>] msg' print 'usage:', sys.argv[0], '[--to <name>] msg'
sys.exit() sys.exit()
def msgViaGCM( devid, msg ): def sendMsg( devid, msg ):
instance = gcm.GCM( mykey.myKey ) values = {
data = { 'title' : 'Msg from Darth', 'registration_ids': [ devid ],
'msg' : msg, 'data' : { 'title' : 'Msg from Darth2',
} 'msg' : msg,
}
}
params = json.dumps( values )
req = urllib2.Request("https://android.googleapis.com/gcm/send", params )
req.add_header( 'Content-Type' , 'application/x-www-form-urlencoded;charset=UTF-8' )
req.add_header( 'Authorization' , 'key=' + mykey.myKey )
req.add_header('Content-Type', 'application/json' )
response = urllib2.urlopen( req )
response = instance.json_request( registration_ids = [devid], response = response.read()
data = data ) print response
if 'errors' in response:
response = response['errors']
if 'NotRegistered' in response:
ids = response['NotRegistered']
for id in ids:
print 'need to remove "', id, '" from db'
else:
print 'no errors'
def main(): def main():
to = None to = None
@ -40,7 +42,7 @@ def main():
if not to: usage() if not to: usage()
devid = mykey.devids[to] devid = mykey.devids[to]
print 'sending: "%s" to' % msg, to print 'sending: "%s" to' % msg, to
msgViaGCM( devid, msg ) sendMsg( devid, msg )
############################################################################## ##############################################################################
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -81,18 +81,19 @@ void
XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc ) XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
{ {
m_nThreads = nThreads; m_nThreads = nThreads;
m_threadInfos = (ThreadInfo*)malloc( nThreads * sizeof(*m_threadInfos) );
m_pFunc = pFunc; m_pFunc = pFunc;
m_kFunc = kFunc; m_kFunc = kFunc;
pthread_t thread; for ( int ii = 0; ii < nThreads; ++ii ) {
ThreadInfo* tip = &m_threadInfos[ii];
int ii; tip->me = this;
for ( ii = 0; ii < nThreads; ++ii ) { int result = pthread_create( &tip->thread, NULL, tpool_main, tip );
int result = pthread_create( &thread, NULL, tpool_main, this );
assert( result == 0 ); assert( result == 0 );
pthread_detach( thread ); pthread_detach( tip->thread );
} }
pthread_t thread;
int result = pthread_create( &thread, NULL, listener_main, this ); int result = pthread_create( &thread, NULL, listener_main, this );
assert( result == 0 ); assert( result == 0 );
result = pthread_detach( thread ); result = pthread_detach( thread );
@ -106,55 +107,65 @@ XWThreadPool::Stop()
int ii; int ii;
for ( ii = 0; ii < m_nThreads; ++ii ) { for ( ii = 0; ii < m_nThreads; ++ii ) {
SockInfo si = { STYPE_UNKNOWN, {0} }; SockInfo si;
enqueue( 0, si ); si.m_type = STYPE_UNKNOWN;
enqueue( si );
} }
interrupt_poll(); interrupt_poll();
} }
void void
XWThreadPool::AddSocket( int socket, SockType stype, in_addr& from ) XWThreadPool::AddSocket( SockType stype, const AddrInfo* from )
{ {
{ {
RWWriteLock ml( &m_activeSocketsRWLock ); RWWriteLock ml( &m_activeSocketsRWLock );
SockInfo si; SockInfo si;
si.m_type = stype; si.m_type = stype;
si.m_addr = from; si.m_addr = *from;
m_activeSockets.push_back( pair<int,SockInfo>(socket, si) ); m_activeSockets.push_back( si );
logf( XW_LOGINFO, "%s: %d sockets active", __func__,
m_activeSockets.size() );
} }
interrupt_poll(); interrupt_poll();
} }
bool bool
XWThreadPool::RemoveSocket( int socket ) XWThreadPool::RemoveSocket( const AddrInfo* addr )
{ {
assert( addr->isTCP() );
bool found = false; bool found = false;
{ {
RWWriteLock ml( &m_activeSocketsRWLock ); RWWriteLock ml( &m_activeSocketsRWLock );
vector< pair<int,SockInfo> >::iterator iter = m_activeSockets.begin(); logf( XW_LOGINFO, "%s: START: %d sockets active", __func__,
while ( iter != m_activeSockets.end() ) { m_activeSockets.size() );
if ( iter->first == socket ) {
vector<SockInfo>::iterator iter;
for ( iter = m_activeSockets.begin();
iter != m_activeSockets.end(); ++iter ) {
if ( iter->m_addr.equals( *addr ) ) {
m_activeSockets.erase( iter ); m_activeSockets.erase( iter );
found = true; found = true;
break; break;
} }
++iter;
} }
logf( XW_LOGINFO, "%s: AFTER: %d sockets active", __func__,
m_activeSockets.size() );
} }
return found; return found;
} /* RemoveSocket */ } /* RemoveSocket */
void void
XWThreadPool::CloseSocket( int socket ) XWThreadPool::CloseSocket( const AddrInfo* addr )
{ {
/* bool do_interrupt = false; */ /* bool do_interrupt = false; */
if ( !RemoveSocket( socket ) ) { assert( addr->isTCP() );
if ( !RemoveSocket( addr ) ) {
MutexLock ml( &m_queueMutex ); MutexLock ml( &m_queueMutex );
deque<QueuePr>::iterator iter = m_queue.begin(); deque<QueuePr>::iterator iter = m_queue.begin();
while ( iter != m_queue.end() ) { while ( iter != m_queue.end() ) {
if ( iter->m_socket == socket ) { if ( iter->m_info.m_addr.equals( *addr ) ) {
m_queue.erase( iter ); m_queue.erase( iter );
/* do_interrupt = true; */ /* do_interrupt = true; */
break; break;
@ -163,7 +174,7 @@ XWThreadPool::CloseSocket( int socket )
} }
} }
logf( XW_LOGINFO, "CLOSING socket %d", socket ); logf( XW_LOGINFO, "CLOSING socket %d", socket );
close( socket ); close( addr->socket() );
/* if ( do_interrupt ) { */ /* if ( do_interrupt ) { */
/* We always need to interrupt the poll because the socket we're closing /* We always need to interrupt the poll because the socket we're closing
will be in the list being listened to. That or we need to drop sockets will be in the list being listened to. That or we need to drop sockets
@ -174,51 +185,57 @@ XWThreadPool::CloseSocket( int socket )
} }
void void
XWThreadPool::EnqueueKill( int socket, const char* const why ) XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
{ {
logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, socket, why ); logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, socket, why );
SockInfo si = { STYPE_UNKNOWN, {0} }; if ( addr->isTCP() ) {
enqueue( socket, si, Q_KILL ); SockInfo si;
si.m_type = STYPE_UNKNOWN;
si.m_addr = *addr;
enqueue( si, Q_KILL );
}
} }
bool bool
XWThreadPool::get_process_packet( int socket, SockType stype, in_addr& addr ) XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
{ {
bool success = false; bool success = false;
short packetSize; short packetSize;
assert( sizeof(packetSize) == 2 ); assert( sizeof(packetSize) == 2 );
unsigned char buf[MAX_MSG_LEN+1]; unsigned char buf[MAX_MSG_LEN+1];
int nRead = read_packet( socket, buf, sizeof(buf) ); int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
if ( nRead < 0 ) { if ( nRead < 0 ) {
EnqueueKill( socket, "bad packet" ); EnqueueKill( addr, "bad packet" );
} else if ( STYPE_GAME == stype ) { } else if ( STYPE_GAME == stype ) {
logf( XW_LOGINFO, "calling m_pFunc" ); logf( XW_LOGINFO, "calling m_pFunc" );
success = (*m_pFunc)( buf, nRead, socket, addr ); success = (*m_pFunc)( buf, nRead, addr );
} else { } else {
buf[nRead] = '\0'; buf[nRead] = '\0';
handle_proxy_packet( buf, nRead, socket, addr ); handle_proxy_packet( buf, nRead, addr );
CloseSocket( socket ); CloseSocket( addr );
} }
return success; return success;
} /* get_process_packet */ } /* get_process_packet */
/* static */ void* void*
XWThreadPool::tpool_main( void* closure ) XWThreadPool::tpool_main( void* closure )
{ {
blockSignals(); blockSignals();
XWThreadPool* me = (XWThreadPool*)closure; ThreadInfo* tip = (ThreadInfo*)closure;
return me->real_tpool_main(); return tip->me->real_tpool_main( tip );
} }
void* void*
XWThreadPool::real_tpool_main() XWThreadPool::real_tpool_main( ThreadInfo* tip )
{ {
logf( XW_LOGINFO, "tpool worker thread starting" ); logf( XW_LOGINFO, "tpool worker thread starting" );
int socket = -1; int socket = -1;
for ( ; ; ) { for ( ; ; ) {
pthread_mutex_lock( &m_queueMutex ); pthread_mutex_lock( &m_queueMutex );
tip->recentTime = 0;
release_socket_locked( socket ); release_socket_locked( socket );
while ( !m_timeToDie && m_queue.size() == 0 ) { while ( !m_timeToDie && m_queue.size() == 0 ) {
@ -232,26 +249,29 @@ XWThreadPool::real_tpool_main()
} }
QueuePr pr; QueuePr pr;
grab_elem_locked( &pr ); bool gotOne = grab_elem_locked( &pr );
tip->recentTime = time( NULL );
pthread_mutex_unlock( &m_queueMutex ); pthread_mutex_unlock( &m_queueMutex );
if ( pr.m_socket >= 0 ) { if ( gotOne ) {
logf( XW_LOGINFO, "worker thread got socket %d from queue", socket = pr.m_info.m_addr.socket();
pr.m_socket ); logf( XW_LOGINFO, "worker thread got socket %d from queue", socket );
switch ( pr.m_act ) { switch ( pr.m_act ) {
case Q_READ: case Q_READ:
if ( get_process_packet( pr.m_socket, pr.m_info.m_type, pr.m_info.m_addr ) ) { assert( socket >= 0 );
AddSocket( pr.m_socket, pr.m_info.m_type, pr.m_info.m_addr ); if ( get_process_packet( pr.m_info.m_type, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, &pr.m_info.m_addr );
} }
break; break;
case Q_KILL: case Q_KILL:
(*m_kFunc)( pr.m_socket ); (*m_kFunc)( &pr.m_info.m_addr );
CloseSocket( pr.m_socket ); CloseSocket( &pr.m_info.m_addr );
break; break;
} }
} else {
socket = -1;
} }
socket = pr.m_socket;
} }
logf( XW_LOGINFO, "tpool worker thread exiting" ); logf( XW_LOGINFO, "tpool worker thread exiting" );
return NULL; return NULL;
@ -310,11 +330,11 @@ XWThreadPool::real_listener()
#endif #endif
++curfd; ++curfd;
vector< pair<int,SockInfo> >::iterator iter; vector<SockInfo>::iterator iter;
for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end(); for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end();
++iter ) { ++iter ) {
fds[curfd].fd = iter->first; fds[curfd].fd = iter->m_addr.socket();
sinfos[curfd] = iter->second; sinfos[curfd] = *iter;
fds[curfd].events = flags; fds[curfd].events = flags;
#ifdef LOG_POLL #ifdef LOG_POLL
if ( logCapacity > logLen ) { if ( logCapacity > logLen ) {
@ -366,7 +386,9 @@ XWThreadPool::real_listener()
if ( fds[curfd].revents != 0 ) { if ( fds[curfd].revents != 0 ) {
int socket = fds[curfd].fd; int socket = fds[curfd].fd;
if ( !RemoveSocket( socket ) ) { const AddrInfo* addr = &sinfos[curfd].m_addr;
assert( socket == addr->socket() );
if ( !RemoveSocket( addr ) ) {
/* no further processing if it's been removed while /* no further processing if it's been removed while
we've been sleeping in poll */ we've been sleeping in poll */
--nEvents; --nEvents;
@ -374,11 +396,11 @@ XWThreadPool::real_listener()
} }
if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) { if ( 0 != (fds[curfd].revents & (POLLIN | POLLPRI)) ) {
enqueue( socket, sinfos[curfd] ); enqueue( sinfos[curfd] );
} else { } else {
logf( XW_LOGERROR, "odd revents: %x", logf( XW_LOGERROR, "odd revents: %x",
fds[curfd].revents ); fds[curfd].revents );
EnqueueKill( socket, "error/hup in poll()" ); EnqueueKill( addr, "error/hup in poll()" );
} }
--nEvents; --nEvents;
} }
@ -408,23 +430,23 @@ XWThreadPool::listener_main( void* closure )
} }
void void
XWThreadPool::enqueue( int socket, SockInfo si, QAction act ) XWThreadPool::enqueue( SockInfo si, QAction act )
{ {
QueuePr pr = { act, socket, si }; QueuePr pr = { act, si };
MutexLock ml( &m_queueMutex ); MutexLock ml( &m_queueMutex );
m_queue.push_back( pr ); m_queue.push_back( pr );
pthread_cond_signal( &m_queueCondVar ); pthread_cond_signal( &m_queueCondVar );
log_hung_threads();
} }
void bool
XWThreadPool::grab_elem_locked( QueuePr* prp ) XWThreadPool::grab_elem_locked( QueuePr* prp )
{ {
bool found = false; bool found = false;
prp->m_socket = -1;
deque<QueuePr>::iterator iter; deque<QueuePr>::iterator iter;
for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) { for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) {
int socket = iter->m_socket; int socket = iter->m_info.m_addr.socket();
/* If NOT found */ /* If NOT found */
if ( m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) { if ( m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) {
*prp = *iter; *prp = *iter;
@ -435,6 +457,7 @@ XWThreadPool::grab_elem_locked( QueuePr* prp )
} }
print_in_use(); print_in_use();
return found;
} /* grab_elem_locked */ } /* grab_elem_locked */
void void
@ -451,12 +474,36 @@ XWThreadPool::release_socket_locked( int socket )
void void
XWThreadPool::print_in_use( void ) XWThreadPool::print_in_use( void )
{ {
char buf[32] = {0}; string str;
int len = 0;
set<int>::iterator iter; set<int>::iterator iter;
for ( iter = m_sockets_in_use.begin(); for ( iter = m_sockets_in_use.begin();
iter != m_sockets_in_use.end(); ++iter ) { iter != m_sockets_in_use.end(); ++iter ) {
len += snprintf( &buf[len], sizeof(buf)-len, "%d ", *iter ); string_printf( str, "%d ", *iter );
}
if ( 0 < str.size() ) {
logf( XW_LOGINFO, "Sockets in use: %s", str.c_str() );
}
}
// We have the mutex when this is called
void
XWThreadPool::log_hung_threads( void )
{
const time_t HUNG_THREASHHOLD = 300; // seconds
int ii;
time_t now = time( NULL );
for ( ii = 0; ii < m_nThreads; ++ii ) {
ThreadInfo* tip = &m_threadInfos[ii];
time_t recentTime = tip->recentTime;
if ( 0 != recentTime ) {
time_t howLong = now - recentTime;
if ( HUNG_THREASHHOLD < howLong ) {
logf( XW_LOGERROR, "thread %d (%p) stopped for %d seconds!",
ii, tip->thread, howLong );
tip->recentTime = 0; // only log once
assert(0);
}
}
} }
} }

View file

@ -31,6 +31,8 @@
#include <deque> #include <deque>
#include <set> #include <set>
#include "addrinfo.h"
using namespace std; using namespace std;
class XWThreadPool { class XWThreadPool {
@ -39,13 +41,19 @@ class XWThreadPool {
typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType; typedef enum { STYPE_UNKNOWN, STYPE_GAME, STYPE_PROXY } SockType;
typedef struct _SockInfo { typedef struct _SockInfo {
SockType m_type; SockType m_type;
in_addr m_addr; AddrInfo m_addr;
} SockInfo; } SockInfo;
typedef struct _ThreadInfo {
XWThreadPool* me;
pthread_t thread;
time_t recentTime;
} ThreadInfo;
static XWThreadPool* GetTPool(); static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen, int socket, typedef bool (*packet_func)( unsigned char* buf, int bufLen,
in_addr& addr ); const AddrInfo* from );
typedef void (*kill_func)( int socket ); typedef void (*kill_func)( const AddrInfo* addr );
XWThreadPool(); XWThreadPool();
~XWThreadPool(); ~XWThreadPool();
@ -54,36 +62,37 @@ class XWThreadPool {
void Stop(); void Stop();
/* Add to set being listened on */ /* Add to set being listened on */
void AddSocket( int socket, SockType stype, in_addr& fromAddr ); void AddSocket( SockType stype, const AddrInfo* from );
/* remove from tpool altogether, and close */ /* remove from tpool altogether, and close */
void CloseSocket( int socket ); void CloseSocket( const AddrInfo* addr );
void EnqueueKill( int socket, const char* const why ); void EnqueueKill( const AddrInfo* addr, const char* const why );
private: private:
typedef enum { Q_READ, Q_KILL } QAction; typedef enum { Q_READ, Q_KILL } QAction;
typedef struct { QAction m_act; int m_socket; SockInfo m_info; } QueuePr; typedef struct { QAction m_act; SockInfo m_info; } QueuePr;
/* Remove from set being listened on */ /* Remove from set being listened on */
bool RemoveSocket( int socket ); bool RemoveSocket( const AddrInfo* addr );
void enqueue( int socket, QAction act = Q_READ ); void enqueue( QAction act = Q_READ );
void enqueue( int socket, SockInfo si, QAction act = Q_READ ); void enqueue( SockInfo si, QAction act = Q_READ );
void release_socket_locked( int socket ); void release_socket_locked( int socket );
void grab_elem_locked( QueuePr* qpp ); bool grab_elem_locked( QueuePr* qpp );
void print_in_use( void ); void print_in_use( void );
void log_hung_threads( void );
bool get_process_packet( int socket, SockType stype, in_addr& addr ); bool get_process_packet( SockType stype, const AddrInfo* from );
void interrupt_poll(); void interrupt_poll();
void* real_tpool_main(); void* real_tpool_main( ThreadInfo* tsp );
static void* tpool_main( void* closure ); static void* tpool_main( void* closure );
void* real_listener(); void* real_listener();
static void* listener_main( void* closure ); static void* listener_main( void* closure );
/* Sockets main thread listens on */ /* Sockets main thread listens on */
vector< pair<int,SockInfo> >m_activeSockets; vector<SockInfo>m_activeSockets;
pthread_rwlock_t m_activeSocketsRWLock; pthread_rwlock_t m_activeSocketsRWLock;
/* Sockets waiting for a thread to read 'em */ /* Sockets waiting for a thread to read 'em */
@ -100,6 +109,7 @@ class XWThreadPool {
int m_nThreads; int m_nThreads;
packet_func m_pFunc; packet_func m_pFunc;
kill_func m_kFunc; kill_func m_kFunc;
ThreadInfo* m_threadInfos;
static XWThreadPool* g_instance; static XWThreadPool* g_instance;
}; };

View file

@ -75,8 +75,10 @@
#include "permid.h" #include "permid.h"
#include "lstnrmgr.h" #include "lstnrmgr.h"
#include "dbmgr.h" #include "dbmgr.h"
#include "addrinfo.h"
static int s_nSpawns = 0; static int s_nSpawns = 0;
static int g_maxsocks = -1;
void void
logf( XW_LogLevel level, const char* format, ... ) logf( XW_LogLevel level, const char* format, ... )
@ -322,27 +324,31 @@ flagsOK( unsigned char** bufp, unsigned char const* end,
} /* flagsOK */ } /* flagsOK */
void void
denyConnection( int socket, XWREASON err ) denyConnection( const AddrInfo* addr, XWREASON err )
{ {
unsigned char buf[2]; unsigned char buf[2];
buf[0] = XWRELAY_CONNECTDENIED; buf[0] = XWRELAY_CONNECTDENIED;
buf[1] = err; buf[1] = err;
send_with_length_unsafe( socket, buf, sizeof(buf) ); send_with_length_unsafe( addr, buf, sizeof(buf) );
} }
/* No mutex here. Caller better be ensuring no other thread can access this /* No mutex here. Caller better be ensuring no other thread can access this
* socket. */ * socket. */
bool bool
send_with_length_unsafe( int socket, unsigned char* buf, int bufLen ) send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
size_t bufLen )
{ {
assert( !!addr );
bool ok = false; bool ok = false;
int socket = addr->socket();
assert ( addr->isTCP() );
unsigned short len = htons( bufLen ); unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 ); ssize_t nSent = send( socket, &len, 2, 0 );
if ( nSent == 2 ) { if ( nSent == 2 ) {
nSent = send( socket, buf, bufLen, 0 ); nSent = send( socket, buf, bufLen, 0 );
if ( nSent == bufLen ) { if ( nSent == ssize_t(bufLen) ) {
logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket );
ok = true; ok = true;
} }
@ -368,7 +374,7 @@ send_with_length_unsafe( int socket, unsigned char* buf, int bufLen )
* game? * game?
*/ */
static bool static bool
processConnect( unsigned char* bufp, int bufLen, int socket, in_addr& addr ) processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
{ {
char cookie[MAX_INVITE_LEN+1]; char cookie[MAX_INVITE_LEN+1];
unsigned char* end = bufp + bufLen; unsigned char* end = bufp + bufLen;
@ -407,25 +413,25 @@ processConnect( unsigned char* bufp, int bufLen, int socket, in_addr& addr )
static pthread_mutex_t s_newCookieLock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t s_newCookieLock = PTHREAD_MUTEX_INITIALIZER;
MutexLock ml( &s_newCookieLock ); MutexLock ml( &s_newCookieLock );
SafeCref scr( cookie, socket, clientVersion, &devID, SafeCref scr( cookie, addr, clientVersion, &devID,
nPlayersH, nPlayersT, seed, langCode, nPlayersH, nPlayersT, seed, langCode,
wantsPublic, makePublic ); wantsPublic, makePublic );
/* nPlayersT etc could be slots in SafeCref to avoid passing /* nPlayersT etc could be slots in SafeCref to avoid passing
here */ here */
success = scr.Connect( socket, nPlayersH, nPlayersT, seed, addr ); success = scr.Connect( nPlayersH, nPlayersT, seed );
} else { } else {
err = XWRELAY_ERROR_BADPROTO; err = XWRELAY_ERROR_BADPROTO;
} }
} }
if ( err != XWRELAY_ERROR_NONE ) { if ( err != XWRELAY_ERROR_NONE ) {
denyConnection( socket, err ); denyConnection( addr, err );
} }
return success; return success;
} /* processConnect */ } /* processConnect */
static bool static bool
processReconnect( unsigned char* bufp, int bufLen, int socket, in_addr& addr ) processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
{ {
unsigned char* end = bufp + bufLen; unsigned char* end = bufp + bufLen;
bool success = false; bool success = false;
@ -459,11 +465,11 @@ processReconnect( unsigned char* bufp, int bufLen, int socket, in_addr& addr )
getDevID( &bufp, end, flags, &devID ); getDevID( &bufp, end, flags, &devID );
SafeCref scr( connName[0]? connName : NULL, SafeCref scr( connName[0]? connName : NULL,
cookie, srcID, socket, clientVersion, &devID, cookie, srcID, addr, clientVersion, &devID,
nPlayersH, nPlayersT, gameSeed, langCode, nPlayersH, nPlayersT, gameSeed, langCode,
wantsPublic, makePublic ); wantsPublic, makePublic );
success = scr.Reconnect( socket, srcID, nPlayersH, nPlayersT, success = scr.Reconnect( srcID, nPlayersH, nPlayersT, gameSeed,
gameSeed, addr, &err ); &err );
if ( !success ) { if ( !success ) {
assert( err != XWRELAY_ERROR_NONE ); assert( err != XWRELAY_ERROR_NONE );
} }
@ -473,27 +479,27 @@ processReconnect( unsigned char* bufp, int bufLen, int socket, in_addr& addr )
} }
if ( err != XWRELAY_ERROR_NONE ) { if ( err != XWRELAY_ERROR_NONE ) {
denyConnection( socket, err ); denyConnection( addr, err );
} }
return success; return success;
} /* processReconnect */ } /* processReconnect */
static bool static bool
processAck( unsigned char* bufp, int bufLen, int socket ) processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr )
{ {
bool success = false; bool success = false;
unsigned char* end = bufp + bufLen; unsigned char* end = bufp + bufLen;
HostID srcID; HostID srcID;
if ( getNetByte( &bufp, end, &srcID ) ) { if ( getNetByte( &bufp, end, &srcID ) ) {
SafeCref scr( socket ); SafeCref scr( addr );
success = scr.HandleAck( srcID ); success = scr.HandleAck( srcID );
} }
return success; return success;
} }
static bool static bool
processDisconnect( unsigned char* bufp, int bufLen, int socket ) processDisconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
{ {
unsigned char* end = bufp + bufLen; unsigned char* end = bufp + bufLen;
CookieID cookieID; CookieID cookieID;
@ -503,8 +509,8 @@ processDisconnect( unsigned char* bufp, int bufLen, int socket )
if ( getNetShort( &bufp, end, &cookieID ) if ( getNetShort( &bufp, end, &cookieID )
&& getNetByte( &bufp, end, &hostID ) ) { && getNetByte( &bufp, end, &hostID ) ) {
SafeCref scr( socket ); SafeCref scr( addr );
scr.Disconnect( socket, hostID ); scr.Disconnect( addr, hostID );
success = true; success = true;
} else { } else {
logf( XW_LOGERROR, "dropping XWRELAY_GAME_DISCONNECT; wrong length" ); logf( XW_LOGERROR, "dropping XWRELAY_GAME_DISCONNECT; wrong length" );
@ -513,10 +519,10 @@ processDisconnect( unsigned char* bufp, int bufLen, int socket )
} /* processDisconnect */ } /* processDisconnect */
static void static void
killSocket( int socket ) killSocket( const AddrInfo* addr )
{ {
logf( XW_LOGINFO, "%s(%d)", __func__, socket ); logf( XW_LOGINFO, "%s(addr.socket=%d)", __func__, addr->socket() );
CRefMgr::Get()->RemoveSocketRefs( socket ); CRefMgr::Get()->RemoveSocketRefs( addr );
} }
time_t time_t
@ -546,7 +552,7 @@ GetNSpawns(void)
/* forward the message. Need only change the command after looking up the /* forward the message. Need only change the command after looking up the
* socket and it's ready to go. */ * socket and it's ready to go. */
static bool static bool
forwardMessage( unsigned char* buf, int buflen, int srcSocket, in_addr& addr ) forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr )
{ {
bool success = false; bool success = false;
unsigned char* bufp = buf + 1; /* skip cmd */ unsigned char* bufp = buf + 1; /* skip cmd */
@ -561,7 +567,7 @@ forwardMessage( unsigned char* buf, int buflen, int srcSocket, in_addr& addr )
logf( XW_LOGINFO, "cookieID = %d", cookieID ); logf( XW_LOGINFO, "cookieID = %d", cookieID );
if ( COOKIE_ID_NONE == cookieID ) { if ( COOKIE_ID_NONE == cookieID ) {
SafeCref scr( srcSocket ); SafeCref scr( addr );
success = scr.Forward( src, addr, dest, buf, buflen ); success = scr.Forward( src, addr, dest, buf, buflen );
} else { } else {
SafeCref scr( cookieID ); /* won't work if not allcon; will be 0 */ SafeCref scr( cookieID ); /* won't work if not allcon; will be 0 */
@ -572,7 +578,7 @@ forwardMessage( unsigned char* buf, int buflen, int srcSocket, in_addr& addr )
} /* forwardMessage */ } /* forwardMessage */
static bool static bool
processMessage( unsigned char* buf, int bufLen, int socket, in_addr& addr ) processMessage( unsigned char* buf, int bufLen, const AddrInfo* addr )
{ {
bool success = false; /* default is failure */ bool success = false; /* default is failure */
XWRELAY_Cmd cmd = *buf; XWRELAY_Cmd cmd = *buf;
@ -581,16 +587,16 @@ processMessage( unsigned char* buf, int bufLen, int socket, in_addr& addr )
switch( cmd ) { switch( cmd ) {
case XWRELAY_GAME_CONNECT: case XWRELAY_GAME_CONNECT:
success = processConnect( buf+1, bufLen-1, socket, addr ); success = processConnect( buf+1, bufLen-1, addr );
break; break;
case XWRELAY_GAME_RECONNECT: case XWRELAY_GAME_RECONNECT:
success = processReconnect( buf+1, bufLen-1, socket, addr ); success = processReconnect( buf+1, bufLen-1, addr );
break; break;
case XWRELAY_ACK: case XWRELAY_ACK:
success = processAck( buf+1, bufLen-1, socket ); success = processAck( buf+1, bufLen-1, addr );
break; break;
case XWRELAY_GAME_DISCONNECT: case XWRELAY_GAME_DISCONNECT:
success = processDisconnect( buf+1, bufLen-1, socket ); success = processDisconnect( buf+1, bufLen-1, addr );
break; break;
#ifdef RELAY_HEARTBEAT #ifdef RELAY_HEARTBEAT
case XWRELAY_HEARTBEAT: case XWRELAY_HEARTBEAT:
@ -598,7 +604,7 @@ processMessage( unsigned char* buf, int bufLen, int socket, in_addr& addr )
break; break;
#endif #endif
case XWRELAY_MSG_TORELAY: case XWRELAY_MSG_TORELAY:
success = forwardMessage( buf, bufLen, socket, addr ); success = forwardMessage( buf, bufLen, addr );
break; break;
default: default:
logf( XW_LOGERROR, "%s bad: %d", __func__, cmd ); logf( XW_LOGERROR, "%s bad: %d", __func__, cmd );
@ -607,7 +613,7 @@ processMessage( unsigned char* buf, int bufLen, int socket, in_addr& addr )
} }
if ( !success ) { if ( !success ) {
XWThreadPool::GetTPool()->EnqueueKill( socket, "failure" ); XWThreadPool::GetTPool()->EnqueueKill( addr, "failure" );
} }
return success; return success;
@ -628,7 +634,7 @@ make_socket( unsigned long addr, unsigned short port )
return -1; return -1;
} }
struct sockaddr_in sockAddr; sockaddr_in sockAddr;
sockAddr.sin_family = AF_INET; sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = htonl(addr); sockAddr.sin_addr.s_addr = htonl(addr);
sockAddr.sin_port = htons(port); sockAddr.sin_port = htons(port);
@ -667,6 +673,7 @@ usage( char* arg0 )
"\t-h (print this help)\\\n" "\t-h (print this help)\\\n"
"\t-i <idfile> (file where next global id stored)\\\n" "\t-i <idfile> (file where next global id stored)\\\n"
"\t-l <logfile> (write logs here, not stderr)\\\n" "\t-l <logfile> (write logs here, not stderr)\\\n"
"\t-m <num_sockets> (max number of simultaneous sockets to have open)\\\n"
"\t-n <serverName> (used in permID generation)\\\n" "\t-n <serverName> (used in permID generation)\\\n"
"\t-p <port> (port to listen on)\\\n" "\t-p <port> (port to listen on)\\\n"
#ifdef DO_HTTP #ifdef DO_HTTP
@ -806,7 +813,7 @@ pushMsgs( vector<unsigned char>& out, DBMgr* dbmgr, const char* connName,
} }
static void static void
handleMsgsMsg( int sock, in_addr& addr, bool sendFull, handleMsgsMsg( const AddrInfo* addr, bool sendFull,
unsigned char* bufp, const unsigned char* end ) unsigned char* bufp, const unsigned char* end )
{ {
unsigned short nameCount; unsigned short nameCount;
@ -850,8 +857,8 @@ handleMsgsMsg( int sock, in_addr& addr, bool sendFull,
memcpy( &out[0], &tmp, sizeof(tmp) ); memcpy( &out[0], &tmp, sizeof(tmp) );
tmp = htons( nameCount ); tmp = htons( nameCount );
memcpy( &out[2], &tmp, sizeof(tmp) ); memcpy( &out[2], &tmp, sizeof(tmp) );
ssize_t nwritten = write( sock, &out[0], out.size() ); ssize_t nwritten = write( addr->socket(), &out[0], out.size() );
logf( XW_LOGINFO, "%s: wrote %d bytes", __func__, nwritten ); logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten );
if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) { if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) {
dbmgr->RecordSent( &msgIDs[0], msgIDs.size() ); dbmgr->RecordSent( &msgIDs[0], msgIDs.size() );
dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() ); dbmgr->RemoveStoredMessages( &msgIDs[0], msgIDs.size() );
@ -907,7 +914,8 @@ log_hex( const unsigned char* memp, int len, const char* tag )
} // log_hex } // log_hex
static void static void
handleProxyMsgs( int sock, in_addr& addr, unsigned char* bufp, unsigned char* end ) handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
unsigned char* end )
{ {
// log_hex( bufp, end-bufp, __func__ ); // log_hex( bufp, end-bufp, __func__ );
unsigned short nameCount; unsigned short nameCount;
@ -962,10 +970,12 @@ handleProxyMsgs( int sock, in_addr& addr, unsigned char* bufp, unsigned char* en
} // handleProxyMsgs } // handleProxyMsgs
void void
handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr ) handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
{ {
logf( XW_LOGINFO, "%s()", __func__ ); logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( len > 0 ) { if ( len > 0 ) {
assert( addr->isTCP() );
int socket = addr->socket();
unsigned char* bufp = buf; unsigned char* bufp = buf;
unsigned char* end = bufp + len; unsigned char* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */ if ( (0 == *bufp++) ) { /* protocol */
@ -985,21 +995,21 @@ handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr )
DBMgr::Get()->PublicRooms( lang, nPlayers, &nNames, names ); DBMgr::Get()->PublicRooms( lang, nPlayers, &nNames, names );
unsigned short netshort = htons( names.size() unsigned short netshort = htons( names.size()
+ sizeof(unsigned short) ); + sizeof(unsigned short) );
write( sock, &netshort, sizeof(netshort) ); write( socket, &netshort, sizeof(netshort) );
netshort = htons( (unsigned short)nNames ); netshort = htons( (unsigned short)nNames );
write( sock, &netshort, sizeof(netshort) ); write( socket, &netshort, sizeof(netshort) );
write( sock, names.c_str(), names.size() ); write( socket, names.c_str(), names.size() );
} }
break; break;
case PRX_HAS_MSGS: case PRX_HAS_MSGS:
case PRX_GET_MSGS: case PRX_GET_MSGS:
if ( len >= 2 ) { if ( len >= 2 ) {
handleMsgsMsg( sock, addr, PRX_GET_MSGS == cmd, bufp, end ); handleMsgsMsg( addr, PRX_GET_MSGS == cmd, bufp, end );
} }
break; /* PRX_HAS_MSGS */ break; /* PRX_HAS_MSGS */
case PRX_PUT_MSGS: case PRX_PUT_MSGS:
handleProxyMsgs( sock, addr, bufp, end ); handleProxyMsgs( socket, addr, bufp, end );
break; break;
case PRX_DEVICE_GONE: case PRX_DEVICE_GONE:
@ -1026,7 +1036,7 @@ handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr )
} }
} }
len = 0; /* return a 0-length message */ len = 0; /* return a 0-length message */
write( sock, &len, sizeof(len) ); write( socket, &len, sizeof(len) );
break; /* PRX_DEVICE_GONE */ break; /* PRX_DEVICE_GONE */
default: default:
logf( XW_LOGERROR, "unexpected command %d", __func__, cmd ); logf( XW_LOGERROR, "unexpected command %d", __func__, cmd );
@ -1036,6 +1046,32 @@ handle_proxy_packet( unsigned char* buf, int len, int sock, in_addr& addr )
} }
} /* handle_proxy_packet */ } /* handle_proxy_packet */
/* From stack overflow, toward a snprintf with an expanding buffer.
*/
void
string_printf( string& str, const char* fmt, ... )
{
const int origsiz = str.size();
int newsiz = 100;
va_list ap;
for ( ; ; ) {
str.resize( origsiz + newsiz );
va_start( ap, fmt );
int len = vsnprintf( (char *)str.c_str() + origsiz, newsiz, fmt, ap );
va_end( ap );
if ( len > newsiz ) { // needs more space
newsiz = len + 1;
} else if ( -1 == len ) {
assert(0); // should be impossible
} else {
str.resize( origsiz + len );
break;
}
}
}
static void static void
set_timeouts( int sock ) set_timeouts( int sock )
{ {
@ -1120,7 +1156,7 @@ main( int argc, char** argv )
first. */ first. */
for ( ; ; ) { for ( ; ; ) {
int opt = getopt(argc, argv, "h?c:p:n:f:l:t:s:w:" int opt = getopt(argc, argv, "h?c:p:m:n:f:l:t:s:w:"
"DF" ); "DF" );
if ( opt == -1 ) { if ( opt == -1 ) {
@ -1162,6 +1198,9 @@ main( int argc, char** argv )
case 'l': case 'l':
logFile = optarg; logFile = optarg;
break; break;
case 'm':
g_maxsocks = atoi( optarg );
break;
case 'n': case 'n':
serverName = optarg; serverName = optarg;
break; break;
@ -1201,6 +1240,11 @@ main( int argc, char** argv )
if ( nWorkerThreads == 0 ) { if ( nWorkerThreads == 0 ) {
(void)cfg->GetValueFor( "NTHREADS", &nWorkerThreads ); (void)cfg->GetValueFor( "NTHREADS", &nWorkerThreads );
} }
if ( g_maxsocks == -1 ) {
(void)cfg->GetValueFor( "MAXSOCKS", &g_maxsocks );
} else {
g_maxsocks = 100;
}
char serverNameBuf[128]; char serverNameBuf[128];
if ( serverName == NULL ) { if ( serverName == NULL ) {
if ( cfg->GetValueFor( "SERVERNAME", serverNameBuf, if ( cfg->GetValueFor( "SERVERNAME", serverNameBuf,
@ -1391,15 +1435,19 @@ main( int argc, char** argv )
} }
if ( FD_ISSET( listener, &rfds ) ) { if ( FD_ISSET( listener, &rfds ) ) {
struct sockaddr_in newaddr; AddrInfo::AddrUnion saddr;
socklen_t siz = sizeof(newaddr); socklen_t siz = sizeof(saddr.addr_in);
int newSock = accept( listener, (sockaddr*)&newaddr, int newSock = accept( listener, &saddr.addr, &siz );
&siz );
if ( newSock < 0 ) { if ( newSock < 0 ) {
logf( XW_LOGERROR, "accept failed: errno(%d)=%s", logf( XW_LOGERROR, "accept failed: errno(%d)=%s",
errno, strerror(errno) ); errno, strerror(errno) );
assert( 0 ); // we're leaking files or load has grown assert( 0 ); // we're leaking files or load has grown
} else { } else {
// I've seen a bug where we accept but never service
// connections. Sockets are not closed, and so the
// number goes up. Probably need a watchdog instead,
// but this will work around it.
assert( g_maxsocks > newSock );
/* Set timeout so send and recv won't block forever */ /* Set timeout so send and recv won't block forever */
set_timeouts( newSock ); set_timeouts( newSock );
@ -1408,12 +1456,12 @@ main( int argc, char** argv )
logf( XW_LOGINFO, logf( XW_LOGINFO,
"%s: accepting connection from %s on socket %d", "%s: accepting connection from %s on socket %d",
__func__, inet_ntoa(newaddr.sin_addr), newSock ); __func__, inet_ntoa(saddr.addr_in.sin_addr), newSock );
tPool->AddSocket( newSock, AddrInfo addr( true, newSock, &saddr );
perGame ? XWThreadPool::STYPE_GAME tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
: XWThreadPool::STYPE_PROXY, : XWThreadPool::STYPE_PROXY,
newaddr.sin_addr ); &addr );
} }
--retval; --retval;
} }

View file

@ -7,7 +7,6 @@ XWRELAY=${DIR}/xwrelay
PIDFILE=${DIR}/xwrelay.pid PIDFILE=${DIR}/xwrelay.pid
CONFFILE=${DIR}/xwrelay.conf CONFFILE=${DIR}/xwrelay.conf
IDFILE=${DIR}/nextid.txt IDFILE=${DIR}/nextid.txt
CSSFILE=${DIR}/xwrelay.css
LOGFILE=/tmp/xwrelay_log_$$.txt LOGFILE=/tmp/xwrelay_log_$$.txt
#LOGFILE=/dev/null #LOGFILE=/dev/null
@ -98,8 +97,8 @@ do_start() {
exit 1 exit 1
fi fi
echo "starting..." | tee -a $LOGFILE echo "starting..." | tee -a $LOGFILE
echo "running $XWRELAY $@ -f $CONFFILE -s $CSSFILE" | tee -a $LOGFILE echo "running $XWRELAY $@ -f $CONFFILE" | tee -a $LOGFILE
$XWRELAY $@ -f $CONFFILE -s $CSSFILE & $XWRELAY $@ -f $CONFFILE &
NEWPID=$! NEWPID=$!
echo -n $NEWPID > $PIDFILE echo -n $NEWPID > $PIDFILE
sleep 1 sleep 1

View file

@ -22,10 +22,12 @@
#ifndef _XWRELAY_PRIV_H_ #ifndef _XWRELAY_PRIV_H_
#define _XWRELAY_PRIV_H_ #define _XWRELAY_PRIV_H_
#include <string>
#include <time.h> #include <time.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "lstnrmgr.h" #include "lstnrmgr.h"
#include "xwrelay.h" #include "xwrelay.h"
#include "addrinfo.h"
typedef unsigned char HostID; /* see HOST_ID_SERVER */ typedef unsigned char HostID; /* see HOST_ID_SERVER */
@ -38,8 +40,9 @@ typedef enum {
void logf( XW_LogLevel level, const char* format, ... ); void logf( XW_LogLevel level, const char* format, ... );
void denyConnection( int socket, XWREASON err ); void denyConnection( const AddrInfo* addr, XWREASON err );
bool send_with_length_unsafe( int socket, unsigned char* buf, int bufLen ); bool send_with_length_unsafe( const AddrInfo* addr,
unsigned char* buf, size_t bufLen );
time_t uptime(void); time_t uptime(void);
@ -49,9 +52,11 @@ int GetNSpawns(void);
int make_socket( unsigned long addr, unsigned short port ); int make_socket( unsigned long addr, unsigned short port );
void string_printf( std::string& str, const char* fmt, ... );
int read_packet( int sock, unsigned char* buf, int buflen ); int read_packet( int sock, unsigned char* buf, int buflen );
void handle_proxy_packet( unsigned char* buf, int bufLen, int socket, void handle_proxy_packet( unsigned char* buf, int bufLen,
in_addr& addr ); const AddrInfo* addr );
const char* cmdToStr( XWRELAY_Cmd cmd ); const char* cmdToStr( XWRELAY_Cmd cmd );