Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(COMMON_SOURCES
src/status.c
src/connection.c
src/worker.c
src/unix_socket.c
src/buffer_pool.c
src/zerocopy.c
src/m3u.c
Expand Down
11 changes: 8 additions & 3 deletions docs/en/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ rtp2httpd [options]

### Network Configuration

- `-l, --listen [address:]port` - Bind address and port (default: \*:5140)
- `-l, --listen [address:]port|/path/to/rtp2httpd.sock` - Bind a TCP listen address/port, or listen on a Unix domain socket (default: \*:5140)
- `-m, --maxclients <number>` - Maximum concurrent clients (default: 5)
Comment thread
stackia marked this conversation as resolved.
- `-w, --workers <number>` - Number of worker processes (default: 1)

`--listen` can be specified multiple times to listen on multiple addresses or ports:
`--listen` can be specified multiple times to listen on multiple TCP addresses/ports or Unix sockets:
Comment thread
stackia marked this conversation as resolved.

```bash
rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140'
rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock
```

Unix socket listen paths must be absolute and must not contain whitespace. At startup, if the same path already contains a socket file, rtp2httpd first probes whether the socket is still in use: if another process is listening on that path, startup is rejected; only confirmed stale socket files are removed automatically. If the path is a regular file, directory, or symbolic link, startup is rejected to avoid deleting user data. When any Unix socket listener is enabled, `zerocopy-on-send` is disabled globally.

Comment thread
stackia marked this conversation as resolved.
#### Upstream Network Interface Configuration

- `-i, --upstream-interface <interface>` - Default upstream interface (applies to all traffic types, lowest priority)
Expand Down Expand Up @@ -249,6 +251,9 @@ ffmpeg-args = -hwaccel none
# Listen on an IPv6 address (brackets optional)
2001:db8::1 5140

# Listen on a Unix domain socket (path must be absolute)
/var/run/rtp2httpd.sock

# Multiple listen addresses are supported

# The [services] section can contain M3U playlists starting with #EXTM3U
Expand Down
11 changes: 8 additions & 3 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ rtp2httpd [选项]

### 网络配置

- `-l, --listen [地址:]端口` - 绑定监听地址和端口 (默认: \*:5140)
- `-l, --listen [地址:]端口|/path/to/rtp2httpd.sock` - 绑定 TCP 监听地址/端口,或监听 Unix domain socket (默认: \*:5140)
- `-m, --maxclients <数量>` - 最大并发客户端数 (默认: 5)
- `-w, --workers <数量>` - 工作进程数 (默认: 1)

`--listen` 可以重复指定,用于同时监听多个地址或端口
`--listen` 可以重复指定,用于同时监听多个 TCP 地址/端口或 Unix socket

```bash
rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140'
rtp2httpd --listen 5140 --listen 192.168.1.1:8081 --listen '[::1]:5140' --listen /var/run/rtp2httpd.sock
```

Unix socket 监听路径必须是绝对路径,且路径中不能包含空白字符。启动时如果同路径已存在 socket 文件,rtp2httpd 会先探测该 socket 是否仍在使用:如果已有进程正在监听该路径,则拒绝启动;只有确认是残留 socket 文件时才会自动清理。如果同路径是普通文件、目录或符号链接,则会拒绝启动以避免误删数据。启用任意 Unix socket 监听时,`zerocopy-on-send` 会被全局关闭。

#### 上游网络接口配置

- `-i, --upstream-interface <接口>` - 默认上游接口(作用于所有流量类型,优先级最低)
Expand Down Expand Up @@ -249,6 +251,9 @@ ffmpeg-args = -hwaccel none
# 监听 IPv6 地址(可省略方括号)
2001:db8::1 5140

# 监听 Unix domain socket(路径必须是绝对路径)
/var/run/rtp2httpd.sock

# 支持多个监听地址

# [services] 内可以直接编写以 #EXTM3U 开头的 m3u 节目清单
Expand Down
6 changes: 5 additions & 1 deletion e2e/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
MCAST_ADDR,
PROJECT_ROOT,
)
from .http import extract_catchup_source, http_get, http_request, stream_get
from .http import extract_catchup_source, http_get, http_request, stream_get, unix_http_get, unix_http_request
from .mock_fcc import MockFCCServer
from .mock_http import MockHTTPUpstream, MockHTTPUpstreamSilent
from .mock_rtsp import (
Expand All @@ -40,6 +40,7 @@
find_free_udp_port_pair,
ipv6_loopback_available,
wait_for_port,
wait_for_unix_socket,
)
from .r2h_process import R2HProcess, make_m3u_rtsp_config
from .rtp import MulticastSender, make_rtp_packet
Expand Down Expand Up @@ -71,5 +72,8 @@
"make_m3u_rtsp_config",
"make_rtp_packet",
"stream_get",
"unix_http_get",
"unix_http_request",
"wait_for_port",
"wait_for_unix_socket",
]
91 changes: 74 additions & 17 deletions e2e/helpers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,79 @@ def http_request(
conn.close()


def _parse_raw_http_response(data: bytes, lower_header_names: bool = False) -> tuple[int, dict, bytes]:
header_end = data.find(b"\r\n\r\n")
if header_end < 0:
return 0, {}, b""

header_text = data[:header_end].decode(errors="replace")
body = data[header_end + 4 :]
parts = header_text.split("\r\n")
status_line = parts[0].split()
if len(status_line) < 2:
return 0, {}, b""
try:
status_code = int(status_line[1])
except ValueError:
return 0, {}, b""

hdrs: dict[str, str] = {}
for line in parts[1:]:
if ":" in line:
k, v = line.split(":", 1)
key = k.strip().lower() if lower_header_names else k.strip()
hdrs[key] = v.strip()

Comment thread
stackia marked this conversation as resolved.
return status_code, hdrs, body


def unix_http_request(
socket_path: str,
method: str,
path: str,
timeout: float = 5.0,
headers: dict | None = None,
body: bytes | None = None,
) -> tuple[int, dict, bytes]:
"""HTTP request over a Unix domain socket. Returns (status, headers, body)."""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect(socket_path)
req_lines = ["%s %s HTTP/1.0" % (method, path), "Host: localhost"]
payload = body or b""
for k, v in (headers or {}).items():
req_lines.append("%s: %s" % (k, v))
if payload:
req_lines.append("Content-Length: %d" % len(payload))
req_lines.append("")
req_lines.append("")
sock.sendall("\r\n".join(req_lines).encode() + payload)

data = b""
while True:
try:
chunk = sock.recv(4096)
except socket.timeout:
break
if not chunk:
break
data += chunk
return _parse_raw_http_response(data)
Comment thread
stackia marked this conversation as resolved.
finally:
sock.close()


def unix_http_get(
socket_path: str,
path: str,
timeout: float = 5.0,
headers: dict | None = None,
) -> tuple[int, dict, bytes]:
"""HTTP GET over a Unix domain socket. Returns (status, headers, body)."""
return unix_http_request(socket_path, "GET", path, timeout=timeout, headers=headers)


def extract_catchup_source(playlist_text, channel_name):
"""Extract catchup-source URL from the EXTINF line for a channel.

Expand Down Expand Up @@ -108,23 +181,7 @@ def stream_get(
break
data += chunk

header_end = data.find(b"\r\n\r\n")
if header_end < 0:
return 0, {}, b""

header_text = data[:header_end].decode(errors="replace")
body = data[header_end + 4 :]

parts = header_text.split("\r\n")
status_code = int(parts[0].split()[1])

hdrs: dict[str, str] = {}
for line in parts[1:]:
if ":" in line:
k, v = line.split(":", 1)
hdrs[k.strip().lower()] = v.strip()

return status_code, hdrs, body
return _parse_raw_http_response(data, lower_header_names=True)
except socket.timeout, OSError:
return 0, {}, b""
finally:
Expand Down
14 changes: 14 additions & 0 deletions e2e/helpers/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,17 @@ def wait_for_port(port: int, host: str = "127.0.0.1", timeout: float = 5.0) -> b
except ConnectionRefusedError, OSError, socket.timeout:
time.sleep(0.05)
return False


def wait_for_unix_socket(path: str, timeout: float = 5.0) -> bool:
"""Block until *path* is accepting Unix stream socket connections."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(0.5)
sock.connect(path)
return True
except OSError:
time.sleep(0.05)
return False
33 changes: 27 additions & 6 deletions e2e/helpers/r2h_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tempfile
from pathlib import Path

from .ports import wait_for_port
from .ports import wait_for_port, wait_for_unix_socket


def make_m3u_rtsp_config(r2h_port: int, rtsp_port: int, channel_name: str, configured_url_query: str = "") -> str:
Expand All @@ -26,16 +26,20 @@ class R2HProcess:
def __init__(
self,
binary: str | Path,
port: int,
port: int | None,
extra_args: list[str] | None = None,
config_content: str | None = None,
capture_log: bool = False,
listen: str | None = None,
wait_socket_path: str | None = None,
):
self.binary = str(binary)
self.port = port
self.extra_args = list(extra_args or [])
self.config_content = config_content
self.capture_log = capture_log
self.listen = listen
self.wait_socket_path = wait_socket_path
self.process: subprocess.Popen | None = None
self._config_path: str | None = None
self._log_path: str | None = None
Expand All @@ -51,9 +55,23 @@ def start(self, wait: bool = True) -> None:
self.process = subprocess.Popen(args, stdout=self._log_handle, stderr=self._log_handle)
else:
self.process = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if wait and not wait_for_port(self.port, timeout=6.0):
self.stop()
raise RuntimeError("rtp2httpd did not start on port %d.\nCommand: %s" % (self.port, " ".join(args)))
if wait:
if self.wait_socket_path:
if not wait_for_unix_socket(self.wait_socket_path, timeout=6.0):
self.stop()
raise RuntimeError(
"rtp2httpd did not start on Unix socket %s.\nCommand: %s"
% (self.wait_socket_path, " ".join(args))
)
elif self.listen and self.listen.startswith("/"):
if not wait_for_unix_socket(self.listen, timeout=6.0):
self.stop()
raise RuntimeError(
"rtp2httpd did not start on Unix socket %s.\nCommand: %s" % (self.listen, " ".join(args))
)
elif self.port is not None and not wait_for_port(self.port, timeout=6.0):
self.stop()
raise RuntimeError("rtp2httpd did not start on port %d.\nCommand: %s" % (self.port, " ".join(args)))

def stop(self) -> None:
if self.process and self.process.poll() is None:
Expand Down Expand Up @@ -97,6 +115,9 @@ def _build_args(self) -> list[str]:
else:
args = [self.binary, "-C"]

args.extend(["-l", str(self.port)])
if self.listen is not None:
args.extend(["-l", self.listen])
elif self.port is not None:
args.extend(["-l", str(self.port)])
args.extend(self.extra_args)
return args
1 change: 1 addition & 0 deletions e2e/test_fcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def test_huawei_fcc_socket_pair_stays_inside_listen_range(self, r2h_binary):
r2h.start()
fcc.start()
try:
assert r2h.port is not None
url = f"/rtp/{MCAST_ADDR}:{mcast_port}?fcc=127.0.0.1:{fcc.port}&fcc-type=huawei"
status, _, body = stream_get(
"127.0.0.1",
Expand Down
Loading
Loading