From 3e9386ad93f0bda85981831fba5404fb32b2c324 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 04:10:44 +0800 Subject: [PATCH 1/6] feat(listener): support unix socket listen --- docs/en/reference/configuration.md | 11 +- docs/reference/configuration.md | 11 +- e2e/helpers/__init__.py | 6 +- e2e/helpers/http.py | 85 ++++++-- e2e/helpers/ports.py | 14 ++ e2e/helpers/r2h_process.py | 33 ++- e2e/test_unix_socket.py | 192 ++++++++++++++++++ .../luci-static/resources/view/rtp2httpd.js | 25 ++- .../po/templates/rtp2httpd.pot | 7 +- .../po/zh_Hans/rtp2httpd.po | 13 +- .../rtp2httpd/files/rtp2httpd.conf | 2 + rtp2httpd.conf | 3 +- src/configuration.c | 147 ++++++++++++-- src/configuration.h | 13 ++ src/connection.c | 20 +- src/m3u.c | 18 +- src/supervisor.c | 164 ++++++++++++++- src/worker.c | 4 +- 18 files changed, 695 insertions(+), 73 deletions(-) create mode 100644 e2e/test_unix_socket.py diff --git a/docs/en/reference/configuration.md b/docs/en/reference/configuration.md index 4eb47d7f..632faaa7 100644 --- a/docs/en/reference/configuration.md +++ b/docs/en/reference/configuration.md @@ -16,16 +16,18 @@ rtp2httpd [options] ### Network Configuration -- `-l, --listen [address:]port` - Bind address and port (default: \*:5140) +- `-l, --listen [address:]port|/path/to/rtp2httpd.sock` - Bind a TCP listen address/port, or listen on a Unix domain socket (default: \*:5140) - `-m, --maxclients ` - Maximum concurrent clients (default: 5) - `-w, --workers ` - Number of worker processes (default: 1) -`--listen` can be specified multiple times to listen on multiple addresses or ports: +`--listen` can be specified multiple times to listen on multiple TCP addresses/ports or Unix sockets: ```bash -rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' +rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock ``` +Unix socket listen paths must be absolute and must not contain whitespace. At startup, if the same path already contains a socket file, rtp2httpd removes it automatically. If the path is a regular file, directory, or symbolic link, startup is rejected to avoid deleting user data. When any Unix socket listener is enabled, `zerocopy-on-send` is disabled globally. + #### Upstream Network Interface Configuration - `-i, --upstream-interface ` - Default upstream interface (applies to all traffic types, lowest priority) @@ -249,6 +251,9 @@ ffmpeg-args = -hwaccel none # Listen on an IPv6 address (brackets optional) 2001:db8::1 5140 +# Listen on a Unix domain socket (path must be absolute) +/var/run/rtp2httpd.sock + # Multiple listen addresses are supported # The [services] section can contain M3U playlists starting with #EXTM3U diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 41ade37c..1d5ef798 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -16,16 +16,18 @@ rtp2httpd [选项] ### 网络配置 -- `-l, --listen [地址:]端口` - 绑定监听地址和端口 (默认: \*:5140) +- `-l, --listen [地址:]端口|/path/to/rtp2httpd.sock` - 绑定 TCP 监听地址/端口,或监听 Unix domain socket (默认: \*:5140) - `-m, --maxclients <数量>` - 最大并发客户端数 (默认: 5) - `-w, --workers <数量>` - 工作进程数 (默认: 1) -`--listen` 可以重复指定,用于同时监听多个地址或端口: +`--listen` 可以重复指定,用于同时监听多个 TCP 地址/端口或 Unix socket: ```bash -rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' +rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock ``` +Unix socket 监听路径必须是绝对路径,且路径中不能包含空白字符。启动时如果同路径已存在 socket 文件,rtp2httpd 会自动清理;如果同路径是普通文件、目录或符号链接,则会拒绝启动以避免误删数据。启用任意 Unix socket 监听时,`zerocopy-on-send` 会被全局关闭。 + #### 上游网络接口配置 - `-i, --upstream-interface <接口>` - 默认上游接口(作用于所有流量类型,优先级最低) @@ -249,6 +251,9 @@ ffmpeg-args = -hwaccel none # 监听 IPv6 地址(可省略方括号) 2001:db8::1 5140 +# 监听 Unix domain socket(路径必须是绝对路径) +/var/run/rtp2httpd.sock + # 支持多个监听地址 # [services] 内可以直接编写以 #EXTM3U 开头的 m3u 节目清单 diff --git a/e2e/helpers/__init__.py b/e2e/helpers/__init__.py index 35927cad..3b2dc857 100644 --- a/e2e/helpers/__init__.py +++ b/e2e/helpers/__init__.py @@ -23,7 +23,7 @@ MCAST_ADDR, PROJECT_ROOT, ) -from .http import extract_catchup_source, http_get, http_request, stream_get +from .http import extract_catchup_source, http_get, http_request, stream_get, unix_http_get, unix_http_request from .mock_fcc import MockFCCServer from .mock_http import MockHTTPUpstream, MockHTTPUpstreamSilent from .mock_rtsp import ( @@ -40,6 +40,7 @@ find_free_udp_port_pair, ipv6_loopback_available, wait_for_port, + wait_for_unix_socket, ) from .r2h_process import R2HProcess, make_m3u_rtsp_config from .rtp import MulticastSender, make_rtp_packet @@ -71,5 +72,8 @@ "make_m3u_rtsp_config", "make_rtp_packet", "stream_get", + "unix_http_get", + "unix_http_request", "wait_for_port", + "wait_for_unix_socket", ] diff --git a/e2e/helpers/http.py b/e2e/helpers/http.py index 57c7d56a..b9a69d19 100644 --- a/e2e/helpers/http.py +++ b/e2e/helpers/http.py @@ -46,6 +46,73 @@ def http_request( conn.close() +def _parse_raw_http_response(data: bytes, lower_header_names: bool = False) -> tuple[int, dict, bytes]: + header_end = data.find(b"\r\n\r\n") + if header_end < 0: + return 0, {}, b"" + + header_text = data[:header_end].decode(errors="replace") + body = data[header_end + 4 :] + parts = header_text.split("\r\n") + status_code = int(parts[0].split()[1]) + + hdrs: dict[str, str] = {} + for line in parts[1:]: + if ":" in line: + k, v = line.split(":", 1) + key = k.strip().lower() if lower_header_names else k.strip() + hdrs[key] = v.strip() + + return status_code, hdrs, body + + +def unix_http_request( + socket_path: str, + method: str, + path: str, + timeout: float = 5.0, + headers: dict | None = None, + body: bytes | None = None, +) -> tuple[int, dict, bytes]: + """HTTP request over a Unix domain socket. Returns (status, headers, body).""" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(timeout) + try: + sock.connect(socket_path) + req_lines = ["%s %s HTTP/1.0" % (method, path), "Host: localhost"] + payload = body or b"" + for k, v in (headers or {}).items(): + req_lines.append("%s: %s" % (k, v)) + if payload: + req_lines.append("Content-Length: %d" % len(payload)) + req_lines.append("") + req_lines.append("") + sock.sendall("\r\n".join(req_lines).encode() + payload) + + data = b"" + while True: + try: + chunk = sock.recv(4096) + except socket.timeout: + break + if not chunk: + break + data += chunk + return _parse_raw_http_response(data) + finally: + sock.close() + + +def unix_http_get( + socket_path: str, + path: str, + timeout: float = 5.0, + headers: dict | None = None, +) -> tuple[int, dict, bytes]: + """HTTP GET over a Unix domain socket. Returns (status, headers, body).""" + return unix_http_request(socket_path, "GET", path, timeout=timeout, headers=headers) + + def extract_catchup_source(playlist_text, channel_name): """Extract catchup-source URL from the EXTINF line for a channel. @@ -108,23 +175,7 @@ def stream_get( break data += chunk - header_end = data.find(b"\r\n\r\n") - if header_end < 0: - return 0, {}, b"" - - header_text = data[:header_end].decode(errors="replace") - body = data[header_end + 4 :] - - parts = header_text.split("\r\n") - status_code = int(parts[0].split()[1]) - - hdrs: dict[str, str] = {} - for line in parts[1:]: - if ":" in line: - k, v = line.split(":", 1) - hdrs[k.strip().lower()] = v.strip() - - return status_code, hdrs, body + return _parse_raw_http_response(data, lower_header_names=True) except socket.timeout, OSError: return 0, {}, b"" finally: diff --git a/e2e/helpers/ports.py b/e2e/helpers/ports.py index 820716b8..53432a6f 100644 --- a/e2e/helpers/ports.py +++ b/e2e/helpers/ports.py @@ -59,3 +59,17 @@ def wait_for_port(port: int, host: str = "127.0.0.1", timeout: float = 5.0) -> b except ConnectionRefusedError, OSError, socket.timeout: time.sleep(0.05) return False + + +def wait_for_unix_socket(path: str, timeout: float = 5.0) -> bool: + """Block until *path* is accepting Unix stream socket connections.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.settimeout(0.5) + sock.connect(path) + return True + except OSError: + time.sleep(0.05) + return False diff --git a/e2e/helpers/r2h_process.py b/e2e/helpers/r2h_process.py index 78cf9216..0d15aa14 100644 --- a/e2e/helpers/r2h_process.py +++ b/e2e/helpers/r2h_process.py @@ -7,7 +7,7 @@ import tempfile from pathlib import Path -from .ports import wait_for_port +from .ports import wait_for_port, wait_for_unix_socket def make_m3u_rtsp_config(r2h_port: int, rtsp_port: int, channel_name: str, configured_url_query: str = "") -> str: @@ -26,16 +26,20 @@ class R2HProcess: def __init__( self, binary: str | Path, - port: int, + port: int | None, extra_args: list[str] | None = None, config_content: str | None = None, capture_log: bool = False, + listen: str | None = None, + wait_socket_path: str | None = None, ): self.binary = str(binary) self.port = port self.extra_args = list(extra_args or []) self.config_content = config_content self.capture_log = capture_log + self.listen = listen + self.wait_socket_path = wait_socket_path self.process: subprocess.Popen | None = None self._config_path: str | None = None self._log_path: str | None = None @@ -51,9 +55,23 @@ def start(self, wait: bool = True) -> None: self.process = subprocess.Popen(args, stdout=self._log_handle, stderr=self._log_handle) else: self.process = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - if wait and not wait_for_port(self.port, timeout=6.0): - self.stop() - raise RuntimeError("rtp2httpd did not start on port %d.\nCommand: %s" % (self.port, " ".join(args))) + if wait: + if self.wait_socket_path: + if not wait_for_unix_socket(self.wait_socket_path, timeout=6.0): + self.stop() + raise RuntimeError( + "rtp2httpd did not start on Unix socket %s.\nCommand: %s" + % (self.wait_socket_path, " ".join(args)) + ) + elif self.listen and self.listen.startswith("/"): + if not wait_for_unix_socket(self.listen, timeout=6.0): + self.stop() + raise RuntimeError( + "rtp2httpd did not start on Unix socket %s.\nCommand: %s" % (self.listen, " ".join(args)) + ) + elif self.port is not None and not wait_for_port(self.port, timeout=6.0): + self.stop() + raise RuntimeError("rtp2httpd did not start on port %d.\nCommand: %s" % (self.port, " ".join(args))) def stop(self) -> None: if self.process and self.process.poll() is None: @@ -97,6 +115,9 @@ def _build_args(self) -> list[str]: else: args = [self.binary, "-C"] - args.extend(["-l", str(self.port)]) + if self.listen is not None: + args.extend(["-l", self.listen]) + elif self.port is not None: + args.extend(["-l", str(self.port)]) args.extend(self.extra_args) return args diff --git a/e2e/test_unix_socket.py b/e2e/test_unix_socket.py new file mode 100644 index 00000000..7e68d3bd --- /dev/null +++ b/e2e/test_unix_socket.py @@ -0,0 +1,192 @@ +""" +E2E tests for Unix domain socket listeners. + +Tests cover CLI and config-file Unix socket binds, mixed TCP/Unix listeners, +multi-worker accept behavior, stale socket cleanup, regular-file rejection, +zero-copy disablement, and file responses over Unix sockets. +""" + +from __future__ import annotations + +import os +import socket +import tempfile +import time + +from helpers import ( + R2HProcess, + find_free_port, + http_get, + unix_http_get, + wait_for_unix_socket, +) + +SAMPLE_EPG_XML = """\ + + + + Channel 1 + + + Unix Socket Programme + + +""" + + +def _socket_path(tmpdir: str) -> str: + return os.path.join(tmpdir, "rtp2httpd.sock") + + +def _write_tmp(data: bytes, suffix: str = ".xml") -> str: + fd, path = tempfile.mkstemp(suffix=suffix, prefix="r2h_unix_epg_") + with os.fdopen(fd, "wb") as f: + f.write(data) + return path + + +class TestUnixSocketListen: + def test_cli_unix_socket_serves_status(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + r2h = R2HProcess(r2h_binary, None, extra_args=["-v", "4"], capture_log=True, listen=sock_path) + try: + r2h.start() + status, hdrs, body = unix_http_get(sock_path, "/status") + content_type = hdrs.get("Content-Type", "").lower() + assert status == 200 + assert "text/html" in content_type + assert len(body) > 0 + assert "New client unix requested URL: /status" in r2h.read_log() + finally: + r2h.stop() + + def test_config_unix_socket_serves_status(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + config = f"""\ +[global] +verbosity = 4 + +[bind] +{sock_path} +""" + r2h = R2HProcess(r2h_binary, None, config_content=config, wait_socket_path=sock_path) + try: + r2h.start() + status, _, _ = unix_http_get(sock_path, "/status") + assert status == 200 + finally: + r2h.stop() + + def test_mixed_tcp_and_unix_listeners(self, r2h_binary): + port = find_free_port() + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + r2h = R2HProcess(r2h_binary, port, extra_args=["-v", "4", "-l", sock_path]) + try: + r2h.start() + assert wait_for_unix_socket(sock_path) + + status, _, _ = http_get("127.0.0.1", port, "/status") + assert status == 200 + + status, _, _ = unix_http_get(sock_path, "/status") + assert status == 200 + finally: + r2h.stop() + + def test_unix_socket_multiple_workers(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + r2h = R2HProcess(r2h_binary, None, extra_args=["-v", "4", "-w", "2"], listen=sock_path) + try: + r2h.start() + for _ in range(8): + status, _, _ = unix_http_get(sock_path, "/status") + assert status == 200 + finally: + r2h.stop() + + def test_stale_socket_path_is_cleaned(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + stale = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + stale.bind(sock_path) + finally: + stale.close() + + r2h = R2HProcess(r2h_binary, None, extra_args=["-v", "4"], listen=sock_path) + try: + r2h.start() + status, _, _ = unix_http_get(sock_path, "/status") + assert status == 200 + finally: + r2h.stop() + + def test_regular_file_socket_path_fails_startup(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + with open(sock_path, "wb") as f: + f.write(b"not a socket") + + r2h = R2HProcess(r2h_binary, None, extra_args=["-v", "4"], capture_log=True, listen=sock_path) + try: + r2h.start(wait=False) + assert r2h.process is not None + r2h.process.wait(timeout=5) + assert r2h.process.returncode != 0 + log = r2h.read_log() + assert "not a socket" in log + finally: + r2h.stop() + + def test_unix_socket_disables_zerocopy(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + r2h = R2HProcess( + r2h_binary, + None, + extra_args=["-v", "4", "--zerocopy-on-send"], + capture_log=True, + listen=sock_path, + ) + try: + r2h.start() + status, _, _ = unix_http_get(sock_path, "/status") + assert status == 200 + log = r2h.read_log() + assert "Zero-copy send disabled because Unix socket listener is configured" in log + finally: + r2h.stop() + + def test_epg_file_response_over_unix_socket(self, r2h_binary): + epg_path = _write_tmp(SAMPLE_EPG_XML.encode()) + try: + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + config = f"""\ +[global] +verbosity = 4 + +[bind] +{sock_path} + +[services] +#EXTM3U x-tvg-url="file://{epg_path}" +#EXTINF:-1,Channel +rtp://239.0.0.1:1234 +""" + r2h = R2HProcess(r2h_binary, None, config_content=config, wait_socket_path=sock_path) + try: + r2h.start() + time.sleep(0.5) + status, hdrs, body = unix_http_get(sock_path, "/epg.xml") + assert status == 200 + assert "xml" in hdrs.get("Content-Type", "").lower() + assert b"Unix Socket Programme" in body + finally: + r2h.stop() + finally: + os.unlink(epg_path) diff --git a/openwrt-support/luci-app-rtp2httpd/htdocs/luci-static/resources/view/rtp2httpd.js b/openwrt-support/luci-app-rtp2httpd/htdocs/luci-static/resources/view/rtp2httpd.js index 55ed2cc6..61e2a0f7 100644 --- a/openwrt-support/luci-app-rtp2httpd/htdocs/luci-static/resources/view/rtp2httpd.js +++ b/openwrt-support/luci-app-rtp2httpd/htdocs/luci-static/resources/view/rtp2httpd.js @@ -37,6 +37,17 @@ return view.extend({ return null; } + if (listen.charAt(0) === "/") { + if (/\s/.test(listen)) { + return null; + } + return { + host: null, + port: null, + socketPath: listen, + }; + } + if (/^\d+$/.test(listen)) { port = listen; } else if (listen.charAt(0) === "[") { @@ -77,6 +88,7 @@ return view.extend({ return { host: host, port: port, + socketPath: null, }; }, @@ -95,7 +107,7 @@ return view.extend({ if (!this.parseListenValue(listen)) { return _( - "Use port, address:port, hostname:port, or [IPv6]:port, for example 5140 or 192.168.1.1:8081." + "Use port, address:port, hostname:port, [IPv6]:port, or an absolute Unix socket path, for example 5140, 192.168.1.1:8081, or /var/run/rtp2httpd.sock." ); } @@ -104,11 +116,14 @@ return view.extend({ getPrimaryListenTarget: function (section_id) { var values = this.getListenValues(section_id); - var target = values.length > 0 ? this.parseListenValue(values[0]) : null; + var target = null; var port; - if (target) { - return target; + for (var i = 0; i < values.length; i++) { + target = this.parseListenValue(values[i]); + if (target && target.port) { + return target; + } } port = uci.get("rtp2httpd", section_id, "port") || "5140"; @@ -373,7 +388,7 @@ return view.extend({ "listen", _("Listen Addresses"), _( - "HTTP listen addresses. Use a bare port for all addresses (e.g., 5140), address:port for IPv4/hostnames, or [IPv6]:port." + "HTTP listen addresses. Use a bare port for all addresses (e.g., 5140), address:port for IPv4/hostnames, [IPv6]:port, or an absolute Unix socket path." ) ); o.placeholder = "5140"; diff --git a/openwrt-support/luci-app-rtp2httpd/po/templates/rtp2httpd.pot b/openwrt-support/luci-app-rtp2httpd/po/templates/rtp2httpd.pot index e84f736d..60d88bb2 100644 --- a/openwrt-support/luci-app-rtp2httpd/po/templates/rtp2httpd.pot +++ b/openwrt-support/luci-app-rtp2httpd/po/templates/rtp2httpd.pot @@ -106,7 +106,8 @@ msgstr "" #: htdocs/luci-static/resources/view/rtp2httpd.js:361 msgid "" "HTTP listen addresses. Use a bare port for all addresses (e.g., 5140), " -"address:port for IPv4/hostnames, or [IPv6]:port." +"address:port for IPv4/hostnames, [IPv6]:port, or an absolute Unix socket " +"path." msgstr "" #: htdocs/luci-static/resources/view/rtp2httpd.js:695 @@ -321,8 +322,8 @@ msgstr "" #: htdocs/luci-static/resources/view/rtp2httpd.js:98 msgid "" -"Use port, address:port, hostname:port, or [IPv6]:port, for example 5140 or " -"192.168.1.1:8081." +"Use port, address:port, hostname:port, [IPv6]:port, or an absolute Unix " +"socket path, for example 5140, 192.168.1.1:8081, or /var/run/rtp2httpd.sock." msgstr "" #: htdocs/luci-static/resources/view/rtp2httpd.js:753 diff --git a/openwrt-support/luci-app-rtp2httpd/po/zh_Hans/rtp2httpd.po b/openwrt-support/luci-app-rtp2httpd/po/zh_Hans/rtp2httpd.po index 7847fdab..b8e5cf59 100644 --- a/openwrt-support/luci-app-rtp2httpd/po/zh_Hans/rtp2httpd.po +++ b/openwrt-support/luci-app-rtp2httpd/po/zh_Hans/rtp2httpd.po @@ -122,10 +122,11 @@ msgstr "HTTP 代理 User-Agent" #: htdocs/luci-static/resources/view/rtp2httpd.js:361 msgid "" "HTTP listen addresses. Use a bare port for all addresses (e.g., 5140), " -"address:port for IPv4/hostnames, or [IPv6]:port." +"address:port for IPv4/hostnames, [IPv6]:port, or an absolute Unix socket " +"path." msgstr "" "HTTP 监听地址。使用裸端口表示监听所有地址(例如:5140),IPv4/主机名使用 " -"address:port,IPv6 使用 [IPv6]:port。" +"address:port,IPv6 使用 [IPv6]:port,也可以使用 Unix socket 绝对路径。" #: htdocs/luci-static/resources/view/rtp2httpd.js:695 msgid "Hostname" @@ -356,11 +357,11 @@ msgstr "使用配置文件而不是单独的选项" #: htdocs/luci-static/resources/view/rtp2httpd.js:98 msgid "" -"Use port, address:port, hostname:port, or [IPv6]:port, for example 5140 or " -"192.168.1.1:8081." +"Use port, address:port, hostname:port, [IPv6]:port, or an absolute Unix " +"socket path, for example 5140, 192.168.1.1:8081, or /var/run/rtp2httpd.sock." msgstr "" -"请使用端口、address:port、hostname:port 或 [IPv6]:port 格式,例如 5140 或 " -"192.168.1.1:8081。" +"请使用端口、address:port、hostname:port、[IPv6]:port 或 Unix socket 绝对路径," +"例如 5140、192.168.1.1:8081 或 /var/run/rtp2httpd.sock。" #: htdocs/luci-static/resources/view/rtp2httpd.js:753 msgid "" diff --git a/openwrt-support/rtp2httpd/files/rtp2httpd.conf b/openwrt-support/rtp2httpd/files/rtp2httpd.conf index c3b85373..c6783628 100644 --- a/openwrt-support/rtp2httpd/files/rtp2httpd.conf +++ b/openwrt-support/rtp2httpd/files/rtp2httpd.conf @@ -5,10 +5,12 @@ config rtp2httpd # option verbose '1' # HTTP listen addresses. Use a bare port to listen on all addresses. + # Use an absolute path to listen on a Unix domain socket. # Multiple listen addresses are supported. # list listen '5140' # list listen '192.168.1.1:8081' # list listen '[::1]:5140' + # list listen '/var/run/rtp2httpd.sock' # Legacy single-port option, kept for backward compatibility: # option port '5140' diff --git a/rtp2httpd.conf b/rtp2httpd.conf index be19e4eb..0b6cd8ff 100644 --- a/rtp2httpd.conf +++ b/rtp2httpd.conf @@ -126,10 +126,11 @@ verbosity = 1 ;ffmpeg-args = -hwaccel none [bind] -#List of address and ports to bind to, eg. +#List of TCP address/ports or Unix socket paths to bind to, eg. ;mybox.example.net 5140 ;mybox2.example.net 8000 ;2001::1 http-alt +;/var/run/rtp2httpd.sock #Note that binding to port number < 1024 will #require root privelegies and therefore is diff --git a/src/configuration.c b/src/configuration.c index 311c8188..2ea4977b 100644 --- a/src/configuration.c +++ b/src/configuration.c @@ -100,6 +100,53 @@ static void safe_free_string(char **str) { } } +static int bind_path_is_valid(const char *path) { + if (!path || path[0] != '/') + return 0; + return strpbrk(path, " \t\r\n") == NULL; +} + +static void add_bindaddr_tcp(char *node, char *service) { + bindaddr_t *ba = malloc(sizeof(bindaddr_t)); + if (!ba) { + logger(LOG_ERROR, "Failed to allocate bind address"); + free(node); + free(service); + return; + } + memset(ba, 0, sizeof(*ba)); + ba->type = BIND_ADDR_TCP; + ba->node = node; + ba->service = service; + ba->next = bind_addresses; + bind_addresses = ba; +} + +static void add_bindaddr_unix(char *path) { + bindaddr_t *ba = malloc(sizeof(bindaddr_t)); + if (!ba) { + logger(LOG_ERROR, "Failed to allocate Unix socket bind address"); + free(path); + return; + } + memset(ba, 0, sizeof(*ba)); + ba->type = BIND_ADDR_UNIX; + ba->path = path; + ba->next = bind_addresses; + bind_addresses = ba; +} + +static void apply_bind_side_effects(void) { + int has_unix = bind_addresses_has_unix(); + if (has_unix) { + if (config.zerocopy_on_send) + config.zerocopy_on_send = 0; + logger(LOG_WARN, "Zero-copy send disabled because Unix socket listener is configured"); + } else if (!has_unix && cmd_zerocopy_on_send_set) { + config.zerocopy_on_send = 1; + } +} + static int parse_port_range_value(const char *value, int *min_port, int *max_port) { char *endptr = NULL; long start = 0; @@ -206,11 +253,43 @@ static void set_player_page_path_value(const char *value) { void parse_bind_sec(char *line) { int pos = 0; char *node, *service; - bindaddr_t *ba; node = extract_token(line, &pos); service = extract_token(line, &pos); + if (!node || node[0] == '\0') { + logger(LOG_ERROR, "Invalid empty bind address line"); + free(node); + free(service); + return; + } + + if (node[0] == '/') { + if (service && service[0] != '\0') { + logger(LOG_ERROR, "Invalid Unix socket bind path (whitespace is not supported): %s", node); + free(node); + free(service); + return; + } + if (!bind_path_is_valid(node)) { + logger(LOG_ERROR, "Invalid Unix socket bind path: %s", node); + free(node); + free(service); + return; + } + logger(LOG_DEBUG, "unix socket path: %s", node); + add_bindaddr_unix(node); + free(service); + return; + } + + if (!service || service[0] == '\0') { + logger(LOG_ERROR, "Invalid TCP bind address line: missing port for %s", node); + free(node); + free(service); + return; + } + if (strcmp("*", node) == 0) { free(node); node = NULL; @@ -224,11 +303,7 @@ void parse_bind_sec(char *line) { } logger(LOG_DEBUG, "node: %s, port: %s", node, service); - ba = malloc(sizeof(bindaddr_t)); - ba->node = node; - ba->service = service; - ba->next = bind_addresses; - bind_addresses = ba; + add_bindaddr_tcp(node, service); } void parse_services_sec(char *line) { @@ -700,6 +775,7 @@ bindaddr_t *new_empty_bindaddr(void) { bindaddr_t *ba; ba = malloc(sizeof(bindaddr_t)); memset(ba, 0, sizeof(*ba)); + ba->type = BIND_ADDR_TCP; ba->service = strdup("5140"); return ba; } @@ -713,6 +789,8 @@ void free_bindaddr(bindaddr_t *ba) { free(bat->node); if (bat->service) free(bat->service); + if (bat->path) + free(bat->path); free(bat); } } @@ -730,8 +808,11 @@ static bindaddr_t *copy_bindaddr(bindaddr_t *src) { free_bindaddr(head); return NULL; } + memset(copy, 0, sizeof(*copy)); + copy->type = src->type; copy->node = src->node ? strdup(src->node) : NULL; copy->service = src->service ? strdup(src->service) : NULL; + copy->path = src->path ? strdup(src->path) : NULL; copy->next = NULL; *tail = copy; tail = ©->next; @@ -747,6 +828,9 @@ static bindaddr_t *copy_bindaddr(bindaddr_t *src) { */ int bind_addresses_equal(bindaddr_t *a, bindaddr_t *b) { while (a && b) { + if (a->type != b->type) + return 0; + /* Compare node (both NULL or both equal) */ if (a->node == NULL && b->node != NULL) return 0; @@ -763,6 +847,14 @@ int bind_addresses_equal(bindaddr_t *a, bindaddr_t *b) { if (a->service && b->service && strcmp(a->service, b->service) != 0) return 0; + /* Compare path (both NULL or both equal) */ + if (a->path == NULL && b->path != NULL) + return 0; + if (a->path != NULL && b->path == NULL) + return 0; + if (a->path && b->path && strcmp(a->path, b->path) != 0) + return 0; + a = a->next; b = b->next; } @@ -771,6 +863,15 @@ int bind_addresses_equal(bindaddr_t *a, bindaddr_t *b) { return (a == NULL && b == NULL); } +int bind_addresses_has_unix(void) { + bindaddr_t *ba; + for (ba = bind_addresses; ba; ba = ba->next) { + if (ba->type == BIND_ADDR_UNIX) + return 1; + } + return 0; +} + /** * Get the config file path */ @@ -940,6 +1041,8 @@ int config_reload(int *out_bind_changed) { return -1; } + apply_bind_side_effects(); + /* Check if bind addresses changed */ if (out_bind_changed) { *out_bind_changed = !bind_addresses_equal(bind_addresses, old_bind_addresses); @@ -980,7 +1083,8 @@ void usage(FILE *f, char *progname) { "pool (default 16384)\n" "\t-B --udp-rcvbuf-size UDP socket receive buffer size for " "multicast/FCC/RTSP (default 524288 = 512KB)\n" - "\t-l --listen [addr:]port Address/port to bind (default ANY:5140)\n" + "\t-l --listen [addr:]port|/path.sock TCP address/port or Unix " + "socket path to bind (default ANY:5140)\n" "\t-c --config Read this file for configuration, instead of the " "default one\n" "\t-C --noconfig Do not read the default config\n" @@ -1032,7 +1136,22 @@ void usage(FILE *f, char *progname) { void parse_bind_cmd(char *arg) { char *p, *node, *service; - bindaddr_t *ba; + + if (arg && arg[0] == '/') { + char *path; + if (!bind_path_is_valid(arg)) { + logger(LOG_ERROR, "Invalid Unix socket bind path: %s", arg); + return; + } + path = strdup(arg); + if (!path) { + logger(LOG_ERROR, "Failed to allocate Unix socket bind path"); + return; + } + logger(LOG_DEBUG, "unix socket path: %s", path); + add_bindaddr_unix(path); + return; + } if (arg[0] == '[') { p = index(arg++, ']'); @@ -1053,11 +1172,7 @@ void parse_bind_cmd(char *arg) { } logger(LOG_DEBUG, "node: %s, port: %s", node, service); - ba = malloc(sizeof(bindaddr_t)); - ba->node = node; - ba->service = service; - ba->next = bind_addresses; - bind_addresses = ba; + add_bindaddr_tcp(node, service); } void parse_cmd_line(int argc, char *argv[]) { @@ -1167,6 +1282,10 @@ void parse_cmd_line(int argc, char *argv[]) { set_config_file_path(NULL); /* No config file */ break; case 'l': + if (!cmd_bind_set && bind_addresses) { + free_bindaddr(bind_addresses); + bind_addresses = NULL; + } parse_bind_cmd(optarg); cmd_bind_set = 1; break; @@ -1314,6 +1433,8 @@ void parse_cmd_line(int argc, char *argv[]) { set_config_file_path(NULL); } + apply_bind_side_effects(); + /* External M3U will be loaded asynchronously by workers after startup * This avoids blocking the startup process waiting for network resources */ if (config.external_m3u_url) { diff --git a/src/configuration.h b/src/configuration.h index a701bbcf..64c14aab 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -20,12 +20,19 @@ typedef enum loglevel { LOG_DEBUG /* Detailed diagnostic information */ } loglevel_t; +typedef enum { + BIND_ADDR_TCP = 0, + BIND_ADDR_UNIX +} bindaddr_type_t; + /* * Linked list of addresses to bind */ typedef struct bindaddr_s { + bindaddr_type_t type; char *node; char *service; + char *path; struct bindaddr_s *next; } bindaddr_t; @@ -186,4 +193,10 @@ void set_config_file_path(const char *path); */ int bind_addresses_equal(bindaddr_t *a, bindaddr_t *b); +/** + * Check whether any configured bind address is a Unix domain socket path. + * @return 1 if at least one Unix socket listener is configured, 0 otherwise + */ +int bind_addresses_has_unix(void); + #endif /* __CONFIGURATION_H__ */ diff --git a/src/connection.c b/src/connection.c index 90ac0374..d1b1a577 100644 --- a/src/connection.c +++ b/src/connection.c @@ -48,6 +48,12 @@ typedef enum { TOKEN_SOURCE_UA /* From User-Agent R2HTOKEN/xxx */ } token_source_t; +static int connection_client_is_tcp(const connection_t *c) { + if (!c || c->client_addr_len == 0) + return 0; + return c->client_addr.ss_family == AF_INET || c->client_addr.ss_family == AF_INET6; +} + /** * Parse cookie value from Cookie header string * Format: "name1=value1; name2=value2" @@ -472,14 +478,16 @@ connection_t *connection_create(int fd, int epfd, struct sockaddr_storage *clien /* Enforce TCP user timeout so unacknowledged data fails quickly */ #ifdef TCP_USER_TIMEOUT - int tcp_user_timeout = CONNECTION_TCP_USER_TIMEOUT_MS; - if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, sizeof(tcp_user_timeout)) < 0) { - logger(LOG_DEBUG, "connection_create: Failed to set TCP_USER_TIMEOUT: %s", strerror(errno)); + if (connection_client_is_tcp(c)) { + int tcp_user_timeout = CONNECTION_TCP_USER_TIMEOUT_MS; + if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, sizeof(tcp_user_timeout)) < 0) { + logger(LOG_DEBUG, "connection_create: Failed to set TCP_USER_TIMEOUT: %s", strerror(errno)); + } } #endif /* Enable SO_ZEROCOPY on socket if supported */ - if (config.zerocopy_on_send) { + if (config.zerocopy_on_send && connection_client_is_tcp(c)) { int one = 1; if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)) == 0) { c->zerocopy_enabled = 1; @@ -729,7 +737,9 @@ int connection_route_and_start(connection_t *c) { /* Format client address string (will be overridden by X-Forwarded-For if * present later) */ char client_addr_str[NI_MAXHOST + NI_MAXSERV + 4] = "unknown"; - if (c->client_addr_len > 0) { + if (c->client_addr_len > 0 && c->client_addr.ss_family == AF_UNIX) { + snprintf(client_addr_str, sizeof(client_addr_str), "unix"); + } else if (c->client_addr_len > 0) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; int r = getnameinfo((struct sockaddr *)&c->client_addr, c->client_addr_len, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV); diff --git a/src/m3u.c b/src/m3u.c index ff4044a3..0f258b3c 100644 --- a/src/m3u.c +++ b/src/m3u.c @@ -171,12 +171,16 @@ char *get_server_address(void) { char server_port[16]; char full_url[2048]; - /* Get first listening port from bind_addresses */ - if (bind_addresses && bind_addresses->service) { - strncpy(server_port, bind_addresses->service, sizeof(server_port) - 1); - server_port[sizeof(server_port) - 1] = '\0'; - } else { - strcpy(server_port, "5140"); + snprintf(server_port, sizeof(server_port), "5140"); + + /* Get first TCP listening port from bind_addresses. Unix socket listeners + * do not provide a usable HTTP authority for generated playlist URLs. */ + for (bindaddr_t *ba = bind_addresses; ba; ba = ba->next) { + if (ba->type == BIND_ADDR_TCP && ba->service) { + strncpy(server_port, ba->service, sizeof(server_port) - 1); + server_port[sizeof(server_port) - 1] = '\0'; + break; + } } /* Priority 1: Use configured hostname */ @@ -200,7 +204,7 @@ char *get_server_address(void) { /* Build full URL */ /* Default protocol to http if not specified */ if (protocol[0] == '\0') { - strcpy(protocol, "http"); + snprintf(protocol, sizeof(protocol), "http"); /* Use port from config if specified, otherwise use server_port */ if (port[0] == '\0') { diff --git a/src/supervisor.c b/src/supervisor.c index b32757b4..38cc15d8 100644 --- a/src/supervisor.c +++ b/src/supervisor.c @@ -13,11 +13,13 @@ #include #include #include +#include #include +#include #include #include -#define MAX_S 10 +#define MAX_S 32 /* Restart rate limiting constants */ #define RESTART_WINDOW_SEC 5 /* Time window for restart counting */ @@ -38,6 +40,9 @@ static int num_workers = 0; static volatile sig_atomic_t supervisor_stop_flag = 0; static volatile sig_atomic_t supervisor_reload_flag = 0; static volatile sig_atomic_t supervisor_restart_workers_flag = 0; +static int unix_listen_sockets[MAX_S]; +static char *unix_listen_paths[MAX_S]; +static int unix_listen_count = 0; /* Forward declarations */ static void supervisor_signal_handler(int signum); @@ -45,6 +50,8 @@ static void supervisor_sighup_handler(int signum); static void supervisor_sigusr1_handler(int signum); static int spawn_worker(int worker_idx); static void cleanup_workers(void); +static int setup_unix_listeners(void); +static void cleanup_unix_listeners(void); /** * Signal handler for supervisor process (SIGTERM/SIGINT) @@ -190,6 +197,119 @@ static void cleanup_workers(void) { num_workers = 0; } +static void cleanup_unix_listeners(void) { + for (int i = 0; i < unix_listen_count; i++) { + if (unix_listen_sockets[i] >= 0) { + close(unix_listen_sockets[i]); + unix_listen_sockets[i] = -1; + } + if (unix_listen_paths[i]) { + struct stat st; + if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { + if (unlink(unix_listen_paths[i]) < 0) { + logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); + } + } + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + } + unix_listen_count = 0; +} + +static int setup_unix_listeners(void) { + bindaddr_t *bind_addr; + + for (bind_addr = bind_addresses; bind_addr; bind_addr = bind_addr->next) { + struct sockaddr_un addr; + struct stat st; + int sock = -1; + char *path_copy = NULL; + + if (bind_addr->type != BIND_ADDR_UNIX) + continue; + + if (!bind_addr->path || bind_addr->path[0] != '/') { + logger(LOG_FATAL, "Invalid Unix socket path"); + cleanup_unix_listeners(); + return -1; + } + + if (strlen(bind_addr->path) >= sizeof(addr.sun_path)) { + logger(LOG_FATAL, "Unix socket path is too long: %s", bind_addr->path); + cleanup_unix_listeners(); + return -1; + } + + if (unix_listen_count >= MAX_S) { + logger(LOG_FATAL, "Too many listening sockets (max %d)", MAX_S); + cleanup_unix_listeners(); + return -1; + } + + if (lstat(bind_addr->path, &st) == 0) { + if (!S_ISSOCK(st.st_mode)) { + logger(LOG_FATAL, "Unix socket path exists and is not a socket: %s", bind_addr->path); + cleanup_unix_listeners(); + return -1; + } + if (unlink(bind_addr->path) < 0) { + logger(LOG_FATAL, "Failed to remove stale Unix socket %s: %s", bind_addr->path, strerror(errno)); + cleanup_unix_listeners(); + return -1; + } + } else if (errno != ENOENT) { + logger(LOG_FATAL, "Failed to inspect Unix socket path %s: %s", bind_addr->path, strerror(errno)); + cleanup_unix_listeners(); + return -1; + } + + path_copy = strdup(bind_addr->path); + if (!path_copy) { + logger(LOG_FATAL, "Failed to allocate Unix socket path"); + cleanup_unix_listeners(); + return -1; + } + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + logger(LOG_FATAL, "Cannot create Unix socket %s: %s", bind_addr->path, strerror(errno)); + free(path_copy); + cleanup_unix_listeners(); + return -1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, bind_addr->path, sizeof(addr.sun_path) - 1); + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + logger(LOG_FATAL, "Cannot bind Unix socket %s: %s", bind_addr->path, strerror(errno)); + close(sock); + free(path_copy); + cleanup_unix_listeners(); + return -1; + } + + if (listen(sock, 128) < 0) { + logger(LOG_FATAL, "Cannot listen on Unix socket %s: %s", bind_addr->path, strerror(errno)); + close(sock); + unlink(bind_addr->path); + free(path_copy); + cleanup_unix_listeners(); + return -1; + } + + unix_listen_sockets[unix_listen_count] = sock; + unix_listen_paths[unix_listen_count] = path_copy; + unix_listen_count++; + + logger(LOG_INFO, "Listening on Unix socket %s", bind_addr->path); + } + + return 0; +} + int supervisor_run(void) { int i; @@ -211,6 +331,16 @@ int supervisor_run(void) { } } + if (bind_addresses == NULL) { + bind_addresses = new_empty_bindaddr(); + } + + if (setup_unix_listeners() < 0) { + cleanup_workers(); + status_cleanup(); + return -1; + } + /* Spawn all workers */ for (i = 0; i < num_workers; i++) { if (spawn_worker(i) < 0) { @@ -309,6 +439,13 @@ int supervisor_run(void) { int bind_changed = 0; if (config_reload(&bind_changed) == 0) { + if (bind_changed) { + cleanup_unix_listeners(); + if (setup_unix_listeners() < 0) { + logger(LOG_ERROR, "Failed to set up Unix socket listeners after configuration reload"); + } + } + /* Handle worker count changes */ if (config.workers > num_workers) { if (bind_changed) { @@ -427,6 +564,9 @@ int supervisor_run(void) { logger(LOG_INFO, "All workers stopped, cleaning up"); + /* Close supervisor-owned Unix listeners and remove socket paths */ + cleanup_unix_listeners(); + /* Clean up worker array */ cleanup_workers(); @@ -440,9 +580,11 @@ int supervisor_run(void) { int run_worker(void) { struct addrinfo hints, *res, *ai; bindaddr_t *bind_addr; + int i; int r; int s[MAX_S]; int maxs, nfds; + int max_tcp_sockets; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; const int on = 1; int notif_fd = -1; @@ -466,19 +608,25 @@ int run_worker(void) { hints.ai_flags = AI_PASSIVE; maxs = 0; nfds = -1; + max_tcp_sockets = MAX_S - unix_listen_count; + if (max_tcp_sockets < 0) + max_tcp_sockets = 0; if (bind_addresses == NULL) { bind_addresses = new_empty_bindaddr(); } for (bind_addr = bind_addresses; bind_addr; bind_addr = bind_addr->next) { + if (bind_addr->type == BIND_ADDR_UNIX) + continue; + r = getaddrinfo(bind_addr->node, bind_addr->service, &hints, &res); if (r) { logger(LOG_FATAL, "GAI: %s", gai_strerror(r)); return EXIT_FAILURE; } - for (ai = res; ai && maxs < MAX_S; ai = ai->ai_next) { + for (ai = res; ai && maxs < max_tcp_sockets; ai = ai->ai_next) { s[maxs] = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); if (s[maxs] < 0) continue; @@ -532,6 +680,18 @@ int run_worker(void) { freeaddrinfo(res); } + for (i = 0; i < unix_listen_count && maxs < MAX_S; i++) { + s[maxs] = unix_listen_sockets[i]; + if (s[maxs] > nfds) + nfds = s[maxs]; + maxs++; + } + + if (i < unix_listen_count) { + logger(LOG_FATAL, "Too many listening sockets (max %d)", MAX_S); + return EXIT_FAILURE; + } + if (maxs == 0) { logger(LOG_FATAL, "No socket to listen!"); return EXIT_FAILURE; diff --git a/src/worker.c b/src/worker.c index 628ab94e..748eb2ee 100644 --- a/src/worker.c +++ b/src/worker.c @@ -350,7 +350,9 @@ int worker_run_event_loop(int *listen_sockets, int num_sockets, int notif_fd) { break; } connection_set_nonblocking(cfd); - connection_set_tcp_nodelay(cfd); + if (client.ss_family == AF_INET || client.ss_family == AF_INET6) { + connection_set_tcp_nodelay(cfd); + } /* Create connection * status_index will be assigned later by status_register_client() if From 143aa62a740195f1e00cf0015ea7ae5cbe1a8eed Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 04:13:44 +0800 Subject: [PATCH 2/6] chore(lint): fix listener checks --- e2e/test_fcc.py | 1 + src/configuration.h | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/e2e/test_fcc.py b/e2e/test_fcc.py index 2079486b..dfd65668 100644 --- a/e2e/test_fcc.py +++ b/e2e/test_fcc.py @@ -285,6 +285,7 @@ def test_huawei_fcc_socket_pair_stays_inside_listen_range(self, r2h_binary): r2h.start() fcc.start() try: + assert r2h.port is not None url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei" status, _, body = stream_get( "127.0.0.1", diff --git a/src/configuration.h b/src/configuration.h index 64c14aab..064452c6 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -20,10 +20,7 @@ typedef enum loglevel { LOG_DEBUG /* Detailed diagnostic information */ } loglevel_t; -typedef enum { - BIND_ADDR_TCP = 0, - BIND_ADDR_UNIX -} bindaddr_type_t; +typedef enum { BIND_ADDR_TCP = 0, BIND_ADDR_UNIX } bindaddr_type_t; /* * Linked list of addresses to bind From e993a807e6f3e459c997be9e055237766f0ba965 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 04:28:45 +0800 Subject: [PATCH 3/6] fix(listener): keep unix sockets on reload failure --- e2e/helpers/http.py | 8 +- e2e/test_unix_socket.py | 45 +++++++++++ src/configuration.c | 4 +- src/configuration.h | 1 + src/supervisor.c | 162 +++++++++++++++++++++++++++++++++++----- 5 files changed, 199 insertions(+), 21 deletions(-) diff --git a/e2e/helpers/http.py b/e2e/helpers/http.py index b9a69d19..88864bfa 100644 --- a/e2e/helpers/http.py +++ b/e2e/helpers/http.py @@ -54,7 +54,13 @@ def _parse_raw_http_response(data: bytes, lower_header_names: bool = False) -> t header_text = data[:header_end].decode(errors="replace") body = data[header_end + 4 :] parts = header_text.split("\r\n") - status_code = int(parts[0].split()[1]) + status_line = parts[0].split() + if len(status_line) < 2: + return 0, {}, b"" + try: + status_code = int(status_line[1]) + except ValueError: + return 0, {}, b"" hdrs: dict[str, str] = {} for line in parts[1:]: diff --git a/e2e/test_unix_socket.py b/e2e/test_unix_socket.py index 7e68d3bd..d7a4cc1d 100644 --- a/e2e/test_unix_socket.py +++ b/e2e/test_unix_socket.py @@ -9,6 +9,7 @@ from __future__ import annotations import os +import signal import socket import tempfile import time @@ -142,6 +143,50 @@ def test_regular_file_socket_path_fails_startup(self, r2h_binary): finally: r2h.stop() + def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + old_sock_path = os.path.join(tmpdir, "old.sock") + bad_sock_path = os.path.join(tmpdir, "bad.sock") + with open(bad_sock_path, "wb") as f: + f.write(b"not a socket") + + old_config = f"""\ +[global] +verbosity = 4 + +[bind] +{old_sock_path} +""" + bad_config = f"""\ +[global] +verbosity = 4 + +[bind] +{bad_sock_path} +""" + r2h = R2HProcess( + r2h_binary, None, config_content=old_config, capture_log=True, wait_socket_path=old_sock_path + ) + try: + r2h.start() + status, _, _ = unix_http_get(old_sock_path, "/status") + assert status == 200 + + assert r2h._config_path is not None + with open(r2h._config_path, "w") as f: + f.write(bad_config) + assert r2h.process is not None + os.kill(r2h.process.pid, signal.SIGHUP) + time.sleep(0.5) + + status, _, _ = unix_http_get(old_sock_path, "/status") + assert status == 200 + log = r2h.read_log() + assert "Unix socket path exists and is not a socket" in log + assert "keeping existing workers and listeners" in log + finally: + r2h.stop() + def test_unix_socket_disables_zerocopy(self, r2h_binary): with tempfile.TemporaryDirectory() as tmpdir: sock_path = _socket_path(tmpdir) diff --git a/src/configuration.c b/src/configuration.c index 2ea4977b..38695a4a 100644 --- a/src/configuration.c +++ b/src/configuration.c @@ -798,7 +798,7 @@ void free_bindaddr(bindaddr_t *ba) { /** * Deep copy a bind address list */ -static bindaddr_t *copy_bindaddr(bindaddr_t *src) { +bindaddr_t *bindaddr_copy(bindaddr_t *src) { bindaddr_t *head = NULL; bindaddr_t **tail = &head; @@ -1020,7 +1020,7 @@ int config_reload(int *out_bind_changed) { } /* Save current bind addresses for comparison and potential rollback */ - old_bind_addresses = copy_bindaddr(bind_addresses); + old_bind_addresses = bindaddr_copy(bind_addresses); /* Step 1: Cleanup all configuration resources */ config_cleanup(false); diff --git a/src/configuration.h b/src/configuration.h index 064452c6..28d45d01 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -144,6 +144,7 @@ void parse_cmd_line(int argc, char *argv[]); /* Memory management */ bindaddr_t *new_empty_bindaddr(void); void free_bindaddr(bindaddr_t *); +bindaddr_t *bindaddr_copy(bindaddr_t *); /* Configuration lifecycle */ diff --git a/src/supervisor.c b/src/supervisor.c index 38cc15d8..32d4e133 100644 --- a/src/supervisor.c +++ b/src/supervisor.c @@ -52,6 +52,7 @@ static int spawn_worker(int worker_idx); static void cleanup_workers(void); static int setup_unix_listeners(void); static void cleanup_unix_listeners(void); +static int replace_unix_listeners_atomically(void); /** * Signal handler for supervisor process (SIGTERM/SIGINT) @@ -217,57 +218,120 @@ static void cleanup_unix_listeners(void) { unix_listen_count = 0; } -static int setup_unix_listeners(void) { +static int find_current_unix_listener(const char *path) { + for (int i = 0; i < unix_listen_count; i++) { + if (unix_listen_paths[i] && strcmp(unix_listen_paths[i], path) == 0) + return i; + } + return -1; +} + +static int temp_unix_listener_path_exists(char **paths, int count, const char *path) { + for (int i = 0; i < count; i++) { + if (paths[i] && strcmp(paths[i], path) == 0) + return 1; + } + return 0; +} + +static void cleanup_temp_unix_listeners(int *sockets, char **paths, int *owned, int count) { + for (int i = 0; i < count; i++) { + if (owned[i] && sockets[i] >= 0) { + close(sockets[i]); + sockets[i] = -1; + if (paths[i]) + unlink(paths[i]); + } + if (paths[i]) { + free(paths[i]); + paths[i] = NULL; + } + } +} + +static int build_unix_listener_set(int *sockets, char **paths, int *owned, int *reused, int *count) { bindaddr_t *bind_addr; + *count = 0; + for (int i = 0; i < MAX_S; i++) { + sockets[i] = -1; + paths[i] = NULL; + owned[i] = 0; + reused[i] = 0; + } + for (bind_addr = bind_addresses; bind_addr; bind_addr = bind_addr->next) { struct sockaddr_un addr; struct stat st; int sock = -1; char *path_copy = NULL; + int current_idx; if (bind_addr->type != BIND_ADDR_UNIX) continue; if (!bind_addr->path || bind_addr->path[0] != '/') { logger(LOG_FATAL, "Invalid Unix socket path"); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } if (strlen(bind_addr->path) >= sizeof(addr.sun_path)) { logger(LOG_FATAL, "Unix socket path is too long: %s", bind_addr->path); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } - if (unix_listen_count >= MAX_S) { + if (*count >= MAX_S) { logger(LOG_FATAL, "Too many listening sockets (max %d)", MAX_S); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + if (temp_unix_listener_path_exists(paths, *count, bind_addr->path)) { + logger(LOG_FATAL, "Duplicate Unix socket listener path: %s", bind_addr->path); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } + current_idx = find_current_unix_listener(bind_addr->path); + if (current_idx >= 0) { + path_copy = strdup(bind_addr->path); + if (!path_copy) { + logger(LOG_FATAL, "Failed to allocate Unix socket path"); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + sockets[*count] = unix_listen_sockets[current_idx]; + paths[*count] = path_copy; + owned[*count] = 0; + reused[current_idx] = 1; + (*count)++; + logger(LOG_INFO, "Keeping Unix socket listener %s", bind_addr->path); + continue; + } + if (lstat(bind_addr->path, &st) == 0) { if (!S_ISSOCK(st.st_mode)) { logger(LOG_FATAL, "Unix socket path exists and is not a socket: %s", bind_addr->path); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } if (unlink(bind_addr->path) < 0) { logger(LOG_FATAL, "Failed to remove stale Unix socket %s: %s", bind_addr->path, strerror(errno)); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } } else if (errno != ENOENT) { logger(LOG_FATAL, "Failed to inspect Unix socket path %s: %s", bind_addr->path, strerror(errno)); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } path_copy = strdup(bind_addr->path); if (!path_copy) { logger(LOG_FATAL, "Failed to allocate Unix socket path"); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } @@ -275,7 +339,7 @@ static int setup_unix_listeners(void) { if (sock < 0) { logger(LOG_FATAL, "Cannot create Unix socket %s: %s", bind_addr->path, strerror(errno)); free(path_copy); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } @@ -287,7 +351,7 @@ static int setup_unix_listeners(void) { logger(LOG_FATAL, "Cannot bind Unix socket %s: %s", bind_addr->path, strerror(errno)); close(sock); free(path_copy); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } @@ -296,20 +360,65 @@ static int setup_unix_listeners(void) { close(sock); unlink(bind_addr->path); free(path_copy); - cleanup_unix_listeners(); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } - unix_listen_sockets[unix_listen_count] = sock; - unix_listen_paths[unix_listen_count] = path_copy; - unix_listen_count++; - + sockets[*count] = sock; + paths[*count] = path_copy; + owned[*count] = 1; + (*count)++; logger(LOG_INFO, "Listening on Unix socket %s", bind_addr->path); } return 0; } +static int replace_unix_listeners_atomically(void) { + int sockets[MAX_S]; + char *paths[MAX_S]; + int owned[MAX_S]; + int reused[MAX_S]; + int count = 0; + + if (build_unix_listener_set(sockets, paths, owned, reused, &count) < 0) + return -1; + + for (int i = 0; i < unix_listen_count; i++) { + if (reused[i]) { + if (unix_listen_paths[i]) { + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + continue; + } + if (unix_listen_sockets[i] >= 0) { + close(unix_listen_sockets[i]); + unix_listen_sockets[i] = -1; + } + if (unix_listen_paths[i]) { + struct stat st; + if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { + if (unlink(unix_listen_paths[i]) < 0) { + logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); + } + } + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + } + + for (int i = 0; i < count; i++) { + unix_listen_sockets[i] = sockets[i]; + unix_listen_paths[i] = paths[i]; + } + unix_listen_count = count; + + return 0; +} + +static int setup_unix_listeners(void) { return replace_unix_listeners_atomically(); } + int supervisor_run(void) { int i; @@ -438,14 +547,29 @@ int supervisor_run(void) { logger(LOG_INFO, "Received SIGHUP, reloading configuration"); int bind_changed = 0; + bindaddr_t *old_bind_addresses = bindaddr_copy(bind_addresses); if (config_reload(&bind_changed) == 0) { + int reload_failed = 0; if (bind_changed) { - cleanup_unix_listeners(); - if (setup_unix_listeners() < 0) { + if (replace_unix_listeners_atomically() < 0) { logger(LOG_ERROR, "Failed to set up Unix socket listeners after configuration reload"); + if (old_bind_addresses) { + free_bindaddr(bind_addresses); + bind_addresses = old_bind_addresses; + old_bind_addresses = NULL; + } + reload_failed = 1; } } + if (old_bind_addresses) + free_bindaddr(old_bind_addresses); + + if (reload_failed) { + logger(LOG_ERROR, "Configuration reload failed, keeping existing workers and listeners"); + continue; + } + /* Handle worker count changes */ if (config.workers > num_workers) { if (bind_changed) { @@ -508,6 +632,8 @@ int supervisor_run(void) { } } } else { + if (old_bind_addresses) + free_bindaddr(old_bind_addresses); logger(LOG_ERROR, "Configuration reload failed, not forwarding SIGHUP to workers"); } } From f2a29494c827853b53ec1968b6fa938e38264dc6 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 04:35:46 +0800 Subject: [PATCH 4/6] refactor(listener): split unix socket helpers --- CMakeLists.txt | 1 + src/supervisor.c | 249 ++------------------------------------------- src/unix_socket.c | 252 ++++++++++++++++++++++++++++++++++++++++++++++ src/unix_socket.h | 11 ++ 4 files changed, 270 insertions(+), 243 deletions(-) create mode 100644 src/unix_socket.c create mode 100644 src/unix_socket.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f5384f85..232bf1fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,7 @@ set(COMMON_SOURCES src/status.c src/connection.c src/worker.c + src/unix_socket.c src/buffer_pool.c src/zerocopy.c src/m3u.c diff --git a/src/supervisor.c b/src/supervisor.c index 32d4e133..7eff4a34 100644 --- a/src/supervisor.c +++ b/src/supervisor.c @@ -3,19 +3,17 @@ #include "platform_compat.h" #include "rtp2httpd.h" #include "status.h" +#include "unix_socket.h" #include "utils.h" #include "worker.h" #include "zerocopy.h" #include #include #include -#include #include #include #include -#include #include -#include #include #include @@ -40,9 +38,6 @@ static int num_workers = 0; static volatile sig_atomic_t supervisor_stop_flag = 0; static volatile sig_atomic_t supervisor_reload_flag = 0; static volatile sig_atomic_t supervisor_restart_workers_flag = 0; -static int unix_listen_sockets[MAX_S]; -static char *unix_listen_paths[MAX_S]; -static int unix_listen_count = 0; /* Forward declarations */ static void supervisor_signal_handler(int signum); @@ -50,9 +45,6 @@ static void supervisor_sighup_handler(int signum); static void supervisor_sigusr1_handler(int signum); static int spawn_worker(int worker_idx); static void cleanup_workers(void); -static int setup_unix_listeners(void); -static void cleanup_unix_listeners(void); -static int replace_unix_listeners_atomically(void); /** * Signal handler for supervisor process (SIGTERM/SIGINT) @@ -198,227 +190,6 @@ static void cleanup_workers(void) { num_workers = 0; } -static void cleanup_unix_listeners(void) { - for (int i = 0; i < unix_listen_count; i++) { - if (unix_listen_sockets[i] >= 0) { - close(unix_listen_sockets[i]); - unix_listen_sockets[i] = -1; - } - if (unix_listen_paths[i]) { - struct stat st; - if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { - if (unlink(unix_listen_paths[i]) < 0) { - logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); - } - } - free(unix_listen_paths[i]); - unix_listen_paths[i] = NULL; - } - } - unix_listen_count = 0; -} - -static int find_current_unix_listener(const char *path) { - for (int i = 0; i < unix_listen_count; i++) { - if (unix_listen_paths[i] && strcmp(unix_listen_paths[i], path) == 0) - return i; - } - return -1; -} - -static int temp_unix_listener_path_exists(char **paths, int count, const char *path) { - for (int i = 0; i < count; i++) { - if (paths[i] && strcmp(paths[i], path) == 0) - return 1; - } - return 0; -} - -static void cleanup_temp_unix_listeners(int *sockets, char **paths, int *owned, int count) { - for (int i = 0; i < count; i++) { - if (owned[i] && sockets[i] >= 0) { - close(sockets[i]); - sockets[i] = -1; - if (paths[i]) - unlink(paths[i]); - } - if (paths[i]) { - free(paths[i]); - paths[i] = NULL; - } - } -} - -static int build_unix_listener_set(int *sockets, char **paths, int *owned, int *reused, int *count) { - bindaddr_t *bind_addr; - - *count = 0; - for (int i = 0; i < MAX_S; i++) { - sockets[i] = -1; - paths[i] = NULL; - owned[i] = 0; - reused[i] = 0; - } - - for (bind_addr = bind_addresses; bind_addr; bind_addr = bind_addr->next) { - struct sockaddr_un addr; - struct stat st; - int sock = -1; - char *path_copy = NULL; - int current_idx; - - if (bind_addr->type != BIND_ADDR_UNIX) - continue; - - if (!bind_addr->path || bind_addr->path[0] != '/') { - logger(LOG_FATAL, "Invalid Unix socket path"); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - if (strlen(bind_addr->path) >= sizeof(addr.sun_path)) { - logger(LOG_FATAL, "Unix socket path is too long: %s", bind_addr->path); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - if (*count >= MAX_S) { - logger(LOG_FATAL, "Too many listening sockets (max %d)", MAX_S); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - if (temp_unix_listener_path_exists(paths, *count, bind_addr->path)) { - logger(LOG_FATAL, "Duplicate Unix socket listener path: %s", bind_addr->path); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - current_idx = find_current_unix_listener(bind_addr->path); - if (current_idx >= 0) { - path_copy = strdup(bind_addr->path); - if (!path_copy) { - logger(LOG_FATAL, "Failed to allocate Unix socket path"); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - sockets[*count] = unix_listen_sockets[current_idx]; - paths[*count] = path_copy; - owned[*count] = 0; - reused[current_idx] = 1; - (*count)++; - logger(LOG_INFO, "Keeping Unix socket listener %s", bind_addr->path); - continue; - } - - if (lstat(bind_addr->path, &st) == 0) { - if (!S_ISSOCK(st.st_mode)) { - logger(LOG_FATAL, "Unix socket path exists and is not a socket: %s", bind_addr->path); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - if (unlink(bind_addr->path) < 0) { - logger(LOG_FATAL, "Failed to remove stale Unix socket %s: %s", bind_addr->path, strerror(errno)); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - } else if (errno != ENOENT) { - logger(LOG_FATAL, "Failed to inspect Unix socket path %s: %s", bind_addr->path, strerror(errno)); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - path_copy = strdup(bind_addr->path); - if (!path_copy) { - logger(LOG_FATAL, "Failed to allocate Unix socket path"); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - sock = socket(AF_UNIX, SOCK_STREAM, 0); - if (sock < 0) { - logger(LOG_FATAL, "Cannot create Unix socket %s: %s", bind_addr->path, strerror(errno)); - free(path_copy); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, bind_addr->path, sizeof(addr.sun_path) - 1); - - if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - logger(LOG_FATAL, "Cannot bind Unix socket %s: %s", bind_addr->path, strerror(errno)); - close(sock); - free(path_copy); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - if (listen(sock, 128) < 0) { - logger(LOG_FATAL, "Cannot listen on Unix socket %s: %s", bind_addr->path, strerror(errno)); - close(sock); - unlink(bind_addr->path); - free(path_copy); - cleanup_temp_unix_listeners(sockets, paths, owned, *count); - return -1; - } - - sockets[*count] = sock; - paths[*count] = path_copy; - owned[*count] = 1; - (*count)++; - logger(LOG_INFO, "Listening on Unix socket %s", bind_addr->path); - } - - return 0; -} - -static int replace_unix_listeners_atomically(void) { - int sockets[MAX_S]; - char *paths[MAX_S]; - int owned[MAX_S]; - int reused[MAX_S]; - int count = 0; - - if (build_unix_listener_set(sockets, paths, owned, reused, &count) < 0) - return -1; - - for (int i = 0; i < unix_listen_count; i++) { - if (reused[i]) { - if (unix_listen_paths[i]) { - free(unix_listen_paths[i]); - unix_listen_paths[i] = NULL; - } - continue; - } - if (unix_listen_sockets[i] >= 0) { - close(unix_listen_sockets[i]); - unix_listen_sockets[i] = -1; - } - if (unix_listen_paths[i]) { - struct stat st; - if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { - if (unlink(unix_listen_paths[i]) < 0) { - logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); - } - } - free(unix_listen_paths[i]); - unix_listen_paths[i] = NULL; - } - } - - for (int i = 0; i < count; i++) { - unix_listen_sockets[i] = sockets[i]; - unix_listen_paths[i] = paths[i]; - } - unix_listen_count = count; - - return 0; -} - -static int setup_unix_listeners(void) { return replace_unix_listeners_atomically(); } - int supervisor_run(void) { int i; @@ -444,7 +215,7 @@ int supervisor_run(void) { bind_addresses = new_empty_bindaddr(); } - if (setup_unix_listeners() < 0) { + if (unix_socket_listeners_replace(bind_addresses) < 0) { cleanup_workers(); status_cleanup(); return -1; @@ -551,7 +322,7 @@ int supervisor_run(void) { if (config_reload(&bind_changed) == 0) { int reload_failed = 0; if (bind_changed) { - if (replace_unix_listeners_atomically() < 0) { + if (unix_socket_listeners_replace(bind_addresses) < 0) { logger(LOG_ERROR, "Failed to set up Unix socket listeners after configuration reload"); if (old_bind_addresses) { free_bindaddr(bind_addresses); @@ -691,7 +462,7 @@ int supervisor_run(void) { logger(LOG_INFO, "All workers stopped, cleaning up"); /* Close supervisor-owned Unix listeners and remove socket paths */ - cleanup_unix_listeners(); + unix_socket_listeners_cleanup(); /* Clean up worker array */ cleanup_workers(); @@ -706,7 +477,6 @@ int supervisor_run(void) { int run_worker(void) { struct addrinfo hints, *res, *ai; bindaddr_t *bind_addr; - int i; int r; int s[MAX_S]; int maxs, nfds; @@ -734,7 +504,7 @@ int run_worker(void) { hints.ai_flags = AI_PASSIVE; maxs = 0; nfds = -1; - max_tcp_sockets = MAX_S - unix_listen_count; + max_tcp_sockets = MAX_S - unix_socket_listeners_count(); if (max_tcp_sockets < 0) max_tcp_sockets = 0; @@ -806,14 +576,7 @@ int run_worker(void) { freeaddrinfo(res); } - for (i = 0; i < unix_listen_count && maxs < MAX_S; i++) { - s[maxs] = unix_listen_sockets[i]; - if (s[maxs] > nfds) - nfds = s[maxs]; - maxs++; - } - - if (i < unix_listen_count) { + if (unix_socket_listeners_append(s, MAX_S, &maxs, &nfds) < 0) { logger(LOG_FATAL, "Too many listening sockets (max %d)", MAX_S); return EXIT_FAILURE; } diff --git a/src/unix_socket.c b/src/unix_socket.c new file mode 100644 index 00000000..bb7f011e --- /dev/null +++ b/src/unix_socket.c @@ -0,0 +1,252 @@ +#include "unix_socket.h" +#include "rtp2httpd.h" +#include "utils.h" +#include +#include +#include +#include +#include +#include +#include + +#define MAX_UNIX_LISTENERS 32 + +static int unix_listen_sockets[MAX_UNIX_LISTENERS]; +static char *unix_listen_paths[MAX_UNIX_LISTENERS]; +static int unix_listen_count = 0; + +static int find_current_unix_listener(const char *path) { + for (int i = 0; i < unix_listen_count; i++) { + if (unix_listen_paths[i] && strcmp(unix_listen_paths[i], path) == 0) + return i; + } + return -1; +} + +static int temp_unix_listener_path_exists(char **paths, int count, const char *path) { + for (int i = 0; i < count; i++) { + if (paths[i] && strcmp(paths[i], path) == 0) + return 1; + } + return 0; +} + +static void cleanup_temp_unix_listeners(int *sockets, char **paths, int *owned, int count) { + for (int i = 0; i < count; i++) { + if (owned[i] && sockets[i] >= 0) { + close(sockets[i]); + sockets[i] = -1; + if (paths[i]) + unlink(paths[i]); + } + if (paths[i]) { + free(paths[i]); + paths[i] = NULL; + } + } +} + +static int build_unix_listener_set(bindaddr_t *bind_list, int *sockets, char **paths, int *owned, int *reused, + int *count) { + bindaddr_t *bind_addr; + + *count = 0; + for (int i = 0; i < MAX_UNIX_LISTENERS; i++) { + sockets[i] = -1; + paths[i] = NULL; + owned[i] = 0; + reused[i] = 0; + } + + for (bind_addr = bind_list; bind_addr; bind_addr = bind_addr->next) { + struct sockaddr_un addr; + struct stat st; + int sock = -1; + char *path_copy = NULL; + int current_idx; + + if (bind_addr->type != BIND_ADDR_UNIX) + continue; + + if (!bind_addr->path || bind_addr->path[0] != '/') { + logger(LOG_FATAL, "Invalid Unix socket path"); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + if (strlen(bind_addr->path) >= sizeof(addr.sun_path)) { + logger(LOG_FATAL, "Unix socket path is too long: %s", bind_addr->path); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + if (*count >= MAX_UNIX_LISTENERS) { + logger(LOG_FATAL, "Too many Unix socket listeners (max %d)", MAX_UNIX_LISTENERS); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + if (temp_unix_listener_path_exists(paths, *count, bind_addr->path)) { + logger(LOG_FATAL, "Duplicate Unix socket listener path: %s", bind_addr->path); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + current_idx = find_current_unix_listener(bind_addr->path); + if (current_idx >= 0) { + path_copy = strdup(bind_addr->path); + if (!path_copy) { + logger(LOG_FATAL, "Failed to allocate Unix socket path"); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + sockets[*count] = unix_listen_sockets[current_idx]; + paths[*count] = path_copy; + owned[*count] = 0; + reused[current_idx] = 1; + (*count)++; + logger(LOG_INFO, "Keeping Unix socket listener %s", bind_addr->path); + continue; + } + + if (lstat(bind_addr->path, &st) == 0) { + if (!S_ISSOCK(st.st_mode)) { + logger(LOG_FATAL, "Unix socket path exists and is not a socket: %s", bind_addr->path); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + if (unlink(bind_addr->path) < 0) { + logger(LOG_FATAL, "Failed to remove stale Unix socket %s: %s", bind_addr->path, strerror(errno)); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + } else if (errno != ENOENT) { + logger(LOG_FATAL, "Failed to inspect Unix socket path %s: %s", bind_addr->path, strerror(errno)); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + path_copy = strdup(bind_addr->path); + if (!path_copy) { + logger(LOG_FATAL, "Failed to allocate Unix socket path"); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + logger(LOG_FATAL, "Cannot create Unix socket %s: %s", bind_addr->path, strerror(errno)); + free(path_copy); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, bind_addr->path, sizeof(addr.sun_path) - 1); + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + logger(LOG_FATAL, "Cannot bind Unix socket %s: %s", bind_addr->path, strerror(errno)); + close(sock); + free(path_copy); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + if (listen(sock, 128) < 0) { + logger(LOG_FATAL, "Cannot listen on Unix socket %s: %s", bind_addr->path, strerror(errno)); + close(sock); + unlink(bind_addr->path); + free(path_copy); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + + sockets[*count] = sock; + paths[*count] = path_copy; + owned[*count] = 1; + (*count)++; + logger(LOG_INFO, "Listening on Unix socket %s", bind_addr->path); + } + + return 0; +} + +int unix_socket_listeners_replace(bindaddr_t *bind_list) { + int sockets[MAX_UNIX_LISTENERS]; + char *paths[MAX_UNIX_LISTENERS]; + int owned[MAX_UNIX_LISTENERS]; + int reused[MAX_UNIX_LISTENERS]; + int count = 0; + + if (build_unix_listener_set(bind_list, sockets, paths, owned, reused, &count) < 0) + return -1; + + for (int i = 0; i < unix_listen_count; i++) { + if (reused[i]) { + if (unix_listen_paths[i]) { + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + continue; + } + if (unix_listen_sockets[i] >= 0) { + close(unix_listen_sockets[i]); + unix_listen_sockets[i] = -1; + } + if (unix_listen_paths[i]) { + struct stat st; + if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { + if (unlink(unix_listen_paths[i]) < 0) { + logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); + } + } + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + } + + for (int i = 0; i < count; i++) { + unix_listen_sockets[i] = sockets[i]; + unix_listen_paths[i] = paths[i]; + } + unix_listen_count = count; + + return 0; +} + +void unix_socket_listeners_cleanup(void) { + for (int i = 0; i < unix_listen_count; i++) { + if (unix_listen_sockets[i] >= 0) { + close(unix_listen_sockets[i]); + unix_listen_sockets[i] = -1; + } + if (unix_listen_paths[i]) { + struct stat st; + if (lstat(unix_listen_paths[i], &st) == 0 && S_ISSOCK(st.st_mode)) { + if (unlink(unix_listen_paths[i]) < 0) { + logger(LOG_WARN, "Failed to unlink Unix socket %s: %s", unix_listen_paths[i], strerror(errno)); + } + } + free(unix_listen_paths[i]); + unix_listen_paths[i] = NULL; + } + } + unix_listen_count = 0; +} + +int unix_socket_listeners_count(void) { return unix_listen_count; } + +int unix_socket_listeners_append(int *sockets, int max_sockets, int *count, int *nfds) { + if (*count + unix_listen_count > max_sockets) + return -1; + + for (int i = 0; i < unix_listen_count; i++) { + sockets[*count] = unix_listen_sockets[i]; + if (sockets[*count] > *nfds) + *nfds = sockets[*count]; + (*count)++; + } + + return 0; +} diff --git a/src/unix_socket.h b/src/unix_socket.h new file mode 100644 index 00000000..88db5b3f --- /dev/null +++ b/src/unix_socket.h @@ -0,0 +1,11 @@ +#ifndef __UNIX_SOCKET_H__ +#define __UNIX_SOCKET_H__ + +#include "configuration.h" + +int unix_socket_listeners_replace(bindaddr_t *bind_list); +void unix_socket_listeners_cleanup(void); +int unix_socket_listeners_count(void); +int unix_socket_listeners_append(int *sockets, int max_sockets, int *count, int *nfds); + +#endif From b8bf90228bbc3423791583047d6a795277624c73 Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 04:58:37 +0800 Subject: [PATCH 5/6] fix(listener): address unix socket review feedback --- docs/en/reference/configuration.md | 2 +- docs/reference/configuration.md | 2 +- e2e/test_unix_socket.py | 47 +++++++++- src/configuration.c | 136 +++++++++++++++++++++++------ src/configuration.h | 24 +++++ src/service.c | 40 +++++++++ src/service.h | 31 +++++++ src/supervisor.c | 59 +++++++++++-- src/unix_socket.c | 41 ++++++++- 9 files changed, 342 insertions(+), 40 deletions(-) diff --git a/docs/en/reference/configuration.md b/docs/en/reference/configuration.md index 632faaa7..e44c0349 100644 --- a/docs/en/reference/configuration.md +++ b/docs/en/reference/configuration.md @@ -26,7 +26,7 @@ rtp2httpd [options] rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock ``` -Unix socket listen paths must be absolute and must not contain whitespace. At startup, if the same path already contains a socket file, rtp2httpd removes it automatically. If the path is a regular file, directory, or symbolic link, startup is rejected to avoid deleting user data. When any Unix socket listener is enabled, `zerocopy-on-send` is disabled globally. +Unix socket listen paths must be absolute and must not contain whitespace. At startup, if the same path already contains a socket file, rtp2httpd first probes whether the socket is still in use: if another process is listening on that path, startup is rejected; only confirmed stale socket files are removed automatically. If the path is a regular file, directory, or symbolic link, startup is rejected to avoid deleting user data. When any Unix socket listener is enabled, `zerocopy-on-send` is disabled globally. #### Upstream Network Interface Configuration diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 1d5ef798..363f2358 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -26,7 +26,7 @@ rtp2httpd [选项] rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock ``` -Unix socket 监听路径必须是绝对路径,且路径中不能包含空白字符。启动时如果同路径已存在 socket 文件,rtp2httpd 会自动清理;如果同路径是普通文件、目录或符号链接,则会拒绝启动以避免误删数据。启用任意 Unix socket 监听时,`zerocopy-on-send` 会被全局关闭。 +Unix socket 监听路径必须是绝对路径,且路径中不能包含空白字符。启动时如果同路径已存在 socket 文件,rtp2httpd 会先探测该 socket 是否仍在使用:如果已有进程正在监听该路径,则拒绝启动;只有确认是残留 socket 文件时才会自动清理。如果同路径是普通文件、目录或符号链接,则会拒绝启动以避免误删数据。启用任意 Unix socket 监听时,`zerocopy-on-send` 会被全局关闭。 #### 上游网络接口配置 diff --git a/e2e/test_unix_socket.py b/e2e/test_unix_socket.py index d7a4cc1d..4f533402 100644 --- a/e2e/test_unix_socket.py +++ b/e2e/test_unix_socket.py @@ -46,6 +46,21 @@ def _write_tmp(data: bytes, suffix: str = ".xml") -> str: return path +def _wait_unix_http_status(socket_path: str, path: str, expected_status: int = 200, timeout: float = 5.0) -> None: + deadline = time.time() + timeout + last_status = None + while time.time() < deadline: + try: + status, _, _ = unix_http_get(socket_path, path) + last_status = status + if status == expected_status: + return + except OSError: + pass + time.sleep(0.1) + assert last_status == expected_status + + class TestUnixSocketListen: def test_cli_unix_socket_serves_status(self, r2h_binary): with tempfile.TemporaryDirectory() as tmpdir: @@ -126,6 +141,25 @@ def test_stale_socket_path_is_cleaned(self, r2h_binary): finally: r2h.stop() + def test_active_socket_path_fails_startup(self, r2h_binary): + with tempfile.TemporaryDirectory() as tmpdir: + sock_path = _socket_path(tmpdir) + active = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + active.bind(sock_path) + active.listen(1) + + r2h = R2HProcess(r2h_binary, None, extra_args=["-v", "4"], capture_log=True, listen=sock_path) + try: + r2h.start(wait=False) + assert r2h.process is not None + r2h.process.wait(timeout=5) + assert r2h.process.returncode != 0 + log = r2h.read_log() + assert "already in use" in log + finally: + r2h.stop() + active.close() + def test_regular_file_socket_path_fails_startup(self, r2h_binary): with tempfile.TemporaryDirectory() as tmpdir: sock_path = _socket_path(tmpdir) @@ -153,6 +187,8 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): old_config = f"""\ [global] verbosity = 4 +workers = 1 +status-page-path = /oldstatus [bind] {old_sock_path} @@ -160,6 +196,8 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): bad_config = f"""\ [global] verbosity = 4 +workers = 2 +status-page-path = /newstatus [bind] {bad_sock_path} @@ -169,7 +207,7 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): ) try: r2h.start() - status, _, _ = unix_http_get(old_sock_path, "/status") + status, _, _ = unix_http_get(old_sock_path, "/oldstatus") assert status == 200 assert r2h._config_path is not None @@ -179,11 +217,16 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): os.kill(r2h.process.pid, signal.SIGHUP) time.sleep(0.5) - status, _, _ = unix_http_get(old_sock_path, "/status") + status, _, _ = unix_http_get(old_sock_path, "/oldstatus") assert status == 200 log = r2h.read_log() assert "Unix socket path exists and is not a socket" in log assert "keeping existing workers and listeners" in log + + os.kill(r2h.process.pid, signal.SIGUSR1) + _wait_unix_http_status(old_sock_path, "/oldstatus") + status, _, _ = unix_http_get(old_sock_path, "/newstatus") + assert status != 200 finally: r2h.stop() diff --git a/src/configuration.c b/src/configuration.c index 38695a4a..24ae2cf9 100644 --- a/src/configuration.c +++ b/src/configuration.c @@ -100,6 +100,50 @@ static void safe_free_string(char **str) { } } +static void free_config_strings(config_t *target, bool force_free) { + if (!cmd_hostname_set || force_free) + safe_free_string(&target->hostname); + if (!cmd_r2h_token_set || force_free) + safe_free_string(&target->r2h_token); + if (!cmd_ffmpeg_path_set || force_free) + safe_free_string(&target->ffmpeg_path); + if (!cmd_ffmpeg_args_set || force_free) + safe_free_string(&target->ffmpeg_args); + if (!cmd_status_page_path_set || force_free) { + safe_free_string(&target->status_page_path); + safe_free_string(&target->status_page_route); + } + if (!cmd_player_page_path_set || force_free) { + safe_free_string(&target->player_page_path); + safe_free_string(&target->player_page_route); + } + if (!cmd_external_m3u_url_set || force_free) + safe_free_string(&target->external_m3u_url); + if (!cmd_rtsp_stun_server_set || force_free) + safe_free_string(&target->rtsp_stun_server); + if (!cmd_http_proxy_user_agent_set || force_free) + safe_free_string(&target->http_proxy_user_agent); + if (!cmd_rtsp_user_agent_set || force_free) + safe_free_string(&target->rtsp_user_agent); + if (!cmd_cors_allow_origin_set || force_free) + safe_free_string(&target->cors_allow_origin); +} + +static int snapshot_string(char **dst, char *src, int keep_shallow) { + if (keep_shallow) { + *dst = src; + return 0; + } + + if (!src) { + *dst = NULL; + return 0; + } + + *dst = strdup(src); + return *dst ? 0 : -1; +} + static int bind_path_is_valid(const char *path) { if (!path || path[0] != '/') return 0; @@ -904,32 +948,7 @@ void config_cleanup(bool force_free) { epg_cleanup(); /* Free string config values */ - if (!cmd_hostname_set || force_free) - safe_free_string(&config.hostname); - if (!cmd_r2h_token_set || force_free) - safe_free_string(&config.r2h_token); - if (!cmd_ffmpeg_path_set || force_free) - safe_free_string(&config.ffmpeg_path); - if (!cmd_ffmpeg_args_set || force_free) - safe_free_string(&config.ffmpeg_args); - if (!cmd_status_page_path_set || force_free) { - safe_free_string(&config.status_page_path); - safe_free_string(&config.status_page_route); - } - if (!cmd_player_page_path_set || force_free) { - safe_free_string(&config.player_page_path); - safe_free_string(&config.player_page_route); - } - if (!cmd_external_m3u_url_set || force_free) - safe_free_string(&config.external_m3u_url); - if (!cmd_rtsp_stun_server_set || force_free) - safe_free_string(&config.rtsp_stun_server); - if (!cmd_http_proxy_user_agent_set || force_free) - safe_free_string(&config.http_proxy_user_agent); - if (!cmd_rtsp_user_agent_set || force_free) - safe_free_string(&config.rtsp_user_agent); - if (!cmd_cors_allow_origin_set || force_free) - safe_free_string(&config.cors_allow_origin); + free_config_strings(&config, force_free); /* Free bind addresses */ if (!cmd_bind_set || force_free) { @@ -938,6 +957,71 @@ void config_cleanup(bool force_free) { } } +int config_snapshot(config_t *snapshot) { + if (!snapshot) + return -1; + + *snapshot = config; + snapshot->hostname = NULL; + snapshot->r2h_token = NULL; + snapshot->ffmpeg_path = NULL; + snapshot->ffmpeg_args = NULL; + snapshot->status_page_path = NULL; + snapshot->status_page_route = NULL; + snapshot->player_page_path = NULL; + snapshot->player_page_route = NULL; + snapshot->external_m3u_url = NULL; + snapshot->rtsp_stun_server = NULL; + snapshot->http_proxy_user_agent = NULL; + snapshot->rtsp_user_agent = NULL; + snapshot->cors_allow_origin = NULL; + +#define SNAPSHOT_STRING(field, cmd_flag) \ + do { \ + if (snapshot_string(&snapshot->field, config.field, cmd_flag) < 0) \ + goto error; \ + } while (0) + + SNAPSHOT_STRING(hostname, cmd_hostname_set); + SNAPSHOT_STRING(r2h_token, cmd_r2h_token_set); + SNAPSHOT_STRING(ffmpeg_path, cmd_ffmpeg_path_set); + SNAPSHOT_STRING(ffmpeg_args, cmd_ffmpeg_args_set); + SNAPSHOT_STRING(status_page_path, cmd_status_page_path_set); + SNAPSHOT_STRING(status_page_route, cmd_status_page_path_set); + SNAPSHOT_STRING(player_page_path, cmd_player_page_path_set); + SNAPSHOT_STRING(player_page_route, cmd_player_page_path_set); + SNAPSHOT_STRING(external_m3u_url, cmd_external_m3u_url_set); + SNAPSHOT_STRING(rtsp_stun_server, cmd_rtsp_stun_server_set); + SNAPSHOT_STRING(http_proxy_user_agent, cmd_http_proxy_user_agent_set); + SNAPSHOT_STRING(rtsp_user_agent, cmd_rtsp_user_agent_set); + SNAPSHOT_STRING(cors_allow_origin, cmd_cors_allow_origin_set); + +#undef SNAPSHOT_STRING + + return 0; + +error: + config_snapshot_free(snapshot); + return -1; +} + +void config_snapshot_free(config_t *snapshot) { + if (!snapshot) + return; + + free_config_strings(snapshot, false); + memset(snapshot, 0, sizeof(*snapshot)); +} + +void config_restore_snapshot(config_t *snapshot) { + if (!snapshot) + return; + + free_config_strings(&config, false); + config = *snapshot; + memset(snapshot, 0, sizeof(*snapshot)); +} + /** * Initialize configuration with default values * Sets all config values to defaults. Respects cmd_*_set flags - diff --git a/src/configuration.h b/src/configuration.h index 28d45d01..11ec78d2 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -163,6 +163,30 @@ void config_init(void); */ void config_cleanup(bool force_free); +/** + * Create a restorable snapshot of the current scalar/string configuration. + * Does not include bind addresses or services; snapshot/free those separately. + * + * @param snapshot Destination snapshot + * @return 0 on success, -1 on allocation failure + */ +int config_snapshot(config_t *snapshot); + +/** + * Free resources owned by a configuration snapshot created by config_snapshot(). + * + * @param snapshot Snapshot to free + */ +void config_snapshot_free(config_t *snapshot); + +/** + * Restore global configuration from a snapshot created by config_snapshot(). + * The snapshot ownership is moved into the global config. + * + * @param snapshot Snapshot to restore + */ +void config_restore_snapshot(config_t *snapshot); + /** * Reload configuration from file * Sequence: config_cleanup() -> config_init() -> parse_config_file() diff --git a/src/service.c b/src/service.c index 1a40b0e7..10d97198 100644 --- a/src/service.c +++ b/src/service.c @@ -2035,6 +2035,24 @@ service_t *service_clone(service_t *service) { return NULL; } +service_t *service_clone_list(service_t *head) { + service_t *cloned_head = NULL; + service_t **tail = &cloned_head; + + for (service_t *current = head; current; current = current->next) { + service_t *cloned = service_clone(current); + if (!cloned) { + service_free_list(cloned_head); + return NULL; + } + + *tail = cloned; + tail = &cloned->next; + } + + return cloned_head; +} + void service_free(service_t *service) { if (!service) { return; @@ -2130,6 +2148,28 @@ void service_free(service_t *service) { free(service); } +void service_free_list(service_t *head) { + service_t *current; + + while (head) { + current = head; + head = head->next; + service_free(current); + } +} + +service_t *service_clone_all(void) { return service_clone_list(services); } + +void service_replace_all(service_t *new_services) { + service_free_all(); + services = new_services; + + service_hashmap_init(); + for (service_t *current = services; current; current = current->next) { + service_hashmap_add(current); + } +} + void service_free_external(void) { service_t **current_ptr = &services; service_t *current; diff --git a/src/service.h b/src/service.h index 48b781ec..5b44a508 100644 --- a/src/service.h +++ b/src/service.h @@ -229,6 +229,15 @@ service_t *service_create_with_query_merge(service_t *configured_service, const */ service_t *service_clone(service_t *service); +/** + * Clone a linked list of services. + * The cloned services are not added to the global services list. + * + * @param head First service in the list + * @return Pointer to cloned list or NULL if the input is NULL or cloning fails + */ +service_t *service_clone_list(service_t *head); + /** * Free service structure allocated by service creation functions * @@ -236,6 +245,28 @@ service_t *service_clone(service_t *service); */ void service_free(service_t *service); +/** + * Free a linked list of services. + * + * @param head First service in the list + */ +void service_free_list(service_t *head); + +/** + * Clone the global services list. + * + * @return Pointer to cloned global services list or NULL if there are no services + */ +service_t *service_clone_all(void); + +/** + * Replace the global services list with a previously cloned list. + * Rebuilds the service lookup hashmap for the new list. + * + * @param new_services New global services list, or NULL to clear services + */ +void service_replace_all(service_t *new_services); + /** * Free services from external M3U in the global services list * This preserves inline services from the configuration file diff --git a/src/supervisor.c b/src/supervisor.c index 7eff4a34..79ae8cf9 100644 --- a/src/supervisor.c +++ b/src/supervisor.c @@ -2,6 +2,7 @@ #include "configuration.h" #include "platform_compat.h" #include "rtp2httpd.h" +#include "service.h" #include "status.h" #include "unix_socket.h" #include "utils.h" @@ -45,6 +46,8 @@ static void supervisor_sighup_handler(int signum); static void supervisor_sigusr1_handler(int signum); static int spawn_worker(int worker_idx); static void cleanup_workers(void); +static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses); +static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses); /** * Signal handler for supervisor process (SIGTERM/SIGINT) @@ -190,6 +193,29 @@ static void cleanup_workers(void) { num_workers = 0; } +static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses) { + if (old_bind_addresses) { + free_bindaddr(bind_addresses); + bind_addresses = *old_bind_addresses; + *old_bind_addresses = NULL; + } + + if (old_services) { + service_replace_all(*old_services); + *old_services = NULL; + } + + config_restore_snapshot(old_config); +} + +static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses) { + if (old_bind_addresses) + free_bindaddr(old_bind_addresses); + if (old_services) + service_free_list(old_services); + config_snapshot_free(old_config); +} + int supervisor_run(void) { int i; @@ -318,23 +344,38 @@ int supervisor_run(void) { logger(LOG_INFO, "Received SIGHUP, reloading configuration"); int bind_changed = 0; + config_t old_config; + service_t *old_services; bindaddr_t *old_bind_addresses = bindaddr_copy(bind_addresses); + if (config_snapshot(&old_config) < 0) { + logger(LOG_ERROR, "Failed to snapshot configuration, not reloading"); + if (old_bind_addresses) + free_bindaddr(old_bind_addresses); + continue; + } + old_services = service_clone_all(); + if (services && !old_services) { + logger(LOG_ERROR, "Failed to snapshot services, not reloading"); + free_reload_snapshot(&old_config, old_services, old_bind_addresses); + continue; + } + if (bind_addresses && !old_bind_addresses) { + logger(LOG_ERROR, "Failed to snapshot bind addresses, not reloading"); + free_reload_snapshot(&old_config, old_services, old_bind_addresses); + continue; + } + if (config_reload(&bind_changed) == 0) { int reload_failed = 0; if (bind_changed) { if (unix_socket_listeners_replace(bind_addresses) < 0) { logger(LOG_ERROR, "Failed to set up Unix socket listeners after configuration reload"); - if (old_bind_addresses) { - free_bindaddr(bind_addresses); - bind_addresses = old_bind_addresses; - old_bind_addresses = NULL; - } + restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses); reload_failed = 1; } } - if (old_bind_addresses) - free_bindaddr(old_bind_addresses); + free_reload_snapshot(&old_config, old_services, old_bind_addresses); if (reload_failed) { logger(LOG_ERROR, "Configuration reload failed, keeping existing workers and listeners"); @@ -403,8 +444,8 @@ int supervisor_run(void) { } } } else { - if (old_bind_addresses) - free_bindaddr(old_bind_addresses); + restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses); + free_reload_snapshot(&old_config, old_services, old_bind_addresses); logger(LOG_ERROR, "Configuration reload failed, not forwarding SIGHUP to workers"); } } diff --git a/src/unix_socket.c b/src/unix_socket.c index bb7f011e..7e2480a4 100644 --- a/src/unix_socket.c +++ b/src/unix_socket.c @@ -31,6 +31,35 @@ static int temp_unix_listener_path_exists(char **paths, int count, const char *p return 0; } +static int unix_socket_path_is_active(const char *path) { + struct sockaddr_un addr; + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + int saved_errno; + + if (sock < 0) { + logger(LOG_FATAL, "Cannot create probe socket for Unix socket %s: %s", path, strerror(errno)); + return -1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1); + + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) { + close(sock); + return 1; + } + + saved_errno = errno; + close(sock); + + if (saved_errno == ECONNREFUSED || saved_errno == ENOENT) + return 0; + + logger(LOG_FATAL, "Cannot verify Unix socket path is stale: %s: %s", path, strerror(saved_errno)); + return -1; +} + static void cleanup_temp_unix_listeners(int *sockets, char **paths, int *owned, int count) { for (int i = 0; i < count; i++) { if (owned[i] && sockets[i] >= 0) { @@ -69,7 +98,7 @@ static int build_unix_listener_set(bindaddr_t *bind_list, int *sockets, char **p continue; if (!bind_addr->path || bind_addr->path[0] != '/') { - logger(LOG_FATAL, "Invalid Unix socket path"); + logger(LOG_FATAL, "Invalid Unix socket path: %s", bind_addr->path ? bind_addr->path : "(null)"); cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } @@ -115,6 +144,16 @@ static int build_unix_listener_set(bindaddr_t *bind_list, int *sockets, char **p cleanup_temp_unix_listeners(sockets, paths, owned, *count); return -1; } + int active = unix_socket_path_is_active(bind_addr->path); + if (active < 0) { + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } + if (active) { + logger(LOG_FATAL, "Unix socket path is already in use: %s", bind_addr->path); + cleanup_temp_unix_listeners(sockets, paths, owned, *count); + return -1; + } if (unlink(bind_addr->path) < 0) { logger(LOG_FATAL, "Failed to remove stale Unix socket %s: %s", bind_addr->path, strerror(errno)); cleanup_temp_unix_listeners(sockets, paths, owned, *count); From 9f7d74598d8f5857eb6eb9884503b55f4c94aabd Mon Sep 17 00:00:00 2001 From: Stackie Jia Date: Thu, 25 Jun 2026 05:05:51 +0800 Subject: [PATCH 6/6] fix(listener): restore media caches on reload failure --- e2e/test_unix_socket.py | 47 ++++++++++++++++++++++++++++++++ src/epg.c | 57 +++++++++++++++++++++++++++++++++++++++ src/epg.h | 14 ++++++++++ src/m3u.c | 59 ++++++++++++++++++++++++++++++++--------- src/m3u.h | 13 +++++++++ src/supervisor.c | 42 ++++++++++++++++++++++------- 6 files changed, 209 insertions(+), 23 deletions(-) diff --git a/e2e/test_unix_socket.py b/e2e/test_unix_socket.py index 4f533402..fca8786f 100644 --- a/e2e/test_unix_socket.py +++ b/e2e/test_unix_socket.py @@ -61,6 +61,22 @@ def _wait_unix_http_status(socket_path: str, path: str, expected_status: int = 2 assert last_status == expected_status +def _wait_unix_http_body_contains(socket_path: str, path: str, needle: bytes, timeout: float = 5.0) -> None: + deadline = time.time() + timeout + last_body = b"" + while time.time() < deadline: + try: + status, _, body = unix_http_get(socket_path, path) + if status == 200: + last_body = body + if needle in body: + return + except OSError: + pass + time.sleep(0.1) + assert needle in last_body + + class TestUnixSocketListen: def test_cli_unix_socket_serves_status(self, r2h_binary): with tempfile.TemporaryDirectory() as tmpdir: @@ -181,8 +197,14 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): with tempfile.TemporaryDirectory() as tmpdir: old_sock_path = os.path.join(tmpdir, "old.sock") bad_sock_path = os.path.join(tmpdir, "bad.sock") + old_epg_path = os.path.join(tmpdir, "old-epg.xml") + new_epg_path = os.path.join(tmpdir, "new-epg.xml") with open(bad_sock_path, "wb") as f: f.write(b"not a socket") + with open(old_epg_path, "wb") as f: + f.write(SAMPLE_EPG_XML.replace("Unix Socket Programme", "Old Programme").encode()) + with open(new_epg_path, "wb") as f: + f.write(SAMPLE_EPG_XML.replace("Unix Socket Programme", "New Programme").encode()) old_config = f"""\ [global] @@ -192,6 +214,11 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): [bind] {old_sock_path} + +[services] +#EXTM3U x-tvg-url="file://{old_epg_path}" +#EXTINF:-1,Old Channel +rtp://239.0.0.1:1234 """ bad_config = f"""\ [global] @@ -201,6 +228,11 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): [bind] {bad_sock_path} + +[services] +#EXTM3U x-tvg-url="file://{new_epg_path}" +#EXTINF:-1,New Channel +rtp://239.0.0.2:1234 """ r2h = R2HProcess( r2h_binary, None, config_content=old_config, capture_log=True, wait_socket_path=old_sock_path @@ -209,6 +241,12 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): r2h.start() status, _, _ = unix_http_get(old_sock_path, "/oldstatus") assert status == 200 + status, _, body = unix_http_get(old_sock_path, "/playlist.m3u") + assert status == 200 + playlist = body.decode() + assert "Old Channel" in playlist + assert "New Channel" not in playlist + _wait_unix_http_body_contains(old_sock_path, "/epg.xml", b"Old Programme") assert r2h._config_path is not None with open(r2h._config_path, "w") as f: @@ -225,6 +263,15 @@ def test_reload_keeps_old_unix_listener_when_new_path_fails(self, r2h_binary): os.kill(r2h.process.pid, signal.SIGUSR1) _wait_unix_http_status(old_sock_path, "/oldstatus") + status, _, body = unix_http_get(old_sock_path, "/playlist.m3u") + assert status == 200 + playlist = body.decode() + assert "Old Channel" in playlist + assert "New Channel" not in playlist + _wait_unix_http_body_contains(old_sock_path, "/epg.xml", b"Old Programme") + status, _, body = unix_http_get(old_sock_path, "/epg.xml") + assert b"Old Programme" in body + assert b"New Programme" not in body status, _, _ = unix_http_get(old_sock_path, "/newstatus") assert status != 200 finally: diff --git a/src/epg.c b/src/epg.c index 7b39e7b5..8bb74188 100644 --- a/src/epg.c +++ b/src/epg.c @@ -2,6 +2,7 @@ #include "http_fetch.h" #include "md5.h" #include "utils.h" +#include #include #include #include @@ -146,9 +147,65 @@ void epg_cleanup(void) { epg_cache.fetch_error_count = 0; epg_cache.etag_valid = 0; epg_cache.etag[0] = '\0'; + epg_cache.retry_count = 0; + epg_cache.next_retry_time = 0; logger(LOG_DEBUG, "EPG cache cleaned up"); } +int epg_cache_snapshot(epg_cache_t *snapshot) { + if (!snapshot) + return -1; + + *snapshot = epg_cache; + snapshot->url = NULL; + snapshot->data_fd = -1; + + if (epg_cache.url) { + snapshot->url = strdup(epg_cache.url); + if (!snapshot->url) { + epg_cache_snapshot_free(snapshot); + return -1; + } + } + + if (epg_cache.data_fd >= 0) { + snapshot->data_fd = dup(epg_cache.data_fd); + if (snapshot->data_fd < 0) { + logger(LOG_ERROR, "Failed to duplicate EPG cache fd: %s", strerror(errno)); + epg_cache_snapshot_free(snapshot); + return -1; + } + } + + return 0; +} + +void epg_cache_snapshot_free(epg_cache_t *snapshot) { + if (!snapshot) + return; + + if (snapshot->url) { + free(snapshot->url); + snapshot->url = NULL; + } + if (snapshot->data_fd >= 0) { + close(snapshot->data_fd); + snapshot->data_fd = -1; + } + memset(snapshot, 0, sizeof(*snapshot)); + snapshot->data_fd = -1; +} + +void epg_cache_restore_snapshot(epg_cache_t *snapshot) { + if (!snapshot) + return; + + epg_cleanup(); + epg_cache = *snapshot; + memset(snapshot, 0, sizeof(*snapshot)); + snapshot->data_fd = -1; +} + int epg_set_url(const char *url) { char *new_url = NULL; diff --git a/src/epg.h b/src/epg.h index ec6121c5..bf0ac864 100644 --- a/src/epg.h +++ b/src/epg.h @@ -22,6 +22,20 @@ typedef struct { */ void epg_cleanup(void); +/* Create a restorable snapshot of the EPG cache. + * Duplicates the cached data fd if present. + * Returns 0 on success, -1 on allocation or fd duplication failure. + */ +int epg_cache_snapshot(epg_cache_t *snapshot); + +/* Free resources owned by an EPG cache snapshot. */ +void epg_cache_snapshot_free(epg_cache_t *snapshot); + +/* Restore the global EPG cache from a snapshot. + * The snapshot ownership is moved into the global cache. + */ +void epg_cache_restore_snapshot(epg_cache_t *snapshot); + /* Set EPG source URL (without fetching) * url: EPG source URL (will be copied), or NULL to clear * Returns: 0 on success, -1 on error diff --git a/src/m3u.c b/src/m3u.c index 0f258b3c..732889b5 100644 --- a/src/m3u.c +++ b/src/m3u.c @@ -842,6 +842,19 @@ static int append_to_transformed_m3u(const char *str, service_source_t source) { return 0; } +static void free_transformed_m3u_buffer(m3u_cache_t *cache) { + if (cache->transformed_m3u) { + free(cache->transformed_m3u); + cache->transformed_m3u = NULL; + } + cache->transformed_m3u_size = 0; + cache->transformed_m3u_used = 0; + cache->transformed_m3u_inline_end = 0; + cache->transformed_m3u_has_header = 0; + cache->transformed_m3u_etag_valid = 0; + cache->transformed_m3u_etag[0] = '\0'; +} + /* Find a unique service name by adding numeric suffix if needed * Returns: malloc'd string containing unique name (caller must free), or NULL * on error @@ -1500,22 +1513,42 @@ const char *m3u_get_etag(void) { return m3u_cache.transformed_m3u_etag; } -void m3u_reset_transformed_playlist(void) { - /* Clear entire buffer */ - if (m3u_cache.transformed_m3u) { - free(m3u_cache.transformed_m3u); - m3u_cache.transformed_m3u = NULL; +void m3u_reset_transformed_playlist(void) { free_transformed_m3u_buffer(&m3u_cache); } + +int m3u_cache_snapshot(m3u_cache_t *snapshot) { + if (!snapshot) + return -1; + + *snapshot = m3u_cache; + snapshot->transformed_m3u = NULL; + + if (m3u_cache.transformed_m3u && m3u_cache.transformed_m3u_size > 0) { + snapshot->transformed_m3u = malloc(m3u_cache.transformed_m3u_size); + if (!snapshot->transformed_m3u) { + memset(snapshot, 0, sizeof(*snapshot)); + return -1; + } + memcpy(snapshot->transformed_m3u, m3u_cache.transformed_m3u, m3u_cache.transformed_m3u_size); } - m3u_cache.transformed_m3u_size = 0; - m3u_cache.transformed_m3u_used = 0; - m3u_cache.transformed_m3u_inline_end = 0; - /* Reset header flag */ - m3u_cache.transformed_m3u_has_header = 0; + return 0; +} - /* Invalidate ETag */ - m3u_cache.transformed_m3u_etag_valid = 0; - m3u_cache.transformed_m3u_etag[0] = '\0'; +void m3u_cache_snapshot_free(m3u_cache_t *snapshot) { + if (!snapshot) + return; + + free_transformed_m3u_buffer(snapshot); + memset(snapshot, 0, sizeof(*snapshot)); +} + +void m3u_cache_restore_snapshot(m3u_cache_t *snapshot) { + if (!snapshot) + return; + + free_transformed_m3u_buffer(&m3u_cache); + m3u_cache = *snapshot; + memset(snapshot, 0, sizeof(*snapshot)); } void m3u_reset_external_playlist(void) { diff --git a/src/m3u.h b/src/m3u.h index 6e504a11..368885fe 100644 --- a/src/m3u.h +++ b/src/m3u.h @@ -64,6 +64,19 @@ void m3u_reset_transformed_playlist(void); */ void m3u_reset_external_playlist(void); +/* Create a restorable snapshot of the M3U cache. + * Returns 0 on success, -1 on allocation failure. + */ +int m3u_cache_snapshot(m3u_cache_t *snapshot); + +/* Free resources owned by an M3U cache snapshot. */ +void m3u_cache_snapshot_free(m3u_cache_t *snapshot); + +/* Restore the global M3U cache from a snapshot. + * The snapshot ownership is moved into the global cache. + */ +void m3u_cache_restore_snapshot(m3u_cache_t *snapshot); + /* Get server address as complete URL * Priority: hostname config > non-upstream interface private IP > non-upstream * interface public IP > upstream interface IP > localhost Returns: malloc'd diff --git a/src/supervisor.c b/src/supervisor.c index 79ae8cf9..72b819b1 100644 --- a/src/supervisor.c +++ b/src/supervisor.c @@ -1,5 +1,7 @@ #include "supervisor.h" #include "configuration.h" +#include "epg.h" +#include "m3u.h" #include "platform_compat.h" #include "rtp2httpd.h" #include "service.h" @@ -46,8 +48,10 @@ static void supervisor_sighup_handler(int signum); static void supervisor_sigusr1_handler(int signum); static int spawn_worker(int worker_idx); static void cleanup_workers(void); -static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses); -static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses); +static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses, + m3u_cache_t *old_m3u_cache, epg_cache_t *old_epg_cache); +static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses, + m3u_cache_t *old_m3u_cache, epg_cache_t *old_epg_cache); /** * Signal handler for supervisor process (SIGTERM/SIGINT) @@ -193,7 +197,8 @@ static void cleanup_workers(void) { num_workers = 0; } -static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses) { +static void restore_reload_snapshot(config_t *old_config, service_t **old_services, bindaddr_t **old_bind_addresses, + m3u_cache_t *old_m3u_cache, epg_cache_t *old_epg_cache) { if (old_bind_addresses) { free_bindaddr(bind_addresses); bind_addresses = *old_bind_addresses; @@ -206,14 +211,19 @@ static void restore_reload_snapshot(config_t *old_config, service_t **old_servic } config_restore_snapshot(old_config); + m3u_cache_restore_snapshot(old_m3u_cache); + epg_cache_restore_snapshot(old_epg_cache); } -static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses) { +static void free_reload_snapshot(config_t *old_config, service_t *old_services, bindaddr_t *old_bind_addresses, + m3u_cache_t *old_m3u_cache, epg_cache_t *old_epg_cache) { if (old_bind_addresses) free_bindaddr(old_bind_addresses); if (old_services) service_free_list(old_services); config_snapshot_free(old_config); + m3u_cache_snapshot_free(old_m3u_cache); + epg_cache_snapshot_free(old_epg_cache); } int supervisor_run(void) { @@ -345,6 +355,8 @@ int supervisor_run(void) { int bind_changed = 0; config_t old_config; + m3u_cache_t old_m3u_cache; + epg_cache_t old_epg_cache; service_t *old_services; bindaddr_t *old_bind_addresses = bindaddr_copy(bind_addresses); if (config_snapshot(&old_config) < 0) { @@ -353,15 +365,25 @@ int supervisor_run(void) { free_bindaddr(old_bind_addresses); continue; } + if (m3u_cache_snapshot(&old_m3u_cache) < 0) { + logger(LOG_ERROR, "Failed to snapshot M3U cache, not reloading"); + free_reload_snapshot(&old_config, NULL, old_bind_addresses, &old_m3u_cache, NULL); + continue; + } + if (epg_cache_snapshot(&old_epg_cache) < 0) { + logger(LOG_ERROR, "Failed to snapshot EPG cache, not reloading"); + free_reload_snapshot(&old_config, NULL, old_bind_addresses, &old_m3u_cache, &old_epg_cache); + continue; + } old_services = service_clone_all(); if (services && !old_services) { logger(LOG_ERROR, "Failed to snapshot services, not reloading"); - free_reload_snapshot(&old_config, old_services, old_bind_addresses); + free_reload_snapshot(&old_config, old_services, old_bind_addresses, &old_m3u_cache, &old_epg_cache); continue; } if (bind_addresses && !old_bind_addresses) { logger(LOG_ERROR, "Failed to snapshot bind addresses, not reloading"); - free_reload_snapshot(&old_config, old_services, old_bind_addresses); + free_reload_snapshot(&old_config, old_services, old_bind_addresses, &old_m3u_cache, &old_epg_cache); continue; } @@ -370,12 +392,12 @@ int supervisor_run(void) { if (bind_changed) { if (unix_socket_listeners_replace(bind_addresses) < 0) { logger(LOG_ERROR, "Failed to set up Unix socket listeners after configuration reload"); - restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses); + restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses, &old_m3u_cache, &old_epg_cache); reload_failed = 1; } } - free_reload_snapshot(&old_config, old_services, old_bind_addresses); + free_reload_snapshot(&old_config, old_services, old_bind_addresses, &old_m3u_cache, &old_epg_cache); if (reload_failed) { logger(LOG_ERROR, "Configuration reload failed, keeping existing workers and listeners"); @@ -444,8 +466,8 @@ int supervisor_run(void) { } } } else { - restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses); - free_reload_snapshot(&old_config, old_services, old_bind_addresses); + restore_reload_snapshot(&old_config, &old_services, &old_bind_addresses, &old_m3u_cache, &old_epg_cache); + free_reload_snapshot(&old_config, old_services, old_bind_addresses, &old_m3u_cache, &old_epg_cache); logger(LOG_ERROR, "Configuration reload failed, not forwarding SIGHUP to workers"); } }