Skip to content

Commit

Permalink
add tcpdump wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
gytsto committed Dec 3, 2024
1 parent 96eb1bb commit 7eca67a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tcli.log
/nat-lab/3rd-party/netfilter-full-cone-nat
/nat-lab/tests/uniffi/telio_bindings.py
/nat-lab/tests/uniffi/libtelio.so
/nat-lab/tests/uniffi/telio.dll
/nat-lab/logs
/nat-lab/coredumps
/src/telio.py
Expand Down
2 changes: 2 additions & 0 deletions nat-lab/tests/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def get_root_path(path: str) -> str:
IPERF_BINARY_MAC = "/var/root/iperf3/iperf3"
IPERF_BINARY_WINDOWS = "C:/workspace/iperf3/iperf3.exe".replace("/", "\\")

WINDUMP_BINARY_WINDOWS = "C:/workspace/WinDump.exe".replace("/", "\\")

# JIRA issue: LLT-1664
# The directories between host and Docker container are shared via
# Docker volumes. Mounting `libtelio/dist` is a nogo, since when host
Expand Down
78 changes: 78 additions & 0 deletions nat-lab/tests/utils/tcpdump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from config import WINDUMP_BINARY_WINDOWS
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional, List
from utils.connection import TargetOS, Connection
from utils.process import Process

PCAP_FILE_PATH = "/dump.pcap"


class TcpDump:
interface: str
connection: Connection
process: Process
verbose: bool
stdout: str
stderr: str
output_file: str

def __init__(
self,
connection: Connection,
interface: Optional[str],
output_file: Optional[str],
filters: List[str],
verbose: bool = False,
) -> None:
self.connection = connection
self.interface = interface or "any"
self.output_file = output_file or PCAP_FILE_PATH
self.process = self.connection.create_process(
[
self.get_tcpdump_binary(connection.target_os),
f"-i {self.interface}",
f"-w {self.output_file}",
]
+ filters
)
self.verbose = verbose
self.stdout = ""
self.stderr = ""

@staticmethod
def get_tcpdump_binary(target_os: TargetOS) -> str:
if target_os in [TargetOS.Linux, TargetOS.Mac]:
return "tcpdump"

if target_os == TargetOS.Windows:
return WINDUMP_BINARY_WINDOWS

raise ValueError(f"target_os not supported {target_os}")

def get_stdout(self) -> str:
return self.stdout

def get_stderr(self) -> str:
return self.stderr

async def on_stdout(self, output: str) -> None:
self.stdout += output
if self.verbose:
print(f"TCPDUMP: {output}")

async def on_stderr(self, output: str) -> None:
self.stderr += output
if self.verbose:
print(f"TCPDUMP ERROR: {output}")

async def execute(self) -> None:
try:
await self.process.execute(self.on_stdout, self.on_stderr)
except Exception as e:
print(f"Error executing tcpdump: {e}")
raise

@asynccontextmanager
async def run(self) -> AsyncIterator["TcpDump"]:
async with self.process.run(self.on_stdout, self.on_stderr):
yield self

0 comments on commit 7eca67a

Please sign in to comment.