Skip to content

Commit

Permalink
Observation handling refactoring:
Browse files Browse the repository at this point in the history
- added ObservationListener
- added ObservationRelationsStore
- CoapServer with outboundObservations service;
- InboundSubscriptionManager
- moved ObservableResourceService to test scope
  • Loading branch information
szysas committed Jun 13, 2023
1 parent c16839e commit 758daee
Show file tree
Hide file tree
Showing 42 changed files with 947 additions and 1,424 deletions.
36 changes: 19 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,18 @@ client.close();

```java
// build CoapClient that connects to coap server which is running on port 5683
CoapClient client = CoapServer.builder()
client = CoapServer.builder()
// define transport, plain text UDP listening on random port
.transport(udp())
// (optional) register observation listener to handle incoming observations
.observationListener((resourceUriPath, observation) -> {
LOGGER.info("Observation: {}", observation);
// in case of block transfer, call to retrieve rest of payload
CompletableFuture<Opaque> payload = retrieveRemainingBlocks(resourceUriPath, observation, req -> client.send(req));
return true; // return false to terminate observation
})
// (optional) set custom observation relation store, for example one that will use external storage
.observationRelationsStore(new HashMapObservationRelations())
// (optional) define maximum block size
.blockSize(BlockSize.S_1024)
// (optional) set maximum response timeout, default for every request
Expand Down Expand Up @@ -137,11 +146,8 @@ futureResponse2.thenAccept(resp ->
LOGGER.info(resp.toString())
);

// observe resource (subscribe), observations will be handled to provided callback
CompletableFuture<CoapResponse> resp3 = client.observe("/sensors/temperature", coapResponse -> {
LOGGER.info(coapResponse.toString());
return true; // return false to terminate observation
});
// observe (subscribe) to a resource, observations will be handled by observation listener
CompletableFuture<CoapResponse> resp3 = client.observe("/sensors/temperature");
LOGGER.info(resp3.join().toString());

client.close();
Expand All @@ -150,6 +156,9 @@ client.close();
### Server usage

```java
// define subscription manager for observable resources
InboundSubscriptionManager subscriptionManager = new InboundSubscriptionManager();

server = CoapServer.builder()
// configure with plain text UDP transport, listening on port 5683
.transport(new DatagramSocketTransport(5683))
Expand All @@ -164,20 +173,13 @@ server = CoapServer.builder()
return completedFuture(CoapResponse.of(Code.C204_CHANGED));
})
// observable resource
.get("/sensors/temperature", req -> {
CoapResponse resp = CoapResponse.ok("21C").nextSupplier(() -> {
// we need to define a promise for next value
CompletableFuture<CoapResponse> promise = new CompletableFuture();
// ... complete once resource value changes
return promise;
}
);

return completedFuture(resp);
})
.get("/sensors/temperature", subscriptionManager.then(req ->
completedFuture(CoapResponse.ok("21C"))
))
)
.build();

