Start adding ability to listen on multiple sockets and to dynamically

change the set listened on.  There's still some debugging to do but
nothing that worked before is broken.  Also begin to accept unique
prefixes (e.g. g for get) for commands and attributes on the control
port.  Note that relay-related code in comms seems broken now, but is
without this checkin.
This commit is contained in:
ehouse 2007-12-01 15:00:30 +00:00
parent a560c9f8a8
commit 25c4368231
10 changed files with 459 additions and 87 deletions

View file

@ -26,6 +26,7 @@ SRC = xwrelay.cpp \
configs.cpp \
crefmgr.cpp \
permid.cpp \
lstnrmgr.cpp \
# STATIC ?= -static

View file

@ -63,6 +63,13 @@ RelayConfigs::RelayConfigs( const char* cfile )
parse( cfile );
} /* RelayConfigs::RelayConfigs */
void
RelayConfigs::GetPorts( std::vector<int>::const_iterator* iter, std::vector<int>::const_iterator* end)
{
*iter = m_ports.begin();
*end = m_ports.end();
}
void
RelayConfigs::parse( const char* fname )
{
@ -99,7 +106,7 @@ RelayConfigs::parse( const char* fname )
} else if ( 0 == strcmp( line, "CTLPORT" ) ) {
m_ctrlport = atoi( value );
} else if ( 0 == strcmp( line, "PORT" ) ) {
m_port = atoi( value );
m_ports.push_back( atoi( value ) );
} else if ( 0 == strcmp( line, "NTHREADS" ) ) {
m_nWorkerThreads = atoi( value );
} else if ( 0 == strcmp( line, "SERVERNAME" ) ) {

View file

@ -22,8 +22,11 @@
#define _CONFIGS_H_
#include <string>
#include <vector>
#include "xwrelay_priv.h"
using namespace std;
class RelayConfigs {
public:
@ -32,7 +35,7 @@ class RelayConfigs {
~RelayConfigs() {}
int GetPort() { return m_port; }
void GetPorts( vector<int>::const_iterator* iter, vector<int>::const_iterator* end);
int GetCtrlPort() { return m_ctrlport; }
int GetNWorkerThreads() { return m_nWorkerThreads; }
time_t GetAllConnectedInterval() { return m_allConnInterval; }
@ -49,11 +52,11 @@ class RelayConfigs {
time_t m_allConnInterval;
time_t m_heartbeatInterval;
int m_ctrlport;
int m_port;
vector<int> m_ports;
int m_logLevel;
int m_nWorkerThreads;
std::string m_serverName;
std::string m_idFileName;
string m_serverName;
string m_idFileName;
static RelayConfigs* instance;
};

View file

@ -127,7 +127,6 @@ class CookieRef {
} heart;
struct {
time_t now;
vector<int>* victims;
} htime;
struct {
int socket;

View file

@ -44,6 +44,7 @@
#include "mlock.h"
#include "xwrelay_priv.h"
#include "configs.h"
#include "lstnrmgr.h"
/* this is *only* for testing. Don't abuse!!!! */
extern pthread_rwlock_t gCookieMapRWLock;
@ -74,6 +75,32 @@ static bool cmd_rev( int socket, const char** args );
static bool cmd_uptime( int socket, const char** args );
static bool cmd_crash( int socket, const char** args );
static int
match( string* cmd, char * const* first, int incr, int count )
{
int cmdlen = cmd->length();
int nFound = 0;
const char* cmdFound = NULL;
int which = -1;
int i;
for ( i = 0; i < count; ++i ) {
logf( XW_LOGINFO, "comparing %s,%s", *first, cmd );
if ( 0 == strncmp( cmd->c_str(), *first, cmdlen ) ) {
++nFound;
which = i;
cmdFound = *first;
}
first = (char* const*)(((char*)first) + incr);
}
if ( nFound == 1 ) {
cmd->assign(cmdFound);
} else {
which = -1;
}
return which;
}
static void
print_to_sock( int sock, bool addCR, const char* what, ... )
{
@ -197,42 +224,88 @@ cmd_kill_eject( int socket, const char** args )
static bool
cmd_get( int socket, const char** args )
{
if ( 0 == strcmp( args[1], "help" ) ) {
print_to_sock( socket, true,
"* %s -- lists all attributes (unimplemented)\n"
"* %s loglevel",
args[0], args[0] );
} else {
const char* attr = args[1];
if ( (NULL != attr) && (0 == strcmp( attr, "loglevel" )) ) {
bool needsHelp = true;
string attr(args[1]);
char* const attrs[] = { "help", "listeners", "loglevel" };
int index = match( &attr, attrs, sizeof(attrs[0]),
sizeof(attrs)/sizeof(attrs[0]));
switch( index ) {
case 0:
break;
case 1: {
char buf[128];
int len = 0;
ListenersIter iter(&g_listeners, false);
for ( ; ; ) {
int listener = iter.next();
if ( listener == -1 ) {
break;
}
len += snprintf( &buf[len], sizeof(buf)-len, "%d,", listener );
}
print_to_sock( socket, true, "%s", buf );
needsHelp = false;
}
break;
case 2: {
RelayConfigs* rc = RelayConfigs::GetConfigs();
if ( NULL != rc ) {
print_to_sock( socket, true, "loglevel=%d\n",
rc->GetLogLevel() );
needsHelp = false;
} else {
logf( XW_LOGERROR, "RelayConfigs::GetConfigs() => NULL" );
}
}
break;
default:
print_to_sock( socket, true, "unknown or ambiguous attribute: %s", attr.c_str() );
}
if ( needsHelp ) {
/* includes help */
print_to_sock( socket, false,
"* %s -- lists all attributes (unimplemented)\n"
"* %s listener\n"
"* %s loglevel\n"
, args[0], args[0], args[0] );
}
return false;
}
} /* cmd_get */
static bool
cmd_set( int socket, const char** args )
{
if ( 0 == strcmp( args[1], "help" ) ) {
print_to_sock( socket, true, "* %s loglevel <n>", args[0] );
} else {
bool needsHelp = true;
if ( 0 == strcmp( args[1], "loglevel" ) ) {
const char* attr = args[1];
const char* val = args[2];
if ( (NULL != attr)
&& (0 == strcmp( attr, "loglevel" ))
&& (NULL != val) ) {
if ( (NULL != attr) && (NULL != val) ) {
RelayConfigs* rc = RelayConfigs::GetConfigs();
if ( rc != NULL ) {
rc->SetLogLevel( atoi(val) );
needsHelp = false;
}
}
} else if ( 0 == strcmp( args[1], "listeners" ) ) {
istringstream str( args[2] );
vector<int> sv;
while ( !str.eof() ) {
int sock;
char comma;
str >> sock >> comma;
logf( XW_LOGERROR, "%s: read %d", __func__, sock );
sv.push_back( sock );
}
g_listeners.SetAll( &sv );
}
if ( needsHelp ) {
print_to_sock( socket, true, "* %s loglevel <n>", args[0] );
}
return false;
}
@ -428,72 +501,53 @@ print_prompt( int socket )
print_to_sock( socket, false, "=> " );
}
static bool
dispatch_command( int sock, const char** args )
{
bool result = false;
const char* cmd = args[0];
const FuncRec* fp = gFuncs;
const FuncRec* last = fp + (sizeof(gFuncs) / sizeof(gFuncs[0]));
while ( fp < last ) {
if ( 0 == strcmp( cmd, fp->name ) ) {
result = (*fp->func)( sock, args );
break;
}
++fp;
}
if ( fp == last ) {
print_to_sock( sock, 1, "unknown command: \"%s\"", cmd );
result = cmd_help( sock, args );
}
return result;
}
static void*
ctrl_thread_main( void* arg )
{
int socket = (int)arg;
int sock = (int)arg;
{
MutexLock ml( &g_ctrlSocksMutex );
g_ctrlSocks.push_back( socket );
g_ctrlSocks.push_back( sock );
}
for ( ; ; ) {
string arg0, arg1, arg2, arg3;
print_prompt( socket );
string cmd, arg1, arg2, arg3;
print_prompt( sock );
char buf[512];
ssize_t nGot = recv( socket, buf, sizeof(buf)-1, 0 );
ssize_t nGot = recv( sock, buf, sizeof(buf)-1, 0 );
if ( nGot <= 1 ) { /* break when just \n comes in */
break;
} else if ( nGot > 2 ) {
/* if nGot is 2, reuse prev string */
buf[nGot] = '\0';
istringstream cmd( buf );
cmd >> arg0 >> arg1 >> arg2 >> arg3;
istringstream s( buf );
s >> cmd >> arg1 >> arg2 >> arg3;
}
int index = match( &cmd, (char*const*)&gFuncs[0].name, sizeof(gFuncs[0]),
sizeof(gFuncs)/sizeof(gFuncs[0]) );
const char* args[] = {
arg0.c_str(),
cmd.c_str(),
arg1.c_str(),
arg2.c_str(),
arg3.c_str()
};
if ( dispatch_command( socket, args ) ) {
if ( index == -1 ) {
print_to_sock( sock, 1, "unknown or ambiguous command: \"%s\"", cmd.c_str() );
(void)cmd_help( sock, args );
} else if ( (*gFuncs[index].func)( sock, args ) ) {
break;
}
}
close ( socket );
close ( sock );
MutexLock ml( &g_ctrlSocksMutex );
vector<int>::iterator iter = g_ctrlSocks.begin();
while ( iter != g_ctrlSocks.end() ) {
if ( *iter == socket ) {
if ( *iter == sock ) {
g_ctrlSocks.erase(iter);
break;
}

203
xwords4/relay/lstnrmgr.cpp Normal file
View file

@ -0,0 +1,203 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2007 by Eric House (xwords@eehouse.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <assert.h>
#include "lstnrmgr.h"
#include "mlock.h"
bool
ListenerMgr::AddListener( int port )
{
logf( XW_LOGINFO, "%s(%d)", __func__, port );
MutexLock ml( &m_mutex );
return addOne( port );
}
void
ListenerMgr::SetAll( const vector<int>* ivp )
{
logf( XW_LOGINFO, "%s", __func__ );
MutexLock ml( &m_mutex );
vector<int> have;
map<int,int>::iterator iter2 = m_socks_to_ports.begin();
while ( iter2 != m_socks_to_ports.end() ) {
have.push_back(iter2->second);
++iter2;
}
std::sort(have.begin(), have.end());
vector<int> want = *ivp;
std::sort(want.begin(), want.end());
/* Now go through both lists in order, removing and adding as
appropriate. */
size_t iWant = 0;
size_t iHave = 0;
while ( (iHave < have.size()) || (iWant < want.size()) ) {
assert( iHave <= have.size() && iWant <= want.size() );
while( (iWant < want.size())
&& ((iHave == have.size() || want[iWant] < have[iHave])) ) {
addOne( want[iWant] );
++iWant;
}
while ( (iHave < have.size())
&& (iWant == want.size() || (have[iHave] < want[iWant])) ) {
removePort( have[iHave] );
++iHave;
}
while ( (iHave < have.size()) && (iWant < want.size())
&& (have[iHave] == want[iWant]) ) {
/* keep both */
++iWant; ++iHave;
}
}
} /* SetAll */
/* void */
/* ListenerMgr::RemoveListener( int listener ) */
/* { */
/* MutexLock ml( &m_mutex ); */
/* removeFD( listener ); */
/* } */
void
ListenerMgr::RemoveAll()
{
MutexLock ml( &m_mutex );
for ( ; ; ) {
map<int,int>::const_iterator iter = m_socks_to_ports.begin();
if ( iter == m_socks_to_ports.end() ) {
break;
}
removeSocket( iter->first );
}
}
void
ListenerMgr::AddToFDSet( fd_set* rfds )
{
MutexLock ml( &m_mutex );
map<int,int>::const_iterator iter = m_socks_to_ports.begin();
while ( iter != m_socks_to_ports.end() ) {
FD_SET( iter->first, rfds );
++iter;
}
}
int
ListenerMgr::GetHighest()
{
int highest = 0;
MutexLock ml( &m_mutex );
map<int,int>::const_iterator iter = m_socks_to_ports.begin();
while ( iter != m_socks_to_ports.end() ) {
if ( iter->first > highest ) {
highest = iter->first;
}
++iter;
}
return highest;
}
bool
ListenerMgr::PortInUse( int port )
{
MutexLock ml( &m_mutex );
return portInUse( port );
}
void
ListenerMgr::removeSocket( int sock )
{
/* Assumption: we have the mutex! */
logf( XW_LOGINFO, "%s(%d)", __func__, sock );
map<int,int>::iterator iter = m_socks_to_ports.find( sock );
assert( iter != m_socks_to_ports.end() );
m_socks_to_ports.erase(iter);
close(sock);
}
void
ListenerMgr::removePort( int port )
{
/* Assumption: we have the mutex! */
logf( XW_LOGINFO, "%s(%d)", __func__, port );
map<int,int>::iterator iter = m_socks_to_ports.begin();
while ( iter != m_socks_to_ports.end() ) {
if ( iter->second == port ) {
int sock = iter->first;
close(sock);
m_socks_to_ports.erase(iter);
break;
}
++iter;
}
assert( iter != m_socks_to_ports.end() ); /* we must have found it! */
}
bool
ListenerMgr::addOne( int port )
{
logf( XW_LOGINFO, "%s(%d)", __func__, port );
/* Assumption: we have the mutex! */
assert( !portInUse(port) );
bool success = false;
int sock = make_socket( INADDR_ANY, port );
success = sock != -1;
if ( success ) {
m_socks_to_ports.insert( pair<int,int>(sock, port) );
}
return success;
}
bool
ListenerMgr::portInUse( int port )
{
/* Assumption: we have the mutex! */
bool found = false;
map<int,int>::const_iterator iter = m_socks_to_ports.begin();
while ( iter != m_socks_to_ports.end() ) {
if ( iter->second == port ) {
found = true;
break;
}
++iter;
}
return found;
}
int
ListenersIter::next()
{
int result = -1;
if ( m_iter != m_lm->m_socks_to_ports.end() ) {
result = m_fds? m_iter->first : m_iter->second;
++m_iter;
}
/* logf( XW_LOGINFO, "%s=>%d", __func__, result ); */
return result;
}

73
xwords4/relay/lstnrmgr.h Normal file
View file

@ -0,0 +1,73 @@
/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */
/*
* Copyright 2007 by Eric House (xwords@eehouse.org). All rights reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#ifndef _LSTNRMGR_H_
#define _LSTNRMGR_H_
#include <string>
#include <vector>
#include <map>
#include "xwrelay_priv.h"
using namespace std;
class ListenerMgr {
public:
void RemoveAll();
/* void RemoveListener( int listener ); */
bool AddListener( int port );
void SetAll( const vector<int>* iv ); /* replace current set with this new one */
void AddToFDSet( fd_set* rfds );
int GetHighest();
bool PortInUse( int port );
private:
void removeSocket( int sock );
void removePort( int port );
bool addOne( int listener );
bool portInUse( int port );
map<int,int> m_socks_to_ports;
pthread_mutex_t m_mutex;
friend class ListenersIter;
};
class ListenersIter {
public:
ListenersIter(ListenerMgr* lm, bool fds) {
m_fds = fds;
m_lm = lm;
pthread_mutex_lock( &m_lm->m_mutex );
m_iter = lm->m_socks_to_ports.begin();
}
~ListenersIter() {
pthread_mutex_unlock( &m_lm->m_mutex );
}
int next();
private:
bool m_fds;
map<int,int>::const_iterator m_iter;
ListenerMgr* m_lm;
};
#endif

View file

@ -15,6 +15,10 @@ ALLCONN=300
NTHREADS=5
# What port do we listen on for incomming connections?
PORT=10997
PORT=10998
PORT=10999
# intentional duplicate for testing
PORT=10999
# And the control port is?

View file

@ -71,6 +71,7 @@
#include "configs.h"
#include "timermgr.h"
#include "permid.h"
#include "lstnrmgr.h"
#define LOG_FILE_PATH "./xwrelay.log"
@ -395,7 +396,7 @@ processMessage( unsigned char* buf, int bufLen, int socket )
return success; /* caller defines non-0 as failure */
} /* processMessage */
static int
int
make_socket( unsigned long addr, unsigned short port )
{
int sock = socket( AF_INET, SOCK_STREAM, 0 );
@ -440,7 +441,8 @@ usage( char* arg0 )
fprintf( stderr,
"\t-? (print this help)\\\n"
"\t-c <cport> (localhost port for control console)\\\n"
"\t-d (don't become daemon)\\\n"
"\t-D (don't become daemon)\\\n"
"\t-F (don't fork and wait to respawn child)\\\n"
"\t-f <conffile> (config file)\\\n"
"\t-h (print this help)\\\n"
"\t-i <idfile> (file where next global id stored)\\\n"
@ -452,7 +454,7 @@ usage( char* arg0 )
}
/* sockets that need to be closable from interrupt handler */
int g_listener;
ListenerMgr g_listeners;
int g_control;
void
@ -473,7 +475,7 @@ shutdown()
stop_ctrl_threads();
close( g_listener );
g_listeners.RemoveAll();
close( g_control );
exit( 0 );
@ -519,6 +521,7 @@ int main( int argc, char** argv )
const char* serverName = NULL;
const char* idFileName = NULL;
bool doDaemon = true;
bool doFork = true;
/* Verify sizes here... */
assert( sizeof(CookieID) == 2 );
@ -529,7 +532,7 @@ int main( int argc, char** argv )
first. */
for ( ; ; ) {
int opt = getopt(argc, argv, "h?c:p:n:i:f:t:d" );
int opt = getopt(argc, argv, "h?c:p:n:i:f:t:DF" );
if ( opt == -1 ) {
break;
@ -542,9 +545,12 @@ int main( int argc, char** argv )
case 'c':
ctrlport = atoi( optarg );
break;
case 'd':
case 'D':
doDaemon = false;
break;
case 'F':
doFork = false;
break;
case 'f':
conffile = optarg;
break;
@ -575,9 +581,6 @@ int main( int argc, char** argv )
RelayConfigs::InitConfigs( conffile );
RelayConfigs* cfg = RelayConfigs::GetConfigs();
if ( port == 0 ) {
port = cfg->GetPort();
}
if ( ctrlport == 0 ) {
ctrlport = cfg->GetCtrlPort();
}
@ -621,7 +624,7 @@ int main( int argc, char** argv )
#ifdef SPAWN_SELF
/* loop forever, relaunching children as they die. */
for ( ; ; ) {
while ( doFork ) {
pid_t pid = fork();
if ( pid == 0 ) { /* child */
break;
@ -639,10 +642,21 @@ int main( int argc, char** argv )
prctl( PR_SET_PDEATHSIG, SIGUSR1 );
(void)signal( SIGUSR1, parentDied );
g_listener = make_socket( INADDR_ANY, port );
if ( g_listener == -1 ) {
exit( 1 );
if ( port != 0 ) {
g_listeners.AddListener( port );
}
vector<int>::const_iterator iter, end;
cfg->GetPorts( &iter, &end );
while ( iter != end ) {
int port = *iter;
if ( !g_listeners.PortInUse( port ) ) {
g_listeners.AddListener( port );
} else {
logf( XW_LOGERROR, "port %d was in use", port );
}
++iter;
}
g_control = make_socket( INADDR_LOOPBACK, ctrlport );
if ( g_control == -1 ) {
exit( 1 );
@ -661,10 +675,10 @@ int main( int argc, char** argv )
fd_set rfds;
for ( ; ; ) {
FD_ZERO(&rfds);
FD_SET( g_listener, &rfds );
g_listeners.AddToFDSet( &rfds );
FD_SET( g_control, &rfds );
int highest = g_listener;
if ( g_control > g_listener ) {
int highest = g_listeners.GetHighest();
if ( g_control > highest ) {
highest = g_control;
}
++highest;
@ -675,10 +689,18 @@ int main( int argc, char** argv )
logf( XW_LOGINFO, "errno: %s (%d)", strerror(errno), errno );
}
} else {
if ( FD_ISSET( g_listener, &rfds ) ) {
logf( XW_LOGINFO, "creating ListenersIter" );
ListenersIter iter(&g_listeners, true);
while ( retval > 0 ) {
int listener = iter.next();
if ( listener < 0 ) {
break;
}
if ( FD_ISSET( listener, &rfds ) ) {
struct sockaddr_in newaddr;
socklen_t siz = sizeof(newaddr);
int newSock = accept( g_listener, (sockaddr*)&newaddr, &siz );
int newSock = accept( listener, (sockaddr*)&newaddr, &siz );
logf( XW_LOGINFO, "accepting connection from %s",
inet_ntoa(newaddr.sin_addr) );
@ -686,6 +708,7 @@ int main( int argc, char** argv )
tPool->AddSocket( newSock );
--retval;
}
}
if ( FD_ISSET( g_control, &rfds ) ) {
run_ctrl_thread( g_control );
--retval;
@ -694,7 +717,7 @@ int main( int argc, char** argv )
}
}
close( g_listener );
g_listeners.RemoveAll();
close( g_control );
delete cfg;

View file

@ -4,6 +4,7 @@
#define _XWRELAY_PRIV_H_
#include <time.h>
#include "lstnrmgr.h"
typedef unsigned char HostID;
@ -22,4 +23,8 @@ int send_with_length_unsafe( int socket, unsigned char* buf, int bufLen );
time_t now();
int make_socket( unsigned long addr, unsigned short port );
extern class ListenerMgr g_listeners;
#endif