Skip to content

Commit

Permalink
CoapServer with outboundObservations service; InboundSubscriptionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
szysas committed Jun 8, 2023
1 parent 0cba63b commit 0a31e39
Show file tree
Hide file tree
Showing 19 changed files with 399 additions and 584 deletions.
4 changes: 2 additions & 2 deletions coap-cli/src/main/java/com/mbed/coap/cli/DeviceEmulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.ObservableResourceService;
import com.mbed.coap.server.RouterService;
import com.mbed.coap.server.observe.ObservableResourceService;
import com.mbed.coap.utils.Service;
import java.net.URI;
import java.time.Instant;
Expand Down Expand Up @@ -76,7 +76,7 @@ public Integer call() throws Exception {
}

protected Service<CoapRequest, CoapResponse> createRouting() {
ObservableResourceService timeResource = new ObservableResourceService(CoapResponse.ok(Instant.now().toString()));
ObservableResourceService timeResource = new ObservableResourceService("/time", CoapResponse.ok(Instant.now().toString()), it -> emulatorServer.sendObservation(it));
scheduledExecutor.scheduleAtFixedRate(() ->
timeResource.putPayload(Opaque.of(Instant.now().toString())),
30, 30, TimeUnit.SECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class UdpTransportBenchmark {
public void setup() throws CoapException, IOException {
((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.ERROR);

coapServer = new CoapServer(server, coapIn -> server.sendPacket(coapResp), null, () -> {
coapServer = new CoapServer(server, coapIn -> server.sendPacket(coapResp), null, null, () -> {
});
coapServer.start();

Expand Down
9 changes: 8 additions & 1 deletion coap-core/src/main/java/com/mbed/coap/server/CoapServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.mbed.coap.packet.CoapPacket;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.SeparateResponse;
import com.mbed.coap.transport.CoapTransport;
import com.mbed.coap.utils.Service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand All @@ -35,13 +37,15 @@ public class CoapServer {
private final CoapTransport transport;
private final Consumer<CoapPacket> dispatcher;
private final Service<CoapRequest, CoapResponse> outboundService;
private final Service<SeparateResponse, Boolean> outboundObservation;
private final Runnable stopAll;

public CoapServer(CoapTransport transport, Consumer<CoapPacket> dispatcher, Service<CoapRequest, CoapResponse> outboundService,
Runnable stopAll) {
Service<SeparateResponse, Boolean> outboundObservation, Runnable stopAll) {
this.transport = transport;
this.dispatcher = dispatcher;
this.outboundService = outboundService;
this.outboundObservation = outboundObservation;
this.stopAll = stopAll;
}

Expand Down Expand Up @@ -123,4 +127,7 @@ public final Service<CoapRequest, CoapResponse> clientService() {
return outboundService;
}

public CompletableFuture<Boolean> sendObservation(SeparateResponse observation) {
return outboundObservation.apply(observation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public CoapServer build() {
.andThen(new CoapRequestConverter(midSupplier))
.andThen(new RescueFilter())
.andThen(new CriticalOptionVerifier())
.andThen(new ObservationSenderFilter(sendNotification))
.andThen(new BlockWiseIncomingFilter(capabilities(), maxIncomingBlockTransferSize))
.then(route);

Expand All @@ -252,7 +251,7 @@ public CoapServer build() {
piggybackedExchangeFilter::handleResponse, exchangeFilter::handleResponse
);

return new CoapServer(coapTransport, dispatcher::handle, outboundService, () -> {
return new CoapServer(coapTransport, dispatcher::handle, outboundService, sendNotification, () -> {
piggybackedExchangeFilter.stop();
duplicateDetectorCache.stop();
if (stopExecutor) {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0a31e39

Please sign in to comment.