cleanup: better encapsulation for AddrInfo

This commit is contained in:
Eric House 2013-01-13 10:14:06 -08:00
parent c567647a8d
commit 98679c8363
7 changed files with 94 additions and 93 deletions

View file

@ -23,23 +23,41 @@
#define _ADDRINFO_H_
#include <netinet/in.h>
#include <string.h>
class AddrInfo {
public:
/* AddrInfo() : m_clientToken(0), m_socket(-1) { } */
union {
typedef union _AddrUnion {
struct sockaddr addr;
struct sockaddr_in addr_in;
} u;
int m_socket;
} AddrUnion;
AddrInfo() {
memset( this, 0, sizeof(*this) );
m_socket = -1;
m_isValid = false;
}
AddrInfo( bool isTCP, int socket, const AddrUnion* saddr ) {
m_isValid = true;
m_isTCP = isTCP;
m_socket = socket;
memcpy( &m_saddr, saddr, sizeof(m_saddr) );
}
void setIsTCP( bool val ) { m_isTCP = val; }
bool isTCP() const { return m_isTCP; } /* later UDP will be here too */
int socket() const { assert(m_isValid); return m_socket; }
struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; }
bool equals( const AddrInfo& other ) const;
private:
// AddrInfo& operator=(const AddrInfo&); // Prevent assignment
int m_socket;
bool m_isTCP;
bool m_isValid;
AddrUnion m_saddr;
};
#endif

View file

