use a single thread and a protected queue

I don't want race conditions between threads talking to the server.
This commit is contained in:
Eric House 2017-10-23 21:07:05 -07:00
parent 6787fe1406
commit c3887b9c77

View file

@ -36,6 +36,9 @@ typedef struct _RelayConStorage {
pthread_t mainThread;
guint moveCheckerID;
XP_U16 nextMoveCheckSecs;
pthread_cond_t relayCondVar;
pthread_mutex_t relayMutex;
GSList* relayTaskList;
int socket;
RelayConnProcs procs;
@ -75,12 +78,36 @@ static size_t writeVLI( XP_U8* out, uint32_t nn );
static size_t un2vli( int nn, uint8_t* buf );
static bool vli2un( const uint8_t** inp, uint32_t* outp );
static void* relayThread( void* arg );
typedef struct _WriteState {
gchar* ptr;
size_t curSize;
} WriteState;
typedef enum { POST, QUERY, } TaskType;
typedef struct _RelayTask {
TaskType typ;
RelayConStorage* storage;
WriteState ws;
union {
struct {
XP_U8* msgbuf;
XP_U16 len;
} post;
struct {
GHashTable* map;
} query;
} u;
} RelayTask;
static RelayTask* makeRelayTask(RelayConStorage* storage, TaskType typ);
static void freeRelayTask(RelayTask* task);
static void handlePost( RelayTask* task );
static void handleQuery( RelayTask* task );
static size_t
write_callback(void *contents, size_t size, size_t nmemb, void* data)
{
@ -138,19 +165,26 @@ relaycon_init( LaunchParams* params, const RelayConnProcs* procs,
XP_MEMCPY( &storage->procs, procs, sizeof(storage->procs) );
storage->procsClosure = procsClosure;
storage->mainThread = pthread_self();
if ( params->useHTTP ) {
storage->mainThread = pthread_self();
pthread_mutex_init ( &storage->relayMutex, NULL );
pthread_cond_init( &storage->relayCondVar, NULL );
pthread_t thread;
(void)pthread_create( &thread, NULL, relayThread, storage );
pthread_detach( thread );
storage->socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
(*procs->socketAdded)( storage, storage->socket, relaycon_receive );
XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) );
XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 );
} else {
storage->socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
(*procs->socketAdded)( storage, storage->socket, relaycon_receive );
XP_MEMSET( &storage->saddr, 0, sizeof(storage->saddr) );
storage->saddr.sin_family = PF_INET;
storage->saddr.sin_addr.s_addr = htonl( hostNameToIP(host) );
storage->saddr.sin_port = htons(port);
XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) );
XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 );
XP_MEMSET( &storage->saddr, 0, sizeof(storage->saddr) );
storage->saddr.sin_family = PF_INET;
storage->saddr.sin_addr.s_addr = htonl( hostNameToIP(host) );
storage->saddr.sin_port = htons(port);
}
storage->params = params;
storage->proto = XWPDEV_PROTO_VERSION_1;
@ -311,6 +345,59 @@ onMainThread( RelayConStorage* storage )
return storage->mainThread = pthread_self();
}
static void*
relayThread( void* arg )
{
RelayConStorage* storage = (RelayConStorage*)arg;
for ( ; ; ) {
pthread_mutex_lock( &storage->relayMutex );
while ( !storage->relayTaskList ) {
pthread_cond_wait( &storage->relayCondVar, &storage->relayMutex );
}
RelayTask* task = storage->relayTaskList->data;
storage->relayTaskList = storage->relayTaskList->next;
pthread_mutex_unlock( &storage->relayMutex );
switch ( task->typ ) {
case POST:
handlePost( task );
break;
case QUERY:
handleQuery( task );
break;
default:
XP_ASSERT(0);
}
}
return NULL;
}
static void
addTask( RelayConStorage* storage, RelayTask* task )
{
pthread_mutex_lock( &storage->relayMutex );
storage->relayTaskList = g_slist_append( storage->relayTaskList, task );
pthread_cond_signal( &storage->relayCondVar );
pthread_mutex_unlock( &storage->relayMutex );
}
static RelayTask*
makeRelayTask( RelayConStorage* storage, TaskType typ )
{
RelayTask* task = (RelayTask*)g_malloc0(sizeof(*task));
task->typ = typ;
task->storage = storage;
return task;
}
static void
freeRelayTask( RelayTask* task )
{
g_free( task->ws.ptr );
g_free( task );
}
static void
sendAckIf( RelayConStorage* storage, const MsgHeader* header )
{
@ -331,6 +418,7 @@ process( RelayConStorage* storage, XP_U8* buf, ssize_t nRead )
MsgHeader header;
if ( readHeader( &ptr, &header ) ) {
sendAckIf( storage, &header );
switch( header.cmd ) {
case XWPDEV_REGRSP: {
uint32_t len;
@ -433,6 +521,7 @@ relaycon_receive( GIOChannel* source, GIOCondition XP_UNUSED_DBG(condition), gpo
{
XP_ASSERT( 0 != (G_IO_IN & condition) ); /* FIX ME */
RelayConStorage* storage = (RelayConStorage*)data;
XP_ASSERT( !storage->params->useHTTP );
XP_U8 buf[512];
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
@ -494,44 +583,47 @@ hostNameToIP( const XP_UCHAR* name )
typedef struct _PostArgs {
RelayConStorage* storage;
WriteState ws;
const XP_U8* msgbuf;
XP_U8* msgbuf;
XP_U16 len;
} PostArgs;
static gboolean
onGotPostData(gpointer user_data)
{
PostArgs* pa = (PostArgs*)user_data;
RelayTask* task = (RelayTask*)user_data;
/* Now pull any data from the reply */
// got "{"status": "ok", "dataLen": 14, "data": "AYQDiDAyMUEzQ0MyADw=", "err": "none"}"
json_object* reply = json_tokener_parse( pa->ws.ptr );
json_object* replyData;
if ( json_object_object_get_ex( reply, "data", &replyData ) && !!replyData ) {
int len = json_object_array_length(replyData);
for ( int ii = 0; ii < len; ++ii ) {
json_object* datum = json_object_array_get_idx( replyData, ii );
const char* str = json_object_get_string( datum );
gsize out_len;
guchar* buf = g_base64_decode( (const gchar*)str, &out_len );
process( pa->storage, buf, out_len );
g_free( buf );
if ( !!task->ws.ptr ) {
json_object* reply = json_tokener_parse( task->ws.ptr );
json_object* replyData;
if ( json_object_object_get_ex( reply, "data", &replyData ) && !!replyData ) {
int len = json_object_array_length(replyData);
for ( int ii = 0; ii < len; ++ii ) {
json_object* datum = json_object_array_get_idx( replyData, ii );
const char* str = json_object_get_string( datum );
gsize out_len;
guchar* buf = g_base64_decode( (const gchar*)str, &out_len );
process( task->storage, buf, out_len );
g_free( buf );
}
(void)json_object_put( replyData );
}
(void)json_object_put( replyData );
(void)json_object_put( reply );
}
(void)json_object_put( reply );
g_free( pa->ws.ptr );
g_free( pa );
g_free( task->u.post.msgbuf );
freeRelayTask( task );
return FALSE;
}
static void*
postThread( void* arg )
static void
handlePost( RelayTask* task )
{
PostArgs* pa = (PostArgs*)arg;
XP_ASSERT( !onMainThread(pa->storage) );
char* data = g_base64_encode( pa->msgbuf, pa->len );
XP_LOGF( "%s(len=%d)", __func__, task->u.post.len );
XP_ASSERT( !onMainThread(task->storage) );
char* data = g_base64_encode( task->u.post.msgbuf, task->u.post.len );
struct json_object* jobj = json_object_new_object();
struct json_object* jstr = json_object_new_string(data);
g_free( data );
@ -543,14 +635,14 @@ postThread( void* arg )
char url[128];
snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/post",
RELAY_API_PROTO, pa->storage->host );
RELAY_API_PROTO, task->storage->host );
curl_easy_setopt( curl, CURLOPT_URL, url );
curl_easy_setopt( curl, CURLOPT_POST, 1L );
addJsonParams( curl, "params", jobj );
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &pa->ws );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws );
curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L );
res = curl_easy_perform(curl);
@ -563,130 +655,121 @@ postThread( void* arg )
curl_easy_cleanup(curl);
curl_global_cleanup();
XP_LOGF( "%s(): got \"%s\"", __func__, pa->ws.ptr );
XP_LOGF( "%s(): got \"%s\"", __func__, task->ws.ptr );
// Put the data on the main thread for processing
(void)g_idle_add( onGotPostData, pa );
return NULL;
} /* postThread */
(void)g_idle_add( onGotPostData, task );
} /* handlePost */
static ssize_t
post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len )
{
PostArgs* pa = (PostArgs*)g_malloc0(sizeof(*pa));
pa->storage = storage;
pa->msgbuf = msgbuf;
pa->len = len;
pthread_t thread;
(void)pthread_create( &thread, NULL, postThread, (void*)pa );
pthread_detach( thread );
XP_LOGF( "%s(len=%d)", __func__, len );
RelayTask* task = makeRelayTask( storage, POST );
task->u.post.msgbuf = g_malloc(len);
XP_MEMCPY( task->u.post.msgbuf, msgbuf, len );
task->u.post.len = len;
addTask( storage, task );
return len;
}
typedef struct _QueryArgs {
RelayConStorage* storage;
/* GSList* ids; */
WriteState ws;
GHashTable* map;
} QueryArgs;
static gboolean
onGotQueryData( gpointer user_data )
{
QueryArgs* qa = (QueryArgs*)user_data;
RelayTask* task = (RelayTask*)user_data;
RelayConStorage* storage = task->storage;
XP_Bool foundAny = false;
json_object* reply = json_tokener_parse( qa->ws.ptr );
if ( !!reply ) {
CommsAddrRec addr = {0};
addr_addType( &addr, COMMS_CONN_RELAY );
if ( !!task->ws.ptr ) {
json_object* reply = json_tokener_parse( task->ws.ptr );
if ( !!reply ) {
CommsAddrRec addr = {0};
addr_addType( &addr, COMMS_CONN_RELAY );
/* Currently there's an array of arrays for each relayID (value) */
json_object_object_foreach(reply, relayID, arrOfArrOfMoves) {
int len1 = json_object_array_length( arrOfArrOfMoves );
XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 );
if ( len1 > 0 ) {
sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( qa->map, relayID );
for ( int ii = 0; ii < len1; ++ii ) {
json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii );
int len2 = json_object_array_length( forGameArray );
for ( int jj = 0; jj < len2; ++jj ) {
json_object* oneMove = json_object_array_get_idx( forGameArray, jj );
const char* asStr = json_object_get_string( oneMove );
gsize out_len;
guchar* buf = g_base64_decode( asStr, &out_len );
(*qa->storage->procs.msgForRow)( qa->storage->procsClosure, &addr,
/* Currently there's an array of arrays for each relayID (value) */
json_object_object_foreach(reply, relayID, arrOfArrOfMoves) {
int len1 = json_object_array_length( arrOfArrOfMoves );
XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 );
if ( len1 > 0 ) {
sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( task->u.query.map, relayID );
for ( int ii = 0; ii < len1; ++ii ) {
json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii );
int len2 = json_object_array_length( forGameArray );
for ( int jj = 0; jj < len2; ++jj ) {
json_object* oneMove = json_object_array_get_idx( forGameArray, jj );
const char* asStr = json_object_get_string( oneMove );
gsize out_len;
guchar* buf = g_base64_decode( asStr, &out_len );
(*storage->procs.msgForRow)( storage->procsClosure, &addr,
rowid, buf, out_len );
g_free(buf);
foundAny = XP_TRUE;
g_free(buf);
foundAny = XP_TRUE;
}
}
}
}
json_object_put( reply );
}
json_object_put( reply );
}
if ( foundAny ) {
/* Reschedule. If we got anything this time, check again sooner! */
reset_schedule_check_interval( qa->storage );
reset_schedule_check_interval( storage );
}
schedule_next_check( qa->storage );
schedule_next_check( storage );
g_hash_table_destroy( qa->map );
g_free( qa );
g_hash_table_destroy( task->u.query.map );
freeRelayTask(task);
return FALSE;
}
static void*
queryThread( void* arg )
static void
handleQuery( RelayTask* task )
{
QueryArgs* qa = (QueryArgs*)arg;
XP_ASSERT( !onMainThread(qa->storage) );
GList* ids = g_hash_table_get_keys( qa->map );
XP_ASSERT( !onMainThread(task->storage) );
if ( g_hash_table_size( task->u.query.map ) > 0 ) {
GList* ids = g_hash_table_get_keys( task->u.query.map );
json_object* jIds = json_object_new_array();
for ( GList* iter = ids; !!iter; iter = iter->next ) {
json_object* idstr = json_object_new_string( iter->data );
json_object_array_add(jIds, idstr);
}
g_list_free( ids );
CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
XP_ASSERT(res == CURLE_OK);
CURL* curl = curl_easy_init();
char url[128];
snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/query",
RELAY_API_PROTO, task->storage->host );
curl_easy_setopt(curl, CURLOPT_URL, url );
curl_easy_setopt(curl, CURLOPT_POST, 1L);
addJsonParams( curl, "ids", jIds );
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &task->ws );
curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L );
res = curl_easy_perform( curl );
/* Check for errors */
if (res != CURLE_OK) {
XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
curl_global_cleanup();
XP_LOGF( "%s(): got <<%s>>", __func__, task->ws.ptr );
json_object* jIds = json_object_new_array();
for ( GList* iter = ids; !!iter; iter = iter->next ) {
json_object* idstr = json_object_new_string( iter->data );
json_object_array_add(jIds, idstr);
}
g_list_free( ids );
CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
XP_ASSERT(res == CURLE_OK);
CURL* curl = curl_easy_init();
char url[128];
snprintf( url, sizeof(url), "%s://%s/xw4/relay.py/query",
RELAY_API_PROTO, qa->storage->host );
curl_easy_setopt(curl, CURLOPT_URL, url );
curl_easy_setopt(curl, CURLOPT_POST, 1L);
addJsonParams( curl, "ids", jIds );
curl_easy_setopt( curl, CURLOPT_WRITEFUNCTION, write_callback );
curl_easy_setopt( curl, CURLOPT_WRITEDATA, &qa->ws );
curl_easy_setopt( curl, CURLOPT_VERBOSE, 1L );
res = curl_easy_perform( curl );
XP_LOGF( "%s(): curl_easy_perform() => %d", __func__, res );
/* Check for errors */
if (res != CURLE_OK) {
XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
curl_global_cleanup();
XP_LOGF( "%s(): got <<%s>>", __func__, qa->ws.ptr );
/* Put processing back on the main thread */
g_idle_add( onGotQueryData, qa );
return NULL;
} /* queryThread */
g_idle_add( onGotQueryData, task );
} /* handleQuery */
static gboolean
checkForMoves( gpointer user_data )
@ -695,23 +778,10 @@ checkForMoves( gpointer user_data )
RelayConStorage* storage = (RelayConStorage*)user_data;
XP_ASSERT( onMainThread(storage) );
QueryArgs* qa = (QueryArgs*)g_malloc0(sizeof(*qa));
qa->storage = storage;
RelayTask* task = makeRelayTask( storage, QUERY );
sqlite3* dbp = storage->params->pDb;
// qa->map = getRowsToRelayIDsMap( dbp );
qa->map = getRelayIDsToRowsMap( dbp );
// qa->ids = g_hash_table_get_values( qa->map );
/* for ( GList* iter = values; !!iter; iter = iter->next ) { */
/* gpointer data = iter->data; */
/* XP_LOGF( "checkForMoves: got id: %s", (char*)data ); */
/* qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); */
/* } */
/* g_list_free( values ); */
pthread_t thread;
(void)pthread_create( &thread, NULL, queryThread, (void*)qa );
pthread_detach( thread );
task->u.query.map = getRelayIDsToRowsMap( dbp );
addTask( storage, task );
schedule_next_check( storage );
return FALSE;
@ -754,8 +824,8 @@ sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len )
nSent = post( storage, msgbuf, len );
} else {
nSent = sendto( storage->socket, msgbuf, len, 0, /* flags */
(struct sockaddr*)&storage->saddr,
sizeof(storage->saddr) );
(struct sockaddr*)&storage->saddr,
sizeof(storage->saddr) );
#ifdef COMMS_CHECKSUM
gchar* sum = g_compute_checksum_for_data( G_CHECKSUM_MD5, msgbuf, len );
XP_LOGF( "%s: sent %d bytes with sum %s", __func__, len, sum );