Skip to content

Commit

Permalink
Misalign batch testcase and add score
Browse files Browse the repository at this point in the history
* Add 4 peers instead of 2 to batching testcase

* Misalign them by starting each with a delay. This
one is synthetic since in real world peers don't usually
start all at the same time and establish direct connections but
in testcase it's still worth to have.

* Misalign further by blocking alpha<->beta for a while
so they would establish direct connection later than
with other peers thus having a misalignment similar to real world.

* Add scoring to distribution histogram. It can now be
quantified how better/worse some solution or scenario is.
For now it's just added to be printed

Signed-off-by: Lukas Pukenis <[email protected]>
  • Loading branch information
LukasPukenis committed Dec 20, 2024
1 parent 07c7840 commit 4335ef9
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 74 deletions.
165 changes: 92 additions & 73 deletions nat-lab/tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import itertools
import pytest
from contextlib import AsyncExitStack
from helpers import SetupParameters, setup_environment, setup_mesh_nodes
from itertools import zip_longest
from helpers import setup_api, setup_connections, SetupParameters, setup_mesh_nodes
from scapy.layers.inet import TCP, UDP, ICMP # type: ignore
from scapy.layers.l2 import ARP # type: ignore
from telio import Client
from timeouts import TEST_BATCHING_TIMEOUT
from typing import List
from utils.asyncio_util import run_async_context
Expand All @@ -17,7 +17,6 @@
FeatureBatching,
EndpointProvider,
RelayState,
LinkState,
NodeState,
PathType,
TelioAdapterType,
Expand All @@ -29,17 +28,19 @@
render_chart,
generate_packet_distribution_histogram,
generate_packet_delay_histogram,
get_ordered_histogram_score,
)

BATCHING_MISALIGN_S = 7
BATCHING_CAPTURE_TIME = 120 # Tied to TEST_BATCHING_TIMEOUT
BATCHING_CAPTURE_TIME = 130
DOCKER_CONE_GW_2_IP = "10.0.254.2"


def _generate_setup_parameters(
conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool
) -> SetupParameters:
features = features_with_endpoint_providers([EndpointProvider.STUN])

features = features_with_endpoint_providers(
[EndpointProvider.STUN, EndpointProvider.LOCAL]
)
features.link_detection = FeatureLinkDetection(
rtt_seconds=1, no_of_pings=1, use_for_downgrade=True
)
Expand Down Expand Up @@ -75,38 +76,37 @@ def _generate_setup_parameters(
ConnectionTag.DOCKER_CONE_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1,
TelioAdapterType.LINUX_NATIVE_TUN,
),
(
ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2,
TelioAdapterType.LINUX_NATIVE_TUN,
),
]
# This test captures histograms of network activity to evaluate the effect of local batching in libtelio.
# Since only local batching is implemented, no client-generated traffic should occur during the test.
# External traffic (incoming data) could distort the histograms, and receive-data-triggered batching is
# not yet supported in libtelio. The test setup is simple: all clients are interconnected and remain idle
# for an extended period. This idle period allows for a visual observation.
# Local batching will only have an effect of batching multiple local keepalives into one bundle but will
# not do anything about syncing the keepalives between the peers.


