diff --git a/utils/constants.py b/utils/constants.py index 73df30397d..e551a72cb1 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -18,7 +18,11 @@ log_path = os.path.join(output_path, "log.log") -url_pattern = r"((https?):\/\/)?(\[[0-9a-fA-F:]+\]|([\w-]+\.)+[\w-]+)(:[0-9]{1,5})?(\/[^\s]*)?(\$[^\s]+)?" +url_domain_pattern = r"((https?|rtmp)://)?(\[[0-9a-fA-F:]+]|([\w-]+\.)+[\w-]+)(:[0-9]{1,5})?" + +url_pattern = url_domain_pattern + r"(/\S*)?(\$\S+)?" + +rtmp_url_pattern = r"^rtmp://.*$" rtp_pattern = r"^([^,,]+)(?:[,,])?(rtp://.*)$" diff --git a/utils/speed.py b/utils/speed.py index c0b1e93202..3c9c0958ee 100644 --- a/utils/speed.py +++ b/utils/speed.py @@ -9,6 +9,7 @@ from aiohttp import ClientSession, TCPConnector from multidict import CIMultiDictProxy +import utils.constants as constants from utils.config import config from utils.tools import is_ipv6, remove_cache_info, get_resolution_value @@ -289,6 +290,11 @@ async def get_speed(url, ipv6_proxy=None, filter_resolution=config.open_filter_r data['speed'] = float("inf") data['delay'] = float("-inf") data['resolution'] = "1920x1080" + elif re.match(constants.rtmp_url_pattern, url) is not None: + start_time = time() + data['resolution'] = await get_resolution_ffprobe(url, timeout) + data['delay'] = int(round((time() - start_time) * 1000)) + data['speed'] = float("inf") if data['resolution'] is not None else 0 else: data.update(await get_speed_m3u8(url, filter_resolution, timeout)) if cache_key and cache_key not in cache: diff --git a/utils/tools.py b/utils/tools.py index ee7aaa494c..c519fa048d 100644 --- a/utils/tools.py +++ b/utils/tools.py @@ -443,8 +443,8 @@ def process_nested_dict(data, seen, flag=None, force_str=None): data[key] = remove_duplicates_from_tuple_list(value, seen, flag, force_str) -url_domain_pattern = re.compile( - r"\b((https?):\/\/)?(\[[0-9a-fA-F:]+\]|([\w-]+\.)+[\w-]+)(:[0-9]{1,5})?\b" +url_domain_compile = re.compile( + constants.url_domain_pattern ) @@ -452,7 +452,7 @@ def get_url_domain(url): """ Get the url domain """ - matcher = url_domain_pattern.search(url) + matcher = url_domain_compile.search(url) if matcher: return matcher.group() return None