Skip to content

Commit

Permalink
Create CoapServerGroup from CoapBuilder; use Transport supplied in bu…
Browse files Browse the repository at this point in the history
…ilder; move MultiCoapServer -> CoapServerGroup
  • Loading branch information
szysas committed Aug 30, 2024
1 parent fa9781d commit 7102a2e
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.mbed.coap.utils.Timer.toTimer;
import static com.mbed.coap.utils.Validations.require;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.packet.BlockSize;
import com.mbed.coap.packet.CoapPacket;
Expand Down Expand Up @@ -55,13 +56,16 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Stream;

public final class CoapServerBuilder {
private static final long DELAYED_TRANSACTION_TIMEOUT_MS = 120000; //2 minutes

private CoapTransport coapTransport;
private Supplier<CoapTransport> coapTransport;
private int duplicationMaxSize = 10000;
private PutOnlyMap<CoapRequestId, CoapPacket> duplicateDetectionCache;
private ScheduledExecutorService scheduledExecutorService;
Expand Down Expand Up @@ -91,6 +95,11 @@ public CoapServerBuilder blockSize(BlockSize blockSize) {
}

public CoapServerBuilder transport(CoapTransport coapTransport) {
requireNonNull(coapTransport);
return transport(() -> coapTransport);
}

public CoapServerBuilder transport(Supplier<CoapTransport> coapTransport) {
this.coapTransport = requireNonNull(coapTransport);
return this;
}
Expand Down Expand Up @@ -208,7 +217,7 @@ public CoapServerBuilder requestTagSupplier(RequestTagSupplier requestTagSupplie
}

public CoapServer build() {
CoapTransport coapTransport = requireNonNull(this.coapTransport, "Missing transport");
CoapTransport coapTransport = requireNonNull(this.coapTransport.get(), "Missing transport");
final boolean stopExecutor = scheduledExecutorService == null;
final ScheduledExecutorService effectiveExecutorService = scheduledExecutorService != null ? scheduledExecutorService : Executors.newSingleThreadScheduledExecutor();
Timer timer = toTimer(effectiveExecutorService);
Expand Down Expand Up @@ -279,4 +288,12 @@ public CoapClient buildClient(InetSocketAddress target) throws IOException {
return CoapClient.create(target, build().start());
}

public CoapServerGroup buildGroup(int size) {
List<CoapServer> servers = Stream.generate(this::build)
.limit(size)
.collect(toList());

return new CoapServerGroup(servers);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencoap.coap.netty;
package com.mbed.coap.server;

import static com.mbed.coap.utils.Validations.require;
import static java.util.stream.Collectors.toList;
import static org.opencoap.coap.netty.CoapCodec.EMPTY_RESOLVER;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.List;
import java.util.stream.StreamSupport;

public class MultiCoapServer {

public class CoapServerGroup {
private final List<CoapServer> servers;

private MultiCoapServer(List<CoapServer> servers) {
CoapServerGroup(List<CoapServer> servers) {
require(!servers.isEmpty(), "At least one server required");
this.servers = servers;
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap) {
return create(builder, bootstrap, Integer.MAX_VALUE);
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap, int limitInstances) {
EventLoopGroup eventLoopGroup = bootstrap.config().group();

// create as many servers as there are executors in the event loop group
List<CoapServer> servers = StreamSupport.stream(eventLoopGroup.spliterator(), false)
.limit(limitInstances)
.map(executor -> builder
.transport(new NettyCoapTransport(bootstrap, EMPTY_RESOLVER))
.executor(executor)
.build()
)
.collect(toList());

return new MultiCoapServer(servers);
}

public MultiCoapServer start() throws IOException {
public CoapServerGroup start() throws IOException {
for (CoapServer server : servers) {
server.start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2022-2024 java-coap contributors (https://github.com/open-coap/java-coap)
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mbed.coap.server;

import static com.mbed.coap.packet.CoapRequest.get;
import static com.mbed.coap.utils.Networks.localhost;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.transport.InMemoryCoapTransport;
import org.junit.jupiter.api.Test;

class CoapServerGroupTest {
private int port = 10_000;
private final CoapServerBuilder builder = CoapServer.builder()
.transport(() -> InMemoryCoapTransport.create(port++))
.route(RouterService.builder()
.get("/test", (req) -> CoapResponse.ok("test").toFuture())
);


@Test
void test() throws Exception {
// given
CoapServerGroup servers = builder.buildGroup(3).start();
CoapClient client = builder.buildClient(localhost(10_001));
CoapClient client2 = builder.buildClient(localhost(10_003));

// when
assertEquals("test", client.sendSync(get("/test")).getPayloadString());
assertEquals("test", client2.sendSync(get("/test")).getPayloadString());

// then
client.close();
client2.close();
servers.stop();
assertFalse(servers.isRunning());
}

@Test
void failWhenBuildingGroupWithNoServers() {
assertThrows(IllegalArgumentException.class, () -> builder.buildGroup(0));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import com.mbed.coap.server.CoapServerGroup;
import com.mbed.coap.server.RouterService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
Expand All @@ -49,7 +49,6 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.opencoap.coap.netty.MultiCoapServer;
import org.opencoap.coap.netty.NettyCoapTransport;
import org.opencoap.ssl.PskAuth;
import org.opencoap.ssl.SslConfig;
Expand Down Expand Up @@ -100,11 +99,13 @@ void afterAll() {
@Test
void multi_thread_server() throws Exception {
Set<String> usedThreads = new HashSet<>();
CoapServerBuilder serverBuilder = CoapServer.builder()
CoapServerGroup server = CoapServer.builder()
.transport(() -> new NettyCoapTransport(serverBootstrap, EMPTY_RESOLVER))
.route(RouterService.builder()
.get("/currentThread", req -> supplyAsync(() -> ok(currentThread().getName()).build(), eventLoopGroup))
);
MultiCoapServer server = MultiCoapServer.create(serverBuilder, serverBootstrap).start();
)
.buildGroup(threads)
.start();


await().pollInterval(Duration.ZERO).atMost(ofSeconds(30)).until(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,79 @@
import static com.mbed.coap.utils.Networks.localhost;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.opencoap.coap.netty.CoapCodec.EMPTY_RESOLVER;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import com.mbed.coap.server.CoapServerGroup;
import com.mbed.coap.server.RouterService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class MultiChannelNettyTest {
private static final int THREADS = 3;
private final Bootstrap bootstrap = createBootstrap();
private Bootstrap bootstrap;

@Test
void test() throws Exception {
CoapServerBuilder builder = CoapServer.builder()
.route(RouterService.builder()
.get("/test", __ -> ok("OK").toFuture())
);
MultiCoapServer coapServer = MultiCoapServer.create(builder, bootstrap).start();
assertTrue(coapServer.isRunning());
void testNio() throws Exception {
bootstrap = createNioBootstrap();
CoapServerGroup coapServer = createServer();

verifyClients(coapServer.getLocalPorts());

coapServer.stop();
assertFalse(coapServer.isRunning());
}

@Test
@EnabledOnOs(OS.LINUX)
void testEpoll() throws Exception {
bootstrap = createEpollBootstrap();
CoapServerGroup coapServer = createServer();
// make sure that all channels are bound to the same port
assertTrue(coapServer.getLocalPorts().stream().distinct().count() == 1);

verifyClients(coapServer.getLocalPorts());

coapServer.stop();
assertFalse(coapServer.isRunning());
}

private void verifyClients(List<Integer> serverPorts) throws Exception {
// verify that all channels are working
for (int i = 0; i < THREADS; i++) {
CoapClient client = CoapServer.builder()
.transport(udp())
.buildClient(localhost(coapServer.getLocalPorts().get(i)));
.buildClient(localhost(serverPorts.get(i)));

assertTrue(client.ping().get());
assertEquals(ok("OK"), client.sendSync(get("/test")));
client.close();
}

coapServer.stop();
assertFalse(coapServer.isRunning());
}

private Bootstrap createBootstrap() {
if (Epoll.isAvailable()) {
return createEpollBootstrap();
}

return createNioBootstrap();
private CoapServerGroup createServer() throws IOException {
return CoapServer.builder()
.route(RouterService.builder()
.get("/test", __ -> ok("OK").toFuture())
)
.transport(() -> new NettyCoapTransport(bootstrap, EMPTY_RESOLVER))
.buildGroup(THREADS)
.start();
}

private Bootstrap createNioBootstrap() {
Expand Down

0 comments on commit 7102a2e

Please sign in to comment.