Skip to content

Commit 8987b39

Browse files
committed
tests/multi_net: Add test for DTLS server and client.
This adds a multi-test for DTLS server and client behaviour. It works on all ports that enable this feature (eg unix, esp32, rp2, stm32), but bare-metal ports that use lwIP are not reliable as the DTLS server because the lwIP bindings only support queuing one UDP packet at a time (that needs to be fixed). Also, to properly implement a DTLS server sockets need to support `socket.recvfrom(n, MSG_PEEK)`. That can be implemented in the future. Signed-off-by: Damien George <[email protected]>
1 parent 321b30c commit 8987b39

File tree

2 files changed

+103
-0
lines changed

2 files changed

+103
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Test DTLS server and client, sending a small amount of data between them.
2+
3+
try:
4+
import socket
5+
import tls
6+
except ImportError:
7+
print("SKIP")
8+
raise SystemExit
9+
10+
PORT = 8000
11+
12+
# These are test certificates. See tests/README.md for details.
13+
certfile = "ec_cert.der"
14+
keyfile = "ec_key.der"
15+
16+
try:
17+
with open(certfile, "rb") as cf:
18+
cert = cadata = cf.read()
19+
with open(keyfile, "rb") as kf:
20+
key = kf.read()
21+
except OSError:
22+
print("SKIP")
23+
raise SystemExit
24+
25+
26+
# DTLS server.
27+
def instance0():
28+
multitest.globals(IP=multitest.get_network_ip())
29+
30+
# Create a UDP socket and bind it to accept incoming connections.
31+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
32+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
33+
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
34+
35+
multitest.next()
36+
37+
# Wait for the client to connect.
38+
data, client_addr = s.recvfrom(1)
39+
print("incoming connection", data)
40+
41+
# Connect back to the client, so the UDP socket can be used like a stream.
42+
s.connect(client_addr)
43+
44+
# Create the DTLS context and load the certificate.
45+
ctx = tls.SSLContext(tls.PROTOCOL_DTLS_SERVER)
46+
ctx.load_cert_chain(cert, key)
47+
48+
# Wrap the UDP socket in server mode.
49+
print("wrap socket")
50+
s = ctx.wrap_socket(s, server_side=1)
51+
52+
# Transfer some data.
53+
for _ in range(4):
54+
print(s.recv(16))
55+
s.send(b"server to client")
56+
57+
# Close the DTLS and UDP connection.
58+
s.close()
59+
60+
61+
# DTLS client.
62+
def instance1():
63+
multitest.next()
64+
65+
# Create a UDP socket and connect to the server.
66+
addr = socket.getaddrinfo(IP, PORT)[0][-1]
67+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
68+
print("connect")
69+
s.connect(addr)
70+
71+
# Send one byte to indicate a connection, and so the server can obtain our address.
72+
s.write("X")
73+
74+
# Create a DTLS context and load the certificate.
75+
ctx = tls.SSLContext(tls.PROTOCOL_DTLS_CLIENT)
76+
ctx.verify_mode = tls.CERT_REQUIRED
77+
ctx.load_verify_locations(cadata)
78+
79+
# Wrap the UDP socket.
80+
print("wrap socket")
81+
s = ctx.wrap_socket(s, server_hostname="micropython.local")
82+
83+
# Transfer some data.
84+
for _ in range(4):
85+
s.send(b"client to server")
86+
print(s.recv(16))
87+
88+
# Close the DTLS and UDP connection.
89+
s.close()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
--- instance0 ---
2+
incoming connection b'X'
3+
wrap socket
4+
b'client to server'
5+
b'client to server'
6+
b'client to server'
7+
b'client to server'
8+
--- instance1 ---
9+
connect
10+
wrap socket
11+
b'server to client'
12+
b'server to client'
13+
b'server to client'
14+
b'server to client'

0 commit comments

Comments
 (0)