pass timeoutSeconds

ACK doesn't need to wait 2 seconds for a reply, and when it does so the
next send waits too. Eventually we'll want to combine messages already
in the queue into a single send. For now, this makes things better.
This commit is contained in:
Eric House 2017-10-28 20:12:05 -07:00
parent 7c22d1fdf8
commit 7b50c90aac
2 changed files with 52 additions and 35 deletions

View file

@ -11,14 +11,14 @@ try:
except ImportError: except ImportError:
apacheAvailable = False apacheAvailable = False
def post(req, params, timeoutSecs = 1): def post(req, params, timeoutSecs = 1.0):
err = 'none' err = 'none'
dataLen = 0 dataLen = 0
jobj = json.loads(params) jobj = json.loads(params)
data = base64.b64decode(jobj['data']) data = base64.b64decode(jobj['data'])
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeoutSecs) # seconds sock.settimeout(float(timeoutSecs)) # seconds
addr = ("127.0.0.1", 10997) addr = ("127.0.0.1", 10997)
sock.sendto(data, addr) sock.sendto(data, addr)
@ -35,7 +35,7 @@ def post(req, params, timeoutSecs = 1):
jobj = {'err' : err, 'data' : responses} jobj = {'err' : err, 'data' : responses}
return json.dumps(jobj) return json.dumps(jobj)
def query(req, ids): def query(req, ids, timeoutSecs = 5.0):
print('ids', ids) print('ids', ids)
ids = json.loads(ids) ids = json.loads(ids)
@ -43,7 +43,7 @@ def query(req, ids):
for id in ids: idsLen += len(id) for id in ids: idsLen += len(id)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5) # seconds sock.settimeout(float(timeoutSecs))
sock.connect(('127.0.0.1', 10998)) sock.connect(('127.0.0.1', 10998))
lenShort = 2 + idsLen + len(ids) + 2 lenShort = 2 + idsLen + len(ids) + 2

View file

