From cb236c47695760939941ff63cb5905ecac900d14 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Tue, 12 Nov 2024 18:33:41 +0100 Subject: [PATCH 1/9] decouple websocket thread from handler update Signed-off-by: Bernd Weymann --- .../internal/handler/AccountHandler.java | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index ded788084a32b..d725a67478373 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -79,6 +79,7 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private final Map activeVehicleHandlerMap = new HashMap<>(); private final Map vepUpdateMap = new HashMap<>(); private final Map> capabilitiesMap = new HashMap<>(); + private final List> eventQueue = new ArrayList<>(); private Optional server = Optional.empty(); private Optional authService = Optional.empty(); @@ -87,6 +88,7 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private String capabilitiesEndpoint = "/v1/vehicle/%s/capabilities"; private String commandCapabilitiesEndpoint = "/v1/vehicle/%s/capabilities/commands"; private String poiEndpoint = "/v1/vehicle/%s/route"; + private boolean updateRunning = false; final MBWebsocket ws; Optional config = Optional.empty(); @@ -288,9 +290,7 @@ public boolean distributeVepUpdates(Map map) { List notFoundList = new ArrayList<>(); map.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); - if (h != null) { - h.distributeContent(value); - } else { + if (h == null) { if (value.getFullUpdate()) { vepUpdateMap.put(key, value); } @@ -298,11 +298,52 @@ public boolean distributeVepUpdates(Map map) { } }); notFoundList.forEach(vin -> { + map.remove(vin); logger.trace("No VehicleHandler available for VIN {}", vin); }); + if (!map.isEmpty()) { + synchronized (eventQueue) { + eventQueue.add(map); + scheduler.execute(this::doDdistributeVepUpdates); + } + } return notFoundList.isEmpty(); } + public void doDdistributeVepUpdates() { + Map map; + synchronized (eventQueue) { + while (updateRunning) { + try { + eventQueue.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!eventQueue.isEmpty()) { + map = eventQueue.remove(0); + } else { + return; + } + updateRunning = true; + } + try { + map.forEach((key, value) -> { + VehicleHandler h = activeVehicleHandlerMap.get(key); + if (h != null) { + h.distributeContent(value); + } + }); + } catch (Throwable t) { + logger.info("Exception during update {}", t.getMessage()); + // ensure every possible throwable is catched to ensure running queue + } + synchronized (eventQueue) { + updateRunning = false; + eventQueue.notifyAll(); + } + } + public void commandStatusUpdate(Map updatesByVinMap) { updatesByVinMap.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); From f5d4f9da3e5737b2e3926a5bb6ec8fd25154edd1 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Wed, 13 Nov 2024 12:49:34 +0100 Subject: [PATCH 2/9] decouple websocket thread Signed-off-by: Bernd Weymann --- .../internal/handler/AccountHandler.java | 47 ++----------------- .../internal/handler/VehicleHandler.java | 36 ++++++++++++++ 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index d725a67478373..ded788084a32b 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -79,7 +79,6 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private final Map activeVehicleHandlerMap = new HashMap<>(); private final Map vepUpdateMap = new HashMap<>(); private final Map> capabilitiesMap = new HashMap<>(); - private final List> eventQueue = new ArrayList<>(); private Optional server = Optional.empty(); private Optional authService = Optional.empty(); @@ -88,7 +87,6 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private String capabilitiesEndpoint = "/v1/vehicle/%s/capabilities"; private String commandCapabilitiesEndpoint = "/v1/vehicle/%s/capabilities/commands"; private String poiEndpoint = "/v1/vehicle/%s/route"; - private boolean updateRunning = false; final MBWebsocket ws; Optional config = Optional.empty(); @@ -290,7 +288,9 @@ public boolean distributeVepUpdates(Map map) { List notFoundList = new ArrayList<>(); map.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); - if (h == null) { + if (h != null) { + h.distributeContent(value); + } else { if (value.getFullUpdate()) { vepUpdateMap.put(key, value); } @@ -298,52 +298,11 @@ public boolean distributeVepUpdates(Map map) { } }); notFoundList.forEach(vin -> { - map.remove(vin); logger.trace("No VehicleHandler available for VIN {}", vin); }); - if (!map.isEmpty()) { - synchronized (eventQueue) { - eventQueue.add(map); - scheduler.execute(this::doDdistributeVepUpdates); - } - } return notFoundList.isEmpty(); } - public void doDdistributeVepUpdates() { - Map map; - synchronized (eventQueue) { - while (updateRunning) { - try { - eventQueue.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - if (!eventQueue.isEmpty()) { - map = eventQueue.remove(0); - } else { - return; - } - updateRunning = true; - } - try { - map.forEach((key, value) -> { - VehicleHandler h = activeVehicleHandlerMap.get(key); - if (h != null) { - h.distributeContent(value); - } - }); - } catch (Throwable t) { - logger.info("Exception during update {}", t.getMessage()); - // ensure every possible throwable is catched to ensure running queue - } - synchronized (eventQueue) { - updateRunning = false; - eventQueue.notifyAll(); - } - } - public void commandStatusUpdate(Map updatesByVinMap) { updatesByVinMap.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java index 40afa829c54e6..21b37aa3220af 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java @@ -134,6 +134,8 @@ public class VehicleHandler extends BaseThingHandler { private JSONObject chargeGroupValueStorage = new JSONObject(); private Map hvacGroupValueStorage = new HashMap<>(); private String vehicleType = NOT_SET; + private List eventQueue = new ArrayList<>(); + private boolean updateRunning = false; Map eventStorage = new HashMap<>(); Optional accountHandler = Optional.empty(); @@ -588,6 +590,40 @@ public void distributeCommandStatus(AppTwinCommandStatusUpdatesByPID cmdUpdates) } public void distributeContent(VEPUpdate data) { + synchronized (eventQueue) { + eventQueue.add(data); + scheduler.execute(this::doUpdate); + } + } + + public void doUpdate() { + VEPUpdate data; + synchronized (eventQueue) { + while (updateRunning) { + try { + eventQueue.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!eventQueue.isEmpty()) { + data = eventQueue.remove(0); + } else { + return; + } + updateRunning = true; + } + try { + update(data); + } finally { + synchronized (eventQueue) { + updateRunning = false; + eventQueue.notifyAll(); + } + } + } + + public void update(VEPUpdate data) { updateStatus(ThingStatus.ONLINE); boolean fullUpdate = data.getFullUpdate(); /** From a11db884bf7bc789194f4cccd0618817c44908a8 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Fri, 15 Nov 2024 21:18:41 +0100 Subject: [PATCH 3/9] socket thread decoupling from updating things Signed-off-by: Bernd Weymann --- .../internal/handler/AccountHandler.java | 2 +- .../internal/server/MBWebsocket.java | 5 +--- .../handler/ThingCallbackListener.java | 21 +++++++++++++++ .../internal/handler/VehicleHandlerTest.java | 26 +++++++++++++++++++ 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index ded788084a32b..0708a8f352d57 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -203,12 +203,12 @@ public void dispose() { server = Optional.empty(); Utils.removePort(config.get().callbackPort); } - ws.interrupt(); scheduledFuture.ifPresent(schedule -> { if (!schedule.isCancelled()) { schedule.cancel(true); } }); + ws.interrupt(); } /** diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index 206b4f9dee74e..600c075b17922 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -169,10 +169,6 @@ private void sendAcknowledgeMessage(ClientMessage message) { } } - public boolean isRunning() { - return running; - } - public void interrupt() { synchronized (this) { runTill = Instant.MIN; @@ -202,6 +198,7 @@ public void keepAlive(boolean b) { @OnWebSocketMessage public void onBytes(InputStream is) { try { + byte[] array = is.readAllBytes(); PushMessage pm = VehicleEvents.PushMessage.parseFrom(is); if (pm.hasVepUpdates()) { boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); diff --git a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java index d785afedb1c47..f041deb805b39 100644 --- a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java +++ b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/ThingCallbackListener.java @@ -12,8 +12,11 @@ */ package org.openhab.binding.mercedesme.internal.handler; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,6 +56,7 @@ public class ThingCallbackListener implements ThingHandlerCallback { public Map> updatesPerGroupMap = new HashMap<>(); public boolean linked = false; public Optional status = Optional.empty(); + private Instant waitTime = Instant.MAX; public ThingStatusInfo getThingStatus() { return status.get(); @@ -78,6 +82,23 @@ public void stateUpdated(ChannelUID channelUID, State state) { } } groupMap.put(channelUID.toString(), state); + synchronized (updatesReceived) { + waitTime = Instant.now().plus(500, ChronoUnit.MILLIS); + } + } + + public void waitForUpdates() { + Instant maxWaitTime = Instant.now().plus(5000, ChronoUnit.MILLIS); + synchronized (updatesReceived) { + while (Instant.now().isBefore(maxWaitTime) && waitTime.isAfter(Instant.now())) { + try { + updatesReceived.wait(50); + } catch (InterruptedException e) { + fail(); + } + } + } + waitTime = Instant.MAX; } @Override diff --git a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java index 8675b4d0ac43a..893351d217534 100644 --- a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java +++ b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java @@ -116,6 +116,7 @@ public void testBEVFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -152,6 +153,7 @@ public void testBEVImperialUnits() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-ImperialUnits.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -188,6 +190,8 @@ public void testBEVImperialUnits() { json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); + assertEquals("%.1f °C", patternMock.patternMap.get("test::bev:hvac#temperature"), "Temperature Pattern"); commandOptionMock.getCommandList("test::bev:hvac#temperature").forEach(cmd -> { assertTrue(cmd.getCommand().endsWith(" °C"), "Command Option Celsius Unit"); @@ -210,6 +214,7 @@ public void testBEVCharging() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -247,12 +252,16 @@ public void testBEVChargeEndtime() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); + assertEquals("2023-09-09 13:54", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday-Underrun.json"); update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); + assertEquals("2023-09-11 13:55", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); } @@ -273,6 +282,8 @@ public void testBEVPartialChargingUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); vh.distributeContent(update); + updateListener.waitForUpdates(); + assertEquals(2, updateListener.updatesReceived.size(), "Update Count"); assertEquals("2023-09-19 20:45", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) .format("%1$tY-%1$tm-%1$td %1$tH:%1$tM"), "End of Charge Time"); @@ -295,6 +306,7 @@ public void testBEVPartialGPSUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-GPS.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); assertEquals("1.23,4.56", updateListener.getResponse("test::bev:position#gps").toFullString(), "GPS update"); assertEquals("41.9 °", updateListener.getResponse("test::bev:position#heading").toFullString(), @@ -317,6 +329,8 @@ public void testBEVPartialRangeUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Range.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); vh.distributeContent(update); + updateListener.waitForUpdates(); + assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); assertEquals("15017 km", updateListener.getResponse("test::bev:range#mileage").toFullString(), "Mileage Update"); @@ -342,6 +356,7 @@ public void testHybridFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -375,6 +390,7 @@ public void testHybridFullUpadteWithCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); // Test charged / uncharged battery and filled / unfilled tank volume assertEquals("5.800000190734863 kWh", updateListener.getResponse("test::hybrid:range#charged").toFullString(), @@ -404,6 +420,7 @@ public void testEventStorage() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); assertEquals(10, updateListener.getUpdatesForGroup("doors"), "Doors Update Count"); @@ -451,11 +468,13 @@ public void testProtoChannelLinked() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertFalse(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); updateListener.linked = true; vh.distributeContent(update); + updateListener.waitForUpdates(); assertTrue(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); } @@ -478,6 +497,7 @@ public void testTemperaturePoints() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); assertEquals("22 °C", updateListener.getResponse("test::bev:hvac#temperature").toFullString(), "Temperature Point One Updated"); @@ -509,6 +529,7 @@ public void testTemperaturePointSelection() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_HVAC, "temperature"); updateListener = new ThingCallbackListener(); @@ -539,6 +560,7 @@ public void testChargeProgramSelection() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vh.distributeContent(update); + updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_CHARGE, "max-soc"); vh.handleCommand(cuid, QuantityType.valueOf("90 %")); @@ -587,6 +609,7 @@ public void testPositioning() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vHandler.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(POSITIONING_UPDATE_COUNT, updateListener.getUpdatesForGroup("position"), "Position Update Count"); assertEquals("1.23,4.56", updateListener.getResponse("test::bev:position#gps").toFullString(), @@ -609,6 +632,7 @@ public void testHVAC() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vHandler.distributeContent(update); + updateListener.waitForUpdates(); assertEquals(HVAC_UPDATE_COUNT, updateListener.getUpdatesForGroup("hvac"), "HVAC Update Count"); assertEquals(0, ((DecimalType) updateListener.getResponse("test::bev:hvac#ac-status")).intValue(), @@ -628,6 +652,7 @@ public void testEcoScore() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vHandler.distributeContent(update); + updateListener.waitForUpdates(); assertEquals("72 %", updateListener.getResponse("test::bev:eco#accel").toFullString(), "Eco Acceleration"); assertEquals("81 %", updateListener.getResponse("test::bev:eco#coasting").toFullString(), "Eco Coasting"); @@ -648,6 +673,7 @@ public void testAdBlue() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Combustion.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); vHandler.distributeContent(update); + updateListener.waitForUpdates(); assertEquals("29 %", updateListener.getResponse("test::combustion:range#adblue-level").toFullString(), "AdBlue Tank Level"); From 30e0379b45f5fa3016c0cbd2f27fcf544d09669b Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sat, 16 Nov 2024 00:17:54 +0100 Subject: [PATCH 4/9] close stream after reading Signed-off-by: Bernd Weymann --- .../mercedesme/internal/server/MBWebsocket.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index 600c075b17922..aa353f8898f84 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -128,9 +128,10 @@ public void run() { accountHandler.updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "@text/mercedesme.account.status.websocket-failure"); logger.warn("Websocket handling exception: {}", t.getMessage()); - } - synchronized (this) { - running = false; + } finally { + synchronized (this) { + running = false; + } } } @@ -199,7 +200,8 @@ public void keepAlive(boolean b) { public void onBytes(InputStream is) { try { byte[] array = is.readAllBytes(); - PushMessage pm = VehicleEvents.PushMessage.parseFrom(is); + is.close(); + PushMessage pm = VehicleEvents.PushMessage.parseFrom(array); if (pm.hasVepUpdates()) { boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); if (distributed) { @@ -238,6 +240,7 @@ public void onBytes(InputStream is) { // don't report thing status errors here. // Sometimes messages cannot be decoded which doesn't effect the overall functionality logger.trace("IOException {}", e.getMessage()); + e.printStackTrace(); } catch (Error err) { logger.trace("Error caught {}", err.getMessage()); } From f26c510b13cba369f5a0bb95d1eee220969210b4 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sat, 16 Nov 2024 17:26:15 +0100 Subject: [PATCH 5/9] free websocket as early as possible Signed-off-by: Bernd Weymann --- .../internal/handler/AccountHandler.java | 95 +++++++++++++++++++ .../internal/server/MBWebsocket.java | 52 +--------- 2 files changed, 98 insertions(+), 49 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index 0708a8f352d57..4b67348593cc6 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -12,6 +12,7 @@ */ package org.openhab.binding.mercedesme.internal.handler; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -57,8 +58,15 @@ import org.slf4j.LoggerFactory; import com.daimler.mbcarkit.proto.Client.ClientMessage; +import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles; +import com.daimler.mbcarkit.proto.VehicleEvents; +import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN; +import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage; import com.daimler.mbcarkit.proto.VehicleEvents.VEPUpdate; +import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN; import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByPID; +import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN; +import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinPendingCommandsRequest; /** * The {@link AccountHandler} acts as Bridge between MercedesMe Account and the associated vehicles @@ -83,6 +91,8 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private Optional server = Optional.empty(); private Optional authService = Optional.empty(); private Optional> scheduledFuture = Optional.empty(); + private List eventQueue = new ArrayList<>(); + private boolean updateRunning = false; private String capabilitiesEndpoint = "/v1/vehicle/%s/capabilities"; private String commandCapabilitiesEndpoint = "/v1/vehicle/%s/capabilities/commands"; @@ -284,6 +294,91 @@ public void getVehicleCapabilities(String vin) { } } + /** + * functions for websocket handling + * + * @param data + */ + + public void onMessage(byte[] data) { + synchronized (eventQueue) { + eventQueue.add(data); + scheduler.execute(this::doUpdate); + } + } + + public void doUpdate() { + byte[] data; + synchronized (eventQueue) { + while (updateRunning) { + try { + eventQueue.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!eventQueue.isEmpty()) { + data = eventQueue.remove(0); + } else { + return; + } + updateRunning = true; + } + try { + handleMessage(data); + } finally { + synchronized (eventQueue) { + updateRunning = false; + eventQueue.notifyAll(); + } + } + } + + private void handleMessage(byte[] array) { + try { + PushMessage pm = VehicleEvents.PushMessage.parseFrom(array); + if (pm.hasVepUpdates()) { + boolean distributed = distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); + logger.trace("Distributed VEPUpdate {}", distributed); + if (distributed) { + AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder() + .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build(); + ws.sendAcknowledgeMessage(cm); + } + } else if (pm.hasAssignedVehicles()) { + for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) { + String vin = pm.getAssignedVehicles().getVins(0); + discovery(vin); + } + AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build(); + ws.sendAcknowledgeMessage(cm); + } else if (pm.hasApptwinCommandStatusUpdatesByVin()) { + AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin(); + commandStatusUpdate(csubv.getUpdatesByVinMap()); + AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN + .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build(); + ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack) + .build(); + ws.sendAcknowledgeMessage(cm); + } else if (pm.hasApptwinPendingCommandRequest()) { + AppTwinPendingCommandsRequest pending = pm.getApptwinPendingCommandRequest(); + if (!pending.getAllFields().isEmpty()) { + logger.trace("Pending Command {}", pending.getAllFields()); + } + } else if (pm.hasDebugMessage()) { + logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage()); + } else { + logger.trace("MB Message: {} not handled", pm.getAllFields()); + } + } catch (IOException e) { + logger.trace("IOException decoding message {}", e.getMessage()); + } catch (Error err) { + logger.trace("Error caught {}", err.getMessage()); + } + } + public boolean distributeVepUpdates(Map map) { List notFoundList = new ArrayList<>(); map.forEach((key, value) -> { diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index aa353f8898f84..8acbf5d20950e 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -39,13 +39,6 @@ import org.slf4j.LoggerFactory; import com.daimler.mbcarkit.proto.Client.ClientMessage; -import com.daimler.mbcarkit.proto.Protos.AcknowledgeAssignedVehicles; -import com.daimler.mbcarkit.proto.VehicleEvents; -import com.daimler.mbcarkit.proto.VehicleEvents.AcknowledgeVEPUpdatesByVIN; -import com.daimler.mbcarkit.proto.VehicleEvents.PushMessage; -import com.daimler.mbcarkit.proto.Vehicleapi.AcknowledgeAppTwinCommandStatusUpdatesByVIN; -import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinCommandStatusUpdatesByVIN; -import com.daimler.mbcarkit.proto.Vehicleapi.AppTwinPendingCommandsRequest; /** * {@link MBWebsocket} as socket endpoint to communicate with Mercedes @@ -158,7 +151,7 @@ private boolean sendMessage() { return false; } - private void sendAcknowledgeMessage(ClientMessage message) { + public void sendAcknowledgeMessage(ClientMessage message) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); message.writeTo(baos); @@ -201,48 +194,9 @@ public void onBytes(InputStream is) { try { byte[] array = is.readAllBytes(); is.close(); - PushMessage pm = VehicleEvents.PushMessage.parseFrom(array); - if (pm.hasVepUpdates()) { - boolean distributed = accountHandler.distributeVepUpdates(pm.getVepUpdates().getUpdatesMap()); - if (distributed) { - AcknowledgeVEPUpdatesByVIN ack = AcknowledgeVEPUpdatesByVIN.newBuilder() - .setSequenceNumber(pm.getVepUpdates().getSequenceNumber()).build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeVepUpdatesByVin(ack).build(); - sendAcknowledgeMessage(cm); - } - } else if (pm.hasAssignedVehicles()) { - for (int i = 0; i < pm.getAssignedVehicles().getVinsCount(); i++) { - String vin = pm.getAssignedVehicles().getVins(0); - accountHandler.discovery(vin); - } - AcknowledgeAssignedVehicles ack = AcknowledgeAssignedVehicles.newBuilder().build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeAssignedVehicles(ack).build(); - sendAcknowledgeMessage(cm); - } else if (pm.hasApptwinCommandStatusUpdatesByVin()) { - AppTwinCommandStatusUpdatesByVIN csubv = pm.getApptwinCommandStatusUpdatesByVin(); - accountHandler.commandStatusUpdate(csubv.getUpdatesByVinMap()); - AcknowledgeAppTwinCommandStatusUpdatesByVIN ack = AcknowledgeAppTwinCommandStatusUpdatesByVIN - .newBuilder().setSequenceNumber(csubv.getSequenceNumber()).build(); - ClientMessage cm = ClientMessage.newBuilder().setAcknowledgeApptwinCommandStatusUpdateByVin(ack) - .build(); - sendAcknowledgeMessage(cm); - } else if (pm.hasApptwinPendingCommandRequest()) { - AppTwinPendingCommandsRequest pending = pm.getApptwinPendingCommandRequest(); - if (!pending.getAllFields().isEmpty()) { - logger.trace("Pending Command {}", pending.getAllFields()); - } - } else if (pm.hasDebugMessage()) { - logger.trace("MB Debug Message: {}", pm.getDebugMessage().getMessage()); - } else { - logger.trace("MB Message: {} not handled", pm.getAllFields()); - } + accountHandler.onMessage(array); } catch (IOException e) { - // don't report thing status errors here. - // Sometimes messages cannot be decoded which doesn't effect the overall functionality - logger.trace("IOException {}", e.getMessage()); - e.printStackTrace(); - } catch (Error err) { - logger.trace("Error caught {}", err.getMessage()); + logger.trace("IOException reading input stream {}", e.getMessage()); } } From dd13cd70d3e9f576a68b32344d41a0b48e3d9688 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sat, 16 Nov 2024 19:55:41 +0100 Subject: [PATCH 6/9] harmonize function names Signed-off-by: Bernd Weymann --- .../internal/handler/AccountHandler.java | 24 +++++------ .../internal/handler/VehicleHandler.java | 18 ++++---- .../internal/server/MBWebsocket.java | 2 +- .../internal/handler/VehicleHandlerTest.java | 42 +++++++++---------- 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index 4b67348593cc6..4e702af0a01c3 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -90,7 +90,7 @@ public class AccountHandler extends BaseBridgeHandler implements AccessTokenRefr private Optional server = Optional.empty(); private Optional authService = Optional.empty(); - private Optional> scheduledFuture = Optional.empty(); + private Optional> refreshScheduler = Optional.empty(); private List eventQueue = new ArrayList<>(); private boolean updateRunning = false; @@ -138,13 +138,13 @@ public void initialize() { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, textKey + " [\"" + thing.getProperties().get("callbackUrl") + "\"]"); } else { - scheduledFuture = Optional.of(scheduler.scheduleWithFixedDelay(this::update, 0, + refreshScheduler = Optional.of(scheduler.scheduleWithFixedDelay(this::refresh, 0, config.get().refreshInterval, TimeUnit.MINUTES)); } } } - public void update() { + public void refresh() { if (server.isPresent()) { if (!Constants.NOT_SET.equals(authService.get().getToken())) { ws.run(); @@ -213,7 +213,7 @@ public void dispose() { server = Optional.empty(); Utils.removePort(config.get().callbackPort); } - scheduledFuture.ifPresent(schedule -> { + refreshScheduler.ifPresent(schedule -> { if (!schedule.isCancelled()) { schedule.cancel(true); } @@ -227,7 +227,7 @@ public void dispose() { @Override public void onAccessTokenResponse(AccessTokenResponse tokenResponse) { if (!Constants.NOT_SET.equals(tokenResponse.getAccessToken())) { - scheduler.schedule(this::update, 2, TimeUnit.SECONDS); + scheduler.schedule(this::refresh, 2, TimeUnit.SECONDS); } else if (server.isEmpty()) { // server not running - fix first String textKey = Constants.STATUS_TEXT_PREFIX + thing.getThingTypeUID().getId() @@ -272,7 +272,7 @@ public void registerVin(String vin, VehicleHandler handler) { activeVehicleHandlerMap.put(vin, handler); VEPUpdate updateForVin = vepUpdateMap.get(vin); if (updateForVin != null) { - handler.distributeContent(updateForVin); + handler.enqueueUpdate(updateForVin); } } @@ -296,18 +296,16 @@ public void getVehicleCapabilities(String vin) { /** * functions for websocket handling - * - * @param data */ - public void onMessage(byte[] data) { + public void enqueueMessage(byte[] data) { synchronized (eventQueue) { eventQueue.add(data); - scheduler.execute(this::doUpdate); + scheduler.execute(this::scheduleMessage); } } - public void doUpdate() { + private void scheduleMessage() { byte[] data; synchronized (eventQueue) { while (updateRunning) { @@ -384,7 +382,7 @@ public boolean distributeVepUpdates(Map map) { map.forEach((key, value) -> { VehicleHandler h = activeVehicleHandlerMap.get(key); if (h != null) { - h.distributeContent(value); + h.enqueueUpdate(value); } else { if (value.getFullUpdate()) { vepUpdateMap.put(key, value); @@ -525,7 +523,7 @@ public void sendCommand(@Nullable ClientMessage cm) { if (cm != null) { ws.setCommand(cm); } - scheduler.schedule(this::update, 2, TimeUnit.SECONDS); + scheduler.schedule(this::refresh, 2, TimeUnit.SECONDS); } public void keepAlive(boolean b) { diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java index 21b37aa3220af..57168d7af6753 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java @@ -589,14 +589,14 @@ public void distributeCommandStatus(AppTwinCommandStatusUpdatesByPID cmdUpdates) }); } - public void distributeContent(VEPUpdate data) { + public void enqueueUpdate(VEPUpdate update) { synchronized (eventQueue) { - eventQueue.add(data); - scheduler.execute(this::doUpdate); + eventQueue.add(update); + scheduler.execute(this::scheduleUpdate); } } - public void doUpdate() { + private void scheduleUpdate() { VEPUpdate data; synchronized (eventQueue) { while (updateRunning) { @@ -614,7 +614,7 @@ public void doUpdate() { updateRunning = true; } try { - update(data); + handleUpdate(data); } finally { synchronized (eventQueue) { updateRunning = false; @@ -623,13 +623,13 @@ public void doUpdate() { } } - public void update(VEPUpdate data) { + public void handleUpdate(VEPUpdate update) { updateStatus(ThingStatus.ONLINE); - boolean fullUpdate = data.getFullUpdate(); + boolean fullUpdate = update.getFullUpdate(); /** * Deliver proto update */ - String newProto = Utils.proto2Json(data, thing.getThingTypeUID()); + String newProto = Utils.proto2Json(update, thing.getThingTypeUID()); String combinedProto = newProto; ChannelUID protoUpdateChannelUID = new ChannelUID(thing.getUID(), GROUP_VEHICLE, OH_CHANNEL_PROTO_UPDATE); ChannelStateMap oldProtoMap = eventStorage.get(protoUpdateChannelUID.getId()); @@ -645,7 +645,7 @@ public void update(VEPUpdate data) { StringType.valueOf(combinedProto)); updateChannel(dataUpdateMap); - Map atts = data.getAttributesMap(); + Map atts = update.getAttributesMap(); /** * handle "simple" values */ diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index 8acbf5d20950e..c3b078c0fe9ba 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -194,7 +194,7 @@ public void onBytes(InputStream is) { try { byte[] array = is.readAllBytes(); is.close(); - accountHandler.onMessage(array); + accountHandler.enqueueMessage(array); } catch (IOException e) { logger.trace("IOException reading input stream {}", e.getMessage()); } diff --git a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java index 893351d217534..65cf867783e67 100644 --- a/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java +++ b/bundles/org.openhab.binding.mercedesme/src/test/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandlerTest.java @@ -115,7 +115,7 @@ public void testBEVFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); @@ -152,7 +152,7 @@ public void testBEVImperialUnits() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-ImperialUnits.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); @@ -189,7 +189,7 @@ public void testBEVImperialUnits() { // overwrite with EU Units json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("%.1f °C", patternMock.patternMap.get("test::bev:hvac#temperature"), "Temperature Pattern"); @@ -213,7 +213,7 @@ public void testBEVCharging() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); @@ -251,7 +251,7 @@ public void testBEVChargeEndtime() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("2023-09-09 13:54", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) @@ -259,7 +259,7 @@ public void testBEVChargeEndtime() { json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA-Charging-Weekday-Underrun.json"); update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("2023-09-11 13:55", ((DateTimeType) updateListener.getResponse("test::bev:charge#end-time")) @@ -281,7 +281,7 @@ public void testBEVPartialChargingUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(2, updateListener.updatesReceived.size(), "Update Count"); @@ -305,7 +305,7 @@ public void testBEVPartialGPSUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-GPS.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); assertEquals("1.23,4.56", updateListener.getResponse("test::bev:position#gps").toFullString(), "GPS update"); @@ -328,7 +328,7 @@ public void testBEVPartialRangeUpdate() { String json = FileReader.readFileInString("src/test/resources/proto-json/PartialUpdate-Range.json"); VEPUpdate update = ProtoConverter.json2Proto(json, false); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(3, updateListener.updatesReceived.size(), "Update Count"); @@ -355,7 +355,7 @@ public void testHybridFullUpdateNoCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); @@ -389,7 +389,7 @@ public void testHybridFullUpadteWithCapacities() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Hybrid-Charging.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); // Test charged / uncharged battery and filled / unfilled tank volume @@ -419,7 +419,7 @@ public void testEventStorage() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(GROUP_COUNT, updateListener.updatesPerGroupMap.size(), "Group Update Count"); @@ -467,13 +467,13 @@ public void testProtoChannelLinked() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertFalse(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); updateListener.linked = true; - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertTrue(updateListener.updatesReceived.containsKey("test::bev:vehicle#proto-update"), "Proto Channel not updated"); @@ -496,7 +496,7 @@ public void testTemperaturePoints() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("22 °C", updateListener.getResponse("test::bev:hvac#temperature").toFullString(), "Temperature Point One Updated"); @@ -528,7 +528,7 @@ public void testTemperaturePointSelection() { vh.setCallback(updateListener); String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Unknown.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_HVAC, "temperature"); @@ -559,7 +559,7 @@ public void testChargeProgramSelection() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vh.distributeContent(update); + vh.enqueueUpdate(update); updateListener.waitForUpdates(); ChannelUID cuid = new ChannelUID(thingMock.getUID(), Constants.GROUP_CHARGE, "max-soc"); @@ -608,7 +608,7 @@ public void testPositioning() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(POSITIONING_UPDATE_COUNT, updateListener.getUpdatesForGroup("position"), "Position Update Count"); @@ -631,7 +631,7 @@ public void testHVAC() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals(HVAC_UPDATE_COUNT, updateListener.getUpdatesForGroup("hvac"), "HVAC Update Count"); @@ -651,7 +651,7 @@ public void testEcoScore() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-BEV-EQA.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("72 %", updateListener.getResponse("test::bev:eco#accel").toFullString(), "Eco Acceleration"); @@ -672,7 +672,7 @@ public void testAdBlue() { String json = FileReader.readFileInString("src/test/resources/proto-json/MB-Combustion.json"); VEPUpdate update = ProtoConverter.json2Proto(json, true); - vHandler.distributeContent(update); + vHandler.enqueueUpdate(update); updateListener.waitForUpdates(); assertEquals("29 %", updateListener.getResponse("test::combustion:range#adblue-level").toFullString(), From 5b8997e484cfe25a287a80a8eed481b317527b8f Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sun, 17 Nov 2024 14:09:12 +0100 Subject: [PATCH 7/9] clear eventQueue in case of dispose and thread interrupt Signed-off-by: Bernd Weymann --- .../binding/mercedesme/internal/handler/AccountHandler.java | 3 +++ .../binding/mercedesme/internal/handler/VehicleHandler.java | 4 ++++ .../binding/mercedesme/internal/server/MBWebsocket.java | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index 4e702af0a01c3..7e2524c818a0a 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -219,6 +219,7 @@ public void dispose() { } }); ws.interrupt(); + eventQueue.clear(); } /** @@ -313,6 +314,8 @@ private void scheduleMessage() { eventQueue.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + eventQueue.clear(); + return; } } if (!eventQueue.isEmpty()) { diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java index 57168d7af6753..bc37335def5b6 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java @@ -181,9 +181,11 @@ private ClientMessage createCM(CommandRequest cr) { @Override public void dispose() { + // accountHandler.ifPresent(ah -> { ah.unregisterVin(config.get().vin); }); + eventQueue.clear(); super.dispose(); } @@ -604,6 +606,8 @@ private void scheduleUpdate() { eventQueue.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + eventQueue.clear(); + return; } } if (!eventQueue.isEmpty()) { diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index c3b078c0fe9ba..e958f0777b93e 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -195,6 +195,12 @@ public void onBytes(InputStream is) { byte[] array = is.readAllBytes(); is.close(); accountHandler.enqueueMessage(array); + /** + * 1. Websocket thread responsible for reading stream in bytes and enqueue for AccountHandler. + * 2. AccountHamdler thread responsible for encoding proto message. In case of update enqueue proto message + * at VehicleHandöer + * 3. VehicleHandler responsible to update channels + */ } catch (IOException e) { logger.trace("IOException reading input stream {}", e.getMessage()); } From 540925bfa2ac8e0634503ee20535e7f495cd4366 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sun, 17 Nov 2024 14:42:19 +0100 Subject: [PATCH 8/9] just a comment Signed-off-by: Bernd Weymann --- .../binding/mercedesme/internal/server/MBWebsocket.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index e958f0777b93e..dab17cff15aae 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -196,6 +196,9 @@ public void onBytes(InputStream is) { is.close(); accountHandler.enqueueMessage(array); /** + * https://community.openhab.org/t/mercedes-me/136866/12 + * Release Websocket thread as early as possible to avoid execeptions + * * 1. Websocket thread responsible for reading stream in bytes and enqueue for AccountHandler. * 2. AccountHamdler thread responsible for encoding proto message. In case of update enqueue proto message * at VehicleHandöer From 8dca91497ca6104def7629a628c5855b7bebf786 Mon Sep 17 00:00:00 2001 From: Bernd Weymann Date: Sun, 24 Nov 2024 00:02:38 +0100 Subject: [PATCH 9/9] correct review changes Signed-off-by: Bernd Weymann --- .../binding/mercedesme/internal/handler/AccountHandler.java | 2 +- .../binding/mercedesme/internal/handler/VehicleHandler.java | 1 - .../openhab/binding/mercedesme/internal/server/MBWebsocket.java | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java index 7e2524c818a0a..ab2d583ad0cb1 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/AccountHandler.java @@ -376,7 +376,7 @@ private void handleMessage(byte[] array) { } catch (IOException e) { logger.trace("IOException decoding message {}", e.getMessage()); } catch (Error err) { - logger.trace("Error caught {}", err.getMessage()); + logger.debug("Error caught {}", err.getMessage()); } } diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java index bc37335def5b6..5087958788051 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/handler/VehicleHandler.java @@ -181,7 +181,6 @@ private ClientMessage createCM(CommandRequest cr) { @Override public void dispose() { - // accountHandler.ifPresent(ah -> { ah.unregisterVin(config.get().vin); }); diff --git a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java index dab17cff15aae..1e2bc1cedd64f 100644 --- a/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java +++ b/bundles/org.openhab.binding.mercedesme/src/main/java/org/openhab/binding/mercedesme/internal/server/MBWebsocket.java @@ -205,7 +205,7 @@ public void onBytes(InputStream is) { * 3. VehicleHandler responsible to update channels */ } catch (IOException e) { - logger.trace("IOException reading input stream {}", e.getMessage()); + logger.debug("IOException reading input stream {}", e.getMessage()); } }