Skip to content

Commit

Permalink
Issue556 (#557)
Browse files Browse the repository at this point in the history
* Issue 556 - ping callback

* Issue 556 - latency

* Issue 556

* Issue 556

* Issue 556

* Issue 556
  • Loading branch information
shamblett authored Aug 6, 2024
1 parent 53759ce commit 959d489
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 4 deletions.
29 changes: 27 additions & 2 deletions example/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import 'package:mqtt_client/mqtt_server_client.dart';
final client = MqttServerClient('test.mosquitto.org', '');

var pongCount = 0; // Pong counter
var pingCount = 0; // Ping counter

Future<int> main() async {
/// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker
Expand All @@ -41,7 +42,7 @@ Future<int> main() async {
/// list so in most cases you can ignore this.
/// Set logging on if needed, defaults to off
client.logging(on: true);
client.logging(on: false);

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
Expand All @@ -66,9 +67,13 @@ Future<int> main() async {
client.onSubscribed = onSubscribed;

/// Set a ping received callback if needed, called whenever a ping response(pong) is received
/// from the broker.
/// from the broker. Can be used for health monitoring.
client.pongCallback = pong;

/// Set a ping sent callback if needed, called whenever a ping request(ping) is sent
/// by the client. Can be used for latency calculations.
client.pingCallback = ping;

/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
Expand Down Expand Up @@ -160,6 +165,13 @@ Future<int> main() async {
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);

/// Print the ping/pong cycle latency data before disconnecting.
print('EXAMPLE::Keep alive latencies');
print(
'The latency of the last ping/pong cycle is ${client.lastCycleLatency} milliseconds');
print(
'The average latency of all the ping/pong cycles is ${client.averageCycleLatency} milliseconds');

/// Finally, unsubscribe and exit gracefully
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
Expand Down Expand Up @@ -193,6 +205,11 @@ void onDisconnected() {
} else {
print('EXAMPLE:: Pong count is incorrect, expected 3. actual $pongCount');
}
if (pingCount == 3) {
print('EXAMPLE:: Ping count is correct');
} else {
print('EXAMPLE:: Ping count is incorrect, expected 3. actual $pingCount');
}
}

/// The successful connect callback
Expand All @@ -205,4 +222,12 @@ void onConnected() {
void pong() {
print('EXAMPLE::Ping response client callback invoked');
pongCount++;
print(
'EXAMPLE::Latency of this ping/pong cycle is ${client.lastCycleLatency} milliseconds');
}

/// Ping callback
void ping() {
print('EXAMPLE::Ping sent client callback invoked');
pingCount++;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ part of '../../mqtt_client.dart';
/// Ping response received callback
typedef PongCallback = void Function();

/// Ping request sent callback
typedef PingCallback = void Function();

/// Implements keep alive functionality on the Mqtt Connection,
/// ensuring that the connection remains active according to the
/// keep alive seconds setting.
Expand Down Expand Up @@ -64,9 +67,24 @@ class MqttConnectionKeepAlive {
/// Used to synchronise shutdown and ping operations.
bool _shutdownPadlock = false;

/// Ping response received callback
/// Ping response received callback.
PongCallback? pongCallback;

/// Ping request sent callback.
PingCallback? pingCallback;

/// Latency(time between sending a ping and receiving a pong) in ms
/// of the last ping/pong cycle. Reset on disconnect.
int lastCycleLatency = 0;

int _lastPingTime = 0;

/// Average latency(time between sending a ping and receiving a pong) in ms
/// of all the ping/pong cycles in a connection period. Reset on disconnect.
int averageCycleLatency = 0;

int _cycleCount = 0;

/// The event bus
events.EventBus? _clientEventBus;

Expand All @@ -88,6 +106,10 @@ class MqttConnectionKeepAlive {
try {
_connectionHandler.sendMessage(pingMsg);
pinged = true;
_lastPingTime = DateTime.now().millisecondsSinceEpoch;
if (pingCallback != null) {
pingCallback!();
}
} catch (e) {
MqttLogger.log(
'MqttConnectionKeepAlive::pingRequired - exception occurred');
Expand Down Expand Up @@ -151,10 +173,20 @@ class MqttConnectionKeepAlive {
/// Processed ping response messages received from a message broker.
bool pingResponseReceived(MqttMessage? pingMsg) {
MqttLogger.log('MqttConnectionKeepAlive::pingResponseReceived');

// Calculate latencies
lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime;
_cycleCount++;
// Average latency calculation is
// new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1)
averageCycleLatency +=
(lastCycleLatency - averageCycleLatency) ~/ _cycleCount;

// Call the pong callback if not null
if (pongCallback != null) {
pongCallback!();
}

// Cancel the disconnect timer if needed.
disconnectTimer?.cancel();
return true;
Expand All @@ -168,6 +200,9 @@ class MqttConnectionKeepAlive {
MqttLogger.log('MqttConnectionKeepAlive::stop - stopping keep alive');
pingTimer!.cancel();
disconnectTimer?.cancel();
lastCycleLatency = 0;
averageCycleLatency = 0;
_cycleCount = 0;
}

/// Handle the disconnect timer timeout
Expand Down
27 changes: 26 additions & 1 deletion lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class MqttClient {
subscriptionsManager?.onUnsubscribed = cb;
}

/// Ping response received callback.
/// Ping response(pong) received callback.
/// If set when a ping response is received from the broker
/// this will be called.
/// Can be used for health monitoring outside of the client itself.
Expand All @@ -264,6 +264,28 @@ class MqttClient {
keepAlive?.pongCallback = cb;
}

/// Ping request(ping) sent callback.
/// If set when a ping request is sent from the client
/// this will be called.
/// Can be used in tandem with the [pongCallback] for latency calculations.
PingCallback? _pingCallback;

/// The ping sent callback
PingCallback? get pingCallback => _pingCallback;

set pingCallback(PingCallback? cb) {
_pingCallback = cb;
keepAlive?.pingCallback = cb;
}

/// The latency of the last ping/pong cycle in milliseconds.
/// Cleared on disconnect.
int? get lastCycleLatency => keepAlive?.lastCycleLatency;

/// The average latency of all ping/pong cycles in a connection period in
/// milliseconds. Cleared on disconnect.
int? get averageCycleLatency => keepAlive?.averageCycleLatency;

/// The event bus
@protected
events.EventBus? clientEventBus;
Expand Down Expand Up @@ -320,6 +342,9 @@ class MqttClient {
if (pongCallback != null) {
keepAlive!.pongCallback = pongCallback;
}
if (pingCallback != null) {
keepAlive!.pingCallback = pingCallback;
}
} else {
MqttLogger.log('MqttClient::connect - keep alive is disabled');
}
Expand Down
93 changes: 93 additions & 0 deletions test/mqtt_client_keep_alive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,97 @@ void main() {
expect(ka.disconnectTimer, isNull);
});
});
group('Latency', () {
test('Ping callback', () async {
final clientEventBus = events.EventBus();
var disconnect = false;
void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) {
disconnect = true;
}

var pingCalled = false;
void pingCallback() {
pingCalled = true;
}

clientEventBus
.on<DisconnectOnNoPingResponse>()
.listen(disconnectOnNoPingResponse);
final ch = MockCH(
clientEventBus,
maxConnectionAttempts: 3,
);
ch.connectionStatus.state = MqttConnectionState.connected;
final ka = MqttConnectionKeepAlive(ch, clientEventBus, 2);
ka.pingCallback = pingCallback;
verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any()))
.called(1);
verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any()))
.called(1);
verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1);
expect(ka.pingTimer?.isActive, isTrue);
expect(ka.disconnectTimer, isNull);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
expect(pingCalled, isTrue);
final pingMessageRx = MqttPingResponseMessage();
ka.pingResponseReceived(pingMessageRx);
expect(disconnect, isFalse);
ka.stop();
expect(ka.pingTimer?.isActive, isFalse);
expect(ka.disconnectTimer, isNull);
});
test('Latency counts', () async {
final latencies = <int>[0, 0, 0];
final clientEventBus = events.EventBus();
var disconnect = false;
void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) {
disconnect = true;
}

clientEventBus
.on<DisconnectOnNoPingResponse>()
.listen(disconnectOnNoPingResponse);
final ch = MockCH(
clientEventBus,
maxConnectionAttempts: 3,
);
ch.connectionStatus.state = MqttConnectionState.connected;
final ka = MqttConnectionKeepAlive(ch, clientEventBus, 3);
verify(() => ch.registerForMessage(MqttMessageType.pingRequest, any()))
.called(1);
verify(() => ch.registerForMessage(MqttMessageType.pingResponse, any()))
.called(1);
verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1);
expect(ka.pingTimer?.isActive, isTrue);
expect(ka.disconnectTimer, isNull);
await MqttUtilities.asyncSleep(4);
verify(() => ch.sendMessage(any())).called(1);
final pingMessageRx = MqttPingResponseMessage();
ka.pingResponseReceived(pingMessageRx);
latencies[0] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
ka.pingResponseReceived(pingMessageRx);
latencies[1] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
await MqttUtilities.asyncSleep(3);
verify(() => ch.sendMessage(any())).called(1);
ka.pingResponseReceived(pingMessageRx);
latencies[2] = ka.lastCycleLatency;
expect(ka.lastCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency > 1000, isTrue);
expect(ka.averageCycleLatency,
(latencies[0] + latencies[1] + latencies[2]) ~/ 3);
expect(disconnect, isFalse);
ka.stop();
expect(ka.averageCycleLatency, 0);
expect(ka.lastCycleLatency, 0);
expect(ka.pingTimer?.isActive, isFalse);
expect(ka.disconnectTimer, isNull);
});
});
}

0 comments on commit 959d489

Please sign in to comment.