Skip to content

Commit

Permalink
Create CoapServerGroup from builder
Browse files Browse the repository at this point in the history
  • Loading branch information
szysas committed Aug 30, 2024
1 parent fa9781d commit 2c4b32e
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.mbed.coap.utils.Timer.toTimer;
import static com.mbed.coap.utils.Validations.require;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.packet.BlockSize;
import com.mbed.coap.packet.CoapPacket;
Expand Down Expand Up @@ -55,13 +56,16 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Stream;

public final class CoapServerBuilder {
private static final long DELAYED_TRANSACTION_TIMEOUT_MS = 120000; //2 minutes

private CoapTransport coapTransport;
private Supplier<CoapTransport> coapTransport;
private int duplicationMaxSize = 10000;
private PutOnlyMap<CoapRequestId, CoapPacket> duplicateDetectionCache;
private ScheduledExecutorService scheduledExecutorService;
Expand Down Expand Up @@ -91,6 +95,11 @@ public CoapServerBuilder blockSize(BlockSize blockSize) {
}

public CoapServerBuilder transport(CoapTransport coapTransport) {
requireNonNull(coapTransport);
return transport(() -> coapTransport);
}

public CoapServerBuilder transport(Supplier<CoapTransport> coapTransport) {
this.coapTransport = requireNonNull(coapTransport);
return this;
}
Expand Down Expand Up @@ -208,7 +217,7 @@ public CoapServerBuilder requestTagSupplier(RequestTagSupplier requestTagSupplie
}

public CoapServer build() {
CoapTransport coapTransport = requireNonNull(this.coapTransport, "Missing transport");
CoapTransport coapTransport = requireNonNull(this.coapTransport.get(), "Missing transport");
final boolean stopExecutor = scheduledExecutorService == null;
final ScheduledExecutorService effectiveExecutorService = scheduledExecutorService != null ? scheduledExecutorService : Executors.newSingleThreadScheduledExecutor();
Timer timer = toTimer(effectiveExecutorService);
Expand Down Expand Up @@ -279,4 +288,12 @@ public CoapClient buildClient(InetSocketAddress target) throws IOException {
return CoapClient.create(target, build().start());
}

public CoapServerGroup buildGroup(int size) {
List<CoapServer> servers = Stream.generate(this::build)
.limit(size)
.collect(toList());

return new CoapServerGroup(servers);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencoap.coap.netty;
package com.mbed.coap.server;

import static com.mbed.coap.utils.Validations.require;
import static java.util.stream.Collectors.toList;
import static org.opencoap.coap.netty.CoapCodec.EMPTY_RESOLVER;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.List;
import java.util.stream.StreamSupport;

public class MultiCoapServer {

public class CoapServerGroup {
private final List<CoapServer> servers;

private MultiCoapServer(List<CoapServer> servers) {
CoapServerGroup(List<CoapServer> servers) {
require(!servers.isEmpty(), "At least one server required");
this.servers = servers;
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap) {
return create(builder, bootstrap, Integer.MAX_VALUE);
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap, int limitInstances) {
EventLoopGroup eventLoopGroup = bootstrap.config().group();

// create as many servers as there are executors in the event loop group
List<CoapServer> servers = StreamSupport.stream(eventLoopGroup.spliterator(), false)
.limit(limitInstances)
.map(executor -> builder
.transport(new NettyCoapTransport(bootstrap, EMPTY_RESOLVER))
.executor(executor)
.build()
)
.collect(toList());

return new MultiCoapServer(servers);
}

public MultiCoapServer start() throws IOException {
public CoapServerGroup start() throws IOException {
for (CoapServer server : servers) {
server.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2022-2024 java-coap contributors (https://github.com/open-coap/java-coap)
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mbed.coap.server;

import static com.mbed.coap.packet.CoapRequest.get;
import static com.mbed.coap.utils.Networks.localhost;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.transport.InMemoryCoapTransport;
import org.junit.jupiter.api.Test;

class CoapServerGroupTest {
private int port = 10_000;
private final CoapServerBuilder builder = CoapServer.builder()
.transport(() -> InMemoryCoapTransport.create(port++))
.route(RouterService.builder()
.get("/test", (req) -> CoapResponse.ok("test").toFuture())
);


@Test
void test() throws Exception {
// given
CoapServerGroup servers = builder.buildGroup(3).start();
CoapClient client = builder.buildClient(localhost(10_001));
CoapClient client2 = builder.buildClient(localhost(10_003));

// when
assertEquals("test", client.sendSync(get("/test")).getPayloadString());
assertEquals("test", client2.sendSync(get("/test")).getPayloadString());

// then
client.close();
client2.close();
servers.stop();
assertFalse(servers.isRunning());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import com.mbed.coap.server.CoapServerGroup;
import com.mbed.coap.server.RouterService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
Expand All @@ -41,15 +41,13 @@
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.opencoap.coap.netty.MultiCoapServer;
import org.opencoap.coap.netty.NettyCoapTransport;
import org.opencoap.ssl.PskAuth;
import org.opencoap.ssl.SslConfig;
Expand All @@ -62,27 +60,14 @@
@EnabledOnOs(OS.LINUX)
public class MultithreadedMbedtlsNettyTest {
private final int threads = 4;
private final int serverPort = new Random().nextInt(32000) + 32000;
private final EventLoopGroup eventLoopGroup = new EpollEventLoopGroup(threads, new DefaultThreadFactory("pool", true));

private final SslConfig clientConf = SslConfig.client(new PskAuth("test", of("secret").getBytes()));
private final SslConfig serverConf = SslConfig.server(new PskAuth("test", of("secret").getBytes()));

private final Bootstrap clientBootstrap = new Bootstrap()
.group(eventLoopGroup)
.localAddress(0)
.channel(EpollDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
InetSocketAddress destinationAddress = localhost(serverPort);
ch.pipeline().addFirst("DTLS", new DtlsClientHandshakeChannelHandler(clientConf.newContext(destinationAddress), destinationAddress, SessionWriter.NO_OPS));
}
});

private final Bootstrap serverBootstrap = new Bootstrap()
.group(eventLoopGroup)
.localAddress(serverPort)
.localAddress(0)
.channel(EpollDatagramChannel.class)
.option(UnixChannelOption.SO_REUSEPORT, true)
.handler(new ChannelInitializer<DatagramChannel>() {
Expand All @@ -100,11 +85,13 @@ void afterAll() {
@Test
void multi_thread_server() throws Exception {
Set<String> usedThreads = new HashSet<>();
CoapServerBuilder serverBuilder = CoapServer.builder()
CoapServerGroup server = CoapServer.builder()
.transport(() -> new NettyCoapTransport(serverBootstrap, EMPTY_RESOLVER))
.route(RouterService.builder()
.get("/currentThread", req -> supplyAsync(() -> ok(currentThread().getName()).build(), eventLoopGroup))
);
MultiCoapServer server = MultiCoapServer.create(serverBuilder, serverBootstrap).start();
)
.buildGroup(threads)
.start();


await().pollInterval(Duration.ZERO).atMost(ofSeconds(30)).until(() -> {
Expand All @@ -119,10 +106,22 @@ void multi_thread_server() throws Exception {
}

private String connectAndReadCurrentThreadName() throws IOException, CoapException {
InetSocketAddress serverAddress = localhost(((InetSocketAddress) serverBootstrap.config().localAddress()).getPort());
Bootstrap clientBootstrap = new Bootstrap()
.group(eventLoopGroup)
.localAddress(0)
.channel(EpollDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
ch.pipeline().addFirst("DTLS", new DtlsClientHandshakeChannelHandler(clientConf.newContext(serverAddress), serverAddress, SessionWriter.NO_OPS));
}
});

CoapClient client = CoapServer.builder()
.executor(eventLoopGroup)
.transport(new NettyCoapTransport(clientBootstrap, EMPTY_RESOLVER))
.buildClient(localhost(serverPort));
.buildClient(serverAddress);

String threadName = client.sendSync(get("/currentThread")).getPayloadString();
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.UnixChannelOption;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
Expand All @@ -52,6 +53,19 @@ public NettyCoapTransport(Bootstrap bootstrap, Function<DatagramPacket, Transpor
@Override
public void start() {
init(bootstrap.bind().syncUninterruptibly().channel());

// for random port update bootstrap with actual port but only if SO_REUSEPORT is enabled
if (bindToRandomPort() && reusePortIsEnabled()) {
bootstrap.localAddress(channel.localAddress());
}
}

private Boolean reusePortIsEnabled() {
return (Boolean) bootstrap.config().options().getOrDefault(UnixChannelOption.SO_REUSEPORT, Boolean.FALSE);
}

private boolean bindToRandomPort() {
return ((InetSocketAddress) bootstrap.config().localAddress()).getPort() == 0;
}

void init(Channel channel) {
Expand Down
Loading

0 comments on commit 2c4b32e

Please sign in to comment.