rename class; add some logging

Rename class that's no longer just about UDP as it began. Add a bit of
logging. This commit *should* not change behavior at all.
This commit is contained in:
Eric House 2017-12-03 19:18:36 -08:00
parent fd5549014a
commit d5c4ecabce
5 changed files with 62 additions and 47 deletions

View file

@ -68,7 +68,7 @@ AddrInfo::equals( const AddrInfo& other ) const
if ( isTCP() ) {
equal = m_socket == other.m_socket;
if ( equal && created() != other.created() ) {
logf( XW_LOGINFO, "%s: rejecting on time mismatch (%lx vs %lx)",
logf( XW_LOGINFO, "%s(): rejecting on time mismatch (%lx vs %lx)",
__func__, created(), other.created() );
equal = false;
}

View file

@ -119,13 +119,14 @@ XWThreadPool::Stop()
void
XWThreadPool::AddSocket( SockType stype, QueueCallback proc, const AddrInfo* from )
{
int sock = from->getSocket();
logf( XW_LOGVERBOSE0, "%s(sock=%d, isTCP=%d)", __func__, sock, from->isTCP() );
SockInfo si = { .m_type = stype,
.m_proc = proc,
.m_addr = *from
};
{
int sock = from->getSocket();
RWWriteLock ml( &m_activeSocketsRWLock );
SockInfo si;
si.m_type = stype;
si.m_proc = proc;
si.m_addr = *from;
m_activeSockets.insert( pair<int, SockInfo>( sock, si ) );
}
interrupt_poll();
@ -158,13 +159,14 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr )
size_t prevSize = m_activeSockets.size();
map<int, SockInfo>::iterator iter = m_activeSockets.find( addr->getSocket() );
int sock = addr->getSocket();
map<int, SockInfo>::iterator iter = m_activeSockets.find( sock );
if ( m_activeSockets.end() != iter && iter->second.m_addr.equals( *addr ) ) {
m_activeSockets.erase( iter );
found = true;
}
logf( XW_LOGINFO, "%s: AFTER: %d sockets active (was %d)", __func__,
m_activeSockets.size(), prevSize );
logf( XW_LOGINFO, "%s(): AFTER closing %d: %d sockets active (was %d)", __func__,
sock, m_activeSockets.size(), prevSize );
}
return found;
} /* RemoveSocket */
@ -172,6 +174,7 @@ XWThreadPool::RemoveSocket( const AddrInfo* addr )
void
XWThreadPool::CloseSocket( const AddrInfo* addr )
{
int sock = addr->getSocket();
if ( addr->isTCP() ) {
if ( !RemoveSocket( addr ) ) {
MutexLock ml( &m_queueMutex );
@ -184,8 +187,13 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
++iter;
}
}
logf( XW_LOGINFO, "CLOSING socket %d", addr->getSocket() );
close( addr->getSocket() );
int err = close( sock );
if ( 0 != err ) {
logf( XW_LOGERROR, "%s(): close(socket=%d) => %d/%s", __func__,
sock, errno, strerror(errno) );
} else {
logf( XW_LOGINFO, "%s(): close(socket=%d) succeeded", __func__, sock );
}
/* We always need to interrupt the poll because the socket we're closing
will be in the list being listened to. That or we need to drop sockets
@ -198,7 +206,7 @@ XWThreadPool::CloseSocket( const AddrInfo* addr )
void
XWThreadPool::EnqueueKill( const AddrInfo* addr, const char* const why )
{
logf( XW_LOGINFO, "%s(%d) reason: %s", __func__, addr->getSocket(), why );
logf( XW_LOGINFO, "%s(socket = %d) reason: %s", __func__, addr->getSocket(), why );
if ( addr->isTCP() ) {
SockInfo si;
si.m_type = STYPE_UNKNOWN;

View file

@ -28,7 +28,7 @@ static UdpQueue* s_instance = NULL;
void
UdpThreadClosure::logStats()
PacketThreadClosure::logStats()
{
time_t now = time( NULL );
if ( 1 < now - m_created ) {
@ -48,6 +48,7 @@ PartialPacket::stillGood() const
bool
PartialPacket::readAtMost( int len )
{
assert( len > 0 );
bool success = false;
uint8_t tmp[len];
ssize_t nRead = recv( m_sock, tmp, len, 0 );
@ -58,9 +59,11 @@ PartialPacket::readAtMost( int len )
len, m_sock, m_errno, strerror(m_errno) );
}
} else if ( 0 == nRead ) { // remote socket closed
logf( XW_LOGINFO, "%s: remote closed (socket=%d)", __func__, m_sock );
logf( XW_LOGINFO, "%s(): remote closed (socket=%d)", __func__, m_sock );
m_errno = -1; // so stillGood will fail
} else {
// logf( XW_LOGVERBOSE0, "%s(): read %d bytes on socket %d", __func__,
// nRead, m_sock );
m_errno = 0;
success = len == nRead;
int curSize = m_buf.size();
@ -152,17 +155,20 @@ void
UdpQueue::handle( const AddrInfo* addr, const uint8_t* buf, int len,
QueueCallback cb )
{
UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb );
PacketThreadClosure* ptc = new PacketThreadClosure( addr, buf, len, cb );
MutexLock ml( &m_queueMutex );
int id = ++m_nextID;
utc->setID( id );
ptc->setID( id );
logf( XW_LOGINFO, "%s: enqueuing packet %d (socket %d, len %d)",
__func__, id, addr->getSocket(), len );
m_queue.push_back( utc );
m_queue.push_back( ptc );
pthread_cond_signal( &m_queueCondVar );
}
// Remove any PartialPacket record with the same socket/fd. This makes sense
// when the socket's being reused or when we have just dealt with a single
// packet and might be getting more.
void
UdpQueue::newSocket_locked( int sock )
{
@ -194,25 +200,25 @@ UdpQueue::thread_main()
while ( m_queue.size() == 0 ) {
pthread_cond_wait( &m_queueCondVar, &m_queueMutex );
}
UdpThreadClosure* utc = m_queue.front();
PacketThreadClosure* ptc = m_queue.front();
m_queue.pop_front();
pthread_mutex_unlock( &m_queueMutex );
utc->noteDequeued();
ptc->noteDequeued();
time_t age = utc->ageInSeconds();
time_t age = ptc->ageInSeconds();
if ( 30 > age ) {
logf( XW_LOGINFO, "%s: dispatching packet %d (socket %d); "
"%d seconds old", __func__, utc->getID(),
utc->addr()->getSocket(), age );
(*utc->cb())( utc );
utc->logStats();
"%d seconds old", __func__, ptc->getID(),
ptc->addr()->getSocket(), age );
(*ptc->cb())( ptc );
ptc->logStats();
} else {
logf( XW_LOGINFO, "%s: dropping packet %d; it's %d seconds old!",
__func__, age );
}
delete utc;
delete ptc;
}
return NULL;
}

View file

@ -30,13 +30,13 @@
using namespace std;
class UdpThreadClosure;
class PacketThreadClosure;
typedef void (*QueueCallback)( UdpThreadClosure* closure );
typedef void (*QueueCallback)( PacketThreadClosure* closure );
class UdpThreadClosure {
class PacketThreadClosure {
public:
UdpThreadClosure( const AddrInfo* addr, const uint8_t* buf,
PacketThreadClosure( const AddrInfo* addr, const uint8_t* buf,
int len, QueueCallback cb )
: m_buf(new uint8_t[len])
, m_len(len)
@ -47,7 +47,7 @@ public:
memcpy( m_buf, buf, len );
}
~UdpThreadClosure() { delete[] m_buf; }
~PacketThreadClosure() { delete[] m_buf; }
const uint8_t* buf() const { return m_buf; }
int len() const { return m_len; }
@ -109,8 +109,8 @@ class UdpQueue {
pthread_mutex_t m_partialsMutex;
pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue;
// map<int, vector<UdpThreadClosure*> > m_bySocket;
deque<PacketThreadClosure*> m_queue;
// map<int, vector<PacketThreadClosure*> > m_bySocket;
int m_nextID;
map<int, PartialPacket*> m_partialPackets;
};

View file

@ -1475,23 +1475,24 @@ handleProxyMsgs( int sock, const AddrInfo* addr, const uint8_t* bufp,
} // handleProxyMsgs
static void
game_thread_proc( UdpThreadClosure* utc )
game_thread_proc( PacketThreadClosure* ptc )
{
if ( !processMessage( utc->buf(), utc->len(), utc->addr(), 0 ) ) {
XWThreadPool::GetTPool()->CloseSocket( utc->addr() );
logf( XW_LOGVERBOSE0, "%s()", __func__ );
if ( !processMessage( ptc->buf(), ptc->len(), ptc->addr(), 0 ) ) {
XWThreadPool::GetTPool()->CloseSocket( ptc->addr() );
}
}
static void
proxy_thread_proc( UdpThreadClosure* utc )
proxy_thread_proc( PacketThreadClosure* ptc )
{
const int len = utc->len();
const AddrInfo* addr = utc->addr();
const int len = ptc->len();
const AddrInfo* addr = ptc->addr();
if ( len > 0 ) {
assert( addr->isTCP() );
int sock = addr->getSocket();
const uint8_t* bufp = utc->buf();
const uint8_t* bufp = ptc->buf();
const uint8_t* end = bufp + len;
if ( (0 == *bufp++) ) { /* protocol */
XWPRXYCMD cmd = (XWPRXYCMD)*bufp++;
@ -1725,10 +1726,10 @@ ackPacketIf( const UDPHeader* header, const AddrInfo* addr )
}
static void
handle_udp_packet( UdpThreadClosure* utc )
handle_udp_packet( PacketThreadClosure* ptc )
{
const uint8_t* ptr = utc->buf();
const uint8_t* end = ptr + utc->len();
const uint8_t* ptr = ptc->buf();
const uint8_t* end = ptr + ptc->len();
UDPHeader header;
if ( getHeader( &ptr, end, &header ) ) {
@ -1751,7 +1752,7 @@ handle_udp_packet( UdpThreadClosure* utc )
if ( 3 >= clientVers ) {
checkAllAscii( model, "bad model" );
}
registerDevice( relayID, &devID, utc->addr(),
registerDevice( relayID, &devID, ptc->addr(),
clientVers, devDesc, model, osVers );
}
}
@ -1764,7 +1765,7 @@ handle_udp_packet( UdpThreadClosure* utc )
ptr += sizeof(clientToken);
clientToken = ntohl( clientToken );
if ( AddrInfo::NULL_TOKEN != clientToken ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
AddrInfo addr( g_udpsock, clientToken, ptc->saddr() );
(void)processMessage( ptr, end - ptr, &addr, clientToken );
} else {
logf( XW_LOGERROR, "%s: dropping packet with token of 0",
@ -1785,7 +1786,7 @@ handle_udp_packet( UdpThreadClosure* utc )
}
SafeCref scr( connName, hid );
if ( scr.IsValid() ) {
AddrInfo addr( g_udpsock, clientToken, utc->saddr() );
AddrInfo addr( g_udpsock, clientToken, ptc->saddr() );
handlePutMessage( scr, hid, &addr, end - ptr, &ptr, end );
assert( ptr == end ); // DON'T CHECK THIS IN!!!
} else {
@ -1820,7 +1821,7 @@ handle_udp_packet( UdpThreadClosure* utc )
case XWPDEV_RQSTMSGS: {
DevID devID( ID_TYPE_RELAY );
if ( getVLIString( &ptr, end, devID.m_devIDString ) ) {
const AddrInfo* addr = utc->addr();
const AddrInfo* addr = ptc->addr();
DevMgr::Get()->rememberDevice( devID.asRelayID(), addr );
if ( XWPDEV_RQSTMSGS == header.cmd ) {
@ -1861,7 +1862,7 @@ handle_udp_packet( UdpThreadClosure* utc )
}
// Do this after the device and address are registered
ackPacketIf( &header, utc->addr() );
ackPacketIf( &header, ptc->addr() );
}
}