post new msg message from relay2

This is to have a utility back, but mostly to start playing with being
able to send keepalives to a device that have nothing to do with moves.
This commit is contained in:
Eric House 2022-01-06 17:26:39 -08:00
parent dcd3a4cc8c
commit 2e5f6128f2
7 changed files with 59 additions and 21 deletions

View file

@ -21,6 +21,7 @@
package org.eehouse.android.xw4; package org.eehouse.android.xw4;
import android.content.Context; import android.content.Context;
import android.content.Intent;
import android.os.Build; import android.os.Build;
import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttActionListener;
@ -60,7 +61,7 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
private MqttAsyncClient mClient; private MqttAsyncClient mClient;
private String mDevID; private String mDevID;
private String mTopic; private String[] mTopics = { null, null };
private Context mContext; private Context mContext;
private MsgThread mMsgThread; private MsgThread mMsgThread;
private LinkedBlockingQueue<MessagePair> mOutboundQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<MessagePair> mOutboundQueue = new LinkedBlockingQueue<>();
@ -166,7 +167,7 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
me.printStackTrace(); me.printStackTrace();
break; break;
} catch ( InterruptedException ie ) { } catch ( InterruptedException ie ) {
ie.printStackTrace(); // ie.printStackTrace();
break; break;
} }
} }
@ -225,10 +226,8 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
{ {
Log.d( TAG, "%H.<init>()", this ); Log.d( TAG, "%H.<init>()", this );
mContext = context; mContext = context;
String[] topic = {null}; mDevID = XwJNI.dvc_getMQTTDevID( mTopics );
mDevID = XwJNI.dvc_getMQTTDevID( topic );
Assert.assertTrueNR( 16 == mDevID.length() ); Assert.assertTrueNR( 16 == mDevID.length() );
mTopic = topic[0];
mMsgThread = new MsgThread(); mMsgThread = new MsgThread();
String host = XWPrefs.getPrefsString( context, R.string.key_mqtt_host ); String host = XWPrefs.getPrefsString( context, R.string.key_mqtt_host );
@ -522,8 +521,7 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
public void messageArrived( String topic, MqttMessage message ) throws Exception public void messageArrived( String topic, MqttMessage message ) throws Exception
{ {
Log.d( TAG, "%H.messageArrived(topic=%s)", this, topic ); Log.d( TAG, "%H.messageArrived(topic=%s)", this, topic );
Assert.assertTrueNR( topic.equals(mTopic) ); mMsgThread.add( topic, message.getPayload() );
mMsgThread.add( message.getPayload() );
ConnStatusHandler ConnStatusHandler
.updateStatusIn( mContext, CommsConnType.COMMS_CONN_MQTT, true ); .updateStatusIn( mContext, CommsConnType.COMMS_CONN_MQTT, true );
@ -541,10 +539,13 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
private void subscribe() private void subscribe()
{ {
Assert.assertTrueNR( null != mTopics && 2 == mTopics.length );
final int qos = XWPrefs.getPrefsInt( mContext, R.string.key_mqtt_qos, 2 ); final int qos = XWPrefs.getPrefsInt( mContext, R.string.key_mqtt_qos, 2 );
int qoss[] = { qos, qos };
setState( State.SUBSCRIBING ); setState( State.SUBSCRIBING );
try { try {
mClient.subscribe( mTopic, qos, null, this ); mClient.subscribe( mTopics, qoss, null, this );
// Log.d( TAG, "subscribed to %s", mTopic ); // Log.d( TAG, "subscribed to %s", mTopic );
} catch ( MqttException ex ) { } catch ( MqttException ex ) {
ex.printStackTrace(); ex.printStackTrace();
@ -581,10 +582,10 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
} }
private class MsgThread extends Thread { private class MsgThread extends Thread {
private LinkedBlockingQueue<byte[]> mQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<MessagePair> mQueue = new LinkedBlockingQueue<>();
void add( byte[] msg ) { void add( String topic, byte[] msg ) {
mQueue.add( msg ); mQueue.add( new MessagePair( topic, msg ) );
} }
@Override @Override
@ -594,17 +595,39 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
Log.d( TAG, "%H.MsgThread.run() starting", MQTTUtils.this ); Log.d( TAG, "%H.MsgThread.run() starting", MQTTUtils.this );
for ( ; ; ) { for ( ; ; ) {
try { try {
byte[] packet = mQueue.take(); MessagePair pair = mQueue.take();
XwJNI.dvc_parseMQTTPacket( packet ); String topic = pair.mTopic;
if ( topic.equals( mTopics[0] ) ) {
XwJNI.dvc_parseMQTTPacket( pair.mPacket );
} else if ( topic.equals( mTopics[1] ) ) {
postNotification( pair );
}
} catch ( InterruptedException ie ) { } catch ( InterruptedException ie ) {
// Assert.failDbg(); // Assert.failDbg();
break; break;
} catch ( JSONException je ) {
Log.e( TAG, "run() ex: %s", je );
} }
} }
long now = Utils.getCurSeconds(); long now = Utils.getCurSeconds();
Log.d( TAG, "%H.MsgThread.run() exiting after %d seconds", MQTTUtils.this, Log.d( TAG, "%H.MsgThread.run() exiting after %d seconds", MQTTUtils.this,
now - startTime ); now - startTime );
} }
private void postNotification( MessagePair pair ) throws JSONException
{
JSONObject obj = new JSONObject( new String(pair.mPacket) );
String msg = obj.optString( "msg" );
if ( null != msg ) {
String title = obj.optString( "title" );
if ( null == title ) {
title = LocUtils.getString( mContext, R.string.remote_msg_title );
}
Intent alertIntent = GamesListDelegate.makeAlertIntent( mContext, msg );
int code = msg.hashCode() ^ title.hashCode();
Utils.postNotification( mContext, alertIntent, title, msg, code );
}
}
} }
public static void handleMessage( Context context, CommsAddrRec from, public static void handleMessage( Context context, CommsAddrRec from,

View file

@ -150,8 +150,7 @@ public class UpdateCheckReceiver extends BroadcastReceiver {
params.put( k_APP, appParams ); params.put( k_APP, appParams );
params.put( k_DEVID, XWPrefs.getDevID( context ) ); params.put( k_DEVID, XWPrefs.getDevID( context ) );
String[] topic = {null}; String devID = XwJNI.dvc_getMQTTDevID( null );
String devID = XwJNI.dvc_getMQTTDevID( topic );
params.put( k_MQTTDEVID, devID ); params.put( k_MQTTDEVID, devID );
} catch ( org.json.JSONException jse ) { } catch ( org.json.JSONException jse ) {
Log.ex( TAG, jse ); Log.ex( TAG, jse );

View file

@ -136,9 +136,10 @@ public class XwJNI {
cleanGlobals(); cleanGlobals();
} }
public static String dvc_getMQTTDevID( String[] topic ) public static String dvc_getMQTTDevID( String[] topics )
{ {
return dvc_getMQTTDevID( getJNI().m_ptrGlobals, topic ); Assert.assertTrueNR( null == topics || 2 == topics.length );
return dvc_getMQTTDevID( getJNI().m_ptrGlobals, topics );
} }
public static void dvc_resetMQTTDevID() public static void dvc_resetMQTTDevID()

View file

@ -629,7 +629,7 @@ streamFromJStream( MPFORMAL JNIEnv* env, VTableMgr* vtMgr, jbyteArray jstream )
JNIEXPORT jstring JNICALL JNIEXPORT jstring JNICALL
Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTDevID Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTDevID
( JNIEnv* env, jclass C, jlong jniGlobalPtr, jobjectArray jTopicOut ) ( JNIEnv* env, jclass C, jlong jniGlobalPtr, jobjectArray jTopicsOut )
{ {
jstring result; jstring result;
DVC_HEADER(jniGlobalPtr); DVC_HEADER(jniGlobalPtr);
@ -638,12 +638,18 @@ Java_org_eehouse_android_xw4_jni_XwJNI_dvc_1getMQTTDevID
XP_UCHAR buf[64]; XP_UCHAR buf[64];
if ( !!jTopicOut ) { if ( !!jTopicsOut ) {
formatMQTTTopic( &devID, buf, VSIZE(buf) ); formatMQTTTopic( &devID, buf, VSIZE(buf) );
jstring jtopic = (*env)->NewStringUTF( env, buf ); jstring jtopic = (*env)->NewStringUTF( env, buf );
XP_ASSERT( 1 == (*env)->GetArrayLength( env, jTopicOut ) ); /* fired */ (*env)->SetObjectArrayElement( env, jTopicsOut, 0, jtopic );
(*env)->SetObjectArrayElement( env, jTopicOut, 0, jtopic );
deleteLocalRef( env, jtopic ); deleteLocalRef( env, jtopic );
if ( 1 < (*env)->GetArrayLength( env, jTopicsOut ) ) {
formatMQTTCtrlTopic( &devID, buf, VSIZE(buf) );
jstring jtopic = (*env)->NewStringUTF( env, buf );
(*env)->SetObjectArrayElement( env, jTopicsOut, 1, jtopic );
deleteLocalRef( env, jtopic );
}
} }
formatMQTTDevID( &devID, buf, VSIZE(buf) ); formatMQTTDevID( &devID, buf, VSIZE(buf) );

View file

@ -285,6 +285,7 @@ typedef uint64_t MQTTDevID;
# define MQTTDevID_FMT "%016llX" # define MQTTDevID_FMT "%016llX"
#endif #endif
# define MQTTTopic_FMT "xw4/device/" MQTTDevID_FMT # define MQTTTopic_FMT "xw4/device/" MQTTDevID_FMT
# define MQTTCtrlTopic_FMT "xw4/msg/" MQTTDevID_FMT
/* Used by scoring code and engine as fast representation of moves. */ /* Used by scoring code and engine as fast representation of moves. */
typedef struct _MoveInfoTile { typedef struct _MoveInfoTile {

View file

@ -621,6 +621,13 @@ formatMQTTTopic( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen )
return buf; return buf;
} }
const XP_UCHAR*
formatMQTTCtrlTopic( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen )
{
XP_SNPRINTF( buf, bufLen, MQTTCtrlTopic_FMT, *devid );
return buf;
}
XP_Bool XP_Bool
strToMQTTCDevID( const XP_UCHAR* str, MQTTDevID* result ) strToMQTTCDevID( const XP_UCHAR* str, MQTTDevID* result )
{ {

View file

@ -115,6 +115,7 @@ XP_Bool smsToBin( XP_U8* out, XP_U16* outlen, const XP_UCHAR* in, XP_U16 inlen )
#endif #endif
const XP_UCHAR* formatMQTTTopic( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen ); const XP_UCHAR* formatMQTTTopic( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen );
const XP_UCHAR* formatMQTTCtrlTopic( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen );
const XP_UCHAR* formatMQTTDevID( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen ); const XP_UCHAR* formatMQTTDevID( const MQTTDevID* devid, XP_UCHAR* buf, XP_U16 bufLen );
XP_Bool strToMQTTCDevID( const XP_UCHAR* str, MQTTDevID* result ); XP_Bool strToMQTTCDevID( const XP_UCHAR* str, MQTTDevID* result );