Skip to content

Commit

Permalink
feat: use rich.progress as progress bar
Browse files Browse the repository at this point in the history
Signed-off-by: jingfelix <[email protected]>
  • Loading branch information
jingfelix committed Aug 25, 2024
1 parent 773a9b4 commit 0fc1c03
Showing 1 changed file with 75 additions and 43 deletions.
118 changes: 75 additions & 43 deletions src/bilifm/audio.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
import sys
import time

import requests
import typer
from rich.console import Console
from rich.panel import Panel
from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn

from .util import AudioQualityEnums, get_signed_params

console = Console()


class Audio:
bvid = ""
Expand Down Expand Up @@ -64,7 +67,13 @@ def download(self):

# 如果文件已存在,则跳过下载
if os.path.exists(file_path):
typer.echo(f"{self.title} already exists, skip for now")
console.print(
Panel(
f"{self.title} 已存在,跳过下载",
style="yellow",
expand=False,
)
)
return

params = get_signed_params(
Expand All @@ -79,15 +88,27 @@ def download(self):
).json()

if json["data"] is None:
typer.echo(
f" `data` field is not valid with url: {self.playUrl} and params : {params}"
console.print(
Panel(
f"[bold red]数据字段无效[/bold red]\n"
f"URL: {self.playUrl}\n"
f"参数: {params}",
title="错误",
expand=False,
)
)
return

audio = json["data"]["dash"]["audio"]
if not audio:
typer.echo(
f" `audio` field is empty with url: {self.playUrl} and params : {params}"
console.print(
Panel(
f"[bold red]音频字段为空[/bold red]\n"
f"URL: {self.playUrl}\n"
f"参数: {params}",
title="错误",
expand=False,
)
)
return

Expand All @@ -103,38 +124,45 @@ def download(self):
response = requests.get(url=base_url, headers=self.headers, stream=True)

total_size = int(response.headers.get("content-length", 0))
temp_size = 0

with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()

temp_size += len(chunk)
done = int(50 * temp_size / total_size)
sys.stdout.write(
"\r[%s%s] %s/%s %s"
% (
"#" * done,
"-" * (50 - done),
temp_size,
total_size,
self.title,
)
)
sys.stdout.flush()

except Exception as e:
typer.echo("Download failed")
typer.echo("Error: " + str(e))
pass
with Progress(
"[progress.description]{task.description}",
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
"•",
DownloadColumn(),
"•",
TransferSpeedColumn(),
console=console,
) as progress:
task = progress.add_task(
f"[cyan]下载 {self.title}", total=total_size
)

end_time = time.time()
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
progress.update(task, advance=len(chunk))

# 添加下载完成的提示
end_time = time.time()
download_time = round(end_time - start_time, 2)
console.print(
Panel(
f"[bold green]下载完成![/bold green]用时 {download_time} 秒",
expand=False,
)
)

sys.stdout.write(
" " + str(round(end_time - start_time, 2)) + " seconds download finish\n"
)
except Exception as e:
console.print(
Panel(
f"[bold red]下载失败[/bold red]\n错误: {str(e)}",
title="异常",
expand=False,
)
)

def __get_cid_title(self, bvid: str):
url = "https://api.bilibili.com/x/web-interface/view"
Expand All @@ -149,17 +177,21 @@ def __get_cid_title(self, bvid: str):
data = response.json().get("data")
self.title = self.__title_process(data.get("title"))

# 这里是否也应该也使用get方法?
self.cid_list = [str(page["cid"]) for page in data["pages"]]
self.cid_list = [str(page.get("cid")) for page in data.get("pages", [])]
self.part_list = [
self.__title_process(str(page["part"])) for page in data["pages"]
self.__title_process(str(page.get("part")))
for page in data.get("pages", [])
]

except ValueError as e:
raise e

except Exception as e:
raise e
console.print(
Panel(
f"[bold red]获取视频信息失败[/bold red]\n错误: {str(e)}",
title="异常",
expand=False,
)
)
raise

def __title_process(self, title: str):
replaceList = ["?", "\\", "*", "|", "<", ">", ":", "/", " "]
Expand Down

0 comments on commit 0fc1c03

Please sign in to comment.