Skip to content

Commit

Permalink
Fixes subscription to _deviceConnector.deviceConnectionStateUpdateStr…
Browse files Browse the repository at this point in the history
…eam leaking (#876)

- Previous code would create a broadcastStream from a normal stream (created by Repeater).
When this is done, the source stream subscription needs to be explicitly closed.
This is usually done by implementing the onCancel callback from the asBroadcastStream method.
This was not happening, so every call to connectedDeviceStream was creating a new subscription that was never closed.
- To fix the problem, this commit uses the Repeater.broadcast constructor which already returns a broadcast stream
and also closes the source stream subscription when needed
  • Loading branch information
gabrielgarciagava committed Jun 24, 2024
1 parent 96e083a commit a6dd926
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions packages/flutter_reactive_ble/lib/src/reactive_ble.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,10 @@ class FlutterReactiveBle {
BleStatus get status => _status;

/// A stream providing connection updates for all the connected BLE devices.
Stream<ConnectionStateUpdate> get connectedDeviceStream => Repeater(onListenEmitFrom: () async* {
Stream<ConnectionStateUpdate> get connectedDeviceStream => Repeater.broadcast(onListenEmitFrom: () async* {
await initialize();
yield* _deviceConnector.deviceConnectionStateUpdateStream;
}).stream.asBroadcastStream()
..listen((_) {});
}).stream;

/// A stream providing value updates for all the connected BLE devices.
///
Expand Down Expand Up @@ -105,8 +104,7 @@ class FlutterReactiveBle {
);

if (Platform.isAndroid || Platform.isIOS) {
ReactiveBlePlatform.instance =
const ReactiveBleMobilePlatformFactory().create(
ReactiveBlePlatform.instance = const ReactiveBleMobilePlatformFactory().create(
logger: _debugLogger,
);
}
Expand Down Expand Up @@ -398,11 +396,10 @@ class FlutterReactiveBle {
Future<void> clearGattCache(String deviceId) =>
_blePlatform.clearGattCache(deviceId).then((info) => info.dematerialize());

/// Reads the RSSI of the of the peripheral with the given device ID.
/// Reads the RSSI of the of the peripheral with the given device ID.
/// The peripheral must be connected, otherwise a [PlatformException] will be
/// thrown
Future<int> readRssi(String deviceId) async =>
_blePlatform.readRssi(deviceId);
Future<int> readRssi(String deviceId) async => _blePlatform.readRssi(deviceId);

/// Subscribes to updates from the characteristic specified.
///
Expand Down

0 comments on commit a6dd926

Please sign in to comment.