subscriptionManager.init(server);
server.start();
```

Expand Down
10 changes: 5 additions & 5 deletions coap-cli/src/main/java/com/mbed/coap/cli/DeviceEmulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import com.mbed.coap.client.RegistrationManager;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.ObservableResourceService;
import com.mbed.coap.server.RouterService;
import com.mbed.coap.server.observe.InboundSubscriptionManager;
import com.mbed.coap.utils.Service;
import java.net.URI;
import java.time.Instant;
Expand All @@ -50,6 +49,7 @@ public class DeviceEmulator implements Callable<Integer> {
private TransportOptions transportOptions;

protected CoapServer emulatorServer;
private final InboundSubscriptionManager subscriptionManager = new InboundSubscriptionManager();
protected final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
RegistrationManager registrationManager;

Expand All @@ -59,6 +59,7 @@ public Integer call() throws Exception {
udpBuilder -> udpBuilder.route(createRouting()).build(),
tcpBuilder -> tcpBuilder.route(createRouting()).build()
);
subscriptionManager.init(emulatorServer);
emulatorServer.start();

//registration
Expand All @@ -76,9 +77,8 @@ public Integer call() throws Exception {
}

protected Service<CoapRequest, CoapResponse> createRouting() {
ObservableResourceService timeResource = new ObservableResourceService(CoapResponse.ok(Instant.now().toString()));
scheduledExecutor.scheduleAtFixedRate(() ->
timeResource.putPayload(Opaque.of(Instant.now().toString())),
subscriptionManager.sendObservation("/time", CoapResponse.ok(Instant.now().toString())),
30, 30, TimeUnit.SECONDS
);

Expand All @@ -91,7 +91,7 @@ protected Service<CoapRequest, CoapResponse> createRouting() {
scheduledExecutor.schedule(() -> promise.complete(CoapResponse.ok("OK")), 10, TimeUnit.SECONDS);
return promise;
})
.get("/time", timeResource)
.get("/time", subscriptionManager.then(__ -> completedFuture(CoapResponse.ok(Instant.now().toString()))))
.build();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class UdpTransportBenchmark {
public void setup() throws CoapException, IOException {
((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.ERROR);

coapServer = new CoapServer(server, coapIn -> server.sendPacket(coapResp), null, () -> {
coapServer = new CoapServer(server, coapIn -> server.sendPacket(coapResp), null, null, () -> {
});
coapServer.start();

Expand Down
15 changes: 5 additions & 10 deletions coap-core/src/main/java/com/mbed/coap/client/CoapClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.mbed.coap.client;

import static com.mbed.coap.client.ObservationConsumer.consumeFrom;
import static com.mbed.coap.utils.Validations.require;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.packet.CoapRequest;
Expand Down Expand Up @@ -77,18 +76,14 @@ private static CoapResponse await(CompletableFuture<CoapResponse> future) throws
}
}

public CompletableFuture<CoapResponse> observe(String uriPath, Function<CoapResponse, Boolean> consumer) {
return observe(uriPath, Opaque.variableUInt(uriPath.hashCode()), consumer);
}

public CompletableFuture<CoapResponse> observe(String uriPath, Opaque token, Function<CoapResponse, Boolean> consumer) {
CompletableFuture<CoapResponse> resp = clientService.apply(
public CompletableFuture<CoapResponse> observe(String uriPath, Opaque token) {
return clientService.apply(
CoapRequest.observe(destination, uriPath).token(token)
);
}

resp.thenAccept(r -> consumeFrom(r.next, consumer));

return resp;
public CompletableFuture<CoapResponse> observe(String uriPath) {
return observe(uriPath, Opaque.EMPTY);
}


Expand Down

This file was deleted.

14 changes: 1 addition & 13 deletions coap-core/src/main/java/com/mbed/coap/packet/CoapResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@
import com.mbed.coap.transport.TransportContext;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class CoapResponse {
private final Code code;
private final HeaderOptions options;
private final Opaque payload;
public transient final Supplier<CompletableFuture<CoapResponse>> next;

private CoapResponse(Code code, Opaque payload, HeaderOptions options, Supplier<CompletableFuture<CoapResponse>> next) {
public CoapResponse(Code code, Opaque payload, HeaderOptions options) {
this.code = code;
this.payload = Objects.requireNonNull(payload);
this.options = Objects.requireNonNull(options);
this.next = next;
}

public CoapResponse(Code code, Opaque payload, HeaderOptions options) {
this(code, payload, options, null);
}

public CoapResponse(Code code, Opaque payload) {
Expand Down Expand Up @@ -155,10 +147,6 @@ public String toString() {

// --- MODIFIERS ---

public CoapResponse nextSupplier(Supplier<CompletableFuture<CoapResponse>> next) {
return new CoapResponse(code, payload, options, next);
}

public CoapResponse payload(Opaque newPayload) {
return new CoapResponse(code, newPayload, options);
}
Expand Down
9 changes: 8 additions & 1 deletion coap-core/src/main/java/com/mbed/coap/server/CoapServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.mbed.coap.packet.CoapPacket;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.SeparateResponse;
import com.mbed.coap.transport.CoapTransport;
import com.mbed.coap.utils.Service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand All @@ -35,13 +37,15 @@ public class CoapServer {
private final CoapTransport transport;
private final Consumer<CoapPacket> dispatcher;
private final Service<CoapRequest, CoapResponse> outboundService;
private final Service<SeparateResponse, Boolean> outboundObservation;
private final Runnable stopAll;

public CoapServer(CoapTransport transport, Consumer<CoapPacket> dispatcher, Service<CoapRequest, CoapResponse> outboundService,
Runnable stopAll) {
Service<SeparateResponse, Boolean> outboundObservation, Runnable stopAll) {
this.transport = transport;
this.dispatcher = dispatcher;
this.outboundService = outboundService;
this.outboundObservation = outboundObservation;
this.stopAll = stopAll;
}

Expand Down Expand Up @@ -123,4 +127,7 @@ public final Service<CoapRequest, CoapResponse> clientService() {
return outboundService;
}

public CompletableFuture<Boolean> sendObservation(SeparateResponse observation) {
return outboundObservation.apply(observation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.mbed.coap.utils.Timer.toTimer;
import static com.mbed.coap.utils.Validations.require;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.packet.BlockSize;
import com.mbed.coap.packet.CoapPacket;
Expand All @@ -44,6 +43,8 @@
import com.mbed.coap.server.messaging.ObservationMapper;
import com.mbed.coap.server.messaging.PiggybackedExchangeFilter;
import com.mbed.coap.server.messaging.RetransmissionFilter;
import com.mbed.coap.server.observe.ObservationListener;
import com.mbed.coap.server.observe.ObservationRelationsStore;
import com.mbed.coap.transmission.RetransmissionBackOff;
import com.mbed.coap.transport.CoapTransport;
import com.mbed.coap.utils.Filter;
Expand Down Expand Up @@ -73,6 +74,8 @@ public final class CoapServerBuilder {
private int maxQueueSize = 100;
private Filter.SimpleFilter<CoapRequest, CoapResponse> outboundFilter = Filter.identity();
private Filter.SimpleFilter<CoapRequest, CoapResponse> routeFilter = Filter.identity();
private ObservationListener observationListener = ObservationListener.REJECT_ALL;
private ObservationRelationsStore obsRelations = ObservationRelationsStore.ALWAYS_EMPTY;

CoapServerBuilder() {
}
Expand Down Expand Up @@ -108,6 +111,19 @@ public CoapServerBuilder outboundFilter(Filter.SimpleFilter<CoapRequest, CoapRes
return this;
}

public CoapServerBuilder observationListener(ObservationListener observationListener) {
this.observationListener = requireNonNull(observationListener);
if (obsRelations.equals(ObservationRelationsStore.ALWAYS_EMPTY)) {
return observationRelationsStore(ObservationRelationsStore.inMemory());
}
return this;
}

public CoapServerBuilder observationRelationsStore(ObservationRelationsStore obsRelations) {
this.obsRelations = requireNonNull(obsRelations);
return this;
}

public CoapServerBuilder maxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
return this;
Expand Down Expand Up @@ -194,15 +210,13 @@ public CoapServer build() {
Service<CoapPacket, Boolean> sender = packet -> coapTransport.sendPacket(packet)
.whenComplete((__, throwable) -> logSent(packet, throwable));

ObservationHandler observationHandler = new ObservationHandler();

// OUTBOUND
ExchangeFilter exchangeFilter = new ExchangeFilter();
RetransmissionFilter<CoapPacket, CoapPacket> retransmissionFilter = new RetransmissionFilter<>(timer, retransmissionBackOff, CoapPacket::isConfirmable);
PiggybackedExchangeFilter piggybackedExchangeFilter = new PiggybackedExchangeFilter();

Service<CoapRequest, CoapResponse> outboundService = outboundFilter
.andThen(new ObserveRequestFilter(observationHandler))
.andThen(new ObserveRequestFilter(obsRelations::add))
.andThen(new CongestionControlFilter<>(maxQueueSize, CoapRequest::getPeerAddress))
.andThen(new BlockWiseOutgoingFilter(capabilities(), maxIncomingBlockTransferSize))
.andThen(new ResponseTimeoutFilter<>(timer, req -> req.getTransContext(RESPONSE_TIMEOUT, responseTimeout)))
Expand Down Expand Up @@ -231,21 +245,20 @@ public CoapServer build() {
.andThen(new CoapRequestConverter(midSupplier))
.andThen(new RescueFilter())
.andThen(new CriticalOptionVerifier())
.andThen(new ObservationSenderFilter(sendNotification))
.andThen(new BlockWiseIncomingFilter(capabilities(), maxIncomingBlockTransferSize))
.andThen(routeFilter)
.then(route);


Service<CoapPacket, CoapPacket> inboundObservation = duplicateDetector
.andThen(new ObservationMapper())
.then(obs -> completedFuture(observationHandler.notify(obs, outboundService)));
.then(new ObservationHandler(observationListener, obsRelations));

CoapDispatcher dispatcher = new CoapDispatcher(sender, inboundObservation, inboundService,
piggybackedExchangeFilter::handleResponse, exchangeFilter::handleResponse
);

return new CoapServer(coapTransport, dispatcher::handle, outboundService, () -> {
return new CoapServer(coapTransport, dispatcher::handle, outboundService, sendNotification, () -> {
piggybackedExchangeFilter.stop();
duplicateDetectorCache.stop();
if (stopExecutor) {
Expand Down
Loading

0 comments on commit 758daee

Please sign in to comment.