From c3887b9c77a8168ea0c4720aa3cf909842259f02 Mon Sep 17 00:00:00 2001 From: Eric House Date: Mon, 23 Oct 2017 21:07:05 -0700 Subject: [PATCH] use a single thread and a protected queue I don't want race conditions between threads talking to the server. --- xwords4/linux/relaycon.c | 344 +++++++++++++++++++++++---------------- 1 file changed, 207 insertions(+), 137 deletions(-) diff --git a/xwords4/linux/relaycon.c b/xwords4/linux/relaycon.c index cad18fe06..bfb28af9c 100644 --- a/xwords4/linux/relaycon.c +++ b/xwords4/linux/relaycon.c @@ -36,6 +36,9 @@ typedef struct _RelayConStorage { pthread_t mainThread; guint moveCheckerID; XP_U16 nextMoveCheckSecs; + pthread_cond_t relayCondVar; + pthread_mutex_t relayMutex; + GSList* relayTaskList; int socket; RelayConnProcs procs; @@ -75,12 +78,36 @@ static size_t writeVLI( XP_U8* out, uint32_t nn ); static size_t un2vli( int nn, uint8_t* buf ); static bool vli2un( const uint8_t** inp, uint32_t* outp ); +static void* relayThread( void* arg ); typedef struct _WriteState { gchar* ptr; size_t curSize; } WriteState; +typedef enum { POST, QUERY, } TaskType; + +typedef struct _RelayTask { + TaskType typ; + RelayConStorage* storage; + WriteState ws; + union { + struct { + XP_U8* msgbuf; + XP_U16 len; + } post; + struct { + GHashTable* map; + } query; + } u; +} RelayTask; + +static RelayTask* makeRelayTask(RelayConStorage* storage, TaskType typ); +static void freeRelayTask(RelayTask* task); +static void handlePost( RelayTask* task ); +static void handleQuery( RelayTask* task ); + + static size_t write_callback(void *contents, size_t size, size_t nmemb, void* data) { @@ -138,19 +165,26 @@ relaycon_init( LaunchParams* params, const RelayConnProcs* procs, XP_MEMCPY( &storage->procs, procs, sizeof(storage->procs) ); storage->procsClosure = procsClosure; - storage->mainThread = pthread_self(); + if ( params->useHTTP ) { + storage->mainThread = pthread_self(); + pthread_mutex_init ( &storage->relayMutex, NULL ); + pthread_cond_init( &storage->relayCondVar, NULL ); + pthread_t thread; + (void)pthread_create( &thread, NULL, relayThread, storage ); + pthread_detach( thread ); - storage->socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); - (*procs->socketAdded)( storage, storage->socket, relaycon_receive ); + XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) ); + XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 ); + } else { + storage->socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); + (*procs->socketAdded)( storage, storage->socket, relaycon_receive ); - XP_MEMSET( &storage->saddr, 0, sizeof(storage->saddr) ); - storage->saddr.sin_family = PF_INET; - storage->saddr.sin_addr.s_addr = htonl( hostNameToIP(host) ); - storage->saddr.sin_port = htons(port); - - XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) ); - XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 ); + XP_MEMSET( &storage->saddr, 0, sizeof(storage->saddr) ); + storage->saddr.sin_family = PF_INET; + storage->saddr.sin_addr.s_addr = htonl( hostNameToIP(host) ); + storage->saddr.sin_port = htons(port); + } storage->params = params; storage->proto = XWPDEV_PROTO_VERSION_1; @@ -311,6 +345,59 @@ onMainThread( RelayConStorage* storage ) return storage->mainThread = pthread_self(); } +static void* +relayThread( void* arg ) +{ + RelayConStorage* storage = (RelayConStorage*)arg; + for ( ; ; ) { + pthread_mutex_lock( &storage->relayMutex ); + while ( !storage->relayTaskList ) { + pthread_cond_wait( &storage->relayCondVar, &storage->relayMutex ); + } + + RelayTask* task = storage->relayTaskList->data; + storage->relayTaskList = storage->relayTaskList->next; + pthread_mutex_unlock( &storage->relayMutex ); + + switch ( task->typ ) { + case POST: + handlePost( task ); + break; + case QUERY: + handleQuery( task ); + break; + default: + XP_ASSERT(0); + } + } + return NULL; +} + +static void +addTask( RelayConStorage* storage, RelayTask* task ) +{ + pthread_mutex_lock( &storage->relayMutex ); + storage->relayTaskList = g_slist_append( storage->relayTaskList, task ); + pthread_cond_signal( &storage->relayCondVar ); + pthread_mutex_unlock( &storage->relayMutex ); +} + +static RelayTask* +makeRelayTask( RelayConStorage* storage, TaskType typ ) +{ + RelayTask* task = (RelayTask*)g_malloc0(sizeof(*task)); + task->typ = typ; + task->storage = storage; + return task; +} + +static void +freeRelayTask( RelayTask* task ) +{ + g_free( task->ws.ptr ); + g_free( task ); +} + static void sendAckIf( RelayConStorage* storage, const MsgHeader* header ) { @@ -331,6 +418,7 @@ process( RelayConStorage* storage, XP_U8* buf, ssize_t nRead ) MsgHeader header; if ( readHeader( &ptr, &header ) ) { sendAckIf( storage, &header ); + switch( header.cmd ) { case XWPDEV_REGRSP: { uint32_t len; @@ -433,6 +521,7 @@ relaycon_receive( GIOChannel* source, GIOCondition XP_UNUSED_DBG(condition), gpo { XP_ASSERT( 0 != (G_IO_IN & condition) ); /* FIX ME */ RelayConStorage* storage = (RelayConStorage*)data; + XP_ASSERT( !storage->params->useHTTP ); XP_U8 buf[512]; struct sockaddr_in from; socklen_t fromlen = sizeof(from); @@ -494,44 +583,47 @@ hostNameToIP( const XP_UCHAR* name ) typedef struct _PostArgs { RelayConStorage* storage; WriteState ws; - const XP_U8* msgbuf; + XP_U8* msgbuf; XP_U16 len; } PostArgs; static gboolean onGotPostData(gpointer user_data) { - PostArgs* pa = (PostArgs*)user_data; + RelayTask* task = (RelayTask*)user_data; /* Now pull any data from the reply */ // got "{"status": "ok", "dataLen": 14, "data": "AYQDiDAyMUEzQ0MyADw=", "err": "none"}" - json_object* reply = json_tokener_parse( pa->ws.ptr ); - json_object* replyData; - if ( json_object_object_get_ex( reply, "data", &replyData ) && !!replyData ) { - int len = json_object_array_length(replyData); - for ( int ii = 0; ii < len; ++ii ) { - json_object* datum = json_object_array_get_idx( replyData, ii ); - const char* str = json_object_get_string( datum ); - gsize out_len; - guchar* buf = g_base64_decode( (const gchar*)str, &out_len ); - process( pa->storage, buf, out_len ); - g_free( buf ); + if ( !!task->ws.ptr ) { + json_object* reply = json_tokener_parse( task->ws.ptr ); + json_object* replyData; + if ( json_object_object_get_ex( reply, "data", &replyData ) && !!replyData ) { + int len = json_object_array_length(replyData); + for ( int ii = 0; ii < len; ++ii ) { + json_object* datum = json_object_array_get_idx( replyData, ii ); + const char* str = json_object_get_string( datum ); + gsize out_len; + guchar* buf = g_base64_decode( (const gchar*)str, &out_len ); + process( task->storage, buf, out_len ); + g_free( buf ); + } + (void)json_object_put( replyData ); } - (void)json_object_put( replyData ); + (void)json_object_put( reply ); } - (void)json_object_put( reply ); - g_free( pa->ws.ptr ); - g_free( pa ); + g_free( task->u.post.msgbuf ); + + freeRelayTask( task ); return FALSE; } -static void* -postThread( void* arg ) +static void +handlePost( RelayTask* task ) { - PostArgs* pa = (PostArgs*)arg; - XP_ASSERT( !onMainThread(pa->storage) ); - char* data = g_base64_encode( pa->msgbuf, pa->len ); + XP_LOGF( "%s(len=%d)", __func__, task->u.post.len ); + XP_ASSERT( !onMainThread(task->storage) ); + char* data = g_base64_encode( task->u.post.msgbuf, task->u.post.len ); struct json_object* jobj = json_object_new_object(); struct json_object* jstr = json_object_new_string(data); g_free( data ); @@ -543,14 +635,14 @@ postThread( void* arg ) char url[128]; snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/post", - RELAY_API_PROTO, pa->storage->host ); + RELAY_API_PROTO, task->storage->host ); curl_easy_setopt( curl, CURLOPT_URL, url ); curl_easy_setopt( curl, CURLOPT_POST, 1L ); addJsonParams( curl, "params", jobj ); curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback ); - curl_easy_setopt( curl, CURLOPT_WRITEDATA, &pa->ws ); + curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws ); curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L ); res = curl_easy_perform(curl); @@ -563,130 +655,121 @@ postThread( void* arg ) curl_easy_cleanup(curl); curl_global_cleanup(); - XP_LOGF( "%s(): got \"%s\"", __func__, pa->ws.ptr ); + XP_LOGF( "%s(): got \"%s\"", __func__, task->ws.ptr ); // Put the data on the main thread for processing - (void)g_idle_add( onGotPostData, pa ); - - return NULL; -} /* postThread */ + (void)g_idle_add( onGotPostData, task ); +} /* handlePost */ static ssize_t post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ) { - PostArgs* pa = (PostArgs*)g_malloc0(sizeof(*pa)); - pa->storage = storage; - pa->msgbuf = msgbuf; - pa->len = len; - - pthread_t thread; - (void)pthread_create( &thread, NULL, postThread, (void*)pa ); - pthread_detach( thread ); + XP_LOGF( "%s(len=%d)", __func__, len ); + RelayTask* task = makeRelayTask( storage, POST ); + task->u.post.msgbuf = g_malloc(len); + XP_MEMCPY( task->u.post.msgbuf, msgbuf, len ); + task->u.post.len = len; + addTask( storage, task ); return len; } -typedef struct _QueryArgs { - RelayConStorage* storage; - /* GSList* ids; */ - WriteState ws; - GHashTable* map; -} QueryArgs; - static gboolean onGotQueryData( gpointer user_data ) { - QueryArgs* qa = (QueryArgs*)user_data; + RelayTask* task = (RelayTask*)user_data; + RelayConStorage* storage = task->storage; XP_Bool foundAny = false; - json_object* reply = json_tokener_parse( qa->ws.ptr ); - if ( !!reply ) { - CommsAddrRec addr = {0}; - addr_addType( &addr, COMMS_CONN_RELAY ); + if ( !!task->ws.ptr ) { + json_object* reply = json_tokener_parse( task->ws.ptr ); + if ( !!reply ) { + CommsAddrRec addr = {0}; + addr_addType( &addr, COMMS_CONN_RELAY ); - /* Currently there's an array of arrays for each relayID (value) */ - json_object_object_foreach(reply, relayID, arrOfArrOfMoves) { - int len1 = json_object_array_length( arrOfArrOfMoves ); - XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 ); - if ( len1 > 0 ) { - sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( qa->map, relayID ); - for ( int ii = 0; ii < len1; ++ii ) { - json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii ); - int len2 = json_object_array_length( forGameArray ); - for ( int jj = 0; jj < len2; ++jj ) { - json_object* oneMove = json_object_array_get_idx( forGameArray, jj ); - const char* asStr = json_object_get_string( oneMove ); - gsize out_len; - guchar* buf = g_base64_decode( asStr, &out_len ); - (*qa->storage->procs.msgForRow)( qa->storage->procsClosure, &addr, + /* Currently there's an array of arrays for each relayID (value) */ + json_object_object_foreach(reply, relayID, arrOfArrOfMoves) { + int len1 = json_object_array_length( arrOfArrOfMoves ); + XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 ); + if ( len1 > 0 ) { + sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( task->u.query.map, relayID ); + for ( int ii = 0; ii < len1; ++ii ) { + json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii ); + int len2 = json_object_array_length( forGameArray ); + for ( int jj = 0; jj < len2; ++jj ) { + json_object* oneMove = json_object_array_get_idx( forGameArray, jj ); + const char* asStr = json_object_get_string( oneMove ); + gsize out_len; + guchar* buf = g_base64_decode( asStr, &out_len ); + (*storage->procs.msgForRow)( storage->procsClosure, &addr, rowid, buf, out_len ); - g_free(buf); - foundAny = XP_TRUE; + g_free(buf); + foundAny = XP_TRUE; + } } } } + json_object_put( reply ); } - json_object_put( reply ); } if ( foundAny ) { /* Reschedule. If we got anything this time, check again sooner! */ - reset_schedule_check_interval( qa->storage ); + reset_schedule_check_interval( storage ); } - schedule_next_check( qa->storage ); + schedule_next_check( storage ); - g_hash_table_destroy( qa->map ); - g_free( qa ); + g_hash_table_destroy( task->u.query.map ); + freeRelayTask(task); return FALSE; } -static void* -queryThread( void* arg ) +static void +handleQuery( RelayTask* task ) { - QueryArgs* qa = (QueryArgs*)arg; - XP_ASSERT( !onMainThread(qa->storage) ); - GList* ids = g_hash_table_get_keys( qa->map ); + XP_ASSERT( !onMainThread(task->storage) ); - json_object* jIds = json_object_new_array(); - for ( GList* iter = ids; !!iter; iter = iter->next ) { - json_object* idstr = json_object_new_string( iter->data ); - json_object_array_add(jIds, idstr); - } - g_list_free( ids ); + if ( g_hash_table_size( task->u.query.map ) > 0 ) { + GList* ids = g_hash_table_get_keys( task->u.query.map ); - CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); - XP_ASSERT(res == CURLE_OK); - CURL* curl = curl_easy_init(); + json_object* jIds = json_object_new_array(); + for ( GList* iter = ids; !!iter; iter = iter->next ) { + json_object* idstr = json_object_new_string( iter->data ); + json_object_array_add(jIds, idstr); + } + g_list_free( ids ); - char url[128]; - snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/query", - RELAY_API_PROTO, qa->storage->host ); - curl_easy_setopt(curl, CURLOPT_URL, url ); - curl_easy_setopt(curl, CURLOPT_POST, 1L); + CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); + XP_ASSERT(res == CURLE_OK); + CURL* curl = curl_easy_init(); - addJsonParams( curl, "ids", jIds ); + char url[128]; + snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/query", + RELAY_API_PROTO, task->storage->host ); + curl_easy_setopt(curl, CURLOPT_URL, url ); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + + addJsonParams( curl, "ids", jIds ); - curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback ); - curl_easy_setopt( curl, CURLOPT_WRITEDATA, &qa->ws ); - curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L ); + curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback ); + curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws ); + curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L ); - res = curl_easy_perform( curl ); + res = curl_easy_perform( curl ); + + /* Check for errors */ + if (res != CURLE_OK) { + XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res)); + } + /* always cleanup */ + curl_easy_cleanup(curl); + curl_global_cleanup(); + + XP_LOGF( "%s(): got <<%s>>", __func__, task->ws.ptr ); - XP_LOGF( "%s(): curl_easy_perform() => %d", __func__, res ); - /* Check for errors */ - if (res != CURLE_OK) { - XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res)); } - /* always cleanup */ - curl_easy_cleanup(curl); - curl_global_cleanup(); - - XP_LOGF( "%s(): got <<%s>>", __func__, qa->ws.ptr ); - /* Put processing back on the main thread */ - g_idle_add( onGotQueryData, qa ); - - return NULL; -} /* queryThread */ + g_idle_add( onGotQueryData, task ); +} /* handleQuery */ static gboolean checkForMoves( gpointer user_data ) @@ -695,23 +778,10 @@ checkForMoves( gpointer user_data ) RelayConStorage* storage = (RelayConStorage*)user_data; XP_ASSERT( onMainThread(storage) ); - QueryArgs* qa = (QueryArgs*)g_malloc0(sizeof(*qa)); - qa->storage = storage; - + RelayTask* task = makeRelayTask( storage, QUERY ); sqlite3* dbp = storage->params->pDb; - // qa->map = getRowsToRelayIDsMap( dbp ); - qa->map = getRelayIDsToRowsMap( dbp ); - // qa->ids = g_hash_table_get_values( qa->map ); - /* for ( GList* iter = values; !!iter; iter = iter->next ) { */ - /* gpointer data = iter->data; */ - /* XP_LOGF( "checkForMoves: got id: %s", (char*)data ); */ - /* qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); */ - /* } */ - /* g_list_free( values ); */ - - pthread_t thread; - (void)pthread_create( &thread, NULL, queryThread, (void*)qa ); - pthread_detach( thread ); + task->u.query.map = getRelayIDsToRowsMap( dbp ); + addTask( storage, task ); schedule_next_check( storage ); return FALSE; @@ -754,8 +824,8 @@ sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ) nSent = post( storage, msgbuf, len ); } else { nSent = sendto( storage->socket, msgbuf, len, 0, /* flags */ - (struct sockaddr*)&storage->saddr, - sizeof(storage->saddr) ); + (struct sockaddr*)&storage->saddr, + sizeof(storage->saddr) ); #ifdef COMMS_CHECKSUM gchar* sum = g_compute_checksum_for_data( G_CHECKSUM_MD5, msgbuf, len ); XP_LOGF( "%s: sent %d bytes with sum %s", __func__, len, sum );