wip: received messages dispatched to games

This commit is contained in:
Eric House 2017-10-22 09:29:15 -07:00
parent 43ffb156fc
commit f49c81462c
7 changed files with 114 additions and 146 deletions

View file

@ -1773,28 +1773,6 @@ cursesErrorMsgRcvd( void* closure, const XP_UCHAR* msg )
} }
} }
static gboolean
queryTimerFired( gpointer data )
{
LOG_FUNC();
CursesAppGlobals* globals = (CursesAppGlobals*)data;
XWGame* game = &globals->cGlobals.game;
if ( !!game->comms ) {
XP_ASSERT( globals->nextQueryTimeSecs > 0 );
if ( checkForMsgs( globals->cGlobals.params, game ) ) {
globals->nextQueryTimeSecs = 1;
} else {
globals->nextQueryTimeSecs *= 2;
}
(void)g_timeout_add_seconds( globals->nextQueryTimeSecs,
queryTimerFired, &g_globals );
}
return FALSE;
}
static gboolean static gboolean
chatsTimerFired( gpointer data ) chatsTimerFired( gpointer data )
{ {
@ -1973,12 +1951,6 @@ cursesmain( XP_Bool isServer, LaunchParams* params )
&g_globals ); &g_globals );
} }
if ( params->useHTTP ) {
g_globals.nextQueryTimeSecs = 1;
(void)g_timeout_add_seconds( g_globals.nextQueryTimeSecs,
queryTimerFired, &g_globals );
}
XP_Bool opened = XP_FALSE; XP_Bool opened = XP_FALSE;
initCurses( &g_globals, &width, &height ); initCurses( &g_globals, &width, &height );

View file

