diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index ded788084a32b..ab2d583ad0cb1 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -12,6 +12,7 @@ */ package org.openhab.binding.mercedesme.internal.handler; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -57,8 +58,15 @@ import org.slf4j.LoggerFactory; import com.daimler.mbcarkit.proto.Client.ClientMessage; +import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles; +import com.daimler.mbcarkit.proto.VehicleEvents; +import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN; +import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage; import com.daimler.mbcarkit.proto.VehicleEvents.VEPUpdate; +import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN; import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByPID; +import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN; +import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinPendingCommandsRequest; /** * The {@link AccountHandler} acts as Bridge between MercedesMe Account and the associated vehicles @@ -82,7 +90,9 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private Optional server = Optional.empty(); private Optional authService = Optional.empty(); - private Optional> scheduledFuture = Optional.empty(); + private Optional> refreshScheduler = Optional.empty(); + private List eventQueue = new ArrayList<>(); + private boolean updateRunning = false; private String capabilitiesEndpoint = "/v1/vehicle/%s/capabilities"; private String commandCapabilitiesEndpoint = "/v1/vehicle/%s/capabilities/commands"; @@ -128,13 +138,13 @@ public void initialize() { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, textKey + " [\"" + thing.getProperties().get("callbackUrl") + "\"]"); } else { - scheduledFuture = Optional.of(scheduler.scheduleWithFixedDelay(this::update, 0, + refreshScheduler = Optional.of(scheduler.scheduleWithFixedDelay(this::refresh, 0, config.get().refreshInterval, TimeUnit.MINUTES)); } } } - public void update() { + public void refresh() { if (server.isPresent()) { if (!Constants.NOT_SET.equals(authService.get().getToken())) { ws.run(); @@ -203,12 +213,13 @@ public void dispose() { server = Optional.empty(); Utils.removePort(config.get().callbackPort); } - ws.interrupt(); - scheduledFuture.ifPresent(schedule -> { + refreshScheduler.ifPresent(schedule -> { if (!schedule.isCancelled()) { schedule.cancel(true); } }); + ws.interrupt(); + eventQueue.clear(); } /** @@ -217,7 +228,7 @@ public void dispose() { @Override public void onAccessTokenResponse(AccessTokenResponse tokenResponse) { if (!Constants.NOT_SET.equals(tokenResponse.getAccessToken())) { - scheduler.schedule(this::update, 2, TimeUnit.SECONDS); + scheduler.schedule(this::refresh, 2, TimeUnit.SECONDS); } else if (server.isEmpty()) { // server not running - fix first String textKey = Constants.STATUS_TEXT_PREFIX + thing.getThingTypeUID().getId() @@ -262,7 +273,7 @@ public void registerVin(String vin, VehicleHandler handler) { activeVehicleHandlerMap.put(vin, handler); VEPUpdate updateForVin = vepUpdateMap.get(vin); if (updateForVin != null) { - handler.distributeContent(updateForVin); + handler.enqueueUpdate(updateForVin); } } @@ -284,12 +295,97 @@ public void getVehicleCapabilities(String vin) { } } + /** + * functions for websocket handling + */ + + public void enqueueMessage(byte[] data) { + synchronized (eventQueue) { + eventQueue.add(data); + scheduler.execute(this::scheduleMessage); + } + } + + private void scheduleMessage() { + byte[] data; + synchronized (eventQueue) { + while (updateRunning) { + try { + eventQueue.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + eventQueue.clear(); + return; + } + } + if (!eventQueue.isEmpty()) { + data = eventQueue.remove(0); + } else { + return; + } + updateRunning = true; + } + try { + handleMessage(data); + } finally { + synchronized (eventQueue) { + updateRunning = false; + eventQueue.notifyAll(); + } + } + } + + private void handleMessage(byte[] array) { + try { + PushMessage pm = VehicleEvents.PushMessage.parseFrom(array); + if (pm.hasVepUpdates()) { + boolean distributed = distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); + logger.trace("Distributed VEPUpdate {}", distributed); + if (distributed) { + AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder() + .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build(); + ws.sendAcknowledgeMessage(cm); + } + } else if (pm.hasAssignedVehicles()) { + for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) { + String vin = pm.getAssignedVehicles().getVins(0); + discovery(vin); + } + AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build(); + ws.sendAcknowledgeMessage(cm); + } else if (pm.hasApptwinCommandStatusUpdatesByVin()) { + AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin(); + commandStatusUpdate(csubv.getUpdatesByVinMap()); + AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN + .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack) + .build(); + ws.sendAcknowledgeMessage(cm); + } else if (pm.hasApptwinPendingCommandRequest()) { + AppTwinPendingCommandsRequest pending = pm.getApptwinPendingCommandRequest(); + if (!pending.getAllFields().isEmpty()) { + logger.trace("Pending Command {}", pending.getAllFields()); + } + } else if (pm.hasDebugMessage()) { + logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage()); + } else { + logger.trace("MB Message: {} not handled", pm.getAllFields()); + } + } catch (IOException e) { + logger.trace("IOException decoding message {}", e.getMessage()); + } catch (Error err) { + logger.debug("Error caught {}", err.getMessage()); + } + } + public boolean distributeVepUpdates(Map map) { List notFoundList = new ArrayList<>(); map.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); if (h != null) { - h.distributeContent(value); + h.enqueueUpdate(value); } else { if (value.getFullUpdate()) { vepUpdateMap.put(key, value); @@ -430,7 +526,7 @@ public void sendCommand(@Nullable ClientMessage cm) { if (cm != null) { ws.setCommand(cm); } - scheduler.schedule(this::update, 2, TimeUnit.SECONDS); + scheduler.schedule(this::refresh, 2, TimeUnit.SECONDS); } public void keepAlive(boolean b) { diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java index 40afa829c54e6..5087958788051 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java @@ -134,6 +134,8 @@ public class VehicleHandler extends BaseThingHandler { private JSONObject chargeGroupValueStorage = new JSONObject(); private Map hvacGroupValueStorage = new HashMap<>(); private String vehicleType = NOT_SET; + private List eventQueue = new ArrayList<>(); + private boolean updateRunning = false; Map eventStorage = new HashMap<>(); Optional accountHandler = Optional.empty(); @@ -182,6 +184,7 @@ public void dispose() { accountHandler.ifPresent(ah -> { ah.unregisterVin(config.get().vin); }); + eventQueue.clear(); super.dispose(); } @@ -587,13 +590,49 @@ public void distributeCommandStatus(AppTwinCommandStatusUpdatesByPID cmdUpdates) }); } - public void distributeContent(VEPUpdate data) { + public void enqueueUpdate(VEPUpdate update) { + synchronized (eventQueue) { + eventQueue.add(update); + scheduler.execute(this::scheduleUpdate); + } + } + + private void scheduleUpdate() { + VEPUpdate data; + synchronized (eventQueue) { + while (updateRunning) { + try { + eventQueue.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + eventQueue.clear(); + return; + } + } + if (!eventQueue.isEmpty()) { + data = eventQueue.remove(0); + } else { + return; + } + updateRunning = true; + } + try { + handleUpdate(data); + } finally { + synchronized (eventQueue) { + updateRunning = false; + eventQueue.notifyAll(); + } + } + } + + public void handleUpdate(VEPUpdate update) { updateStatus(ThingStatus.ONLINE); - boolean fullUpdate = data.getFullUpdate(); + boolean fullUpdate = update.getFullUpdate(); /** * Deliver proto update */ - String newProto = Utils.proto2Json(data, thing.getThingTypeUID()); + String newProto = Utils.proto2Json(update, thing.getThingTypeUID()); String combinedProto = newProto; ChannelUID protoUpdateChannelUID = new ChannelUID(thing.getUID(), GROUP_VEHICLE, OH_CHANNEL_PROTO_UPDATE); ChannelStateMap oldProtoMap = eventStorage.get(protoUpdateChannelUID.getId()); @@ -609,7 +648,7 @@ public void distributeContent(VEPUpdate data) { StringType.valueOf(combinedProto)); updateChannel(dataUpdateMap); - Map atts = data.getAttributesMap(); + Map atts = update.getAttributesMap(); /** * handle "simple" values */ diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index 206b4f9dee74e..1e2bc1cedd64f 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -39,13 +39,6 @@ import org.slf4j.LoggerFactory; import com.daimler.mbcarkit.proto.Client.ClientMessage; -import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles; -import com.daimler.mbcarkit.proto.VehicleEvents; -import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN; -import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage; -import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN; -import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN; -import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinPendingCommandsRequest; /** * {@link MBWebsocket} as socket endpoint to communicate with Mercedes @@ -128,9 +121,10 @@ public void run() { accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "@text/mercedesme.account.status.websocket-failure"); logger.warn("Websocket handling exception: {}", t.getMessage()); - } - synchronized (this) { - running = false; + } finally { + synchronized (this) { + running = false; + } } } @@ -157,7 +151,7 @@ private boolean sendMessage() { return false; } - private void sendAcknowledgeMessage(ClientMessage message) { + public void sendAcknowledgeMessage(ClientMessage message) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); message.writeTo(baos); @@ -169,10 +163,6 @@ private void sendAcknowledgeMessage(ClientMessage message) { } } - public boolean isRunning() { - return running; - } - public void interrupt() { synchronized (this) { runTill = Instant.MIN; @@ -202,47 +192,20 @@ public void keepAlive(boolean b) { @OnWebSocketMessage public void onBytes(InputStream is) { try { - PushMessage pm = VehicleEvents.PushMessage.parseFrom(is); - if (pm.hasVepUpdates()) { - boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); - if (distributed) { - AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder() - .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build(); - sendAcknowledgeMessage(cm); - } - } else if (pm.hasAssignedVehicles()) { - for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) { - String vin = pm.getAssignedVehicles().getVins(0); - accountHandler.discovery(vin); - } - AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build(); - sendAcknowledgeMessage(cm); - } else if (pm.hasApptwinCommandStatusUpdatesByVin()) { - AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin(); - accountHandler.commandStatusUpdate(csubv.getUpdatesByVinMap()); - AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN - .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack) - .build(); - sendAcknowledgeMessage(cm); - } else if (pm.hasApptwinPendingCommandRequest()) { - AppTwinPendingCommandsRequest pending = pm.getApptwinPendingCommandRequest(); - if (!pending.getAllFields().isEmpty()) { - logger.trace("Pending Command {}", pending.getAllFields()); - } - } else if (pm.hasDebugMessage()) { - logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage()); - } else { - logger.trace("MB Message: {} not handled", pm.getAllFields()); - } + byte[] array = is.readAllBytes(); + is.close(); + accountHandler.enqueueMessage(array); + /** + * https://community.openhab.org/t/mercedes-me/136866/12 + * Release Websocket thread as early as possible to avoid execeptions + * + * 1. Websocket thread responsible for reading stream in bytes and enqueue for AccountHandler. + * 2. AccountHamdler thread responsible for encoding proto message. In case of update enqueue proto message + * at VehicleHandöer + * 3. VehicleHandler responsible to update channels + */ } catch (IOException e) { - // don't report thing status errors here. - // Sometimes messages cannot be decoded which doesn't effect the overall functionality - logger.trace("IOException {}", e.getMessage()); - } catch (Error err) { - logger.trace("Error caught {}", err.getMessage()); + logger.debug("IOException reading input stream {}", e.getMessage()); } } diff --git a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java index d785afedb1c47..f041deb805b39 100644 --- a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java +++ b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java @@ -12,8 +12,11 @@ */ package org.openhab.binding.mercedesme.internal.handler; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,6 +56,7 @@ public class ThingCallbackListener implements ThingHandlerCallback { public Map> updatesPerGroupMap = new HashMap<>(); public boolean linked = false; public Optional status = Optional.empty(); + private Instant waitTime = Instant.MAX; public ThingStatusInfo getThingStatus() { return status.get(); @@ -78,6 +82,23 @@ public void stateUpdated(ChannelUID channelUID, State state) { } } groupMap.put(channelUID.toString(), state); + synchronized (updatesReceived) { + waitTime = Instant.now().plus(500, ChronoUnit.MILLIS); + } + } + + public void waitForUpdates() { + Instant maxWaitTime = Instant.now().plus(5000, ChronoUnit.MILLIS); + synchronized (updatesReceived) { + while (Instant.now().isBefore(maxWaitTime) && waitTime.isAfter(Instant.now())) { + try { + updatesReceived.wait(50); + } catch (InterruptedException e) { + fail(); + } + } + } + waitTime = Instant.MAX; } @Override diff --git a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java index 8675b4d0ac43a..65cf867783e67 100644 --- a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java +++ b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java @@ -115,7 +115,8 @@ public void testBEVFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -151,7 +152,8 @@ public void testBEVImperialUnits() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-ImperialUnits.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -187,7 +189,9 @@ public void testBEVImperialUnits() { // overwrite with EU Units json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); + assertEquals("%.1f °C", patternMock.patternMap.get("test::bev:hvac#temperature"), "Temperature Pattern"); commandOptionMock.getCommandList("test::bev:hvac#temperature").forEach(cmd -> { assertTrue(cmd.getCommand().endsWith(" °C"), "Command Option Celsius Unit"); @@ -209,7 +213,8 @@ public void testBEVCharging() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -246,13 +251,17 @@ public void testBEVChargeEndtime() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); + assertEquals("2023-09-09 13:54", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday-Underrun.json"); update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); + assertEquals("2023-09-11 13:55", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); } @@ -272,7 +281,9 @@ public void testBEVPartialChargingUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); + assertEquals(2, updateListener.updatesReceived.size(), "Update Count"); assertEquals("2023-09-19 20:45", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); @@ -294,7 +305,8 @@ public void testBEVPartialGPSUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-GPS.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); assertEquals("1.23,4.56", updateListener.getResponse("test::bev:position#gps").toFullString(), "GPS update"); assertEquals("41.9 °", updateListener.getResponse("test::bev:position#heading").toFullString(), @@ -316,7 +328,9 @@ public void testBEVPartialRangeUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Range.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); + assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); assertEquals("15017 km", updateListener.getResponse("test::bev:range#mileage").toFullString(), "Mileage Update"); @@ -341,7 +355,8 @@ public void testHybridFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -374,7 +389,8 @@ public void testHybridFullUpadteWithCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); // Test charged / uncharged battery and filled / unfilled tank volume assertEquals("5.800000190734863 kWh", updateListener.getResponse("test::hybrid:range#charged").toFullString(), @@ -403,7 +419,8 @@ public void testEventStorage() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -450,12 +467,14 @@ public void testProtoChannelLinked() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertFalse(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); updateListener.linked = true; - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertTrue(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); } @@ -477,7 +496,8 @@ public void testTemperaturePoints() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals("22 °C", updateListener.getResponse("test::bev:hvac#temperature").toFullString(), "Temperature Point One Updated"); @@ -508,7 +528,8 @@ public void testTemperaturePointSelection() { vh.setCallback(updateListener); String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_HVAC, "temperature"); updateListener = new ThingCallbackListener(); @@ -538,7 +559,8 @@ public void testChargeProgramSelection() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); + updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_CHARGE, "max-soc"); vh.handleCommand(cuid, QuantityType.valueOf("90 %")); @@ -586,7 +608,8 @@ public void testPositioning() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(POSITIONING_UPDATE_COUNT, updateListener.getUpdatesForGroup("position"), "Position Update Count"); assertEquals("1.23,4.56", updateListener.getResponse("test::bev:position#gps").toFullString(), @@ -608,7 +631,8 @@ public void testHVAC() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals(HVAC_UPDATE_COUNT, updateListener.getUpdatesForGroup("hvac"), "HVAC Update Count"); assertEquals(0, ((DecimalType) updateListener.getResponse("test::bev:hvac#ac-status")).intValue(), @@ -627,7 +651,8 @@ public void testEcoScore() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals("72 %", updateListener.getResponse("test::bev:eco#accel").toFullString(), "Eco Acceleration"); assertEquals("81 %", updateListener.getResponse("test::bev:eco#coasting").toFullString(), "Eco Coasting"); @@ -647,7 +672,8 @@ public void testAdBlue() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Combustion.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); + updateListener.waitForUpdates(); assertEquals("29 %", updateListener.getResponse("test::combustion:range#adblue-level").toFullString(), "AdBlue Tank Level");