refactor packet assembly/sending so posting message can post all,

including header.
This commit is contained in:
Eric House 2013-08-22 08:15:38 -07:00
parent 16aafccec6
commit 1af12f1de2

View file

@ -425,35 +425,29 @@ denyConnection( const AddrInfo* addr, XWREASON err )
send_with_length_unsafe( addr, buf, sizeof(buf), NULL );
}
static ssize_t
send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
uint32_t* packetIDP, XWRelayReg cmd, va_list* app )
static void
assemble_packet( vector<uint8_t>& packet, uint32_t* packetIDP, XWRelayReg cmd,
va_list* app )
{
uint32_t packetNum = UDPAckTrack::nextPacketID( cmd );
struct iovec vec[10];
unsigned int iocount = 0;
unsigned char header[1 + sizeof(packetNum) + 1];
header[0] = XWPDEV_PROTO_VERSION;
if ( NULL != packetIDP ) {
*packetIDP = packetNum;
}
uint8_t header[1 + sizeof(packetNum) + 1];
header[0] = XWPDEV_PROTO_VERSION;
packetNum = htonl( packetNum );
memcpy( &header[1], &packetNum, sizeof(packetNum) );
header[5] = cmd;
vec[iocount].iov_base = header;
vec[iocount].iov_len = sizeof(header);
++iocount;
packet.insert( packet.end(), header, header + sizeof(header) );
for ( ; ; ) {
unsigned char* ptr = va_arg(*app, unsigned char*);
uint8_t* ptr = va_arg(*app, uint8_t*);
if ( !ptr ) {
break;
}
assert( iocount < VSIZE(vec) );
vec[iocount].iov_base = ptr;
vec[iocount].iov_len = va_arg(*app, int);
++iocount;
size_t len = va_arg(*app, int);
packet.insert( packet.end(), ptr, ptr + len );
}
#ifdef LOG_UDP_PACKETS
@ -470,19 +464,56 @@ send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
assert( size < sizeof(out) );
out[size] = '\0';
#endif
}
struct msghdr mhdr = {0};
mhdr.msg_iov = vec;
mhdr.msg_iovlen = iocount;
mhdr.msg_name = (void*)dest_addr;
mhdr.msg_namelen = sizeof(*dest_addr);
static void
assemble_packet( vector<uint8_t>& packet, uint32_t* packetIDP, XWRelayReg cmd,
... )
{
va_list ap;
va_start( ap, cmd );
assemble_packet( packet, packetIDP, cmd, ap );
va_end( ap );
}
ssize_t nSent = sendmsg( socket, &mhdr, 0 /* flags */);
static bool
get_addr_info_if( const AddrInfo* addr, int* sockp,
const struct sockaddr** dest_addr )
{
bool current = addr->isCurrent();
if ( current ) {
int socket = addr->socket();
assert( g_udpsock == socket || socket == -1 );
if ( -1 == socket ) {
socket = g_udpsock;
}
*sockp = socket;
*dest_addr = addr->sockaddr();
}
return current;
}
static ssize_t
send_packet_via_udp_impl( vector<uint8_t>& packet,
int socket, const struct sockaddr* dest_addr )
{
ssize_t nSent = sendto( socket, packet.data(), packet.size(), 0 /*flags*/,
dest_addr, sizeof(*dest_addr) );
if ( 0 > nSent ) {
logf( XW_LOGERROR, "%s: sendmsg->errno %d (%s)", __func__, errno,
strerror(errno) );
}
return nSent;
}
static ssize_t
send_via_udp_impl( int socket, const struct sockaddr* dest_addr,
uint32_t* packetIDP, XWRelayReg cmd, va_list* app )
{
vector<uint8_t> packet;
assemble_packet( packet, packetIDP, cmd, app );
ssize_t nSent = send_packet_via_udp_impl( packet, socket, dest_addr );
#ifdef LOG_UDP_PACKETS
gchar* b64 = g_base64_encode( (unsigned char*)dest_addr,
sizeof(*dest_addr) );
@ -500,17 +531,12 @@ static ssize_t
send_via_udp( const AddrInfo* addr, uint32_t* packetIDP, XWRelayReg cmd, ... )
{
ssize_t result = 0;
if ( addr->isCurrent() ) {
int socket = addr->socket();
assert( g_udpsock == socket || socket == -1 );
if ( -1 == socket ) {
socket = g_udpsock;
}
int socket;
const struct sockaddr* dest_addr;
if ( get_addr_info_if( addr, &socket, &dest_addr ) ) {
va_list ap;
va_start( ap, cmd );
result = send_via_udp_impl( socket, addr->sockaddr(), packetIDP,
cmd, &ap );
result = send_via_udp_impl( socket, dest_addr, packetIDP, cmd, &ap );
va_end( ap );
} else {
logf( XW_LOGINFO, "%s: not sending to out-of-date packet", __func__ );
@ -609,22 +635,16 @@ send_havemsgs( const AddrInfo* addr )
class MsgClosure {
public:
MsgClosure( DevIDRelay devid, gpointer msg, uint16_t len,
MsgClosure( DevIDRelay devid, const vector<uint8_t>* packet,
OnMsgAckProc proc, void* procClosure )
{
m_devid = devid;
m_len = len;
m_msg = g_malloc(len);
memcpy( m_msg, msg, len );
m_packet = *packet;
m_proc = proc;
m_procClosure = procClosure;
}
~MsgClosure() {
g_free( m_msg );
}
DevIDRelay m_devid;
uint16_t m_len;
gpointer m_msg;
vector<uint8_t> m_packet;
OnMsgAckProc m_proc;
void* m_procClosure;
};
@ -634,9 +654,8 @@ onPostedMsgAcked( bool acked, uint32_t packetID, void* data )
{
MsgClosure* mc = (MsgClosure*)data;
if ( !acked ) {
DBMgr::Get()->StoreMessage( mc->m_devid,
(const unsigned char*)mc->m_msg,
mc->m_len );
DBMgr::Get()->StoreMessage( mc->m_devid, mc->m_packet.data(),
mc->m_packet.size() );
}
if ( NULL != mc->m_proc ) {
(*mc->m_proc)( acked, mc->m_devid, packetID, mc->m_procClosure );
@ -648,32 +667,35 @@ bool
post_message( DevIDRelay devid, const char* message, OnMsgAckProc proc,
void* procClosure )
{
const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( devid );
bool canSendNow = !!addru;
vector<uint8_t> packet;
uint32_t packetID;
XWRelayReg cmd = XWPDEV_ALERT;
short len = strlen( message );
short netLen = htons( len );
uint8_t buf[1 + sizeof(netLen) + len];
XWRelayReg cmd = XWPDEV_ALERT;
buf[0] = cmd;
memcpy( &buf[1], &netLen, sizeof(netLen) );
memcpy( &buf[1 + sizeof(netLen)], message, len );
assemble_packet( packet, &packetID, cmd, &netLen, sizeof(netLen),
message, strlen(message),
NULL );
const AddrInfo::AddrUnion* addru = DevMgr::Get()->get( devid );
bool canSendNow = !!addru;
bool sent = false;
if ( canSendNow ) {
AddrInfo addr( addru );
uint32_t packetID;
int socket;
const struct sockaddr* dest_addr;
if ( get_addr_info_if( &addr, &socket, &dest_addr ) ) {
sent = 0 < send_packet_via_udp_impl( packet, socket, dest_addr );
sent = send_via_udp( &addr, &packetID, cmd, &buf[1],
VSIZE(buf)-1, NULL );
if ( sent ) {
MsgClosure* mc = new MsgClosure( devid, buf, VSIZE(buf),
proc, procClosure );
UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc );
if ( sent ) {
MsgClosure* mc = new MsgClosure( devid, &packet,
proc, procClosure );
UDPAckTrack::setOnAck( onPostedMsgAcked, packetID, (void*)mc );
}
}
}
if ( !sent ) {
DBMgr::Get()->StoreMessage( devid, (const unsigned char*)buf,
VSIZE(buf) );
DBMgr::Get()->StoreMessage( devid, packet.data(), packet.size() );
}
return sent;
}