relay improvements for UDP connection: record addresses, prepare to

run in separate thread, etc.
This commit is contained in:
Eric House 2013-01-18 07:10:47 -08:00
parent 932778f700
commit 1c5fef6a38
13 changed files with 205 additions and 120 deletions

View file

@ -32,7 +32,7 @@ AddrInfo::equals( const AddrInfo& other ) const
if ( isTCP() ) {
equal = m_socket == other.m_socket;
} else {
assert( m_socket == other.m_socket ); /* both same UDP socket */
// assert( m_socket == other.m_socket ); /* both same UDP socket */
/* what does equal mean on udp addresses? Same host, or same host AND game */
equal = m_clientToken == other.m_clientToken
&& 0 == memcmp( &m_saddr, &other.m_saddr, sizeof(m_saddr) );

View file

@ -28,6 +28,8 @@
class AddrInfo {
public:
typedef uint32_t ClientToken;
typedef union _AddrUnion {
struct sockaddr addr;
struct sockaddr_in addr_in;
@ -43,7 +45,11 @@ class AddrInfo {
construct( socket, saddr, true );
}
AddrInfo( int socket, uint32_t clientToken, const AddrUnion* saddr ) {
AddrInfo( int socket, ClientToken clientToken, const AddrUnion* saddr ) {
init( socket, clientToken, saddr );
}
void init( int socket, ClientToken clientToken, const AddrUnion* saddr ) {
construct( socket, saddr, false );
m_clientToken = clientToken;
}
@ -51,9 +57,10 @@ class AddrInfo {
void setIsTCP( bool val ) { m_isTCP = val; }
bool isTCP() const { return m_isTCP; } /* later UDP will be here too */
int socket() const { assert(m_isValid); return m_socket; }
uint32_t clientToken() const { assert(m_isValid); return m_clientToken; }
ClientToken clientToken() const { assert(m_isValid); return m_clientToken; }
struct in_addr sin_addr() const { return m_saddr.addr_in.sin_addr; }
const struct sockaddr* sockaddr() const { assert(m_isValid); return &m_saddr.addr; }
const AddrUnion* saddr() const { assert(m_isValid); return &m_saddr; }
bool equals( const AddrInfo& other ) const;
@ -71,7 +78,7 @@ class AddrInfo {
int m_socket;
bool m_isTCP;
bool m_isValid;
uint32_t m_clientToken; /* must be 32 bit */
ClientToken m_clientToken; /* must be 32 bit */
AddrUnion m_saddr;
};

View file

@ -40,6 +40,7 @@
#include "timermgr.h"
#include "configs.h"
#include "crefmgr.h"
#include "devmgr.h"
#include "permid.h"
using namespace std;
@ -261,7 +262,7 @@ CookieRef::_HandleAck( HostID hostID )
void
CookieRef::_PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen )
const unsigned char* buf, int buflen )
{
CRefEvent evt( XWE_PROXYMSG, addr );
evt.u.fwd.src = srcID;
@ -308,14 +309,14 @@ CookieRef::_Shutdown()
HostID
CookieRef::HostForSocket( const AddrInfo* addr )
{
HostID hid = -1;
HostID hid = HOST_ID_NONE;
ASSERT_LOCKED();
RWReadLock rrl( &m_socketsRWLock );
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_addr.equals( *addr ) ) {
hid = iter->m_hostID;
logf( XW_LOGINFO, "%s: assigning hid of %d", __func__, hid );
assert( HOST_ID_NONE != hid );
break;
}
}
@ -404,7 +405,7 @@ CookieRef::notifyDisconn( const CRefEvent* evt )
evt->u.disnote.why
};
send_with_length( &evt->addr, buf, sizeof(buf), true );
send_with_length( &evt->addr, HOST_ID_NONE, buf, sizeof(buf), true );
} /* notifyDisconn */
void
@ -522,7 +523,7 @@ CookieRef::_CheckHeartbeats( time_t now )
void
CookieRef::_Forward( HostID src, const AddrInfo* addr,
HostID dest, unsigned char* buf, int buflen )
HostID dest, const unsigned char* buf, int buflen )
{
pushForwardEvent( src, addr, dest, buf, buflen );
handleEvents();
@ -587,7 +588,7 @@ CookieRef::pushHeartFailedEvent( int socket )
void
CookieRef::pushForwardEvent( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen )
const unsigned char* buf, int buflen )
{
logf( XW_LOGVERBOSE1, "pushForwardEvent: %d -> %d", src, dest );
CRefEvent evt( XWE_FORWARDMSG, addr );
@ -807,12 +808,19 @@ CookieRef::handleEvents()
} /* handleEvents */
bool
CookieRef::send_with_length( const AddrInfo* addr,
unsigned char* buf, int bufLen, bool cascade )
CookieRef::send_with_length( const AddrInfo* addr, HostID dest,
const unsigned char* buf, int bufLen, bool cascade )
{
bool failed = false;
if ( send_with_length_unsafe( addr, buf, bufLen ) ) {
DBMgr::Get()->RecordSent( ConnName(), HostForSocket(addr), bufLen );
if ( HOST_ID_NONE == dest ) {
dest = HostForSocket(addr);
}
if ( HOST_ID_NONE != dest ) {
DBMgr::Get()->RecordSent( ConnName(), dest, bufLen );
} else {
logf( XW_LOGERROR, "%s: no hid for addr", __func__ );
}
} else {
failed = true;
}
@ -857,7 +865,7 @@ CookieRef::send_stored_messages( HostID dest, const AddrInfo* addr )
buf, &buflen, &msgID ) ) {
break;
}
if ( ! send_with_length( addr, buf, buflen, true ) ) {
if ( ! send_with_length( addr, dest, buf, buflen, true ) ) {
break;
}
DBMgr::Get()->RemoveStoredMessages( &msgID, 1 );
@ -908,6 +916,8 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
evt->u.con.clientVersion, nPlayersH, seed,
&evt->addr, devID, reconn );
DevMgr::Get()->Remember( devID, &evt->addr );
HostID hostid = evt->u.con.srcID;
if ( NULL != hidp ) {
*hidp = hostid;
@ -915,8 +925,7 @@ CookieRef::increasePlayerCounts( CRefEvent* evt, bool reconn, HostID* hidp,
/* first add the rec here, whether it'll get ack'd or not */
logf( XW_LOGINFO, "%s: remembering pair: hostid=%x, "
"socket=%d (size=%d)",
__func__, hostid, socket, m_sockets.size());
"(size=%d)", __func__, hostid, m_sockets.size());
assert( m_sockets.size() < 4 );
@ -1104,7 +1113,7 @@ CookieRef::sendResponse( const CRefEvent* evt, bool initial,
}
}
send_with_length( &evt->addr, buf, bufp - buf, true );
send_with_length( &evt->addr, evt->u.con.srcID, buf, bufp - buf, true );
logf( XW_LOGVERBOSE0, "sent %s", cmdToStr( XWRELAY_Cmd(buf[0]) ) );
} /* sendResponse */
@ -1120,11 +1129,13 @@ CookieRef::sendAnyStored( const CRefEvent* evt )
void
CookieRef::forward_or_store( const CRefEvent* evt )
{
unsigned char* buf = evt->u.fwd.buf;
AddrInfo addr; // invalid unless assigned to
const unsigned char* cbuf = evt->u.fwd.buf;
do {
/* This is an ugly hack!!!! */
int buflen = evt->u.fwd.buflen;
unsigned char buf[buflen];
if ( *buf == XWRELAY_MSG_TORELAY ) {
*buf = XWRELAY_MSG_FROMRELAY;
buf[0] = XWRELAY_MSG_FROMRELAY;
} else if ( *buf == XWRELAY_MSG_TORELAY_NOCONN ) {
*buf = XWRELAY_MSG_FROMRELAY_NOCONN;
} else {
@ -1133,7 +1144,8 @@ CookieRef::forward_or_store( const CRefEvent* evt )
break;
}
int buflen = evt->u.fwd.buflen;
memcpy( &buf[1], &cbuf[1], buflen-1 );
HostID dest = evt->u.fwd.dest;
const AddrInfo* destAddr = SocketForHost( dest );
@ -1141,8 +1153,22 @@ CookieRef::forward_or_store( const CRefEvent* evt )
usleep( m_delayMicros );
}
// If recipient GAME isn't connected, see if owner device is and can
// receive
if ( NULL == destAddr) {
DevIDRelay devid;
AddrInfo::ClientToken token;
if ( DBMgr::Get()->TokenFor( ConnName(), dest, &devid, &token ) ) {
const AddrInfo::AddrUnion* saddr = DevMgr::Get()->get( devid );
if ( !!saddr ) {
addr.init( -1, token, saddr );
destAddr = &addr;
}
}
}
if ( (NULL == destAddr)
|| !send_with_length( destAddr, buf, buflen, true ) ) {
|| !send_with_length( destAddr, dest, buf, buflen, true ) ) {
store_message( dest, buf, buflen );
}
@ -1162,7 +1188,7 @@ CookieRef::send_denied( const CRefEvent* evt, XWREASON why )
}
void
CookieRef::send_msg( const AddrInfo* addr, HostID id,
CookieRef::send_msg( const AddrInfo* addr, HostID hid,
XWRelayMsg msg, XWREASON why, bool cascade )
{
unsigned char buf[10];
@ -1173,7 +1199,7 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id,
switch ( msg ) {
case XWRELAY_DISCONNECT_OTHER:
buf[len++] = why;
tmp = htons( id );
tmp = htons( hid );
memcpy( &buf[len], &tmp, 2 );
len += 2;
break;
@ -1183,7 +1209,7 @@ CookieRef::send_msg( const AddrInfo* addr, HostID id,
}
assert( len <= sizeof(buf) );
send_with_length( addr, buf, len, cascade );
send_with_length( addr, HOST_ID_NONE, buf, len, cascade );
} /* send_msg */
void
@ -1210,7 +1236,7 @@ CookieRef::notifyGameDead( const AddrInfo* addr )
,XWRELAY_ERROR_DELETED
};
send_with_length( addr, buf, sizeof(buf), true );
send_with_length( addr, HOST_ID_NONE, buf, sizeof(buf), true );
}
/* void */
@ -1263,7 +1289,7 @@ CookieRef::sendAllHere( bool initial )
vector<HostRec>::const_iterator iter;
for ( iter = m_sockets.begin(); iter != m_sockets.end(); ++iter ) {
if ( iter->m_hostID == dest ) {
sent = send_with_length( &iter->m_addr, buf, bufp-buf, true );
sent = send_with_length( &iter->m_addr, dest, buf, bufp-buf, true );
break;
}
}

