diff --git a/xwords4/relay/devmgr.cpp b/xwords4/relay/devmgr.cpp new file mode 100644 index 000000000..724222f65 --- /dev/null +++ b/xwords4/relay/devmgr.cpp @@ -0,0 +1,76 @@ +/* -*- compile-command: "make -k -j3"; -*- */ + +/* + * Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "devmgr.h" +#include "mlock.h" + +static DevMgr* s_instance = NULL; + +/* static */ DevMgr* +DevMgr::Get() +{ + if ( NULL == s_instance ) { + s_instance = new DevMgr(); + } + return s_instance; +} /* Get */ + +void +DevMgr::Remember( DevIDRelay devid, const AddrInfo::AddrUnion* saddr ) +{ + logf( XW_LOGINFO, "%s(devid=%d)", __func__, devid ); + time_t now = time( NULL ); + UDPAddrRec rec( saddr, now ); + + MutexLock ml( &m_mapLock ); + + // C++'s insert doesn't replace, but the result tells whether the key was + // already there and provides an iterator via which it can be updated + pair::iterator, bool> result = + m_devAddrMap.insert( pair( devid, rec ) ); + if ( !result.second ) { + result.first->second = rec; + } + + logf( XW_LOGINFO, "dev->addr map now contains %d entries", m_devAddrMap.size() ); +} + +void +DevMgr::Remember( DevIDRelay devid, const AddrInfo* addr ) +{ + Remember( devid, addr->saddr() ); +} + +const AddrInfo::AddrUnion* +DevMgr::get( DevIDRelay devid ) +{ + const AddrInfo::AddrUnion* result = NULL; + MutexLock ml( &m_mapLock ); + map::const_iterator iter; + iter = m_devAddrMap.find( devid ); + if ( m_devAddrMap.end() != iter ) { + result = &iter->second.m_addr; + logf( XW_LOGINFO, "%s: found addr for %.8x; is %d seconds old", __func__, + devid, time(NULL) - iter->second.m_added ); + } + logf( XW_LOGINFO, "%s(devid=%d)=>%p", __func__, devid, result ); + return result; +} + diff --git a/xwords4/relay/devmgr.h b/xwords4/relay/devmgr.h new file mode 100644 index 000000000..7f843bd8b --- /dev/null +++ b/xwords4/relay/devmgr.h @@ -0,0 +1,57 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + + +#ifndef _DEVMGR_H_ +#define _DEVMGR_H_ + +#include + +#include "xwrelay_priv.h" +#include "addrinfo.h" + +using namespace std; + +class DevMgr { + public: + static DevMgr* Get(); + void Remember( DevIDRelay devid, const AddrInfo::AddrUnion* saddr ); + void Remember( DevIDRelay devid, const AddrInfo* addr ); + const AddrInfo::AddrUnion* get( DevIDRelay devid ); + + private: + DevMgr() { pthread_mutex_init( &m_mapLock, NULL ); } + /* destructor's never called.... + ~DevMgr() { pthread_mutex_destroy( &m_mapLock ); } + */ + + class UDPAddrRec { + public: + UDPAddrRec( const AddrInfo::AddrUnion* addr, time_t tim ) { + m_addr = *addr; m_added = tim; + } + AddrInfo::AddrUnion m_addr; + time_t m_added; + }; + + map m_devAddrMap; + pthread_mutex_t m_mapLock; +}; + +#endif diff --git a/xwords4/relay/udpack.cpp b/xwords4/relay/udpack.cpp new file mode 100644 index 000000000..75762b829 --- /dev/null +++ b/xwords4/relay/udpack.cpp @@ -0,0 +1,136 @@ +/* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */ +/* + * Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "udpack.h" +#include "mlock.h" + +UDPAckTrack* UDPAckTrack::s_self = NULL; + + +/* static*/ bool +UDPAckTrack::shouldAck( XWRelayReg cmd ) +{ + return ( XWPDEV_ACK != cmd && XWPDEV_ALERT != cmd ); +} + +/* static*/ uint32_t +UDPAckTrack::nextPacketID( XWRelayReg cmd ) +{ + uint32_t result = 0; + if ( shouldAck( cmd ) ) { + result = get()->nextPacketIDImpl(); + } + return result; +} + +/* static*/ void +UDPAckTrack::recordAck( uint32_t packetID ) +{ + get()->recordAckImpl( packetID ); +} + +/* static */ UDPAckTrack* +UDPAckTrack::get() +{ + if ( NULL == s_self ) { + s_self = new UDPAckTrack(); + } + return s_self; +} + +UDPAckTrack::UDPAckTrack() +{ + m_nextID = 0; + pthread_mutex_init( &m_mutex, NULL ); + + pthread_t thread; + pthread_create( &thread, NULL, thread_main, (void*)this ); + pthread_detach( thread ); +} + +uint32_t +UDPAckTrack::nextPacketIDImpl() +{ + MutexLock ml( &m_mutex ); + AckRecord record; + uint32_t result = ++m_nextID; + m_pendings.insert( pair(result, record) ); + return result; +} + +void +UDPAckTrack::recordAckImpl( uint32_t packetID ) +{ + map::iterator iter; + MutexLock ml( &m_mutex ); + iter = m_pendings.find( packetID ); + if ( m_pendings.end() == iter ) { + logf( XW_LOGERROR, "%s: packet ID %d not found", __func__, packetID ); + } else { + time_t took = time( NULL ) - iter->second.m_createTime; + if ( 5 < took ) { + logf( XW_LOGERROR, "%s: packet ID %d took %d seconds to get acked", __func__, packetID ); + } + m_pendings.erase( iter ); + } +} + +void* +UDPAckTrack::threadProc() +{ + for ( ; ; ) { + sleep( 30 ); + time_t now = time( NULL ); + vector older; + { + MutexLock ml( &m_mutex ); + map::iterator iter; + for ( iter = m_pendings.begin(); iter != m_pendings.end(); ++iter ) { + time_t took = now - iter->second.m_createTime; + if ( 60 < took ) { + older.push_back( iter->first ); + m_pendings.erase( iter ); + } + } + } + if ( 0 < older.size() ) { + string leaked; + vector::const_iterator iter = older.begin(); + for ( ; ; ) { + string_printf( leaked, "%d", *iter ); + if ( ++iter == older.end() ) { + break; + } + string_printf( leaked, ", " ); + } + logf( XW_LOGERROR, "%s: these packets leaked: %s", __func__, + leaked.c_str() ); + } else { + logf( XW_LOGINFO, "%s: no packets leaked", __func__ ); + } + } + return NULL; +} + +/* static */ void* +UDPAckTrack::thread_main( void* arg ) +{ + UDPAckTrack* self = (UDPAckTrack*)arg; + return self->threadProc(); +} diff --git a/xwords4/relay/udpack.h b/xwords4/relay/udpack.h new file mode 100644 index 000000000..ee94a046b --- /dev/null +++ b/xwords4/relay/udpack.h @@ -0,0 +1,52 @@ +/* -*- compile-command: "make MEMDEBUG=TRUE -j3"; -*- */ +/* + * Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _UDPACK_H_ +#define _UDPACK_H_ + +#include "xwrelay_priv.h" +#include "xwrelay.h" + +class AckRecord { + public: + AckRecord() { m_createTime = time( NULL ); } + time_t m_createTime; +}; + +class UDPAckTrack { + public: + static uint32_t nextPacketID( XWRelayReg cmd ); + static void recordAck( uint32_t packetID ); + static bool shouldAck( XWRelayReg cmd ); + + private: + static UDPAckTrack* get(); + static void* thread_main( void* arg ); + UDPAckTrack(); + uint32_t nextPacketIDImpl(); + void recordAckImpl( uint32_t packetID ); + void* threadProc(); + + static UDPAckTrack* s_self; + uint32_t m_nextID; + pthread_mutex_t m_mutex; + map m_pendings; +}; + +#endif diff --git a/xwords4/relay/udpqueue.cpp b/xwords4/relay/udpqueue.cpp new file mode 100644 index 000000000..724d73c16 --- /dev/null +++ b/xwords4/relay/udpqueue.cpp @@ -0,0 +1,104 @@ +/* -*- compile-command: "make -k -j3"; -*- */ + +/* + * Copyright 2010-2012 by Eric House (xwords@eehouse.org). All rights + * reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#include "udpqueue.h" +#include "mlock.h" + + +static UdpQueue* s_instance = NULL; + + +void +UdpThreadClosure::logStats() +{ + time_t now = time( NULL ); + if ( 1 < now - m_created ) { + logf( XW_LOGERROR, "packet waited %d s for processing which then took %d s", + m_dequed - m_created, now - m_dequed ); + } +} + +UdpQueue::UdpQueue() +{ + pthread_mutex_init ( &m_queueMutex, NULL ); + pthread_cond_init( &m_queueCondVar, NULL ); + + pthread_t thread; + int result = pthread_create( &thread, NULL, thread_main_static, this ); + assert( result == 0 ); + result = pthread_detach( thread ); + assert( result == 0 ); +} + +UdpQueue::~UdpQueue() +{ + pthread_cond_destroy( &m_queueCondVar ); + pthread_mutex_destroy ( &m_queueMutex ); +} + +UdpQueue* +UdpQueue::get() +{ + if ( s_instance == NULL ) { + s_instance = new UdpQueue(); + } + return s_instance; +} + +void +UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, + QueueCallback cb ) +{ + UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb ); + MutexLock ml( &m_queueMutex ); + m_queue.push_back( utc ); + pthread_cond_signal( &m_queueCondVar ); +} + +void* +UdpQueue::thread_main() +{ + for ( ; ; ) { + pthread_mutex_lock( &m_queueMutex ); + while ( m_queue.size() == 0 ) { + pthread_cond_wait( &m_queueCondVar, &m_queueMutex ); + } + UdpThreadClosure* utc = m_queue.front(); + m_queue.pop_front(); + pthread_mutex_unlock( &m_queueMutex ); + + utc->noteDequeued(); + (*utc->cb())( utc ); + utc->logStats(); + delete utc; + } + return NULL; +} + +/* static */ void* +UdpQueue::thread_main_static( void* closure ) +{ + blockSignals(); + + UdpQueue* me = (UdpQueue*)closure; + return me->thread_main(); +} + diff --git a/xwords4/relay/udpqueue.h b/xwords4/relay/udpqueue.h new file mode 100644 index 000000000..57cbda212 --- /dev/null +++ b/xwords4/relay/udpqueue.h @@ -0,0 +1,86 @@ +/* -*-mode: C; fill-column: 78; c-basic-offset: 4; -*- */ +/* + * Copyright 2013 by Eric House (xwords@eehouse.org). All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + + +#ifndef _UDPQUEUE_H_ +#define _UDPQUEUE_H_ + +#include +#include + +#include "xwrelay_priv.h" +#include "addrinfo.h" + +using namespace std; + +class UdpThreadClosure; + +typedef void (*QueueCallback)( UdpThreadClosure* closure ); + +class UdpThreadClosure { +public: + UdpThreadClosure( const AddrInfo* addr, unsigned char* buf, + int len, QueueCallback cb ) + : m_buf(new unsigned char[len]) + , m_len(len) + , m_addr(*addr) + , m_cb(cb) + , m_created(time( NULL )) + { + memcpy( m_buf, buf, len ); + } + + ~UdpThreadClosure() { delete m_buf; } + + const unsigned char* buf() const { return m_buf; } + int len() const { return m_len; } + const AddrInfo::AddrUnion* saddr() const { return m_addr.saddr(); } + const AddrInfo* addr() const { return &m_addr; } + void noteDequeued() { m_dequed = time( NULL ); } + void logStats(); + const QueueCallback cb() const { return m_cb; } + + private: + unsigned char* m_buf; + int m_len; + AddrInfo m_addr; + QueueCallback m_cb; + time_t m_created; + time_t m_dequed; +}; + +class UdpQueue { + public: + static UdpQueue* get(); + UdpQueue(); + ~UdpQueue(); + void handle( const AddrInfo* addr, unsigned char* buf, int len, + QueueCallback cb ); + + private: + static void* thread_main_static( void* closure ); + void* thread_main(); + + pthread_mutex_t m_queueMutex; + pthread_cond_t m_queueCondVar; + deque m_queue; + +}; + +#endif