Skip to content

Commit

Permalink
Added MultiCoapServer, helper class to support multiple Netty channels
Browse files Browse the repository at this point in the history
  • Loading branch information
szysas committed Aug 29, 2024
1 parent 6b29a92 commit 6df061d
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 38 deletions.
1 change: 1 addition & 0 deletions coap-mbedtls/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ dependencies {
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.10.3")
testImplementation("io.netty:netty-transport-native-epoll:4.1.112.Final:linux-x86_64")
testImplementation("ch.qos.logback:logback-classic:1.3.14")
testImplementation("org.awaitility:awaitility:4.2.1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,41 @@
*/
package org.opencoap.transport.mbedtls;

import static com.mbed.coap.packet.CoapRequest.post;
import static com.mbed.coap.packet.CoapRequest.get;
import static com.mbed.coap.packet.CoapResponse.ok;
import static com.mbed.coap.packet.Opaque.of;
import static com.mbed.coap.utils.Assertions.assertEquals;
import static com.mbed.coap.utils.Networks.localhost;
import static java.lang.Thread.currentThread;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.awaitility.Awaitility.await;
import static org.opencoap.coap.netty.CoapCodec.EMPTY_RESOLVER;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import com.mbed.coap.server.RouterService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.time.Duration;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.opencoap.coap.netty.MultiCoapServer;
import org.opencoap.coap.netty.NettyCoapTransport;
import org.opencoap.ssl.PskAuth;
import org.opencoap.ssl.SslConfig;
Expand Down Expand Up @@ -71,14 +77,14 @@ public class MultithreadedMbedtlsNettyTest {
protected void initChannel(DatagramChannel ch) {
InetSocketAddress destinationAddress = localhost(serverPort);
ch.pipeline().addFirst("DTLS", new DtlsClientHandshakeChannelHandler(clientConf.newContext(destinationAddress), destinationAddress, SessionWriter.NO_OPS));
}
});
}
});