@pytest.mark.asyncio
@pytest.mark.timeout(TEST_BATCHING_TIMEOUT)
@pytest.mark.parametrize(
"setup_params,misalign_sleep_s,capture_duration",
"setup_params,capture_duration",
[
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, True)
_generate_setup_parameters(conn_tag, adapter, False)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
],
),
pytest.param(
[
_generate_setup_parameters(conn_tag, adapter, False)
_generate_setup_parameters(conn_tag, adapter, True)
for conn_tag, adapter in ALL_NODES
],
BATCHING_MISALIGN_S,
BATCHING_CAPTURE_TIME,
marks=[
pytest.mark.batching,
Expand All @@ -116,53 +116,88 @@ def _generate_setup_parameters(
)
async def test_batching(
setup_params: List[SetupParameters],
misalign_sleep_s: int,
capture_duration: int,
) -> None:
"""Batch test generates environment where all peers idle after forming direct connections
packet capture is being used to observe how traffic flows and is then processed and displayed.
"""

async with AsyncExitStack() as exit_stack:
env = await exit_stack.enter_async_context(
setup_environment(exit_stack, setup_params)
api, nodes = setup_api(
[(instance.is_local, instance.ip_stack) for instance in setup_params]
)
connection_managers = await setup_connections(
exit_stack,
[
(
instance.connection_tag,
instance.connection_tracker_config,
)
for instance in setup_params
],
)

await asyncio.gather(*[
client.wait_for_state_on_any_derp([RelayState.CONNECTED])
for client, instance in zip_longest(env.clients, setup_params)
if instance.derp_servers != []
])
clients = []
for node, conn_man, params in zip(nodes, connection_managers, setup_params):
client = Client(
conn_man.connection, node, params.adapter_type_override, params.features
)
clients.append(client)

alpha_client, beta_client, *_ = clients
alpha_node, beta_node, *_ = nodes

# Start capture tasks

# We capture the traffic from all nodes and gateways.
# On gateways we are sure the traffic has left the machine, however no easy way to
# inspect the packets(encrypted by wireguard). For packet inspection
# client traffic can be inspected.
gateways = [DOCKER_GW_MAP[param.connection_tag] for param in setup_params]
gateway_container_names = [container_id(conn_tag) for conn_tag in gateways]
conns = [client.get_connection() for client in env.clients]
conns = [client.get_connection() for client in clients]
node_container_names = [
conn.container_name()
for conn in conns
if isinstance(conn, DockerConnection)
]

container_names = gateway_container_names + node_container_names
container_names = sorted(
list(set(gateway_container_names + node_container_names))
)

print("Will capture batching on containers: ", container_names)
cnodes = zip(env.clients, env.nodes)
pcap_capture_tasks = []
for name in container_names:
pcap_task = asyncio.create_task(
capture_traffic(
name,
capture_duration,
)
)
pcap_capture_tasks.append(pcap_task)

# Misalign the peers by first stopping all of them and then restarting after various delays.
# This will have an effect of forcing neighboring libtelio node to add the peer to internal lists
# for keepalives at various points in time thus allowing us to observe better
# if the local batching is in action.
for client in env.clients:
await client.stop_device()
async def delayed_task(delay, node, client):
await asyncio.sleep(delay)
return await exit_stack.enter_async_context(
client.run(api.get_meshnet_config(node.id))
)

tasks = []
for i, (client, node) in enumerate(zip(clients, nodes)):
tasks.append(asyncio.create_task(delayed_task(i * 3, node, client)))

# misalign the peers by sleeping some before starting each node again
async def start_node_manually(client, node, sleep_s):
await asyncio.sleep(sleep_s)
await client.simple_start()
await client.set_meshnet_config(env.api.get_meshnet_config(node.id))
# deliberately block direct connection alpha <-> beta. This will make alpha and beta still form direct connections with other peers
# but alpha <-> beta itself will form after a delay causing misalignment which represents real world keepalive flow better
async with AsyncExitStack() as exit_stack2:
await exit_stack2.enter_async_context(
alpha_client.get_router().disable_path(DOCKER_CONE_GW_2_IP),
)
await asyncio.sleep(20)

await asyncio.gather(*[
start_node_manually(client, node, i * misalign_sleep_s)
for i, (client, node) in enumerate(cnodes)
client.wait_for_state_on_any_derp([RelayState.CONNECTED])
for client in [alpha_client, beta_client]
])

await asyncio.gather(*[
Expand All @@ -173,55 +208,37 @@ async def start_node_manually(client, node, sleep_s):
)
)
)
for client, node in itertools.product(env.clients, env.nodes)
for client, node in itertools.product(clients, nodes)
if not client.is_node(node)
])

print("All peers directly interconnected")

pyro5_ports = [
int(port) for port in {client.get_proxy_port() for client in env.clients}
int(port) for port in {client.get_proxy_port() for client in clients}
]

print("Pyro ports", pyro5_ports)
# In general it's not great to filter traffic but for testing and observing
# it's crucial since it distorts the results. For example Pyro traffic is a constant stream of
# TCP packets
allow_pcap_filters = [
(
"No Pyro5, SSDP, ARP",
"No Pyro5 and no ARP",
lambda p: (
(
(p.haslayer(UDP) or p.haslayer(TCP))
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
)
and (
not p.haslayer(ICMP)
or p.haslayer(ICMP)
and p[ICMP].type in [0, 8]
)
and (
p.haslayer(UDP)
and p[UDP].sport != 1900
and p[UDP].dport != 1900
(not p.haslayer(TCP))
or (
p.haslayer(TCP)
and p.sport not in pyro5_ports
and p.dport not in pyro5_ports
)
)
and (not p.haslayer(ARP))
),
),
]

pcap_capture_tasks = []
for name in container_names:
pcap_task = asyncio.create_task(
capture_traffic(
name,
capture_duration,
)
)
pcap_capture_tasks.append(pcap_task)
is_batching_enabled = clients[0].get_features().batching is not None

pcap_paths = await asyncio.gather(*pcap_capture_tasks)

is_batching_enabled = env.clients[0].get_features().batching is not None
for container, pcap_path in zip(container_names, pcap_paths):
distribution_hs = generate_packet_distribution_histogram(
pcap_path, capture_duration, allow_pcap_filters
Expand All @@ -243,6 +260,8 @@ async def start_node_manually(client, node, sleep_s):
print("Delay chart below")
print(delay_chart)

print("Score: ", get_ordered_histogram_score(delay_hs))


def proxying_peer_parameters(clients: List[ConnectionTag]):
def features():
Expand Down
2 changes: 1 addition & 1 deletion nat-lab/tests/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180
TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180
TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300
TEST_BATCHING_TIMEOUT = 1000
TEST_BATCHING_TIMEOUT = 600
9 changes: 9 additions & 0 deletions nat-lab/tests/utils/traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ def generate_packet_delay_histogram(
return generate_histogram(timestamps, buckets)


def get_ordered_histogram_score(data: typing.List[int]) -> int:
# Assumes the histogram order matters and each item going to the right adds more to the score
# Useful to quantity a score for things like periods between packets
score = 0
for i, value in enumerate(data, start=1):
score += i * value
return score


def generate_packet_distribution_histogram(
pcap_path: str,
buckets: int,
Expand Down

0 comments on commit 4335ef9

Please sign in to comment.