use a single timer and a queue for received data

using g_add_idle() for each piece of data received on the (background)
curl-query thread wasn't working. They were getting starved, and I think
some were considered duplicates and never scheduled. So add a single
timer proc called every 50 ms and a queue that it checks and into which
the network thread can put stuff.
This commit is contained in:
Eric House 2017-10-28 16:13:11 -07:00
parent 7d4fb1cc5d
commit 3e9381d946
2 changed files with 80 additions and 6 deletions

View file

@ -50,7 +50,7 @@ linux_debugf( const char* format, ... )
gettimeofday( &tv, &tz ); gettimeofday( &tv, &tz );
timp = localtime( &tv.tv_sec ); timp = localtime( &tv.tv_sec );
size_t len = snprintf( buf, sizeof(buf), "<%d:%lu>%.2d:%.2d:%.2d:", getpid(), size_t len = snprintf( buf, sizeof(buf), "<%d:%lx>%.2d:%.2d:%.2d:", getpid(),
pthread_self(), timp->tm_hour, timp->tm_min, timp->tm_sec ); pthread_self(), timp->tm_hour, timp->tm_min, timp->tm_sec );
XP_ASSERT( len < sizeof(buf) ); XP_ASSERT( len < sizeof(buf) );

View file

@ -40,6 +40,9 @@ typedef struct _RelayConStorage {
pthread_mutex_t relayMutex; pthread_mutex_t relayMutex;
GSList* relayTaskList; GSList* relayTaskList;
pthread_mutex_t gotDataMutex;
GSList* gotDataTaskList;
int socket; int socket;
RelayConnProcs procs; RelayConnProcs procs;
void* procsClosure; void* procsClosure;
@ -48,6 +51,7 @@ typedef struct _RelayConStorage {
XWPDevProto proto; XWPDevProto proto;
LaunchParams* params; LaunchParams* params;
XP_UCHAR host[64]; XP_UCHAR host[64];
int nextTaskID;
} RelayConStorage; } RelayConStorage;
typedef struct _MsgHeader { typedef struct _MsgHeader {
@ -63,6 +67,7 @@ static gboolean relaycon_receive( GIOChannel *source, GIOCondition condition,
static void schedule_next_check( RelayConStorage* storage ); static void schedule_next_check( RelayConStorage* storage );
static void reset_schedule_check_interval( RelayConStorage* storage ); static void reset_schedule_check_interval( RelayConStorage* storage );
static void checkForMovesOnce( RelayConStorage* storage ); static void checkForMovesOnce( RelayConStorage* storage );
static gboolean gotDataTimer(gpointer user_data);
static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ); static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len );
static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str ); static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str );
@ -91,6 +96,7 @@ typedef enum { POST, QUERY, } TaskType;
typedef struct _RelayTask { typedef struct _RelayTask {
TaskType typ; TaskType typ;
int id;
RelayConStorage* storage; RelayConStorage* storage;
WriteState ws; WriteState ws;
union { union {
@ -108,7 +114,8 @@ static RelayTask* makeRelayTask(RelayConStorage* storage, TaskType typ);
static void freeRelayTask(RelayTask* task); static void freeRelayTask(RelayTask* task);
static void handlePost( RelayTask* task ); static void handlePost( RelayTask* task );
static void handleQuery( RelayTask* task ); static void handleQuery( RelayTask* task );
static void addToGotData( RelayTask* task );
static RelayTask* getFromGotData( RelayConStorage* storage );
static size_t static size_t
write_callback(void *contents, size_t size, size_t nmemb, void* data) write_callback(void *contents, size_t size, size_t nmemb, void* data)
@ -203,12 +210,15 @@ relaycon_init( LaunchParams* params, const RelayConnProcs* procs,
if ( params->useHTTP ) { if ( params->useHTTP ) {
storage->mainThread = pthread_self(); storage->mainThread = pthread_self();
pthread_mutex_init ( &storage->relayMutex, NULL ); pthread_mutex_init( &storage->relayMutex, NULL );
pthread_cond_init( &storage->relayCondVar, NULL ); pthread_cond_init( &storage->relayCondVar, NULL );
pthread_t thread; pthread_t thread;
(void)pthread_create( &thread, NULL, relayThread, storage ); (void)pthread_create( &thread, NULL, relayThread, storage );
pthread_detach( thread ); pthread_detach( thread );
pthread_mutex_init( &storage->gotDataMutex, NULL );
g_timeout_add( 50, gotDataTimer, storage );
XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) ); XP_ASSERT( XP_STRLEN(host) < VSIZE(storage->host) );
XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 ); XP_MEMCPY( storage->host, host, XP_STRLEN(host) + 1 );
} else { } else {
@ -421,15 +431,19 @@ addTask( RelayConStorage* storage, RelayTask* task )
static RelayTask* static RelayTask*
makeRelayTask( RelayConStorage* storage, TaskType typ ) makeRelayTask( RelayConStorage* storage, TaskType typ )
{ {
XP_ASSERT( onMainThread(storage) );
RelayTask* task = (RelayTask*)g_malloc0(sizeof(*task)); RelayTask* task = (RelayTask*)g_malloc0(sizeof(*task));
task->typ = typ; task->typ = typ;
task->id = ++storage->nextTaskID;
task->storage = storage; task->storage = storage;
XP_LOGF( "%s(): made with id %d from storage %p", __func__, task->id, storage );
return task; return task;
} }
static void static void
freeRelayTask( RelayTask* task ) freeRelayTask( RelayTask* task )
{ {
XP_LOGF( "%s(): deleting id %d", __func__, task->id );
g_free( task->ws.ptr ); g_free( task->ws.ptr );
g_free( task ); g_free( task );
} }
@ -657,7 +671,7 @@ onGotPostData(gpointer user_data)
static void static void
handlePost( RelayTask* task ) handlePost( RelayTask* task )
{ {
XP_LOGF( "%s(len=%d)", __func__, task->u.post.len ); XP_LOGF( "%s(task.post.len=%d)", __func__, task->u.post.len );
XP_ASSERT( !onMainThread(task->storage) ); XP_ASSERT( !onMainThread(task->storage) );
char* data = g_base64_encode( task->u.post.msgbuf, task->u.post.len ); char* data = g_base64_encode( task->u.post.msgbuf, task->u.post.len );
struct json_object* jobj = json_object_new_object(); struct json_object* jobj = json_object_new_object();
@ -668,7 +682,7 @@ handlePost( RelayTask* task )
runWitCurl( task, "post", "params", jobj ); runWitCurl( task, "post", "params", jobj );
// Put the data on the main thread for processing // Put the data on the main thread for processing
(void)g_idle_add( onGotPostData, task ); addToGotData( task );
} /* handlePost */ } /* handlePost */
static ssize_t static ssize_t
@ -750,7 +764,7 @@ handleQuery( RelayTask* task )
runWitCurl( task, "query", "ids", jIds ); runWitCurl( task, "query", "ids", jIds );
} }
/* Put processing back on the main thread */ /* Put processing back on the main thread */
g_idle_add( onGotQueryData, task ); addToGotData( task );
} /* handleQuery */ } /* handleQuery */
static void static void
@ -774,6 +788,66 @@ checkForMoves( gpointer user_data )
return FALSE; return FALSE;
} }
static gboolean
gotDataTimer(gpointer user_data)
{
RelayConStorage* storage = (RelayConStorage*)user_data;
assert( onMainThread(storage) );
for ( ; ; ) {
RelayTask* task = getFromGotData( storage );
if ( !task ) {
break;
} else {
switch ( task->typ ) {
case POST:
onGotPostData( task );
break;
case QUERY:
onGotQueryData( task );
break;
default:
XP_ASSERT(0);
}
}
}
return TRUE;
}
static void
addToGotData( RelayTask* task )
{
RelayConStorage* storage = task->storage;
pthread_mutex_lock( &storage->gotDataMutex );
storage->gotDataTaskList = g_slist_append( storage->gotDataTaskList, task );
XP_LOGF( "%s(): added id %d; len now %d", __func__, task->id,
g_slist_length(storage->gotDataTaskList) );
pthread_mutex_unlock( &storage->gotDataMutex );
}
static RelayTask*
getFromGotData( RelayConStorage* storage )
{
RelayTask* task = NULL;
XP_ASSERT( onMainThread(storage) );
pthread_mutex_lock( &storage->gotDataMutex );
int len = g_slist_length( storage->gotDataTaskList );
// XP_LOGF( "%s(): before: len: %d", __func__, len );
if ( len > 0 ) {
GSList* head = storage->gotDataTaskList;
storage->gotDataTaskList
= g_slist_remove_link( storage->gotDataTaskList,
storage->gotDataTaskList );
task = head->data;
g_slist_free( head );
XP_LOGF( "%s(): got id %d!", __func__, task->id );
}
// XP_LOGF( "%s(): len now %d", __func__, g_slist_length(storage->gotDataTaskList) );
pthread_mutex_unlock( &storage->gotDataMutex );
return task;
}
static void static void
reset_schedule_check_interval( RelayConStorage* storage ) reset_schedule_check_interval( RelayConStorage* storage )
{ {