@ -263,12 +263,11 @@ void
CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen )
{
CRefEvent evt( XWE_PROXYMSG );
CRefEvent evt( XWE_PROXYMSG, addr );
evt.u.fwd.src = srcID;
evt.u.fwd.dest = destID;
evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen;
evt.u.fwd.addr = *addr;
m_eventQueue.push_back( evt );
handleEvents();
@ -279,8 +278,7 @@ CookieRef::_Disconnect( const AddrInfo* addr, HostID hostID )
{
logf( XW_LOGINFO, "%s(socket=%d, hostID=%d)", __func__, socket, hostID );
CRefEvent evt( XWE_DISCONN );
evt.u.discon.addr = *addr;
CRefEvent evt( XWE_DISCONN, addr );
evt.u.discon.srcID = hostID;
m_eventQueue.push_back( evt );
@ -406,13 +404,13 @@ CookieRef::notifyDisconn( const CRefEvent* evt )
evt->u.disnote.why
};
send_with_length( &evt->u.disnote.addr, buf, sizeof(buf), true );
send_with_length( &evt->addr, buf, sizeof(buf), true );
} /* notifyDisconn */
void
CookieRef::removeSocket( const AddrInfo* addr )
{
logf( XW_LOGINFO, "%s(socket=%d)", __func__, addr->m_socket );
logf( XW_LOGINFO, "%s(socket=%d)", __func__, addr->socket() );
bool found = false;
ASSERT_LOCKED();
@ -542,9 +540,7 @@ CookieRef::pushConnectEvent( int clientVersion, DevID* devID,
int nPlayersH, int nPlayersS,
int seed, const AddrInfo* addr )
{
CRefEvent evt;
evt.type = XWE_DEVCONNECT;
evt.u.con.addr = *addr;
CRefEvent evt( XWE_DEVCONNECT, addr );
evt.u.con.clientVersion = clientVersion;
evt.u.con.devID = devID;
evt.u.con.srcID = HOST_ID_NONE;
@ -559,8 +555,7 @@ CookieRef::pushReconnectEvent( int clientVersion, DevID* devID,
HostID srcID, int nPlayersH, int nPlayersS,
int seed, const AddrInfo* addr )
{
CRefEvent evt( XWE_RECONNECT );
evt.u.con.addr = *addr;
CRefEvent evt( XWE_RECONNECT, addr );
evt.u.con.clientVersion = clientVersion;
evt.u.con.devID = devID;
evt.u.con.srcID = srcID;
@ -595,28 +590,25 @@ CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen )
{
logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest );
CRefEvent evt( XWE_FORWARDMSG );
CRefEvent evt( XWE_FORWARDMSG, addr );
evt.u.fwd.src = src;
evt.u.fwd.dest = dest;
evt.u.fwd.buf = buf;
evt.u.fwd.buflen = buflen;
evt.u.fwd.addr = *addr;
m_eventQueue.push_back( evt );
}
void
CookieRef::pushRemoveSocketEvent( const AddrInfo* addr )
{
CRefEvent evt( XWE_REMOVESOCKET );
evt.u.rmsock.addr = *addr;
CRefEvent evt( XWE_REMOVESOCKET, addr );
m_eventQueue.push_back( evt );
}
void
CookieRef::pushNotifyDisconEvent( const AddrInfo* addr, XWREASON why )
{
CRefEvent evt( XWE_NOTIFYDISCON );
evt.u.disnote.addr = *addr;
CRefEvent evt( XWE_NOTIFYDISCON, addr );
evt.u.disnote.why = why;
m_eventQueue.push_back( evt );
}
@ -631,8 +623,7 @@ CookieRef::pushLastSocketGoneEvent()
void
CookieRef::pushGameDead( const AddrInfo* addr )
{
CRefEvent evt( XWE_GAMEDEAD );
evt.u.discon.addr = *addr;
CRefEvent evt( XWE_GAMEDEAD, addr );
m_eventQueue.push_back( evt );
}
@ -697,11 +688,11 @@ CookieRef::handleEvents()
break;
case XWA_SEND_DUP_ROOM:
send_denied( &evt, XWRELAY_ERROR_DUP_ROOM );
removeSocket( &evt.u.rmsock.addr );
removeSocket( &evt.addr );
break;
case XWA_SEND_TOO_MANY:
send_denied( &evt, XWRELAY_ERROR_TOO_MANY );
removeSocket( &evt.u.rmsock.addr );
removeSocket( &evt.addr );
break;
case XWA_FWD:
@ -721,20 +712,19 @@ CookieRef::handleEvents()
break;
case XWA_HEARTDISCONN:
notifyOthers( &evt.u.heart.addr, XWRELAY_DISCONNECT_OTHER,
notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_HEART_OTHER );
setAllConnectedTimer();
// reducePlayerCounts( evt.u.discon.socket );
disconnectSocket( &evt.u.heart.addr,
XWRELAY_ERROR_HEART_YOU );
disconnectSocket( &evt.addr, XWRELAY_ERROR_HEART_YOU );
break;
case XWA_DISCONNECT:
setAllConnectedTimer();
// reducePlayerCounts( evt.u.discon.socket );
notifyOthers( &evt.u.discon.addr, XWRELAY_DISCONNECT_OTHER,
notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_OTHER_DISCON );
removeSocket( &evt.u.discon.addr );
removeSocket( &evt.addr );
/* Don't notify. This is a normal part of a game ending. */
break;
@ -743,7 +733,7 @@ CookieRef::handleEvents()
break;
case XWA_TELLGAMEDEAD:
notifyGameDead( &evt.u.discon.addr );
notifyGameDead( &evt.addr );
break;
case XWA_NOTEHEART:
@ -760,10 +750,10 @@ CookieRef::handleEvents()
case XWA_REMOVESOCK_1:
// reducePlayerCounts( evt.u.rmsock.socket );
if ( XWA_REMOVESOCK_2 == takeAction ) {
notifyOthers( &evt.u.rmsock.addr, XWRELAY_DISCONNECT_OTHER,
notifyOthers( &evt.addr, XWRELAY_DISCONNECT_OTHER,
XWRELAY_ERROR_LOST_OTHER );
}
removeSocket( &evt.u.rmsock.addr );
removeSocket( &evt.addr );
break;
case XWA_SENDALLHERE:
@ -857,7 +847,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
logf( XW_LOGVERBOSE0, "%s(dest=%d)", __func__, dest );
assert( dest > 0 && dest <= 4 );
assert( -1 != addr->m_socket );
assert( -1 != addr->socket() );
assert( addr->isTCP() );
for ( ; ; ) {
@ -917,7 +907,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
evt->u.con.srcID =
DBMgr::Get()->AddDevice( ConnName(), evt->u.con.srcID,
evt->u.con.clientVersion, nPlayersH, seed,
&evt->u.con.addr, devID, reconn );
&evt->addr, devID, reconn );
HostID hostid = evt->u.con.srcID;
if ( NULL != hidp ) {
@ -933,7 +923,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
{
RWWriteLock rwl( &m_socketsRWLock );
HostRec hr( hostid, &evt->u.con.addr, nPlayersH, seed, !reconn );
HostRec hr( hostid, &evt->addr, nPlayersH, seed, !reconn );
m_sockets.push_back( hr );
}
@ -1115,7 +1105,7 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial,
}
}
send_with_length( &evt->u.con.addr, buf, bufp - buf, true );
send_with_length( &evt->addr, buf, bufp - buf, true );
logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) );
} /* sendResponse */
@ -1124,7 +1114,7 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
{
HostID dest = evt->u.con.srcID;
if ( HOST_ID_NONE != dest ) {
send_stored_messages( dest, &evt->u.con.addr );
send_stored_messages( dest, &evt->addr );
}
}
@ -1159,7 +1149,7 @@ CookieRef::forward_or_store( const CRefEvent* evt )
/* also note that we've heard from src recently */
HostID src = evt->u.fwd.src;
DBMgr::Get()->RecordAddress( ConnName(), src, &evt->u.fwd.addr );
DBMgr::Get()->RecordAddress( ConnName(), src, &evt->addr );
#ifdef RELAY_HEARTBEAT
pushHeartbeatEvent( src, SocketForHost(src) );
#endif
@ -1169,7 +1159,7 @@ CookieRef::forward_or_store( const CRefEvent* evt )
void
CookieRef::send_denied( const CRefEvent* evt, XWREASON why )
{
denyConnection( &evt->u.con.addr, why );
denyConnection( &evt->addr, why );
}
void
@ -1200,7 +1190,7 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id,
void
CookieRef::notifyOthers( const AddrInfo* addr, XWRelayMsg msg, XWREASON why )
{
assert( addr->m_socket != 0 );
assert( addr->socket() != 0 );
assert( addr->isTCP() );
ASSERT_LOCKED();
@ -1308,7 +1298,7 @@ CookieRef::disconnectSockets( XWREASON why )
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
const AddrInfo* addr = &iter->m_addr;
if ( addr->m_socket != 0 ) {
if ( addr->socket() != 0 ) {
disconnectSocket( addr, why );
} else {
assert( 0 );
@ -1343,7 +1333,7 @@ CookieRef::removeDevice( const CRefEvent* const evt )
void
CookieRef::noteHeartbeat( const CRefEvent* evt )
{
const AddrInfo& addr = evt->u.heart.addr;
const AddrInfo& addr = evt->addr;
HostID id = evt->u.heart.id;
ASSERT_LOCKED();
@ -1360,7 +1350,7 @@ CookieRef::noteHeartbeat( const CRefEvent* evt )
connection. An attack is the most likely explanation. But:
now it's happening after a crash and clients reconnect. */
logf( XW_LOGERROR, "wrong socket record for HostID %x; "
"wanted %d, found %d", id, addr.m_socket, iter->m_addr.m_socket );
"wanted %d, found %d", id, addr.socket(), iter->m_addr.socket() );
}
break;
}
@ -1424,7 +1414,7 @@ CookieRef::printSeeds( const char* caller )
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
len += snprintf( &buf[len], sizeof(buf)-len, "[%d]%.4x(%d)/%d/%c ",
iter->m_hostID, iter->m_seed,
iter->m_seed, iter->m_addr.m_socket,
iter->m_seed, iter->m_addr.socket(),
iter->m_ackPending?'a':'A' );
}
logf( XW_LOGINFO, "seeds/sockets/ack'd after %s(): %s", caller, buf );
@ -1484,7 +1474,7 @@ CookieRef::_PrintCookieInfo( string& out )
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
snprintf( buf, sizeof(buf), " HostID=%d; socket=%d;last hbeat=%ld\n",
iter->m_hostID, iter->m_addr.m_socket,
iter->m_hostID, iter->m_addr.socket(),
iter->m_lastHeartbeat );
out += buf;
}
@ -1512,7 +1502,7 @@ CookieRef::_FormatHostInfo( string* hostIds, string* seeds, string* addrs )
}
if ( !!addrs ) {
int socket = iter->m_addr.m_socket;
int socket = iter->m_addr.socket();
sockaddr_in name;
socklen_t siz = sizeof(name);
if ( 0 == getpeername( socket, (struct sockaddr*)&name, &siz) ) {

View file

@ -152,14 +152,15 @@ class CookieRef {
public:
CRefEvent() { type = XWE_NONE; }
CRefEvent( XW_RELAY_EVENT typ ) { type = typ; }
CRefEvent( XW_RELAY_EVENT typ, const AddrInfo* addrp ) { type = typ; addr = *addrp; }
XW_RELAY_EVENT type;
AddrInfo addr; /* sender's address */
union {
struct {
HostID src;
HostID dest;
unsigned char* buf;
int buflen;
AddrInfo addr; /* sender's address */
} fwd;
struct {
int clientVersion;
@ -168,13 +169,11 @@ class CookieRef {
int nPlayersS;
int seed;
HostID srcID;
AddrInfo addr;
} con;
struct {
HostID srcID;
} ack;
struct {
AddrInfo addr;
HostID srcID;
} discon;
struct {
@ -183,16 +182,13 @@ class CookieRef {
} devgone;
struct {
HostID id;
AddrInfo addr;
} heart;
struct {
time_t now;
} htime;
struct {
AddrInfo addr;
} rmsock;
struct {
AddrInfo addr;
XWREASON why;
} disnote;
} u;

View file

@ -330,7 +330,7 @@ DBMgr::AddDevice( const char* connName, HostID curID, int clientVersion,
" mtimes[%d]='now', ack[%d]=\'%c\'"
" WHERE connName = '%s'";
string query;
char* ntoa = inet_ntoa( ((sockaddr_in*)addr)->sin_addr );
char* ntoa = inet_ntoa( addr->sin_addr() );
string_printf( query, fmt, newID, nToAdd, newID, clientVersion,
newID, seed, newID, ntoa, devIDBuf.c_str(), newID,
newID, ackd?'A':'a', connName );
@ -506,7 +506,7 @@ DBMgr::RecordAddress( const char* const connName, HostID hid,
const char* fmt = "UPDATE " GAMES_TABLE " SET addrs[%d] = \'%s\'"
" WHERE connName = '%s'";
string query;
char* ntoa = inet_ntoa( ((sockaddr_in*)addr)->sin_addr );
char* ntoa = inet_ntoa( addr->sin_addr() );
string_printf( query, fmt, hid, ntoa, connName );
logf( XW_LOGVERBOSE0, "%s: query: %s", __func__, query.c_str() );

View file

@ -86,8 +86,8 @@ XWThreadPool::Setup( int nThreads, packet_func pFunc, kill_func kFunc )
m_kFunc = kFunc;
for ( int ii = 0; ii < nThreads; ++ii ) {
ThreadInfo* tip = &m_threadInfos[ii];
tip->me = this;
ThreadInfo* tip = &m_threadInfos[ii];
tip->me = this;
int result = pthread_create( &tip->thread, NULL, tpool_main, tip );
assert( result == 0 );
pthread_detach( tip->thread );
@ -109,7 +109,6 @@ XWThreadPool::Stop()
for ( ii = 0; ii < m_nThreads; ++ii ) {
SockInfo si;
si.m_type = STYPE_UNKNOWN;
si.m_addr.m_socket = 0;
enqueue( si );
}
@ -175,7 +174,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
}
}
logf( XW_LOGINFO, "CLOSING socket %d", socket );
close( addr->m_socket );
close( addr->socket() );
/* if ( do_interrupt ) { */
/* We always need to interrupt the poll because the socket we're closing
will be in the list being listened to. That or we need to drop sockets
@ -205,7 +204,7 @@ XWThreadPool::get_process_packet( SockType stype, const AddrInfo* addr )
assert( sizeof(packetSize) == 2 );
unsigned char buf[MAX_MSG_LEN+1];
int nRead = read_packet( addr->m_socket, buf, sizeof(buf) );
int nRead = read_packet( addr->socket(), buf, sizeof(buf) );
if ( nRead < 0 ) {
EnqueueKill( addr, "bad packet" );
} else if ( STYPE_GAME == stype ) {
@ -250,17 +249,17 @@ XWThreadPool::real_tpool_main( ThreadInfo* tip )
}
QueuePr pr;
grab_elem_locked( &pr );
bool gotOne = grab_elem_locked( &pr );
tip->recentTime = time( NULL );
pthread_mutex_unlock( &m_queueMutex );
socket = pr.m_info.m_addr.m_socket;
if ( socket >= 0 ) {
logf( XW_LOGINFO, "worker thread got socket %d from queue",
socket );
if ( gotOne ) {
socket = pr.m_info.m_addr.socket();
logf( XW_LOGINFO, "worker thread got socket %d from queue", socket );
switch ( pr.m_act ) {
case Q_READ:
assert( socket >= 0 );
if ( get_process_packet( pr.m_info.m_type, &pr.m_info.m_addr ) ) {
AddSocket( pr.m_info.m_type, &pr.m_info.m_addr );
}
@ -332,7 +331,7 @@ XWThreadPool::real_listener()
vector<SockInfo>::iterator iter;
for ( iter = m_activeSockets.begin(); iter != m_activeSockets.end();
++iter ) {
fds[curfd].fd = iter->m_addr.m_socket;
fds[curfd].fd = iter->m_addr.socket();
sinfos[curfd] = *iter;
fds[curfd].events = flags;
#ifdef LOG_POLL
@ -386,7 +385,7 @@ XWThreadPool::real_listener()
if ( fds[curfd].revents != 0 ) {
int socket = fds[curfd].fd;
const AddrInfo* addr = &sinfos[curfd].m_addr;
assert( socket == addr->m_socket );
assert( socket == addr->socket() );
if ( !RemoveSocket( addr ) ) {
/* no further processing if it's been removed while
we've been sleeping in poll */
@ -439,14 +438,13 @@ XWThreadPool::enqueue( SockInfo si, QAction act )
log_hung_threads();
}
void
bool
XWThreadPool::grab_elem_locked( QueuePr* prp )
{
bool found = false;
prp->m_info.m_addr.m_socket = -1;
deque<QueuePr>::iterator iter;
for ( iter = m_queue.begin(); !found && iter != m_queue.end(); ++iter ) {
int socket = iter->m_info.m_addr.m_socket;
int socket = iter->m_info.m_addr.socket();
/* If NOT found */
if ( m_sockets_in_use.end() == m_sockets_in_use.find( socket ) ) {
*prp = *iter;
@ -457,6 +455,7 @@ XWThreadPool::grab_elem_locked( QueuePr* prp )
}
print_in_use();
return found;
} /* grab_elem_locked */
void
@ -493,16 +492,16 @@ XWThreadPool::log_hung_threads( void )
int ii;
time_t now = time( NULL );
for ( ii = 0; ii < m_nThreads; ++ii ) {
ThreadInfo* tip = &m_threadInfos[ii];
time_t recentTime = tip->recentTime;
if ( 0 != recentTime ) {
time_t howLong = now - recentTime;
if ( HUNG_THREASHHOLD < howLong ) {
logf( XW_LOGERROR, "thread %d (%p) stopped for %d seconds!",
ii, tip->thread, howLong );
tip->recentTime = 0; // only log once
assert(0);
}
}
ThreadInfo* tip = &m_threadInfos[ii];
time_t recentTime = tip->recentTime;
if ( 0 != recentTime ) {
time_t howLong = now - recentTime;
if ( HUNG_THREASHHOLD < howLong ) {
logf( XW_LOGERROR, "thread %d (%p) stopped for %d seconds!",
ii, tip->thread, howLong );
tip->recentTime = 0; // only log once
assert(0);
}
}
}
}

View file

@ -78,7 +78,7 @@ class XWThreadPool {
void enqueue( QAction act = Q_READ );
void enqueue( SockInfo si, QAction act = Q_READ );
void release_socket_locked( int socket );
void grab_elem_locked( QueuePr* qpp );
bool grab_elem_locked( QueuePr* qpp );
void print_in_use( void );
void log_hung_threads( void );

View file

@ -342,7 +342,7 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
{
assert( !!addr );
bool ok = false;
int socket = addr->m_socket;
int socket = addr->socket();
assert ( addr->isTCP() );
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
@ -521,7 +521,7 @@ processDisconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
static void
killSocket( const AddrInfo* addr )
{
logf( XW_LOGINFO, "%s(addr.socket=%d)", __func__, addr->m_socket );
logf( XW_LOGINFO, "%s(addr.socket=%d)", __func__, addr->socket() );
CRefMgr::Get()->RemoveSocketRefs( addr );
}
@ -857,7 +857,7 @@ handleMsgsMsg( const AddrInfo* addr, bool sendFull,
memcpy( &out[0], &tmp, sizeof(tmp) );
tmp = htons( nameCount );
memcpy( &out[2], &tmp, sizeof(tmp) );
ssize_t nwritten = write( addr->m_socket, &out[0], out.size() );
ssize_t nwritten = write( addr->socket(), &out[0], out.size() );
logf( XW_LOGVERBOSE0, "%s: wrote %d bytes", __func__, nwritten );
if ( sendFull && nwritten >= 0 && (size_t)nwritten == out.size() ) {
dbmgr->RecordSent( &msgIDs[0], msgIDs.size() );
@ -975,7 +975,7 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( len > 0 ) {
assert( addr->isTCP() );
int socket = addr->m_socket;
int socket = addr->socket();
unsigned char* bufp = buf;
unsigned char* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */
@ -1435,10 +1435,9 @@ main( int argc, char** argv )
}
if ( FD_ISSET( listener, &rfds ) ) {
AddrInfo addr;
memset( &addr, 0, sizeof(addr) );
socklen_t siz = sizeof(addr.u.addr_in);
int newSock = accept( listener, &addr.u.addr, &siz );
AddrInfo::AddrUnion saddr;
socklen_t siz = sizeof(saddr.addr_in);
int newSock = accept( listener, &saddr.addr, &siz );
if ( newSock < 0 ) {
logf( XW_LOGERROR, "accept failed: errno(%d)=%s",
errno, strerror(errno) );
@ -1457,10 +1456,9 @@ main( int argc, char** argv )
logf( XW_LOGINFO,
"%s: accepting connection from %s on socket %d",
__func__, inet_ntoa(addr.u.addr_in.sin_addr), newSock );
__func__, inet_ntoa(saddr.addr_in.sin_addr), newSock );
addr.m_socket = newSock;
addr.setIsTCP( true );
AddrInfo addr( true, newSock, &saddr );
tPool->AddSocket( perGame ? XWThreadPool::STYPE_GAME
: XWThreadPool::STYPE_PROXY,
&addr );