diff --git a/README.md b/README.md index 0b78dff..9b2ae3c 100644 --- a/README.md +++ b/README.md @@ -88,13 +88,38 @@ bilifm season $uid $sid [OPTIONS] - uid, sid 的获取: 打开视频合集网页, 从 URL 中获取 - https://space.bilibili.com/23263470/channel/collectiondetail?sid=1855309 +https://space.bilibili.com/23263470/channel/collectiondetail?sid=1855309 + +例如上面链接, uid 为 23263470, sid 为 1855309 (目前 uid 可以随意填写) + +```bash +bilifm season 23263470 1855309 +``` - 例如上面链接, uid 为 23263470, sid 为 1855309 (目前 uid 可以随意填写) - Options: - -o, --directory 选择音频保存地址 +### series 模式 + +下载视频列表 +```bash +bilifm series $uid $sid [OPTIONS] +``` + +- uid, sid 的获取: + 打开用户空间中的合集和列表, 找到列表点击更多, 然后从URL中获取 + +https://space.bilibili.com/488978908/channel/seriesdetail?sid=888434 + +例如上面链接, uid 为 488978908, sid 为 888434. 使用下面命令 + +```bash +bilifm series 488978908 888434 +``` + +- Options: + - -o, --directory 选择音频保存地址 ## Features diff --git a/src/bilifm/command.py b/src/bilifm/command.py index 76827cb..f0cdf1b 100644 --- a/src/bilifm/command.py +++ b/src/bilifm/command.py @@ -5,6 +5,7 @@ from .audio import Audio from .fav import Fav from .season import Season +from .series import Series from .user import User from .util import Directory, Path @@ -50,8 +51,8 @@ def fav( @app.command() def season(uid: str, sid: str, directory: Directory = None): sea = Season(uid, sid) - ret = sea.get_videos() - if not ret: + audio_generator = sea.get_videos() + if not audio_generator: typer.Exit(1) return @@ -59,7 +60,28 @@ def season(uid: str, sid: str, directory: Directory = None): os.makedirs(sea.name) os.chdir(sea.name) - for id in sea.videos: - audio = Audio(id) - audio.download() + for audios in audio_generator: + for id in audios: + audio = Audio(id) + audio.download() + typer.echo("Download complete") + + +@app.command() +def series(uid: str, sid: str, directory: Directory = None): + """Download bilibili video series + + The api of series lacks the series name, executing + this command will not create a folder for the series + """ + ser = Series(uid, sid) + audio_generator = ser.get_videos() + if not audio_generator: + typer.Exit(1) + return + + for audios in audio_generator: + for id in audios: + audio = Audio(id) + audio.download() typer.echo("Download complete") diff --git a/src/bilifm/season.py b/src/bilifm/season.py index 489df64..4273627 100644 --- a/src/bilifm/season.py +++ b/src/bilifm/season.py @@ -2,7 +2,12 @@ import typer -from .util import request +from .util import Retry, request + +headers: dict[str, str] = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com", +} class Season: @@ -10,6 +15,7 @@ class Season: season_url: str = ( "https://api.bilibili.com/x/polymer/web-space/seasons_archives_list" ) + retry = 3 def __init__(self, uid: str, sid: str, page_size=30) -> None: self.uid = uid @@ -26,33 +32,50 @@ def get_videos(self): "page_size": self.page_size, } - res = request( - method="get", url=self.season_url, params=params, wbi=True, dm=True - ).json() - - code = res.get("code", -404) - if code != 0: - # uid 错误好像无影响 - if code == "-404": - typer.echo(f"Error: uid {self.uid} or sid {self.season_id} error.") - else: - type.echo("Error: unknown error") - typer.echo(f"code: {res['code']}") - if res.get("message", None): - typer.echo(f"msg: {res['message']}") + @Retry(self.__response_succeed, self.__handle_error_response) + def wrapped_request(): + """wrap request with retry""" + return request( + method="get", url=self.season_url, params=params, headers=headers + ).json() + + res = wrapped_request() + if res is None: return False self.total = res["data"]["meta"]["total"] self.name = res["data"]["meta"]["name"] - max_pn = self.total // 50 - for i in range(1, max_pn + 2): - params["page_num"] = i + def bvid_generator(): + max_pn = self.total // self.page_size + for i in range(1, max_pn + 2): + params["page_num"] = i + res = wrapped_request() + if res: + bvids = [d["bvid"] for d in res["data"]["archives"]] + # self.videos.extend(bvids) + yield bvids + else: + typer.echo( + f"skip audios from {(i-1)* self.page_size} to {i * self.page_size}" + ) - res = request( - method="get", url=self.season_url, params=params, wbi=True, dm=True - ).json() - bvids = [d["bvid"] for d in res["data"]["archives"]] - self.videos.extend(bvids) + return bvid_generator() + + def __handle_error_response(self, response): + code = response.get("code", -404) + if code == -404: + typer.echo(f"Error: uid {self.uid} or sid {self.season_id} error.") + elif code == -352: + typer.echo( + "Error: Authentication problem or too many requests, please try again later." + ) + else: + typer.echo("Error: Unknown problem.") + + typer.echo(f"code: {response['code']}") + if response.get("message", None): + typer.echo(f"msg: {response['message']}") - return True + def __response_succeed(self, response): + return response.get("code", -404) == 0 diff --git a/src/bilifm/series.py b/src/bilifm/series.py new file mode 100644 index 0000000..20aed3b --- /dev/null +++ b/src/bilifm/series.py @@ -0,0 +1,70 @@ +"""download bilibili video series, 视频列表""" + +import typer + +from .util import Retry, request + + +class Series: + series_url: str = "https://api.bilibili.com/x/series/archives" + retry: int = 3 + + def __init__(self, uid: str, series_id: str, page_size=30) -> None: + self.uid = uid + self.series_id = series_id + self.page_size = page_size + self.videos = [] + self.total = 0 + + def get_videos(self): + """return a generator that contain page_size videos""" + params = { + "mid": self.uid, + "series_id": self.series_id, + "pn": 1, + "ps": self.page_size, + "current_id": self.uid, + } + + @Retry(self.__response_succeed, self.__handle_error_response) + def wrapped_request(): + """wrap request with retry""" + return request(method="get", url=self.series_url, params=params).json() + + res = wrapped_request() + if res is None: + return 0 + + self.total = res["data"]["page"]["total"] + + def bvid_generator(): + for i in range(1, self.total // self.page_size + 2): + params["pn"] = i + res = wrapped_request() + if res: + bvids = [ar["bvid"] for ar in res["data"]["archives"]] + # self.videos.extend(bvids) + yield bvids + else: + typer.echo( + f"skip audios from {(i-1)* self.page_size} to {i * self.page_size}" + ) + + return bvid_generator() + + def __handle_error_response(self, response): + try: + archives = response["data"]["archives"] + except KeyError: + archives = 0 # something null not none + if archives is None: + typer.echo(f"Error: uid {self.uid} or sid {self.series_id} error.") + else: + typer.echo("Error: Unknown problem.") + typer.echo(f"resp: {response}") + + def __response_succeed(self, response) -> bool: + try: + return response["data"]["archives"] is not None + except KeyError: + return False diff --git a/src/bilifm/util.py b/src/bilifm/util.py index d42b3fe..7f4d6dd 100644 --- a/src/bilifm/util.py +++ b/src/bilifm/util.py @@ -4,6 +4,7 @@ import urllib.parse from functools import reduce from hashlib import md5 +from typing import Callable import requests import typer @@ -192,3 +193,24 @@ def check_path(path: str): Directory = Annotated[str, typer.Option("-o", "--directory", callback=change_directory)] Path = typer.Argument(callback=check_path) + + +class Retry: + """Retry decorator""" + + def __init__(self, response_succeed, handle_error_response, total=3) -> None: + self.total = total + self.__response_succeed = response_succeed + self.__handle_error_response = handle_error_response + pass + + def __call__(self, request_func: Callable) -> Callable: + def wrapped_request(*args, **kwargs): + for _ in range(self.total): + res = request_func(*args, **kwargs) + if self.__response_succeed(res): + return res + self.__handle_error_response(res) + return None + + return wrapped_request