View file

@ -33,6 +33,7 @@
#include "devid.h"
#include "dbmgr.h"
#include "states.h"
#include "addrinfo.h"
typedef vector<unsigned char> MsgBuffer;
typedef deque<MsgBuffer*> MsgBufQueue;
@ -131,14 +132,14 @@ class CookieRef {
int seed, const AddrInfo* addr, bool gameDead );
void _HandleAck( HostID hostID );
void _PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen );
const unsigned char* buf, int buflen );
void _Disconnect( const AddrInfo* addr, HostID hostID );
void _DeviceGone( HostID hostID, int seed );
void _Shutdown();
void _HandleHeartbeat( HostID id, const AddrInfo* addr );
void _CheckHeartbeats( time_t now );
void _Forward( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen );
const unsigned char* buf, int buflen );
void _Remove( const AddrInfo* addr );
void _CheckAllConnected();
void _CheckNotAcked( HostID hid );
@ -159,7 +160,7 @@ class CookieRef {
struct {
HostID src;
HostID dest;
unsigned char* buf;
const unsigned char* buf;
int buflen;
} fwd;
struct {
@ -194,8 +195,8 @@ class CookieRef {
} u;
};
bool send_with_length( const AddrInfo* addr,
unsigned char* buf, int bufLen, bool cascade );
bool send_with_length( const AddrInfo* addr, HostID hid,
const unsigned char* buf, int bufLen, bool cascade );
void send_msg( const AddrInfo* addr, HostID id,
XWRelayMsg msg, XWREASON why, bool cascade );
void pushConnectEvent( int clientVersion, DevID* devID,
@ -208,7 +209,7 @@ class CookieRef {
void pushHeartFailedEvent( const AddrInfo* addr );
void pushForwardEvent( HostID src, const AddrInfo* addr,
HostID dest, unsigned char* buf, int buflen );
HostID dest, const unsigned char* buf, int buflen );
void pushDestBadEvent();
void pushLastSocketGoneEvent();
void pushGameDead( const AddrInfo* addr );

View file

@ -405,11 +405,11 @@ CidInfo*
CRefMgr::getCookieRef( CookieID cid, bool failOk )
{
CidInfo* cinfo = NULL;
for ( ; ; ) {
for ( int count = 0; ; ++count ) {
cinfo = m_cidlock->Claim( cid );
if ( NULL != cinfo->GetRef() ) {
break;
} else if ( failOk ) {
} else if ( failOk || count > 20 ) {
break;
}
m_cidlock->Relinquish( cinfo, true );

View file

@ -186,7 +186,7 @@ class SafeCref {
~SafeCref();
bool Forward( HostID src, const AddrInfo* addr, HostID dest,
unsigned char* buf, int buflen ) {
const unsigned char* buf, int buflen ) {
if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() );
@ -198,7 +198,7 @@ class SafeCref {
}
void PutMsg( HostID srcID, const AddrInfo* addr, HostID destID,
unsigned char* buf, int buflen ) {
const unsigned char* buf, int buflen ) {
if ( IsValid() ) {
CookieRef* cref = m_cinfo->GetRef();
assert( 0 != cref->GetCid() );

View file

@ -577,6 +577,32 @@ DBMgr::PublicRooms( int lang, int nPlayers, int* nNames, string& names )
*nNames = nTuples;
}
bool
DBMgr::TokenFor( const char* const connName, int hid, DevIDRelay* devid,
AddrInfo::ClientToken* token )
{
bool found = false;
const char* fmt = "SELECT tokens[%d], devids[%d] FROM " GAMES_TABLE
" WHERE connName='%s'";
string query;
string_printf( query, fmt, hid, hid, connName );
PGresult* result = PQexec( getThreadConn(), query.c_str() );
if ( 1 == PQntuples( result ) ) {
AddrInfo::ClientToken token_tmp = atoi( PQgetvalue( result, 0, 0 ) );
DevIDRelay devid_tmp = atoi( PQgetvalue( result, 0, 1 ) );
if ( 0 != token_tmp // 0 is illegal (legacy/unset) value
&& 0 != devid_tmp ) {
*token = token_tmp;
*devid = devid_tmp;
found = true;
}
}
PQclear( result );
logf( XW_LOGINFO, "%s(%s,%d)=>%s (%d, %d)", __func__, connName, hid,
(found?"true":"false"), *devid, *token );
return found;
}
int
DBMgr::PendingMsgCount( const char* connName, int hid )
{

View file

@ -88,6 +88,10 @@ class DBMgr {
queries.*/
void PublicRooms( int lang, int nPlayers, int* nNames, string& names );
/* Get stored address info, if available and valid */
bool TokenFor( const char* const connName, int hid, DevIDRelay* devid,
AddrInfo::ClientToken* token );
/* Return number of messages pending for connName:hostid pair passed in */
int PendingMsgCount( const char* const connName, int hid );

View file

@ -26,7 +26,7 @@ static DevMgr* s_instance = NULL;
/* static */ DevMgr*
DevMgr::Get()
{
if ( s_instance == NULL ) {
if ( NULL == s_instance ) {
s_instance = new DevMgr();
}
return s_instance;
@ -35,27 +35,33 @@ DevMgr::Get()
void
DevMgr::Remember( DevIDRelay devid, const AddrInfo::AddrUnion* saddr )
{
logf( XW_LOGINFO, "%s(devid=%d)", __func__, devid );
time_t now = time( NULL );
MutexLock ml( &m_mapLock );
UDPAddrRec rec( saddr, now );
MutexLock ml( &m_mapLock );
m_devAddrMap.insert( pair<DevIDRelay,UDPAddrRec>( devid, rec ) );
logf( XW_LOGINFO, "dev->addr map now contains %d entries", m_devAddrMap.size() );
}
#if 0 // not used yet
void
DevMgr::Remember( DevIDRelay devid, const AddrInfo* addr )
{
Remember( devid, addr->saddr() );
}
const AddrInfo::AddrUnion*
DevMgr::get( DevIDRelay devid )
{
const AddrInfo::AddrUnion* result = NULL;
MutexLock ml( &m_mapLock );
map<DevIDRelay,UDPAddrRec>::iterator iter;
map<DevIDRelay,UDPAddrRec>::const_iterator iter;
iter = m_devAddrMap.find( devid );
if ( m_devAddrMap.end() != iter ) {
result = &iter->second.m_addr;
logf( XW_LOGINFO, "%s: found addr for %.8x; is %d seconds old", __func__,
devid, time(NULL) - iter->second.m_added );
}
logf( XW_LOGINFO, "%s(devid=%d)=>%p", __func__, devid, result );
return result;
}
#endif

View file

@ -32,11 +32,14 @@ class DevMgr {
public:
static DevMgr* Get();
void Remember( DevIDRelay devid, const AddrInfo::AddrUnion* saddr );
void Remember( DevIDRelay devid, const AddrInfo* addr );
const AddrInfo::AddrUnion* get( DevIDRelay devid );
private:
DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); }
/* destructor's never called....
~DevMgr() { pthread_mutex_destroy( &m_mapLock ); }
*/
class UDPAddrRec {
public:

View file

@ -51,7 +51,7 @@ class XWThreadPool {
} ThreadInfo;
static XWThreadPool* GetTPool();
typedef bool (*packet_func)( unsigned char* buf, int bufLen,
typedef bool (*packet_func)( const unsigned char* buf, int bufLen,
const AddrInfo* from );
typedef void (*kill_func)( const AddrInfo* addr );

View file

@ -77,6 +77,7 @@
#include "dbmgr.h"
#include "addrinfo.h"
#include "devmgr.h"
#include "udpqueue.h"
static int s_nSpawns = 0;
static int g_maxsocks = -1;
@ -180,7 +181,7 @@ cmdToStr( XWRELAY_Cmd cmd )
}
static bool
parseRelayID( unsigned char** const inp, const unsigned char* const end,
parseRelayID( const unsigned char** const inp, const unsigned char* const end,
char* buf, int buflen, HostID* hid )
{
const char* hidp = strchr( (char*)*inp, '/' );
@ -207,7 +208,7 @@ parseRelayID( unsigned char** const inp, const unsigned char* const end,
}
static bool
getNetShort( unsigned char** bufpp, const unsigned char* end,
getNetShort( const unsigned char** bufpp, const unsigned char* end,
unsigned short* out )
{
unsigned short tmp;
@ -221,7 +222,7 @@ getNetShort( unsigned char** bufpp, const unsigned char* end,
} /* getNetShort */
static bool
getNetByte( unsigned char** bufpp, const unsigned char* end,
getNetByte( const unsigned char** bufpp, const unsigned char* end,
unsigned char* out )
{
bool ok = *bufpp < end;
@ -233,7 +234,7 @@ getNetByte( unsigned char** bufpp, const unsigned char* end,
} /* getNetByte */
static bool
getNetString( unsigned char** bufpp, const unsigned char* end, string& out )
getNetString( const unsigned char** bufpp, const unsigned char* end, string& out )
{
char* str = (char*)*bufpp;
size_t len = 1 + strlen( str );
@ -247,7 +248,7 @@ getNetString( unsigned char** bufpp, const unsigned char* end, string& out )
}
static void
getDevID( unsigned char** bufpp, const unsigned char* end,
getDevID( const unsigned char** bufpp, const unsigned char* end,
unsigned short flags, DevID* devID )
{
if ( XWRELAY_PROTO_VERSION_CLIENTID <= flags ) {
@ -285,7 +286,7 @@ processHeartbeat( unsigned char* buf, int bufLen, int socket )
#endif
static bool
readStr( unsigned char** bufp, const unsigned char* end,
readStr( const unsigned char** bufp, const unsigned char* end,
char* outBuf, int bufLen )
{
unsigned char clen = **bufp;
@ -300,7 +301,7 @@ readStr( unsigned char** bufp, const unsigned char* end,
} /* readStr */
static XWREASON
flagsOK( unsigned char** bufp, unsigned char const* end,
flagsOK( const unsigned char** bufp, unsigned char const* end,
unsigned short* clientVersion, unsigned short* flagsp )
{
XWREASON err = XWRELAY_ERROR_OLDFLAGS;
@ -348,12 +349,13 @@ send_via_udp( int socket, const struct sockaddr *dest_addr,
/* No mutex here. Caller better be ensuring no other thread can access this
* socket. */
bool
send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
size_t bufLen )
{
assert( !!addr );
bool ok = false;
int socket = addr->socket();
if ( addr->isTCP() ) {
unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 );
@ -365,7 +367,7 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
}
}
} else {
uint32_t clientToken = addr->clientToken();
AddrInfo::ClientToken clientToken = addr->clientToken();
assert( 0 != clientToken );
unsigned char tmpbuf[1 + 1 + sizeof(clientToken) + bufLen];
tmpbuf[0] = XWREG_PROTO_VERSION;
@ -374,7 +376,11 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
memcpy( &tmpbuf[2], &clientToken, sizeof(clientToken) );
memcpy( &tmpbuf[2 + sizeof(clientToken)], buf, bufLen );
const struct sockaddr* saddr = addr->sockaddr();
send_via_udp( g_udpsock, saddr, tmpbuf, sizeof(tmpbuf) );
assert( g_udpsock == socket || socket == -1 );
if ( -1 == socket ) {
socket = g_udpsock;
}
send_via_udp( socket, saddr, tmpbuf, sizeof(tmpbuf) );
logf( XW_LOGINFO, "sent %d bytes on UDP socket %d", bufLen, socket );
ok = true;
}
@ -399,10 +405,10 @@ send_with_length_unsafe( const AddrInfo* addr, unsigned char* buf,
* game?
*/
static bool
processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processConnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
char cookie[MAX_INVITE_LEN+1];
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
bool success = false;
cookie[0] = '\0';
@ -456,9 +462,9 @@ processConnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
} /* processConnect */
static bool
processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processReconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
bool success = false;
logf( XW_LOGINFO, "%s()", __func__ );
@ -511,10 +517,10 @@ processReconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
} /* processReconnect */
static bool
processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processAck( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
bool success = false;
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
HostID srcID;
if ( getNetByte( &bufp, end, &srcID ) ) {
SafeCref scr( addr );
@ -524,9 +530,9 @@ processAck( unsigned char* bufp, int bufLen, const AddrInfo* addr )
}
static bool
processDisconnect( unsigned char* bufp, int bufLen, const AddrInfo* addr )
processDisconnect( const unsigned char* bufp, int bufLen, const AddrInfo* addr )
{
unsigned char* end = bufp + bufLen;
const unsigned char* end = bufp + bufLen;
CookieID cookieID;
HostID hostID;
bool success = false;
@ -577,11 +583,11 @@ GetNSpawns(void)
/* forward the message. Need only change the command after looking up the
* socket and it's ready to go. */
static bool
forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr )
forwardMessage( const unsigned char* buf, int buflen, const AddrInfo* addr )
{
bool success = false;
unsigned char* bufp = buf + 1; /* skip cmd */
unsigned char* end = buf + buflen;
const unsigned char* bufp = buf + 1; /* skip cmd */
const unsigned char* end = buf + buflen;
CookieID cookieID;
HostID src;
HostID dest;
@ -603,7 +609,7 @@ forwardMessage( unsigned char* buf, int buflen, const AddrInfo* addr )
} /* forwardMessage */
static bool
processMessage( unsigned char* buf, int bufLen, const AddrInfo* addr )
processMessage( const unsigned char* buf, int bufLen, const AddrInfo* addr )
{
bool success = false; /* default is failure */
XWRELAY_Cmd cmd = *buf;
@ -839,7 +845,7 @@ pushMsgs( vector<unsigned char>& out, DBMgr* dbmgr, const char* connName,
static void
handleMsgsMsg( const AddrInfo* addr, bool sendFull,
unsigned char* bufp, const unsigned char* end )
const unsigned char* bufp, const unsigned char* end )
{
unsigned short nameCount;
int ii;
@ -939,8 +945,8 @@ log_hex( const unsigned char* memp, int len, const char* tag )
} // log_hex
static void
handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
unsigned char* end )
handleProxyMsgs( int sock, const AddrInfo* addr, const unsigned char* bufp,
const unsigned char* end )
{
// log_hex( bufp, end-bufp, __func__ );
unsigned short nameCount;
@ -975,7 +981,7 @@ handleProxyMsgs( int sock, const AddrInfo* addr, unsigned char* bufp,
HostID dest;
XWRELAY_Cmd cmd;
if ( getNetShort( &bufp, end, &len ) ) {
unsigned char* start = bufp;
const unsigned char* start = bufp;
if ( getNetByte( &bufp, end, &cmd )
&& getNetByte( &bufp, end, &src )
&& getNetByte( &bufp, end, &dest ) ) {
@ -1001,8 +1007,8 @@ handle_proxy_packet( unsigned char* buf, int len, const AddrInfo* addr )
if ( len > 0 ) {
assert( addr->isTCP() );
int socket = addr->socket();
unsigned char* bufp = buf;
unsigned char* end = bufp + len;
const unsigned char* bufp = buf;
const unsigned char* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */
XWPRXYCMD cmd = (XWPRXYCMD)*bufp++;
switch( cmd ) {
@ -1097,12 +1103,58 @@ registerDevice( const DevID* devID, const AddrInfo::AddrUnion* saddr )
DevMgr::Get()->Remember( relayID, saddr );
}
static void
udp_thread_proc( UdpThreadClosure* utc )
{
const unsigned char* ptr = utc->buf();
const unsigned char* end = ptr + utc->len();
unsigned char proto = *ptr++;
if ( XWREG_PROTO_VERSION != 0 ) {
logf( XW_LOGERROR, "unexpected proto %d", __func__, (int) proto );
} else {
int msg = *ptr++;
switch( msg ) {
case XWRREG_REG: {
DevIDType typ = (DevIDType)*ptr++;
unsigned short idLen;
if ( !getNetShort( &ptr, end, &idLen ) ) {
break;
}
if ( end - ptr > idLen ) {
logf( XW_LOGERROR, "full devID not received" );
break;
}
DevID devID( typ );
devID.m_devIDString.append( (const char*)ptr, idLen );
ptr += idLen;
registerDevice( &devID, utc->saddr() );
}
break;
case XWRREG_MSG: {
AddrInfo::ClientToken clientToken;
memcpy( &clientToken, ptr, sizeof(clientToken) );
ptr += sizeof(clientToken);
clientToken = ntohl( clientToken );
if ( 0 != clientToken ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
(void)processMessage( ptr, end - ptr, &addr );
} else {
logf( XW_LOGERROR, "%s: dropping packet with token of 0" );
}
}
break;
default:
logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, msg );
}
}
}
// This will need to be done in a thread before there can be simulaneous
// connections.
static void
handle_udp_packet( int udpsock )
{
bool success = false;
logf( XW_LOGINFO, "%s()", __func__ );
unsigned char buf[512];
AddrInfo::AddrUnion saddr;
@ -1111,52 +1163,10 @@ handle_udp_packet( int udpsock )
ssize_t nRead = recvfrom( udpsock, buf, sizeof(buf), 0 /*flags*/,
&saddr.addr, &fromlen );
if ( 2 <= nRead ) {
unsigned char* ptr = buf;
unsigned char* end = buf + nRead;
logf( XW_LOGINFO, "%s: recvfrom=>%d", __func__, nRead );
unsigned char proto = *ptr++;
if ( XWREG_PROTO_VERSION != 0 ) {
logf( XW_LOGERROR, "unexpected proto %d", __func__, (int) proto );
} else {
int msg = *ptr++;
switch( msg ) {
case XWRREG_REG: {
DevIDType typ = (DevIDType)*ptr++;
unsigned short idLen;
if ( !getNetShort( &ptr, end, &idLen ) ) {
break;
}
if ( end - ptr > idLen ) {
logf( XW_LOGERROR, "full devID not received" );
break;
}
DevID devID( typ );
devID.m_devIDString.append( (const char*)ptr, idLen );
ptr += idLen;
registerDevice( &devID, &saddr );
}
break;
case XWRREG_MSG: {
uint32_t clientToken;
memcpy( &clientToken, ptr, sizeof(clientToken) );
ptr += sizeof(clientToken);
clientToken = ntohl( clientToken );
if ( 0 != clientToken ) {
AddrInfo addr( udpsock, clientToken, &saddr );
success = processMessage( ptr, end - ptr, &addr );
} else {
logf( XW_LOGERROR, "%s: dropping packet with token of 0" );
}
}
break;
default:
logf( XW_LOGERROR, "%s: unexpected msg %d", __func__, msg );
}
}
logf( XW_LOGINFO, "%s: recvfrom=>%d", __func__, utc->len() );
if ( 0 < nRead ) {
UdpQueue::get()->handle( &saddr, buf, nRead, udp_thread_proc );
}
logf( XW_LOGINFO, "%s()=>%d", __func__, success );
}
/* From stack overflow, toward a snprintf with an expanding buffer.
@ -1498,7 +1508,9 @@ main( int argc, char** argv )
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(udpport);
int err = bind( g_udpsock, (struct sockaddr*)&saddr, sizeof(saddr) );
if ( 0 != err ) {
if ( 0 == err ) {
err = fcntl( g_udpsock, F_SETFL, O_NONBLOCK );
} else {
logf( XW_LOGERROR, "bind()=>%s", strerror(errno) );
g_udpsock = -1;
}

View file

@ -45,7 +45,7 @@ void logf( XW_LogLevel level, const char* format, ... );
void denyConnection( const AddrInfo* addr, XWREASON err );
bool send_with_length_unsafe( const AddrInfo* addr,
unsigned char* buf, size_t bufLen );
const unsigned char* buf, size_t bufLen );
time_t uptime(void);