fix mqtt problems

There were ways of leaving multiple instances running, I think fighting
for resources and keeping each other from connecting. Now I think there
can only be one. And lots of logging will help find future problems.
This commit is contained in:
Eric House 2020-06-04 11:36:04 -07:00
parent 5f31fedf70
commit 72e813038f
2 changed files with 166 additions and 100 deletions

View file

@ -48,18 +48,20 @@ import org.eehouse.android.xw4.loc.LocUtils;
public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallbackExtended { public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallbackExtended {
private static final String TAG = MQTTUtils.class.getSimpleName(); private static final String TAG = MQTTUtils.class.getSimpleName();
private static final String KEY_NEXT_REG = TAG + "/next_reg"; private static final String KEY_NEXT_REG = TAG + "/next_reg";
private static enum State { NONE, CONNECTING, CONNECTED, SUBSCRIBING, SUBSCRIBED,
CLOSING };
private static AtomicReference<MQTTUtils> sInstance = new AtomicReference<>(); private static MQTTUtils[] sInstance = {null};
private static long sNextReg = 0; private static long sNextReg = 0;
private MqttAsyncClient mClient; private MqttAsyncClient mClient;
private long mPauseTime = 0L;
private String mDevID; private String mDevID;
private String mTopic; private String mTopic;
private Context mContext; private Context mContext;
private MsgThread mMsgThread; private MsgThread mMsgThread;
private LinkedBlockingQueue<MessagePair> mOutboundQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<MessagePair> mOutboundQueue = new LinkedBlockingQueue<>();
private boolean mShouldExit = false; private boolean mShouldExit = false;
private State mState = State.NONE;
public static void init( Context context ) public static void init( Context context )
{ {
@ -76,19 +78,16 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
public static void onFCMReceived( Context context ) public static void onFCMReceived( Context context )
{ {
Log.d( TAG, "onFCMReceived()" ); Log.d( TAG, "onFCMReceived()" );
// If we have an instance now, it's not working, so kill it. And start onConfigChanged( context );
// another
MQTTUtils instance = sInstance.get();
if ( null != instance ) {
clearInstance( instance );
}
getOrStart( context ); getOrStart( context );
Log.d( TAG, "onFCMReceived() DONE" );
} }
static void onConfigChanged( Context context ) static void onConfigChanged( Context context )
{ {
MQTTUtils instance = sInstance.get(); MQTTUtils instance;
synchronized ( sInstance ) {
instance = sInstance[0];
}
if ( null != instance ) { if ( null != instance ) {
clearInstance( instance ); clearInstance( instance );
} }
@ -98,7 +97,9 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
{ {
MQTTUtils result = null; MQTTUtils result = null;
if ( BuildConfig.OFFER_MQTT ) { if ( BuildConfig.OFFER_MQTT ) {
result = sInstance.get(); synchronized( sInstance ) {
result = sInstance[0];
}
if ( null == result ) { if ( null == result ) {
try { try {
result = new MQTTUtils(context); result = new MQTTUtils(context);
@ -124,17 +125,23 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
@Override @Override
public void run() public void run()
{ {
long startTime = Utils.getCurSeconds();
Log.d( TAG, "%H.run() starting", this );
setup(); setup();
while ( !mShouldExit ) { for ( long totalSlept = 0; !mShouldExit && totalSlept < 10000; ) {
try { try {
// this thread can be fed before the connection is // this thread can be fed before the connection is
// established. Wait for that before removing packets from the // established. Wait for that before removing packets from the
// queue. // queue.
if ( !mClient.isConnected() ) { if ( !mClient.isConnected() ) {
Log.d( TAG, "not connected; sleeping..." ); Log.d( TAG, "%H.run(): not connected; sleeping...", MQTTUtils.this );
Thread.sleep(500); final long thisSleep = 1000;
Thread.sleep(thisSleep);
totalSlept += thisSleep;
continue; continue;
} }
totalSlept = 0;
MessagePair pair = mOutboundQueue.take(); MessagePair pair = mOutboundQueue.take();
MqttMessage message = new MqttMessage( pair.mPacket ); MqttMessage message = new MqttMessage( pair.mPacket );
mClient.publish( pair.mTopic, message ); mClient.publish( pair.mTopic, message );
@ -147,6 +154,10 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
} }
} }
clearInstance(); clearInstance();
long now = Utils.getCurSeconds();
Log.d( TAG, "%H.run() exiting after %d seconds", this,
now - startTime );
} }
private void enqueue( String topic, byte[] packet ) private void enqueue( String topic, byte[] packet )
@ -154,9 +165,14 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
mOutboundQueue.add( new MessagePair( topic, packet ) ); mOutboundQueue.add( new MessagePair( topic, packet ) );
} }
private static void setInstance( MQTTUtils instance ) private static void setInstance( MQTTUtils newInstance )
{ {
MQTTUtils oldInstance = sInstance.getAndSet(instance); MQTTUtils oldInstance;
synchronized ( sInstance ) {
oldInstance = sInstance[0];
Log.d( TAG, "setInstance(): changing sInstance[0] from %H to %H", oldInstance, newInstance );
sInstance[0] = newInstance;
}
if ( null != oldInstance ) { if ( null != oldInstance ) {
oldInstance.disconnect(); oldInstance.disconnect();
} }
@ -164,26 +180,16 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
private static void clearInstance( MQTTUtils curInstance ) private static void clearInstance( MQTTUtils curInstance )
{ {
MQTTUtils oldInstance = sInstance.getAndSet(null); synchronized ( sInstance ) {
if ( curInstance == oldInstance ) { if ( sInstance[0] == curInstance ) {
oldInstance.disconnect(); sInstance[0] = null;
} else { } else {
Log.e( TAG, "unreachable instance still running???" ); curInstance = null; // protect from disconnect() call
} }
// if ( sResumed ) {
// Log.d( TAG, "clearInstance(); looks like I could start another!!" );
// }
} }
if ( null != curInstance ) {
public static void onPause() curInstance.disconnect();
{ }
// Log.d( TAG, "onPause()" );
// MQTTUtils instance = sInstance.get();
// if ( null != instance ) {
// instance.setPaused(true);
// }
// DbgUtils.assertOnUIThread();
// // sResumed = false;
} }
private MQTTUtils( Context context ) throws MqttException private MQTTUtils( Context context ) throws MqttException
@ -205,39 +211,47 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
mClient.setCallback( this ); mClient.setCallback( this );
} }
private void setState( State newState )
{
Log.d( TAG, "%H.setState(): was %s, now %s", this, mState, newState );
boolean stateOk;
switch ( newState ) {
case CONNECTED:
stateOk = mState == State.CONNECTING;
if ( stateOk ) {
mState = newState;
subscribe();
}
break;
case SUBSCRIBED:
stateOk = mState == State.SUBSCRIBING;
if ( stateOk ) {
mState = newState;
mMsgThread.start();
}
break;
default:
stateOk = true;
mState = newState;
Log.d( TAG, "doing nothing on %s", mState );
break;
}
if ( !stateOk ) {
Log.e( TAG, "%H.setState(): bad state for %s: %s", this, newState, mState );
}
}
private void setup() private void setup()
{ {
Log.d( TAG, "setup()" ); Log.d( TAG, "setup()" );
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setCleanSession(false);
final int qos = XWPrefs.getPrefsInt( mContext, R.string.key_mqtt_qos, 2 );
try { try {
mClient.connect( mqttConnectOptions, null, new IMqttActionListener() { setState( State.CONNECTING );
@Override mClient.connect( mqttConnectOptions, null, this );
public void onSuccess( IMqttToken asyncActionToken ) {
Log.d( TAG, "onSuccess()" );
try {
mClient.subscribe( mTopic, qos, null, MQTTUtils.this );
Log.d( TAG, "subscribed to %s", mTopic );
mMsgThread.start();
} catch ( MqttException ex ) {
ex.printStackTrace();
} catch ( Exception ex ) {
ex.printStackTrace();
clearInstance();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d( TAG, "onFailure(%s, %s)", asyncActionToken, exception );
ConnStatusHandler.updateStatus( mContext, null,
CommsConnType.COMMS_CONN_MQTT,
false );
clearInstance();
}
} );
} catch ( MqttException ex ) { } catch ( MqttException ex ) {
ex.printStackTrace(); ex.printStackTrace();
} catch ( java.lang.IllegalStateException ise ) { } catch ( java.lang.IllegalStateException ise ) {
@ -297,43 +311,70 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
} }
} }
// private void setPaused( boolean paused )
// {
// if ( paused ) {
// if ( 0 == mPauseTime ) {
// mPauseTime = System.currentTimeMillis();
// Log.d( TAG, "setPaused() called for first time" );
// }
// } else {
// long diff = System.currentTimeMillis() - mPauseTime;
// Log.d( TAG, "unpausing after %d seconds", diff/1000);
// mPauseTime = 0;
// }
// }
private void disconnect() private void disconnect()
{ {
if ( 0 == mPauseTime ) { Log.d( TAG, "%H.disconnect()", this );
Log.d( TAG, "disconnect()" );
} else { interrupt();
long diff = System.currentTimeMillis() - mPauseTime;
Log.d( TAG, "disconnect() called %d seconds after app paused", diff/1000 );
}
try {
mShouldExit = true;
mClient.unsubscribe( mDevID );
mClient.disconnect();
Log.d( TAG, "disconnect() succeeded" );
mMsgThread.interrupt(); mMsgThread.interrupt();
} catch (MqttException ex){ try {
ex.printStackTrace(); mMsgThread.join();
} catch (Exception ex){ Log.d( TAG, "%H.disconnect(); JOINED thread", this );
ex.printStackTrace(); } catch ( InterruptedException ie ) {
clearInstance(); Log.e( TAG, "%H.disconnect(); got ie from join: %s", this, ie );
}
} }
private void clearInstance() { clearInstance( this ); } mShouldExit = true;
setState( State.CLOSING );
// Hack. Problem is that e.g. unsubscribe will throw an exception if
// you're not subscribed. That can't prevent us from continuing to
// disconnect() and close. Rather than wrap each in its own try/catch,
// run 'em in a loop in a single try/catch.
outer:
for ( int ii = 0; ; ++ii ) {
String action = null;
try {
switch ( ii ) {
case 0:
action = "unsubscribe";
mClient.unsubscribe( mDevID );
break; // not continue, which skips the Log() below
case 1:
action = "disconnect";
mClient.disconnect();
break;
case 2:
action = "close";
mClient.close();
break;
default:
break outer;
}
Log.d( TAG, "%H.disconnect(): %s() succeeded", this, action );
} catch ( MqttException mex ) {
Log.e( TAG, "%H.disconnect(): %s(): got mex %s",
this, action, mex );
} catch ( Exception ex ) {
Log.e( TAG, "%H.disconnect(): %s(): got ex %s",
this, action, ex );
}
}
mClient = null;
// Make sure we don't need to call clearInstance(this)
synchronized ( sInstance ) {
Assert.assertTrueNR( sInstance[0] != this );
}
Log.d( TAG, "%H.disconnect() DONE", this );
}
private void clearInstance()
{
Log.d( TAG, "%H.clearInstance()", this );
clearInstance( this );
}
public static void inviteRemote( Context context, String invitee, NetLaunchInfo nli ) public static void inviteRemote( Context context, String invitee, NetLaunchInfo nli )
{ {
@ -389,7 +430,7 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
@Override @Override
public void messageArrived( String topic, MqttMessage message) throws Exception public void messageArrived( String topic, MqttMessage message) throws Exception
{ {
Log.d( TAG, "messageArrived(topic=%s, message=%s)", topic, message ); Log.d( TAG, "%H.messageArrived(topic=%s)", this, topic );
Assert.assertTrueNR( topic.equals(mTopic) ); Assert.assertTrueNR( topic.equals(mTopic) );
mMsgThread.add( message.getPayload() ); mMsgThread.add( message.getPayload() );
ConnStatusHandler ConnStatusHandler
@ -399,26 +440,51 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) public void deliveryComplete(IMqttDeliveryToken token)
{ {
Log.d( TAG, "deliveryComplete(token=%s)", token ); Log.d( TAG, "%H.deliveryComplete(token=%s)", this, token );
ConnStatusHandler ConnStatusHandler
.updateStatusOut( mContext, CommsConnType.COMMS_CONN_MQTT, true ); .updateStatusOut( mContext, CommsConnType.COMMS_CONN_MQTT, true );
} }
private void subscribe()
{
final int qos = XWPrefs.getPrefsInt( mContext, R.string.key_mqtt_qos, 2 );
setState( State.SUBSCRIBING );
try {
mClient.subscribe( mTopic, qos, null, this );
// Log.d( TAG, "subscribed to %s", mTopic );
} catch ( MqttException ex ) {
ex.printStackTrace();
} catch ( Exception ex ) {
ex.printStackTrace();
clearInstance();
}
}
@Override @Override
public void onSuccess( IMqttToken asyncActionToken ) public void onSuccess( IMqttToken asyncActionToken )
{ {
Log.d( TAG, "onSuccess(%s)", asyncActionToken ); Log.d( TAG, "%H.onSuccess(%s); cur state: %s", asyncActionToken, this, mState );
switch ( mState ) {
case CONNECTING:
setState( State.CONNECTED );
break;
case SUBSCRIBING:
setState( State.SUBSCRIBED );
break;
default:
Log.e( TAG, "%H.onSuccess(): unexpected state %s", mState );
}
} }
@Override @Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) public void onFailure(IMqttToken asyncActionToken, Throwable exception)
{ {
Log.d( TAG, "onFailure(%s, %s)", asyncActionToken, exception ); Log.d( TAG, "%H.onFailure(%s, %s); cur state: %s", this, asyncActionToken,
exception, mState );
} }
private class MsgThread extends Thread { private class MsgThread extends Thread {
private LinkedBlockingQueue<byte[]> mQueue = new LinkedBlockingQueue<>(); private LinkedBlockingQueue<byte[]> mQueue = new LinkedBlockingQueue<>();
private long mStartTime = Utils.getCurSeconds();
void add( byte[] msg ) { void add( byte[] msg ) {
mQueue.add( msg ); mQueue.add( msg );
@ -427,6 +493,8 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
@Override @Override
public void run() public void run()
{ {
long startTime = Utils.getCurSeconds();
Log.d( TAG, "%H.MsgThread.run() starting", MQTTUtils.this );
for ( ; ; ) { for ( ; ; ) {
try { try {
byte[] packet = mQueue.take(); byte[] packet = mQueue.take();
@ -437,7 +505,8 @@ public class MQTTUtils extends Thread implements IMqttActionListener, MqttCallba
} }
} }
long now = Utils.getCurSeconds(); long now = Utils.getCurSeconds();
Log.d( TAG, "%H.run() exiting after %d seconds", this, now - mStartTime ); Log.d( TAG, "%H.MsgThread.run() exiting after %d seconds", MQTTUtils.this,
now - startTime );
} }
} }

View file

@ -118,9 +118,6 @@ public class XWApp extends Application
} }
GameUtils.resendAllIf( this, null ); GameUtils.resendAllIf( this, null );
break; break;
case ON_PAUSE:
MQTTUtils.onPause();
break;
} }
} }