From 1d93c6c8dcf5446593127c0e9bd006fbe5e26196 Mon Sep 17 00:00:00 2001 From: Marius Sincovici Date: Tue, 17 Dec 2024 17:03:53 +0100 Subject: [PATCH] updates Signed-off-by: Marius Sincovici --- test/qa/conftest.py | 6 +++--- test/qa/lib/network.py | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/test/qa/conftest.py b/test/qa/conftest.py index e00e1127..601cbc2e 100644 --- a/test/qa/conftest.py +++ b/test/qa/conftest.py @@ -79,14 +79,14 @@ def _check_connection_to_ip(ip_address, stop_event): print("start _check_connection_to_ip") while not stop_event.is_set(): try: - socket.create_connection((ip_address, 443), timeout=1) + network.is_internet_reachable(ip_address=ip_address, retry=1) print(f"~~~_check_connection_to_ip: IN-PING {ip_address} SUCCESS") except Exception as e: # noqa: BLE001 print(f"~~~_check_connection_to_ip: IN-PING {ip_address} FAILURE: {e}.") data = "\n".join(["_check_connection_to_ip: Default route:", str(os.popen("sudo ip route get 1.1.1.1").read()), "iptables stats",str(os.popen("sudo iptables -L -v -n").read())]) - print(data=data) + print(data) stop_event.wait(_CHECK_FREQUENCY) @@ -117,7 +117,7 @@ def _check_dns_resolution(domain, stop_event): def _capture_traffic(stop_event): print("start _capture_traffic") # use circular log files, keep only 2 latest each 10MB size - command = ["tshark", "-a", "filesize:1048576", "-b", "files:2", "-i", "any", "-w", "/opt/dist/logs/tshark_capture.pcap"] + command = ["tshark", "-a", "filesize:1048576", "-b", "files:2", "-i", "any", "-w", os.environ["WORKDIR"] + "/dist/logs/tshark_capture.pcap"] print("Starting tshark...") process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stop_event.wait() diff --git a/test/qa/lib/network.py b/test/qa/lib/network.py index 13a5cf79..ff55b09f 100644 --- a/test/qa/lib/network.py +++ b/test/qa/lib/network.py @@ -81,16 +81,17 @@ def capture_traffic(connection_settings, duration: int=3) -> str: return t_connect.packets -def _is_internet_reachable(retry=5) -> bool: +def is_internet_reachable(ip_address="1.1.1.1", port=443, retry=5) -> bool: """Returns True when remote host is reachable by its public IP.""" i = 0 while i < retry: try: - return "icmp_seq=" in sh.ping("-c", "1", "-w", "1", "1.1.1.1") - except sh.ErrorReturnCode: + sock = socket.create_connection((ip_address, port), timeout=1) + sock.close() + return True + except Exception: # noqa: BLE001 time.sleep(1) i += 1 - logging.log(capture_traffic(("", "", ""), duration=10)) return False @@ -143,13 +144,13 @@ def is_not_available(retry=5) -> bool: # If assert below fails, and you are running Kill Switch tests on your machine, inside of Docker, # set DNS in resolv.conf of your system to anything else but 127.0.0.53 - return not _is_internet_reachable(retry) + return not is_internet_reachable(retry=retry) def is_available(retry=5) -> bool: """Returns True when network access is available or throws AssertionError otherwise.""" assert _is_internet_reachable_outside_vpn(retry) - assert _is_internet_reachable(retry) + assert is_internet_reachable(retry=retry) return True