go with non-blocking sockets for tcp connections, adding the ability

to reassemble packets that arrive in separate recv() calls.
This commit is contained in:
Eric House 2013-06-24 07:09:57 -07:00
parent d4cf37d2ef
commit 519f90a69a
3 changed files with 135 additions and 39 deletions

View file

@ -37,9 +37,41 @@ UdpThreadClosure::logStats()
} }
} }
bool
PartialPacket::stillGood() const
{
return 0 == m_errno
|| EAGAIN == m_errno
|| EWOULDBLOCK == m_errno;
}
bool
PartialPacket::readAtMost( int len )
{
bool success = false;
uint8_t tmp[len];
ssize_t nRead = recv( m_sock, tmp, len, 0 );
if ( 0 > nRead ) { // error case
m_errno = errno;
logf( XW_LOGERROR, "%s(len=%d): recv failed: %d (%s)", __func__, len,
m_errno, strerror(m_errno) );
} else if ( 0 == nRead ) { // remote socket closed
logf( XW_LOGINFO, "%s: remote closed", __func__ );
m_errno = -1; // so stillGood will fail
} else {
m_errno = 0;
success = len == nRead;
int curSize = m_buf.size();
m_buf.resize( nRead + curSize );
memcpy( &m_buf[curSize], tmp, nRead );
}
return success;
}
UdpQueue::UdpQueue() UdpQueue::UdpQueue()
{ {
m_nextID = 0; m_nextID = 0;
pthread_mutex_init ( &m_partialsMutex, NULL );
pthread_mutex_init ( &m_queueMutex, NULL ); pthread_mutex_init ( &m_queueMutex, NULL );
pthread_cond_init( &m_queueCondVar, NULL ); pthread_cond_init( &m_queueCondVar, NULL );
@ -54,6 +86,7 @@ UdpQueue::~UdpQueue()
{ {
pthread_cond_destroy( &m_queueCondVar ); pthread_cond_destroy( &m_queueCondVar );
pthread_mutex_destroy ( &m_queueMutex ); pthread_mutex_destroy ( &m_queueMutex );
pthread_mutex_destroy ( &m_partialsMutex );
} }
UdpQueue* UdpQueue*
@ -65,52 +98,85 @@ UdpQueue::get()
return s_instance; return s_instance;
} }
// return false if socket should no longer be used
bool bool
UdpQueue::handle( const AddrInfo* addr, QueueCallback cb ) UdpQueue::handle( const AddrInfo* addr, QueueCallback cb )
{ {
bool success = false; PartialPacket* packet;
int sock = addr->socket(); int sock = addr->socket();
unsigned short msgLen;
ssize_t nRead = recv( sock, &msgLen, sizeof(msgLen), MSG_WAITALL ); // Hang onto this mutex for as long as we may be writing to the packet
if ( 0 == nRead ) { // since having it deleted while in use would be bad.
logf( XW_LOGINFO, "%s: recv(sock=%d) => 0: remote closed", __func__, sock ); MutexLock ml( &m_partialsMutex );
} else if ( nRead != sizeof(msgLen) ) {
logf( XW_LOGERROR, "%s: first recv => %d: %s", __func__, map<int, PartialPacket*>::iterator iter = m_partialPackets.find( sock );
nRead, strerror(errno) ); if ( m_partialPackets.end() == iter ) {
packet = new PartialPacket( sock );
m_partialPackets.insert( pair<int, PartialPacket*>( sock, packet ) );
} else { } else {
msgLen = ntohs( msgLen ); packet = iter->second;
if ( MAX_MSG_LEN <= msgLen ) { }
logf( XW_LOGERROR, "%s: message of len %d too large; dropping", __func__, msgLen );
} else { // First see if we've read the length bytes
unsigned char buf[msgLen]; if ( packet->readSoFar() < sizeof( packet->m_len ) ) {
nRead = recv( sock, buf, msgLen, MSG_WAITALL ); if ( packet->readAtMost( sizeof(packet->m_len) - packet->readSoFar() ) ) {
if ( nRead == msgLen ) { packet->m_len = ntohs(*(unsigned short*)packet->data());
logf( XW_LOGINFO, "%s: read %d bytes on socket %d", __func__, nRead, sock );
handle( addr, buf, msgLen, cb );
success = true;
} else {
logf( XW_LOGERROR, "%s: second recv failed: %s", __func__,
strerror(errno) );
} }
} }
if ( packet->readSoFar() >= sizeof( packet->m_len ) ) {
assert( 0 < packet->m_len );
int leftToRead =
packet->m_len - (packet->readSoFar() - sizeof(packet->m_len));
if ( packet->readAtMost( leftToRead ) ) {
handle( addr, packet->data() + sizeof(packet->m_len),
packet->m_len, cb );
packet = NULL;
newSocket_locked( sock );
} }
return success; }
return NULL == packet || packet->stillGood();
} }
void void
UdpQueue::handle( const AddrInfo* addr, unsigned char* buf, int len, UdpQueue::handle( const AddrInfo* addr, const uint8_t* buf, int len,
QueueCallback cb ) QueueCallback cb )
{ {
UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb ); UdpThreadClosure* utc = new UdpThreadClosure( addr, buf, len, cb );
MutexLock ml( &m_queueMutex ); MutexLock ml( &m_queueMutex );
int id = ++m_nextID; int id = ++m_nextID;
utc->setID( id ); utc->setID( id );
logf( XW_LOGINFO, "%s: enqueuing packet %d", __func__, id ); logf( XW_LOGINFO, "%s: enqueuing packet %d (len %d)", __func__, id, len );
m_queue.push_back( utc ); m_queue.push_back( utc );
pthread_cond_signal( &m_queueCondVar ); pthread_cond_signal( &m_queueCondVar );
} }
void
UdpQueue::newSocket_locked( int sock )
{
map<int, PartialPacket*>::iterator iter = m_partialPackets.find( sock );
if ( m_partialPackets.end() != iter ) {
delete iter->second;
m_partialPackets.erase( iter );
}
}
void
UdpQueue::newSocket( int sock )
{
MutexLock ml( &m_partialsMutex );
newSocket_locked( sock );
}
void
UdpQueue::newSocket( const AddrInfo* addr )
{
assert( addr->isTCP() );
newSocket( addr->socket() );
}
void* void*
UdpQueue::thread_main() UdpQueue::thread_main()
{ {

View file

@ -36,9 +36,9 @@ typedef void (*QueueCallback)( UdpThreadClosure* closure );
class UdpThreadClosure { class UdpThreadClosure {
public: public:
UdpThreadClosure( const AddrInfo* addr, unsigned char* buf, UdpThreadClosure( const AddrInfo* addr, const uint8_t* buf,
int len, QueueCallback cb ) int len, QueueCallback cb )
: m_buf(new unsigned char[len]) : m_buf(new uint8_t[len])
, m_len(len) , m_len(len)
, m_addr(*addr) , m_addr(*addr)
, m_cb(cb) , m_cb(cb)
@ -60,7 +60,7 @@ public:
int getID( void ) { return m_id; } int getID( void ) { return m_id; }
private: private:
unsigned char* m_buf; uint8_t* m_buf;
int m_len; int m_len;
AddrInfo m_addr; AddrInfo m_addr;
QueueCallback m_cb; QueueCallback m_cb;
@ -69,25 +69,49 @@ public:
int m_id; int m_id;
}; };
class PartialPacket {
public:
PartialPacket(int sock) {
m_sock = sock;
m_len = 0;
m_errno = 0;
}
bool stillGood() const ;
bool readAtMost( int len );
size_t readSoFar() const { return m_buf.size(); }
const uint8_t* data() const { return m_buf.data(); }
unsigned short m_len;
private:
vector<uint8_t> m_buf;
int m_sock;
int m_errno;
};
class UdpQueue { class UdpQueue {
public: public:
static UdpQueue* get(); static UdpQueue* get();
UdpQueue(); UdpQueue();
~UdpQueue(); ~UdpQueue();
bool handle( const AddrInfo* addr, QueueCallback cb ); bool handle( const AddrInfo* addr, QueueCallback cb );
void handle( const AddrInfo* addr, unsigned char* buf, int len, void handle( const AddrInfo* addr, const uint8_t* buf, int len,
QueueCallback cb ); QueueCallback cb );
void forgetSocket( const AddrInfo* addr ); void newSocket( int socket );
void newSocket( const AddrInfo* addr );
private: private:
void newSocket_locked( int sock );
static void* thread_main_static( void* closure ); static void* thread_main_static( void* closure );
void* thread_main(); void* thread_main();
pthread_mutex_t m_partialsMutex;
pthread_mutex_t m_queueMutex; pthread_mutex_t m_queueMutex;
pthread_cond_t m_queueCondVar; pthread_cond_t m_queueCondVar;
deque<UdpThreadClosure*> m_queue; deque<UdpThreadClosure*> m_queue;
// map<int, vector<UdpThreadClosure*> > m_bySocket; // map<int, vector<UdpThreadClosure*> > m_bySocket;
int m_nextID; int m_nextID;
map<int, PartialPacket*> m_partialPackets;
}; };
#endif #endif

View file

@ -453,8 +453,8 @@ send_with_length_unsafe( const AddrInfo* addr, const unsigned char* buf,
if ( addr->isCurrent() ) { if ( addr->isCurrent() ) {
int socket = addr->socket(); int socket = addr->socket();
unsigned short len = htons( bufLen ); unsigned short len = htons( bufLen );
ssize_t nSent = send( socket, &len, 2, 0 ); ssize_t nSent = send( socket, &len, sizeof(len), 0 );
if ( nSent == 2 ) { if ( nSent == sizeof(len) ) {
nSent = send( socket, buf, bufLen, 0 ); nSent = send( socket, buf, bufLen, 0 );
if ( nSent == ssize_t(bufLen) ) { if ( nSent == ssize_t(bufLen) ) {
logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket ); logf( XW_LOGINFO, "sent %d bytes on socket %d", nSent, socket );
@ -1408,7 +1408,7 @@ udp_thread_proc( UdpThreadClosure* utc )
static void static void
read_udp_packet( int udpsock ) read_udp_packet( int udpsock )
{ {
unsigned char buf[MAX_MSG_LEN]; uint8_t buf[MAX_MSG_LEN];
AddrInfo::AddrUnion saddr; AddrInfo::AddrUnion saddr;
memset( &saddr, 0, sizeof(saddr) ); memset( &saddr, 0, sizeof(saddr) );
socklen_t fromlen = sizeof(saddr.addr_in); socklen_t fromlen = sizeof(saddr.addr_in);
@ -1448,6 +1448,8 @@ string_printf( string& str, const char* fmt, ... )
} }
} }
// Going with non-blocking instead
#if 0
static void static void
set_timeouts( int sock ) set_timeouts( int sock )
{ {
@ -1472,6 +1474,7 @@ set_timeouts( int sock )
assert( 0 ); assert( 0 );
} }
} }
#endif
static void static void
enable_keepalive( int sock ) enable_keepalive( int sock )
@ -1904,8 +1907,10 @@ main( int argc, char** argv )
assert( g_maxsocks > newSock ); assert( g_maxsocks > newSock );
/* Set timeout so send and recv won't block forever */ /* Set timeout so send and recv won't block forever */
set_timeouts( newSock ); // set_timeouts( newSock );
int err = fcntl( newSock, F_SETFL, O_NONBLOCK );
assert( 0 == err );
enable_keepalive( newSock ); enable_keepalive( newSock );
logf( XW_LOGINFO, logf( XW_LOGINFO,
@ -1918,6 +1923,7 @@ main( int argc, char** argv )
perGame ? game_thread_proc perGame ? game_thread_proc
: proxy_thread_proc, : proxy_thread_proc,
&addr ); &addr );
UdpQueue::get()->newSocket( &addr );
} }
--retval; --retval;
} }