Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
szysas committed Jun 13, 2023
1 parent 8d000d8 commit 451fd7f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.mbed.coap.server.observe;

import static com.mbed.coap.utils.Validations.assume;
import static com.mbed.coap.utils.Validations.require;
import static java.util.Objects.requireNonNull;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
Expand Down Expand Up @@ -45,15 +45,14 @@ public class InboundSubscriptionManager implements Filter.SimpleFilter<CoapReque
private final Map<String, Map<InetSocketAddress, Opaque>> obsRelations = new ConcurrentHashMap<>();
private final AtomicInteger observeSeq = new AtomicInteger(0);

public InboundSubscriptionManager init(Service<SeparateResponse, Boolean> outboundObservation) {
public void init(Service<SeparateResponse, Boolean> outboundObservation) {
obsRelations.clear();
this.outboundObservation = requireNonNull(outboundObservation);
return this;
}

public InboundSubscriptionManager init(CoapServer server) {
assume(!server.isRunning());
obsRelations.clear();
return init(server::sendObservation);
public void init(CoapServer server) {
require(!server.isRunning(), "SubscriptionManager should be initialized with non yet running server");
init(server::sendObservation);
}

@Override
Expand Down Expand Up @@ -87,7 +86,9 @@ public void sendObservation(String uriPath, CoapResponse obs) {
obs.options().setObserve(observeSeq.incrementAndGet());

Map<InetSocketAddress, Opaque> subscriptions = obsRelations.getOrDefault(uriPath, Collections.emptyMap());
subscriptions.forEach((peerAddress, token) -> {

for (InetSocketAddress peerAddress : subscriptions.keySet()) {
Opaque token = subscriptions.get(peerAddress);
SeparateResponse separateResponse = obs.toSeparate(token, peerAddress);

outboundObservation.apply(separateResponse).whenComplete((result, exception) -> {
Expand All @@ -99,7 +100,7 @@ public void sendObservation(String uriPath, CoapResponse obs) {
LOGGER.info("[{}#{}] Removed observation relation, got reset", peerAddress, token);
}
});
});
}

if (obs.getCode() != Code.C205_CONTENT) {
obsRelations.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@
class InboundSubscriptionManagerTest {

private Service<SeparateResponse, Boolean> outboundObservation = mock(Service.class);
private InboundSubscriptionManager subMgr = new InboundSubscriptionManager().init(outboundObservation);
private InboundSubscriptionManager subMgr = new InboundSubscriptionManager();
private static final InetSocketAddress PEER_1 = IpPortAddress.local(15683).toInetSocketAddress();
private static final InetSocketAddress PEER_2 = IpPortAddress.local(25683).toInetSocketAddress();

@BeforeEach
void setUp() {
reset(outboundObservation);
given(outboundObservation.apply(any())).willReturn(completedFuture(true));

subMgr.init(outboundObservation);
}

@AfterEach
Expand All @@ -61,7 +63,7 @@ void tearDown() {

@Test
public void createObservationRelation() {
assertEquals(0, subMgr.subscribe(get(PEER_1, "/test").token(13).observe(0), ok("OK!")).options().getObserve());
assertEquals(0, subMgr.apply(get(PEER_1, "/test").token(13).observe(0), __ -> completedFuture(ok("OK!"))).join().options().getObserve());
assertEquals(0, subMgr.subscribe(get(PEER_2, "/test").token(1142).observe(0), ok("OK!")).options().getObserve());

assertEquals(2, subMgr.size());
Expand Down Expand Up @@ -129,6 +131,20 @@ public void sendCancelObservation() throws ExecutionException, InterruptedExcept
verify(outboundObservation).apply(eq(expected.toSeparate(variableUInt(13), PEER_1)));
}

@Test
public void shouldRemoveSubscription_when_gotReset() throws ExecutionException, InterruptedException {
// given
subMgr.subscribe(get(PEER_1, "/test").token(13).observe(0), ok("OK!"));
given(outboundObservation.apply(any())).willReturn(completedFuture(false));

// when
subMgr.sendObservation("/test", ok("OK"));

// then
assertEquals(0, subMgr.size());
verify(outboundObservation).apply(any());
}

@Test
public void deregisterObservationRelation() throws ExecutionException, InterruptedException {
// given
Expand Down

0 comments on commit 451fd7f

Please sign in to comment.