@ -69,7 +69,7 @@ static void reset_schedule_check_interval( RelayConStorage* storage );
static void checkForMovesOnce( RelayConStorage* storage ); static void checkForMovesOnce( RelayConStorage* storage );
static gboolean gotDataTimer(gpointer user_data); static gboolean gotDataTimer(gpointer user_data);
static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ); static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len, XP_U16 timeout );
static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str ); static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str );
static void getNetString( const XP_U8** ptr, XP_U16 len, XP_UCHAR* buf ); static void getNetString( const XP_U8** ptr, XP_U16 len, XP_UCHAR* buf );
static XP_U16 getNetShort( const XP_U8** ptr ); static XP_U16 getNetShort( const XP_U8** ptr );
@ -103,6 +103,7 @@ typedef struct _RelayTask {
struct { struct {
XP_U8* msgbuf; XP_U8* msgbuf;
XP_U16 len; XP_U16 len;
float timeoutSecs;
} post; } post;
struct { struct {
GHashTable* map; GHashTable* map;
@ -110,7 +111,7 @@ typedef struct _RelayTask {
} u; } u;
} RelayTask; } RelayTask;
static RelayTask* makeRelayTask(RelayConStorage* storage, TaskType typ); static RelayTask* makeRelayTask( RelayConStorage* storage, TaskType typ );
static void freeRelayTask(RelayTask* task); static void freeRelayTask(RelayTask* task);
static void handlePost( RelayTask* task ); static void handlePost( RelayTask* task );
static void handleQuery( RelayTask* task ); static void handleQuery( RelayTask* task );
@ -139,26 +140,44 @@ write_callback(void *contents, size_t size, size_t nmemb, void* data)
} }
static void static void
addJsonParams( CURL* curl, const char* name, json_object* param ) addJsonParams( CURL* curl, va_list ap )
{ {
const char* asStr = json_object_to_json_string( param ); gchar* buf = NULL;
XP_LOGF( "%s: adding param (with name %s): %s", __func__, name, asStr ); for ( ; ; ) {
const char* name = va_arg(ap, const char*);
if ( !name ) {
break;
}
json_object* param = va_arg(ap, json_object*);
XP_ASSERT( !!param );
char* curl_params = curl_easy_escape( curl, asStr, strlen(asStr) ); const char* asStr = json_object_to_json_string( param );
gchar* buf = g_strdup_printf( "%s=%s", name, curl_params ); XP_LOGF( "%s: adding param (with name %s): %s", __func__, name, asStr );
curl_free( curl_params );
char* curl_params = curl_easy_escape( curl, asStr, strlen(asStr) );
gchar* tmp = g_strdup_printf( "%s=%s", name, curl_params );
curl_free( curl_params );
if ( !buf ) {
buf = tmp;
} else {
gchar* cur = buf;
buf = g_strdup_printf( "%s&%s", cur, tmp );
g_free( tmp );
g_free( cur );
}
json_object_put( param );
}
XP_LOGF( "%s(): setting params: %s", __func__, buf );
curl_easy_setopt( curl, CURLOPT_POSTFIELDS, buf ); curl_easy_setopt( curl, CURLOPT_POSTFIELDS, buf );
curl_easy_setopt( curl, CURLOPT_POSTFIELDSIZE, (long)strlen(buf) ); curl_easy_setopt( curl, CURLOPT_POSTFIELDSIZE, (long)strlen(buf) );
// Can't free the buf!! Well, maybe after the send... // Can't free the buf!! Well, maybe after the send...
// g_free( buf ); // g_free( buf );
json_object_put( param );
} }
static XP_Bool static XP_Bool
runWitCurl( RelayTask* task, const gchar* proc, const gchar* key, runWitCurl( RelayTask* task, const gchar* proc, ...)
json_object* jVal )
{ {
CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
XP_ASSERT(res == CURLE_OK); XP_ASSERT(res == CURLE_OK);
@ -170,7 +189,10 @@ runWitCurl( RelayTask* task, const gchar* proc, const gchar* key,
curl_easy_setopt( curl, CURLOPT_URL, url ); curl_easy_setopt( curl, CURLOPT_URL, url );
curl_easy_setopt( curl, CURLOPT_POST, 1L ); curl_easy_setopt( curl, CURLOPT_POST, 1L );
addJsonParams( curl, key, jVal ); va_list ap;
va_start( ap, proc );
addJsonParams( curl, ap );
va_end( ap );
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback ); curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws ); curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws );
@ -266,7 +288,7 @@ relaycon_reg( LaunchParams* params, const XP_UCHAR* rDevID,
indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, "linux box" ); indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, "linux box" );
indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, "linux version" ); indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, "linux version" );
sendIt( storage, tmpbuf, indx ); sendIt( storage, tmpbuf, indx, 2.0 );
} }
void void
@ -303,7 +325,7 @@ relaycon_invite( LaunchParams* params, XP_U32 destDevID,
indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, ptr, len ); indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, ptr, len );
stream_destroy( stream ); stream_destroy( stream );
sendIt( storage, tmpbuf, indx ); sendIt( storage, tmpbuf, indx, 2.0 );
LOG_RETURN_VOID(); LOG_RETURN_VOID();
} }
@ -320,7 +342,7 @@ relaycon_send( LaunchParams* params, const XP_U8* buf, XP_U16 buflen,
indx += writeHeader( storage, tmpbuf, XWPDEV_MSG ); indx += writeHeader( storage, tmpbuf, XWPDEV_MSG );
indx += writeLong( &tmpbuf[indx], sizeof(tmpbuf) - indx, gameToken ); indx += writeLong( &tmpbuf[indx], sizeof(tmpbuf) - indx, gameToken );
indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, buf, buflen ); indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, buf, buflen );
nSent = sendIt( storage, tmpbuf, indx ); nSent = sendIt( storage, tmpbuf, indx, 2.0 );
if ( nSent > buflen ) { if ( nSent > buflen ) {
nSent = buflen; nSent = buflen;
} }
@ -348,7 +370,7 @@ relaycon_sendnoconn( LaunchParams* params, const XP_U8* buf, XP_U16 buflen,
(const XP_U8*)relayID, idLen ); (const XP_U8*)relayID, idLen );
tmpbuf[indx++] = '\n'; tmpbuf[indx++] = '\n';
indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, buf, buflen ); indx += writeBytes( &tmpbuf[indx], sizeof(tmpbuf) - indx, buf, buflen );
nSent = sendIt( storage, tmpbuf, indx ); nSent = sendIt( storage, tmpbuf, indx, 2.0 );
if ( nSent > buflen ) { if ( nSent > buflen ) {
nSent = buflen; nSent = buflen;
} }
@ -367,7 +389,7 @@ relaycon_requestMsgs( LaunchParams* params, const XP_UCHAR* devID )
indx += writeHeader( storage, tmpbuf, XWPDEV_RQSTMSGS ); indx += writeHeader( storage, tmpbuf, XWPDEV_RQSTMSGS );
indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, devID ); indx += addVLIStr( &tmpbuf[indx], sizeof(tmpbuf) - indx, devID );
sendIt( storage, tmpbuf, indx ); sendIt( storage, tmpbuf, indx, 2.0 );
} }
void void
@ -382,7 +404,7 @@ relaycon_deleted( LaunchParams* params, const XP_UCHAR* devID,
indx += writeDevID( &tmpbuf[indx], sizeof(tmpbuf) - indx, devID ); indx += writeDevID( &tmpbuf[indx], sizeof(tmpbuf) - indx, devID );
indx += writeLong( &tmpbuf[indx], sizeof(tmpbuf) - indx, gameToken ); indx += writeLong( &tmpbuf[indx], sizeof(tmpbuf) - indx, gameToken );
sendIt( storage, tmpbuf, indx ); sendIt( storage, tmpbuf, indx, 0.0 );
} }
static XP_Bool static XP_Bool
@ -455,7 +477,7 @@ sendAckIf( RelayConStorage* storage, const MsgHeader* header )
XP_U8 tmpbuf[16]; XP_U8 tmpbuf[16];
int indx = writeHeader( storage, tmpbuf, XWPDEV_ACK ); int indx = writeHeader( storage, tmpbuf, XWPDEV_ACK );
indx += writeVLI( &tmpbuf[indx], header->packetID ); indx += writeVLI( &tmpbuf[indx], header->packetID );
sendIt( storage, tmpbuf, indx ); sendIt( storage, tmpbuf, indx, 0.0 );
} }
} }
@ -630,13 +652,6 @@ hostNameToIP( const XP_UCHAR* name )
return ip; return ip;
} }
typedef struct _PostArgs {
RelayConStorage* storage;
WriteState ws;
XP_U8* msgbuf;
XP_U16 len;
} PostArgs;
static gboolean static gboolean
onGotPostData(gpointer user_data) onGotPostData(gpointer user_data)
{ {
@ -679,18 +694,20 @@ handlePost( RelayTask* task )
g_free( data ); g_free( data );
json_object_object_add( jobj, "data", jstr ); json_object_object_add( jobj, "data", jstr );
runWitCurl( task, "post", "params", jobj ); json_object* jTimeout = json_object_new_double( task->u.post.timeoutSecs );
runWitCurl( task, "post", "params", jobj, "timeoutSecs", jTimeout, NULL );
// Put the data on the main thread for processing // Put the data on the main thread for processing
addToGotData( task ); addToGotData( task );
} /* handlePost */ } /* handlePost */
static ssize_t static ssize_t
post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ) post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len, float timeout )
{ {
XP_LOGF( "%s(len=%d)", __func__, len ); XP_LOGF( "%s(len=%d)", __func__, len );
RelayTask* task = makeRelayTask( storage, POST ); RelayTask* task = makeRelayTask( storage, POST );
task->u.post.msgbuf = g_malloc(len); task->u.post.msgbuf = g_malloc(len);
task->u.post.timeoutSecs = timeout;
XP_MEMCPY( task->u.post.msgbuf, msgbuf, len ); XP_MEMCPY( task->u.post.msgbuf, msgbuf, len );
task->u.post.len = len; task->u.post.len = len;
addTask( storage, task ); addTask( storage, task );
@ -761,7 +778,7 @@ handleQuery( RelayTask* task )
} }
g_list_free( ids ); g_list_free( ids );
runWitCurl( task, "query", "ids", jIds ); runWitCurl( task, "query", "ids", jIds, NULL );
} }
/* Put processing back on the main thread */ /* Put processing back on the main thread */
addToGotData( task ); addToGotData( task );
@ -879,11 +896,11 @@ schedule_next_check( RelayConStorage* storage )
} }
static ssize_t static ssize_t
sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ) sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len, XP_U16 timeout )
{ {
ssize_t nSent; ssize_t nSent;
if ( storage->params->useHTTP ) { if ( storage->params->useHTTP ) {
nSent = post( storage, msgbuf, len ); nSent = post( storage, msgbuf, len, timeout );
} else { } else {
nSent = sendto( storage->socket, msgbuf, len, 0, /* flags */ nSent = sendto( storage->socket, msgbuf, len, 0, /* flags */
(struct sockaddr*)&storage->saddr, (struct sockaddr*)&storage->saddr,