From f49c81462c12afe10030736900dc087f42dcc332 Mon Sep 17 00:00:00 2001 From: Eric House Date: Sun, 22 Oct 2017 09:29:15 -0700 Subject: [PATCH] wip: received messages dispatched to games --- xwords4/linux/cursesmain.c | 28 ------ xwords4/linux/gamesdb.c | 15 +-- xwords4/linux/gamesdb.h | 3 +- xwords4/linux/gtkboard.c | 7 +- xwords4/linux/gtkmain.c | 12 +++ xwords4/linux/relaycon.c | 191 +++++++++++++++++-------------------- xwords4/linux/relaycon.h | 4 +- 7 files changed, 114 insertions(+), 146 deletions(-) diff --git a/xwords4/linux/cursesmain.c b/xwords4/linux/cursesmain.c index a308e7847..8c7922a99 100644 --- a/xwords4/linux/cursesmain.c +++ b/xwords4/linux/cursesmain.c @@ -1773,28 +1773,6 @@ cursesErrorMsgRcvd( void* closure, const XP_UCHAR* msg ) } } -static gboolean -queryTimerFired( gpointer data ) -{ - LOG_FUNC(); - CursesAppGlobals* globals = (CursesAppGlobals*)data; - - XWGame* game = &globals->cGlobals.game; - if ( !!game->comms ) { - XP_ASSERT( globals->nextQueryTimeSecs > 0 ); - if ( checkForMsgs( globals->cGlobals.params, game ) ) { - globals->nextQueryTimeSecs = 1; - } else { - globals->nextQueryTimeSecs *= 2; - } - - (void)g_timeout_add_seconds( globals->nextQueryTimeSecs, - queryTimerFired, &g_globals ); - } - - return FALSE; -} - static gboolean chatsTimerFired( gpointer data ) { @@ -1973,12 +1951,6 @@ cursesmain( XP_Bool isServer, LaunchParams* params ) &g_globals ); } - if ( params->useHTTP ) { - g_globals.nextQueryTimeSecs = 1; - (void)g_timeout_add_seconds( g_globals.nextQueryTimeSecs, - queryTimerFired, &g_globals ); - } - XP_Bool opened = XP_FALSE; initCurses( &g_globals, &width, &height ); diff --git a/xwords4/linux/gamesdb.c b/xwords4/linux/gamesdb.c index 33f27c2db..7d8f1e6d9 100644 --- a/xwords4/linux/gamesdb.c +++ b/xwords4/linux/gamesdb.c @@ -312,11 +312,11 @@ listGames( sqlite3* pDb ) } GHashTable* -getRowsToRelayIDsMap( sqlite3* pDb ) +getRelayIDsToRowsMap( sqlite3* pDb ) { - GHashTable* table = g_hash_table_new( g_int64_hash, g_int64_equal ); + GHashTable* table = g_hash_table_new( g_str_hash, g_str_equal ); sqlite3_stmt *ppStmt; - int result = sqlite3_prepare_v2( pDb, "SELECT rowid, relayid FROM games", + int result = sqlite3_prepare_v2( pDb, "SELECT relayid, rowid FROM games", -1, &ppStmt, NULL ); assertPrintResult( pDb, result, SQLITE_OK ); XP_USE( result ); @@ -324,12 +324,13 @@ getRowsToRelayIDsMap( sqlite3* pDb ) switch( sqlite3_step( ppStmt ) ) { case SQLITE_ROW: /* have data */ { - sqlite3_int64* key = g_malloc( sizeof( *key ) ); - *key = sqlite3_column_int64( ppStmt, 0 ); XP_UCHAR relayID[32]; - getColumnText( ppStmt, 1, relayID, VSIZE(relayID) ); - gpointer value = g_strdup( relayID ); + getColumnText( ppStmt, 0, relayID, VSIZE(relayID) ); + gpointer key = g_strdup( relayID ); + sqlite3_int64* value = g_malloc( sizeof( *key ) ); + *value = sqlite3_column_int64( ppStmt, 1 ); g_hash_table_insert( table, key, value ); + /* XP_LOGF( "%s(): added map %s => %lld", __func__, (char*)key, *value ); */ } break; case SQLITE_DONE: diff --git a/xwords4/linux/gamesdb.h b/xwords4/linux/gamesdb.h index 214f324f9..086301570 100644 --- a/xwords4/linux/gamesdb.h +++ b/xwords4/linux/gamesdb.h @@ -56,7 +56,8 @@ void summarize( CommonGlobals* cGlobals ); /* Return GSList whose data is (ptrs to) rowids */ GSList* listGames( sqlite3* dbp ); -GHashTable *getRowsToRelayIDsMap(sqlite3* dbp); +/* Mapping of relayID -> rowid */ +GHashTable* getRelayIDsToRowsMap( sqlite3* pDb ); XP_Bool getGameInfo( sqlite3* dbp, sqlite3_int64 rowid, GameInfo* gib ); void getRowsForGameID( sqlite3* dbp, XP_U32 gameID, sqlite3_int64* rowids, diff --git a/xwords4/linux/gtkboard.c b/xwords4/linux/gtkboard.c index 505724679..eb6c9b573 100644 --- a/xwords4/linux/gtkboard.c +++ b/xwords4/linux/gtkboard.c @@ -341,6 +341,8 @@ relay_connd_gtk( void* closure, XP_UCHAR* const room, char buf[256]; if ( allHere ) { + /* disable for now. Seeing this too often */ + skip = XP_TRUE; snprintf( buf, sizeof(buf), "All expected players have joined in %s. Play!", room ); } else { @@ -1179,10 +1181,7 @@ handle_memstats( GtkWidget* XP_UNUSED(widget), GtkGameGlobals* globals ) static void handle_movescheck( GtkWidget* XP_UNUSED(widget), GtkGameGlobals* globals ) { - LaunchParams* params = globals->cGlobals.params; - if ( checkForMsgs( params, &globals->cGlobals.game ) ) { - board_draw( globals->cGlobals.game.board ); - } + checkForMsgsNow( globals->cGlobals.params ); } #endif diff --git a/xwords4/linux/gtkmain.c b/xwords4/linux/gtkmain.c index 913a26007..ad3119513 100644 --- a/xwords4/linux/gtkmain.c +++ b/xwords4/linux/gtkmain.c @@ -696,6 +696,17 @@ gtkGotBuf( void* closure, const CommsAddrRec* from, XP_USE( seed ); } +static void +gtkGotMsgForRow( void* closure, const CommsAddrRec* from, + sqlite3_int64 rowid, const XP_U8* buf, XP_U16 len ) +{ + XP_LOGF( "%s(): got msg of len %d for row %lld", __func__, len, rowid ); + GtkAppGlobals* apg = (GtkAppGlobals*)closure; + // LaunchParams* params = apg->params; + (void)feedBufferGTK( apg, rowid, buf, len, from ); + LOG_RETURN_VOID(); +} + static gint requestMsgs( gpointer data ) { @@ -850,6 +861,7 @@ gtkmain( LaunchParams* params ) if ( params->useUdp ) { RelayConnProcs procs = { .msgReceived = gtkGotBuf, + .msgForRow = gtkGotMsgForRow, .msgNoticeReceived = gtkNoticeRcvd, .devIDReceived = gtkDevIDReceived, .msgErrorMsg = gtkErrorMsgRcvd, diff --git a/xwords4/linux/relaycon.c b/xwords4/linux/relaycon.c index 3c514cd59..caf1777e8 100644 --- a/xwords4/linux/relaycon.c +++ b/xwords4/linux/relaycon.c @@ -51,10 +51,12 @@ typedef struct _MsgHeader { } MsgHeader; static RelayConStorage* getStorage( LaunchParams* params ); +static XP_Bool onMainThread( RelayConStorage* storage ); static XP_U32 hostNameToIP( const XP_UCHAR* name ); static gboolean relaycon_receive( GIOChannel *source, GIOCondition condition, gpointer data ); -static void scheule_next_check( RelayConStorage* storage ); +static void schedule_next_check( RelayConStorage* storage ); +static void reset_schedule_check_interval( RelayConStorage* storage ); static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ); static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str ); static void getNetString( const XP_U8** ptr, XP_U16 len, XP_UCHAR* buf ); @@ -96,7 +98,8 @@ static void addJsonParams( CURL* curl, const char* name, json_object* param ) { const char* asStr = json_object_to_json_string( param ); - + XP_LOGF( "%s: adding param: %s", __func__, asStr ); + char* curl_params = curl_easy_escape( curl, asStr, strlen(asStr) ); gchar* buf = g_strdup_printf( "%s=%s", name, curl_params ); XP_LOGF( "%s: added param: %s", __func__, buf ); @@ -110,93 +113,12 @@ addJsonParams( CURL* curl, const char* name, json_object* param ) json_object_put( param ); } -XP_Bool -checkForMsgs( LaunchParams* params, XWGame* game ) +void +checkForMsgsNow( LaunchParams* params ) { - XP_Bool foundAny = false; - XP_UCHAR idBuf[64]; - if ( !!game->comms ) { - XP_U16 len = VSIZE(idBuf); - if ( comms_getRelayID( game->comms, idBuf, &len ) ) { - XP_LOGF( "%s: got %s", __func__, idBuf ); - } else { - idBuf[0] = '\0'; - } - } - - if ( !!idBuf[0] ) { - ReadState rs = { - .ptr = g_malloc0(1), - .curSize = 1L - }; - - /* build a json array of relayIDs, then stringify it */ - json_object* ids = json_object_new_array(); - json_object* idstr = json_object_new_string(idBuf); - json_object_array_add(ids, idstr); - - CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); - XP_ASSERT(res == CURLE_OK); - CURL* curl = curl_easy_init(); - - curl_easy_setopt(curl, CURLOPT_URL, - "http://localhost/xw4/relay.py/query"); - curl_easy_setopt(curl, CURLOPT_POST, 1L); - - addJsonParams( curl, "ids", ids ); - - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback ); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rs ); - curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); - - res = curl_easy_perform(curl); - - XP_LOGF( "%s(): curl_easy_perform() => %d", __func__, res ); - /* Check for errors */ - if (res != CURLE_OK) { - XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res)); - } - /* always cleanup */ - curl_easy_cleanup(curl); - curl_global_cleanup(); - - XP_LOGF( "%s(): got <<%s>>", __func__, rs.ptr ); - - if (res == CURLE_OK) { - json_object* reply = json_tokener_parse( rs.ptr ); - if ( !!reply ) { - json_object_object_foreach(reply, key, val) { - int len = json_object_array_length(val); - XP_LOGF( "%s: got key: %s of len %d", __func__, key, len ); - for ( int ii = 0; ii < len; ++ii ) { - json_object* forGame = json_object_array_get_idx(val, ii); - int len2 = json_object_array_length(forGame); - foundAny = foundAny || len2 > 0; - for ( int jj = 0; jj < len2; ++jj ) { - json_object* oneMove = json_object_array_get_idx(forGame, jj); - const char* asStr = json_object_get_string(oneMove); - gsize out_len; - guchar* buf = g_base64_decode( asStr, &out_len ); - XWStreamCtxt* stream = mem_stream_make( MPPARM(params->mpool) - params->vtMgr, params, - CHANNEL_NONE, NULL ); - stream_putBytes( stream, buf, out_len ); - g_free(buf); - - CommsAddrRec addr = {0}; - addr_addType( &addr, COMMS_CONN_RELAY ); - XP_Bool handled = game_receiveMessage( game, stream, &addr ); - XP_LOGF( "%s(): game_receiveMessage() => %d", __func__, handled ); - stream_destroy( stream ); - - foundAny = XP_TRUE; - } - } - } - } - } - } - return foundAny; + RelayConStorage* storage = getStorage( params ); + XP_ASSERT( onMainThread(storage) ); + XP_ASSERT(0); /* FIX ME */ } void @@ -223,7 +145,7 @@ relaycon_init( LaunchParams* params, const RelayConnProcs* procs, storage->proto = XWPDEV_PROTO_VERSION_1; if ( params->useHTTP ) { - scheule_next_check( storage ); + schedule_next_check( storage ); } } @@ -566,7 +488,7 @@ typedef struct _PostArgs { } PostArgs; static gboolean -onGotData(gpointer user_data) +onGotPostData(gpointer user_data) { PostArgs* pa = (PostArgs*)user_data; /* Now pull any data from the reply */ @@ -633,7 +555,7 @@ postThread( void* arg ) XP_LOGF( "%s(): got \"%s\"", __func__, pa->rs.ptr ); // Put the data on the main thread for processing - (void)g_idle_add( onGotData, pa ); + (void)g_idle_add( onGotPostData, pa ); return NULL; } @@ -654,23 +576,72 @@ post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ) typedef struct _QueryArgs { RelayConStorage* storage; - GSList* ids; + /* GSList* ids; */ ReadState rs; + GHashTable* map; } QueryArgs; +static gboolean +onGotQueryData( gpointer user_data ) +{ + QueryArgs* qa = (QueryArgs*)user_data; + XP_Bool foundAny = false; + json_object* reply = json_tokener_parse( qa->rs.ptr ); + if ( !!reply ) { + CommsAddrRec addr = {0}; + addr_addType( &addr, COMMS_CONN_RELAY ); + + /* Currently there's an array of arrays for each relayID (value) */ + json_object_object_foreach(reply, relayID, arrOfArrOfMoves) { + int len1 = json_object_array_length( arrOfArrOfMoves ); + XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 ); + if ( len1 > 0 ) { + sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( qa->map, relayID ); + for ( int ii = 0; ii < len1; ++ii ) { + json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii ); + int len2 = json_object_array_length( forGameArray ); + for ( int jj = 0; jj < len2; ++jj ) { + json_object* oneMove = json_object_array_get_idx( forGameArray, jj ); + const char* asStr = json_object_get_string( oneMove ); + gsize out_len; + guchar* buf = g_base64_decode( asStr, &out_len ); + (*qa->storage->procs.msgForRow)( qa->storage->procsClosure, &addr, + rowid, buf, out_len ); + g_free(buf); + foundAny = XP_TRUE; + } + } + } + } + json_object_put( reply ); + } + + if ( foundAny ) { + /* Reschedule. If we got anything this time, check again sooner! */ + reset_schedule_check_interval( qa->storage ); + } + schedule_next_check( qa->storage ); + + g_hash_table_destroy( qa->map ); + g_free( qa ); + + return FALSE; +} + static void* queryThread( void* arg ) { QueryArgs* qa = (QueryArgs*)arg; - GSList* ids = qa->ids; + GList* ids = g_hash_table_get_keys( qa->map ); qa->rs.ptr = g_malloc0(1); qa->rs.curSize = 1L; json_object* jIds = json_object_new_array(); - for ( GSList* iter = ids; !!iter; iter = iter->next ) { + for ( GList* iter = ids; !!iter; iter = iter->next ) { json_object* idstr = json_object_new_string( iter->data ); json_object_array_add(jIds, idstr); } + g_list_free( ids ); CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); XP_ASSERT(res == CURLE_OK); @@ -699,6 +670,9 @@ queryThread( void* arg ) XP_LOGF( "%s(): got <<%s>>", __func__, qa->rs.ptr ); + /* Put processing back on the main thread */ + g_idle_add( onGotQueryData, qa ); + return NULL; } @@ -713,26 +687,33 @@ checkForMoves( gpointer user_data ) qa->storage = storage; sqlite3* dbp = storage->params->pDb; - GHashTable* map = getRowsToRelayIDsMap( dbp ); - GList* values = g_hash_table_get_values( map ); - for ( GList* iter = values; !!iter; iter = iter->next ) { - gpointer data = iter->data; - XP_LOGF( "checkForMoves: got id: %s", (char*)data ); - qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); - } - g_list_free( values ); - g_hash_table_destroy( map ); + // qa->map = getRowsToRelayIDsMap( dbp ); + qa->map = getRelayIDsToRowsMap( dbp ); + // qa->ids = g_hash_table_get_values( qa->map ); + /* for ( GList* iter = values; !!iter; iter = iter->next ) { */ + /* gpointer data = iter->data; */ + /* XP_LOGF( "checkForMoves: got id: %s", (char*)data ); */ + /* qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); */ + /* } */ + /* g_list_free( values ); */ pthread_t thread; (void)pthread_create( &thread, NULL, queryThread, (void*)qa ); pthread_detach( thread ); - scheule_next_check( storage ); + schedule_next_check( storage ); return FALSE; } static void -scheule_next_check( RelayConStorage* storage ) +reset_schedule_check_interval( RelayConStorage* storage ) +{ + XP_ASSERT( onMainThread(storage) ); + storage->nextMoveCheckSecs = 0; +} + +static void +schedule_next_check( RelayConStorage* storage ) { XP_ASSERT( onMainThread(storage) ); diff --git a/xwords4/linux/relaycon.h b/xwords4/linux/relaycon.h index c418b6a5a..fc57e1a79 100644 --- a/xwords4/linux/relaycon.h +++ b/xwords4/linux/relaycon.h @@ -27,6 +27,8 @@ typedef struct _Procs { void (*msgReceived)( void* closure, const CommsAddrRec* from, const XP_U8* buf, XP_U16 len ); + void (*msgForRow)( void* closure, const CommsAddrRec* from, + sqlite3_int64 rowid, const XP_U8* buf, XP_U16 len ); void (*msgNoticeReceived)( void* closure ); void (*devIDReceived)( void* closure, const XP_UCHAR* devID, XP_U16 maxInterval ); @@ -57,5 +59,5 @@ void relaycon_cleanup( LaunchParams* params ); XP_U32 makeClientToken( sqlite3_int64 rowid, XP_U16 seed ); void rowidFromToken( XP_U32 clientToken, sqlite3_int64* rowid, XP_U16* seed ); -XP_Bool checkForMsgs(LaunchParams* params, XWGame* game); +void checkForMsgsNow( LaunchParams* params ); #endif