@ -312,11 +312,11 @@ listGames( sqlite3* pDb )
} }
GHashTable* GHashTable*
getRowsToRelayIDsMap( sqlite3* pDb ) getRelayIDsToRowsMap( sqlite3* pDb )
{ {
GHashTable* table = g_hash_table_new( g_int64_hash, g_int64_equal ); GHashTable* table = g_hash_table_new( g_str_hash, g_str_equal );
sqlite3_stmt *ppStmt; sqlite3_stmt *ppStmt;
int result = sqlite3_prepare_v2( pDb, "SELECT rowid, relayid FROM games", int result = sqlite3_prepare_v2( pDb, "SELECT relayid, rowid FROM games",
-1, &ppStmt, NULL ); -1, &ppStmt, NULL );
assertPrintResult( pDb, result, SQLITE_OK ); assertPrintResult( pDb, result, SQLITE_OK );
XP_USE( result ); XP_USE( result );
@ -324,12 +324,13 @@ getRowsToRelayIDsMap( sqlite3* pDb )
switch( sqlite3_step( ppStmt ) ) { switch( sqlite3_step( ppStmt ) ) {
case SQLITE_ROW: /* have data */ case SQLITE_ROW: /* have data */
{ {
sqlite3_int64* key = g_malloc( sizeof( *key ) );
*key = sqlite3_column_int64( ppStmt, 0 );
XP_UCHAR relayID[32]; XP_UCHAR relayID[32];
getColumnText( ppStmt, 1, relayID, VSIZE(relayID) ); getColumnText( ppStmt, 0, relayID, VSIZE(relayID) );
gpointer value = g_strdup( relayID ); gpointer key = g_strdup( relayID );
sqlite3_int64* value = g_malloc( sizeof( *key ) );
*value = sqlite3_column_int64( ppStmt, 1 );
g_hash_table_insert( table, key, value ); g_hash_table_insert( table, key, value );
/* XP_LOGF( "%s(): added map %s => %lld", __func__, (char*)key, *value ); */
} }
break; break;
case SQLITE_DONE: case SQLITE_DONE:

View file

@ -56,7 +56,8 @@ void summarize( CommonGlobals* cGlobals );
/* Return GSList whose data is (ptrs to) rowids */ /* Return GSList whose data is (ptrs to) rowids */
GSList* listGames( sqlite3* dbp ); GSList* listGames( sqlite3* dbp );
GHashTable *getRowsToRelayIDsMap(sqlite3* dbp); /* Mapping of relayID -> rowid */
GHashTable* getRelayIDsToRowsMap( sqlite3* pDb );
XP_Bool getGameInfo( sqlite3* dbp, sqlite3_int64 rowid, GameInfo* gib ); XP_Bool getGameInfo( sqlite3* dbp, sqlite3_int64 rowid, GameInfo* gib );
void getRowsForGameID( sqlite3* dbp, XP_U32 gameID, sqlite3_int64* rowids, void getRowsForGameID( sqlite3* dbp, XP_U32 gameID, sqlite3_int64* rowids,

View file

@ -341,6 +341,8 @@ relay_connd_gtk( void* closure, XP_UCHAR* const room,
char buf[256]; char buf[256];
if ( allHere ) { if ( allHere ) {
/* disable for now. Seeing this too often */
skip = XP_TRUE;
snprintf( buf, sizeof(buf), snprintf( buf, sizeof(buf),
"All expected players have joined in %s. Play!", room ); "All expected players have joined in %s. Play!", room );
} else { } else {
@ -1179,10 +1181,7 @@ handle_memstats( GtkWidget* XP_UNUSED(widget), GtkGameGlobals* globals )
static void static void
handle_movescheck( GtkWidget* XP_UNUSED(widget), GtkGameGlobals* globals ) handle_movescheck( GtkWidget* XP_UNUSED(widget), GtkGameGlobals* globals )
{ {
LaunchParams* params = globals->cGlobals.params; checkForMsgsNow( globals->cGlobals.params );
if ( checkForMsgs( params, &globals->cGlobals.game ) ) {
board_draw( globals->cGlobals.game.board );
}
} }
#endif #endif

View file

@ -696,6 +696,17 @@ gtkGotBuf( void* closure, const CommsAddrRec* from,
XP_USE( seed ); XP_USE( seed );
} }
static void
gtkGotMsgForRow( void* closure, const CommsAddrRec* from,
sqlite3_int64 rowid, const XP_U8* buf, XP_U16 len )
{
XP_LOGF( "%s(): got msg of len %d for row %lld", __func__, len, rowid );
GtkAppGlobals* apg = (GtkAppGlobals*)closure;
// LaunchParams* params = apg->params;
(void)feedBufferGTK( apg, rowid, buf, len, from );
LOG_RETURN_VOID();
}
static gint static gint
requestMsgs( gpointer data ) requestMsgs( gpointer data )
{ {
@ -850,6 +861,7 @@ gtkmain( LaunchParams* params )
if ( params->useUdp ) { if ( params->useUdp ) {
RelayConnProcs procs = { RelayConnProcs procs = {
.msgReceived = gtkGotBuf, .msgReceived = gtkGotBuf,
.msgForRow = gtkGotMsgForRow,
.msgNoticeReceived = gtkNoticeRcvd, .msgNoticeReceived = gtkNoticeRcvd,
.devIDReceived = gtkDevIDReceived, .devIDReceived = gtkDevIDReceived,
.msgErrorMsg = gtkErrorMsgRcvd, .msgErrorMsg = gtkErrorMsgRcvd,

View file

@ -51,10 +51,12 @@ typedef struct _MsgHeader {
} MsgHeader; } MsgHeader;
static RelayConStorage* getStorage( LaunchParams* params ); static RelayConStorage* getStorage( LaunchParams* params );
static XP_Bool onMainThread( RelayConStorage* storage );
static XP_U32 hostNameToIP( const XP_UCHAR* name ); static XP_U32 hostNameToIP( const XP_UCHAR* name );
static gboolean relaycon_receive( GIOChannel *source, GIOCondition condition, static gboolean relaycon_receive( GIOChannel *source, GIOCondition condition,
gpointer data ); gpointer data );
static void scheule_next_check( RelayConStorage* storage ); static void schedule_next_check( RelayConStorage* storage );
static void reset_schedule_check_interval( RelayConStorage* storage );
static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len ); static ssize_t sendIt( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len );
static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str ); static size_t addVLIStr( XP_U8* buf, size_t len, const XP_UCHAR* str );
static void getNetString( const XP_U8** ptr, XP_U16 len, XP_UCHAR* buf ); static void getNetString( const XP_U8** ptr, XP_U16 len, XP_UCHAR* buf );
@ -96,6 +98,7 @@ static void
addJsonParams( CURL* curl, const char* name, json_object* param ) addJsonParams( CURL* curl, const char* name, json_object* param )
{ {
const char* asStr = json_object_to_json_string( param ); const char* asStr = json_object_to_json_string( param );
XP_LOGF( "%s: adding param: %s", __func__, asStr );
char* curl_params = curl_easy_escape( curl, asStr, strlen(asStr) ); char* curl_params = curl_easy_escape( curl, asStr, strlen(asStr) );
gchar* buf = g_strdup_printf( "%s=%s", name, curl_params ); gchar* buf = g_strdup_printf( "%s=%s", name, curl_params );
@ -110,93 +113,12 @@ addJsonParams( CURL* curl, const char* name, json_object* param )
json_object_put( param ); json_object_put( param );
} }
XP_Bool void
checkForMsgs( LaunchParams* params, XWGame* game ) checkForMsgsNow( LaunchParams* params )
{ {
XP_Bool foundAny = false; RelayConStorage* storage = getStorage( params );
XP_UCHAR idBuf[64]; XP_ASSERT( onMainThread(storage) );
if ( !!game->comms ) { XP_ASSERT(0); /* FIX ME */
XP_U16 len = VSIZE(idBuf);
if ( comms_getRelayID( game->comms, idBuf, &len ) ) {
XP_LOGF( "%s: got %s", __func__, idBuf );
} else {
idBuf[0] = '\0';
}
}
if ( !!idBuf[0] ) {
ReadState rs = {
.ptr = g_malloc0(1),
.curSize = 1L
};
/* build a json array of relayIDs, then stringify it */
json_object* ids = json_object_new_array();
json_object* idstr = json_object_new_string(idBuf);
json_object_array_add(ids, idstr);
CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
XP_ASSERT(res == CURLE_OK);
CURL* curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL,
"http://localhost/xw4/relay.py/query");
curl_easy_setopt(curl, CURLOPT_POST, 1L);
addJsonParams( curl, "ids", ids );
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback );
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &rs );
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
res = curl_easy_perform(curl);
XP_LOGF( "%s(): curl_easy_perform() => %d", __func__, res );
/* Check for errors */
if (res != CURLE_OK) {
XP_LOGF( "curl_easy_perform() failed: %s", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
curl_global_cleanup();
XP_LOGF( "%s(): got <<%s>>", __func__, rs.ptr );
if (res == CURLE_OK) {
json_object* reply = json_tokener_parse( rs.ptr );
if ( !!reply ) {
json_object_object_foreach(reply, key, val) {
int len = json_object_array_length(val);
XP_LOGF( "%s: got key: %s of len %d", __func__, key, len );
for ( int ii = 0; ii < len; ++ii ) {
json_object* forGame = json_object_array_get_idx(val, ii);
int len2 = json_object_array_length(forGame);
foundAny = foundAny || len2 > 0;
for ( int jj = 0; jj < len2; ++jj ) {
json_object* oneMove = json_object_array_get_idx(forGame, jj);
const char* asStr = json_object_get_string(oneMove);
gsize out_len;
guchar* buf = g_base64_decode( asStr, &out_len );
XWStreamCtxt* stream = mem_stream_make( MPPARM(params->mpool)
params->vtMgr, params,
CHANNEL_NONE, NULL );
stream_putBytes( stream, buf, out_len );
g_free(buf);
CommsAddrRec addr = {0};
addr_addType( &addr, COMMS_CONN_RELAY );
XP_Bool handled = game_receiveMessage( game, stream, &addr );
XP_LOGF( "%s(): game_receiveMessage() => %d", __func__, handled );
stream_destroy( stream );
foundAny = XP_TRUE;
}
}
}
}
}
}
return foundAny;
} }
void void
@ -223,7 +145,7 @@ relaycon_init( LaunchParams* params, const RelayConnProcs* procs,
storage->proto = XWPDEV_PROTO_VERSION_1; storage->proto = XWPDEV_PROTO_VERSION_1;
if ( params->useHTTP ) { if ( params->useHTTP ) {
scheule_next_check( storage ); schedule_next_check( storage );
} }
} }
@ -566,7 +488,7 @@ typedef struct _PostArgs {
} PostArgs; } PostArgs;
static gboolean static gboolean
onGotData(gpointer user_data) onGotPostData(gpointer user_data)
{ {
PostArgs* pa = (PostArgs*)user_data; PostArgs* pa = (PostArgs*)user_data;
/* Now pull any data from the reply */ /* Now pull any data from the reply */
@ -633,7 +555,7 @@ postThread( void* arg )
XP_LOGF( "%s(): got \"%s\"", __func__, pa->rs.ptr ); XP_LOGF( "%s(): got \"%s\"", __func__, pa->rs.ptr );
// Put the data on the main thread for processing // Put the data on the main thread for processing
(void)g_idle_add( onGotData, pa ); (void)g_idle_add( onGotPostData, pa );
return NULL; return NULL;
} }
@ -654,23 +576,72 @@ post( RelayConStorage* storage, const XP_U8* msgbuf, XP_U16 len )
typedef struct _QueryArgs { typedef struct _QueryArgs {
RelayConStorage* storage; RelayConStorage* storage;
GSList* ids; /* GSList* ids; */
ReadState rs; ReadState rs;
GHashTable* map;
} QueryArgs; } QueryArgs;
static gboolean
onGotQueryData( gpointer user_data )
{
QueryArgs* qa = (QueryArgs*)user_data;
XP_Bool foundAny = false;
json_object* reply = json_tokener_parse( qa->rs.ptr );
if ( !!reply ) {
CommsAddrRec addr = {0};
addr_addType( &addr, COMMS_CONN_RELAY );
/* Currently there's an array of arrays for each relayID (value) */
json_object_object_foreach(reply, relayID, arrOfArrOfMoves) {
int len1 = json_object_array_length( arrOfArrOfMoves );
XP_LOGF( "%s: got key: %s of len %d", __func__, relayID, len1 );
if ( len1 > 0 ) {
sqlite3_int64 rowid = *(sqlite3_int64*)g_hash_table_lookup( qa->map, relayID );
for ( int ii = 0; ii < len1; ++ii ) {
json_object* forGameArray = json_object_array_get_idx( arrOfArrOfMoves, ii );
int len2 = json_object_array_length( forGameArray );
for ( int jj = 0; jj < len2; ++jj ) {
json_object* oneMove = json_object_array_get_idx( forGameArray, jj );
const char* asStr = json_object_get_string( oneMove );
gsize out_len;
guchar* buf = g_base64_decode( asStr, &out_len );
(*qa->storage->procs.msgForRow)( qa->storage->procsClosure, &addr,
rowid, buf, out_len );
g_free(buf);
foundAny = XP_TRUE;
}
}
}
}
json_object_put( reply );
}
if ( foundAny ) {
/* Reschedule. If we got anything this time, check again sooner! */
reset_schedule_check_interval( qa->storage );
}
schedule_next_check( qa->storage );
g_hash_table_destroy( qa->map );
g_free( qa );
return FALSE;
}
static void* static void*
queryThread( void* arg ) queryThread( void* arg )
{ {
QueryArgs* qa = (QueryArgs*)arg; QueryArgs* qa = (QueryArgs*)arg;
GSList* ids = qa->ids; GList* ids = g_hash_table_get_keys( qa->map );
qa->rs.ptr = g_malloc0(1); qa->rs.ptr = g_malloc0(1);
qa->rs.curSize = 1L; qa->rs.curSize = 1L;
json_object* jIds = json_object_new_array(); json_object* jIds = json_object_new_array();
for ( GSList* iter = ids; !!iter; iter = iter->next ) { for ( GList* iter = ids; !!iter; iter = iter->next ) {
json_object* idstr = json_object_new_string( iter->data ); json_object* idstr = json_object_new_string( iter->data );
json_object_array_add(jIds, idstr); json_object_array_add(jIds, idstr);
} }
g_list_free( ids );
CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT); CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
XP_ASSERT(res == CURLE_OK); XP_ASSERT(res == CURLE_OK);
@ -699,6 +670,9 @@ queryThread( void* arg )
XP_LOGF( "%s(): got <<%s>>", __func__, qa->rs.ptr ); XP_LOGF( "%s(): got <<%s>>", __func__, qa->rs.ptr );
/* Put processing back on the main thread */
g_idle_add( onGotQueryData, qa );
return NULL; return NULL;
} }
@ -713,26 +687,33 @@ checkForMoves( gpointer user_data )
qa->storage = storage; qa->storage = storage;
sqlite3* dbp = storage->params->pDb; sqlite3* dbp = storage->params->pDb;
GHashTable* map = getRowsToRelayIDsMap( dbp ); // qa->map = getRowsToRelayIDsMap( dbp );
GList* values = g_hash_table_get_values( map ); qa->map = getRelayIDsToRowsMap( dbp );
for ( GList* iter = values; !!iter; iter = iter->next ) { // qa->ids = g_hash_table_get_values( qa->map );
gpointer data = iter->data; /* for ( GList* iter = values; !!iter; iter = iter->next ) { */
XP_LOGF( "checkForMoves: got id: %s", (char*)data ); /* gpointer data = iter->data; */
qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); /* XP_LOGF( "checkForMoves: got id: %s", (char*)data ); */
} /* qa->ids = g_slist_prepend( qa->ids, g_strdup(data) ); */
g_list_free( values ); /* } */
g_hash_table_destroy( map ); /* g_list_free( values ); */
pthread_t thread; pthread_t thread;
(void)pthread_create( &thread, NULL, queryThread, (void*)qa ); (void)pthread_create( &thread, NULL, queryThread, (void*)qa );
pthread_detach( thread ); pthread_detach( thread );
scheule_next_check( storage ); schedule_next_check( storage );
return FALSE; return FALSE;
} }
static void static void
scheule_next_check( RelayConStorage* storage ) reset_schedule_check_interval( RelayConStorage* storage )
{
XP_ASSERT( onMainThread(storage) );
storage->nextMoveCheckSecs = 0;
}
static void
schedule_next_check( RelayConStorage* storage )
{ {
XP_ASSERT( onMainThread(storage) ); XP_ASSERT( onMainThread(storage) );

View file

@ -27,6 +27,8 @@
typedef struct _Procs { typedef struct _Procs {
void (*msgReceived)( void* closure, const CommsAddrRec* from, void (*msgReceived)( void* closure, const CommsAddrRec* from,
const XP_U8* buf, XP_U16 len ); const XP_U8* buf, XP_U16 len );
void (*msgForRow)( void* closure, const CommsAddrRec* from,
sqlite3_int64 rowid, const XP_U8* buf, XP_U16 len );
void (*msgNoticeReceived)( void* closure ); void (*msgNoticeReceived)( void* closure );
void (*devIDReceived)( void* closure, const XP_UCHAR* devID, void (*devIDReceived)( void* closure, const XP_UCHAR* devID,
XP_U16 maxInterval ); XP_U16 maxInterval );
@ -57,5 +59,5 @@ void relaycon_cleanup( LaunchParams* params );
XP_U32 makeClientToken( sqlite3_int64 rowid, XP_U16 seed ); XP_U32 makeClientToken( sqlite3_int64 rowid, XP_U16 seed );
void rowidFromToken( XP_U32 clientToken, sqlite3_int64* rowid, XP_U16* seed ); void rowidFromToken( XP_U32 clientToken, sqlite3_int64* rowid, XP_U16* seed );
XP_Bool checkForMsgs(LaunchParams* params, XWGame* game); void checkForMsgsNow( LaunchParams* params );
#endif #endif