private final Bootstrap serverBootstrap = new Bootstrap()
.group(eventLoopGroup)
.localAddress(serverPort)
.channel(EpollDatagramChannel.class)
.option(EpollChannelOption.SO_REUSEPORT, true)
.option(UnixChannelOption.SO_REUSEPORT, true)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
Expand All @@ -93,39 +99,33 @@ void afterAll() {

@Test
void multi_thread_server() throws Exception {
CoapServerBuilder serverBuilder = CoapServer.builder()
.executor(eventLoopGroup)
Set<String> usedThreads = new HashSet<>();
CoapServerBuilder serverBuilder = CoapServer.builder()
.route(RouterService.builder()
.post("/echo", req -> CompletableFuture.supplyAsync(() -> ok(req.getPayload()).build()))
.get("/currentThread", req -> supplyAsync(() -> ok(currentThread().getName()).build(), eventLoopGroup))
);
List<CoapServer> servers = new ArrayList<>();
for (int i = 0; i < threads; i++) {
servers.add(
serverBuilder
.transport(new NettyCoapTransport(serverBootstrap, EMPTY_RESOLVER))
.build()
.start()
);
}
MultiCoapServer server = MultiCoapServer.create(serverBuilder, serverBootstrap).start();


List<CoapClient> clients = new ArrayList<>();
InetSocketAddress srvAddr = localhost(serverPort);
CoapServerBuilder clientBuilder = CoapServer.builder().executor(eventLoopGroup);
for (int i = 0; i < 100; i++) {
clients.add(
clientBuilder
.transport(new NettyCoapTransport(clientBootstrap, EMPTY_RESOLVER))
.buildClient(srvAddr)
);
}
await().pollInterval(Duration.ZERO).atMost(ofSeconds(30)).until(() -> {
String threadName = connectAndReadCurrentThreadName();
usedThreads.add(threadName);

for (CoapClient client : clients) {
assertEquals(ok("paska"), client.sendSync(post("/echo").payload("paska")));
client.close();
}
// wait until all threads from eventLoopGroup are used
return usedThreads.size() == threads;
});

server.stop();
}

private String connectAndReadCurrentThreadName() throws IOException, CoapException {
CoapClient client = CoapServer.builder()
.executor(eventLoopGroup)
.transport(new NettyCoapTransport(clientBootstrap, EMPTY_RESOLVER))
.buildClient(localhost(serverPort));

for (CoapServer srv : servers) {
srv.stop();
}
String threadName = client.sendSync(get("/currentThread")).getPayloadString();
client.close();
return threadName;
}
}
1 change: 1 addition & 0 deletions coap-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {

testImplementation(testFixtures(project(":coap-core")))
testImplementation("ch.qos.logback:logback-classic:1.3.14")
testImplementation("io.netty:netty-transport-native-epoll:4.1.112.Final:linux-x86_64")

jmhImplementation("io.netty:netty-all:4.1.112.Final")
jmh("org.openjdk.jmh:jmh-core:1.37")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2022-2024 java-coap contributors (https://github.com/open-coap/java-coap)
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencoap.coap.netty;

import static com.mbed.coap.utils.Validations.require;
import static java.util.stream.Collectors.toList;
import static org.opencoap.coap.netty.CoapCodec.EMPTY_RESOLVER;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.List;
import java.util.stream.StreamSupport;

public class MultiCoapServer {

private final List<CoapServer> servers;

private MultiCoapServer(List<CoapServer> servers) {
require(!servers.isEmpty(), "At least one server required");
this.servers = servers;
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap) {
return create(builder, bootstrap, Integer.MAX_VALUE);
}

public static MultiCoapServer create(CoapServerBuilder builder, Bootstrap bootstrap, int limitInstances) {
EventLoopGroup eventLoopGroup = bootstrap.config().group();

// create as many servers as there are executors in the event loop group
List<CoapServer> servers = StreamSupport.stream(eventLoopGroup.spliterator(), false)
.limit(limitInstances)
.map(executor -> builder
.transport(new NettyCoapTransport(bootstrap, EMPTY_RESOLVER))
.executor(executor)
.build()
)
.collect(toList());

return new MultiCoapServer(servers);
}

public MultiCoapServer start() throws IOException {
for (CoapServer server : servers) {
server.start();
}
return this;
}

public void stop() {
for (CoapServer server : servers) {
server.stop();
}
}

public List<Integer> getLocalPorts() {
return servers.stream().map(server -> server.getLocalSocketAddress().getPort()).collect(toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (C) 2022-2024 java-coap contributors (https://github.com/open-coap/java-coap)
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencoap.coap.netty;

import static com.mbed.coap.packet.CoapRequest.get;
import static com.mbed.coap.packet.CoapResponse.ok;
import static com.mbed.coap.transport.udp.DatagramSocketTransport.udp;
import static com.mbed.coap.utils.Assertions.assertEquals;
import static com.mbed.coap.utils.Networks.localhost;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.mbed.coap.client.CoapClient;
import com.mbed.coap.server.CoapServer;
import com.mbed.coap.server.CoapServerBuilder;
import com.mbed.coap.server.RouterService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class MultiChannelNettyTest {
private static final int THREADS = 3;
private final Bootstrap bootstrap = createBootstrap();

@Test
void test() throws Exception {
CoapServerBuilder builder = CoapServer.builder()
.route(RouterService.builder()
.get("/test", __ -> ok("OK").toFuture())
);
MultiCoapServer coapServer = MultiCoapServer.create(builder, bootstrap).start();

// verify that all channels are working
for (int i = 0; i < THREADS; i++) {
CoapClient client = CoapServer.builder()
.transport(udp())
.buildClient(localhost(coapServer.getLocalPorts().get(i)));

assertTrue(client.ping().get());
assertEquals(ok("OK"), client.sendSync(get("/test")));
}

coapServer.stop();
}

private Bootstrap createBootstrap() {
if (Epoll.isAvailable()) {
return createEpollBootstrap();
}

return createNioBootstrap();
}

private Bootstrap createNioBootstrap() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(THREADS, new DefaultThreadFactory("udp", true));
return new Bootstrap()
.group(eventLoopGroup)
.localAddress(0) // this will cause server binding to multiple ports
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
// do nothing
}
});
}

private Bootstrap createEpollBootstrap() {
EventLoopGroup eventLoopGroup = new EpollEventLoopGroup(3, new DefaultThreadFactory("udp", true));

return new Bootstrap()
.group(eventLoopGroup)
.option(UnixChannelOption.SO_REUSEPORT, true)
.localAddress(65001) // bind multiple times on single port
.channel(EpollDatagramChannel.class)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) {
// do nothing
}
});
}
}

0 comments on commit 6df061d

Please sign in to comment.