diff --git a/.github/workflows/alist-sync.yaml b/.github/workflows/alist-sync.yaml index 5aca062..e0752e1 100644 --- a/.github/workflows/alist-sync.yaml +++ b/.github/workflows/alist-sync.yaml @@ -50,6 +50,22 @@ jobs: ./.alist-sync-cache/*.shelve ./alist/data + - name: Install Grafana Agent + if: secrets.GRAFANA_AGENT_CONFIG + run: | + cat > /etc/grafana-agent.yaml << EOF + ${{ secrets.GRAFANA_AGENT_CONFIG }} + EOF + + sed -i "s/instance: /instance: ${_ALIST_SYNC_NAME}/g" /etc/grafana-agent.yaml + sed -i "s##$(pwd)/logs/*.log#g" /etc/grafana-agent.yaml + + curl -fL# https://github.com/grafana/agent/releases/download/v0.39.1/grafana-agent-0.39.1-1.amd64.deb -o grafana-agent-0.39.1-1.amd64.deb + echo -n "19ba2f69ec14fae4812128475beea6e034e7e9c1183cd0c1f6814d04d0ba788b grafana-agent-0.39.1-1.amd64.deb" | sha256sum -c - 2>&1 | grep "OK" || fatal 'Failed sha256sum check' + sudo dpkg -i "/tmp/grafana-agent-0.39.1-1.amd64.deb" + sudo systemctl enable grafana-agent.service + sudo systemctl start grafana-agent.service + - name: Install and Init Alist Server run: | sudo ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime @@ -113,7 +129,7 @@ jobs: fi - name: Debugger - if: ${{ github.event.inputs.debug == 'true' && failure() }} + if: ${{ github.event.inputs.debug == 'true' && always() }} uses: csexton/debugger-action@master - name: Upload Logs diff --git a/alist_sync/a_worker.py b/alist_sync/a_worker.py index d418ea2..cd03622 100644 --- a/alist_sync/a_worker.py +++ b/alist_sync/a_worker.py @@ -74,8 +74,9 @@ async def backup(self, worker: Worker): async def downloader(self, worker: Worker): """""" self.tmp_files.auto_clear() - if self.tmp_files.pre_total_size() > 15 * GB: - await asyncio.sleep(5) + if worker.tmp_file.exists(): + if worker.tmp_file.stat().st_size == worker.source_path.stat().size: + worker.update(status="downloaded") return async with self.sp_downloader: self.tmp_files.add_tmp(worker.tmp_file, worker.source_path) @@ -94,11 +95,13 @@ async def downloader(self, worker: Worker): self.tmp_files.clear_file(worker.tmp_file) logger.error("下载失败, 返回码: %d", rt_code) raise DownloaderError(f"下载失败, 返回码: {rt_code}") + + logger.info("下载成功: %s", worker.source_path.as_uri()) worker.update(status="downloaded") async def uploader(self, worker: Worker): """""" - assert worker.tmp_file.exists(), "临时文件不存在" + assert worker.tmp_file.exists(), f"临时文件不存在: {self.tmp_files}" assert ( worker.tmp_file.stat().st_size == worker.source_path.stat().size ), f"临时文件大小不一致: {worker.tmp_file.stat().st_size} != {worker.source_path.stat().size}" diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index fd1e993..31b5d8b 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -174,7 +174,7 @@ def checker( ) -> "Worker|None": if not target_stat.exists(): logger.info( - f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}" + f"Checked: [COPY] {source_stat.path.as_uri()[4:]} -> {target_stat.path.as_uri()[4:]}" ) return self.create_worker( type_="copy", @@ -182,7 +182,7 @@ def checker( target_path=target_stat.path, ) - logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}") + logger.debug(f"Checked: [JUMP] {source_stat.path.as_uri()[4:]}") return None diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index 8b4ed4b..8d5db0a 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -188,158 +188,6 @@ def __retry( retry -= 1 continue - def downloader(self): - """HTTP多线程下载""" - # download - with self.tmp_file.open("wb") as _tmp: - with downloader_client.stream( - "GET", - self.source_path.get_download_uri(), - follow_redirects=True, - ) as _res: - logger.debug( - f"Worker[{self.short_id}] Downloading from {self.source_path}" - ) - for i in _res.iter_bytes(chunk_size=1024 * 1024): - _tmp.write(i) - assert ( - self.tmp_file.exists() - and self.tmp_file.stat().st_size == self.source_path.stat().size - ), "下载后文件大小不一致" - self.update(status="downloaded") - - def uploader(self): - import urllib.parse - - # upload - with self.tmp_file.open("rb") as fs: - res = self.target_path.client.verify_request( - "PUT", - "/api/fs/put", - headers={ - "As-Task": "false", - "Content-Type": "application/octet-stream", - "Last-Modified": str( - int(self.source_path.stat().modified.timestamp() * 1000) - ), - "File-Path": urllib.parse.quote(str(self.target_path.as_posix())), - }, - content=fs, - timeout=Timeout(300, read=300, write=300, connect=300), - ) - - assert res.code == 200 - logger.info( - f"Worker[{self.short_id}] Upload File " - f"[{self.target_path}] [{res.code}]{res.message}." - ) - self.update(status="uploaded") - - def copy_type(self): - """复制任务""" - logger.debug(f"Worker[{self.short_id}] Start Copping") - if self.status not in ["downloaded", "uploaded"]: - self.__retry( - 3, - (TimeoutException, AssertionError), - self.downloader, - ) - - if self.status != "uploaded": - self.target_path.unlink(missing_ok=True) - self.target_path.parent.mkdir(parents=True, exist_ok=True) - self.__retry( - 3, - (TimeoutException, AssertionError), - self.uploader, - ) - - return self.update(status="copied") - - def delete_type(self): - """删除任务""" - self.target_path.unlink(missing_ok=True) - assert not self.target_path.exists() - self.update(status="deleted") - - def recheck_copy(self, retry=5, re_time=2): - """再次检查当前Worker的结果是否符合预期。""" - try: - return ( - self.target_path.re_stat(retry=retry, timeout=re_time).size - == self.source_path.re_stat().size - ) - except FileNotFoundError: - if retry > 0: - return self.recheck_copy(retry=retry - 1, re_time=re_time) - logger.error( - f"Worker[{self.short_id}] Recheck Error: 文件不存在.({retry=})" - ) - return False - - def recheck(self) -> bool: - """再次检查当前Worker的结果是否符合预期。""" - if self.type == "copy": - return self.recheck_copy(retry=3, re_time=3) - elif self.type == "delete": - self.target_path.re_stat(retry=5, timeout=2) - return not self.target_path.exists() - else: - raise ValueError(f"Unknown Worker Type {self.type}.") - - def run(self, is_retry=False): - """启动Worker""" - if is_retry is False: - logger.info(f"worker[{self.short_id}] 已经开始工作.") - self.update() - - try: - if self.status in ["done", "failed"]: - self.update() - return - if self.need_backup and self.status in ["init"]: - self.backup() - - if self.type == "copy" and self.status in [ - "init", - "back-upped", - "downloaded", - "uploaded", - ]: - self.copy_type() - - elif self.type == "delete" and self.status in ["init", "back-upped"]: - self.delete_type() - - assert self.recheck() - self.update(status=f"done") - return - except TimeoutException as _e: - if is_retry: - return self.__error_exec(_e) - time.sleep(2) - return self.__retry( - 3, - (TimeoutException, AssertionError), - self.run, - True, - ) - except Exception as _e: - return self.__error_exec(_e) - - def __error_exec(self, _e: Exception): - logger.error( - f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}", - exc_info=_e, - ) - self.error_info = ( - f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n" - f"{traceback.format_exc()}" - ) - self.update(status="failed") - if sync_config.debug: - raise WorkerError from _e - class Workers: def __init__(self): diff --git a/alist_sync/downloader.py b/alist_sync/downloader.py index 30bebfc..373cf5f 100644 --- a/alist_sync/downloader.py +++ b/alist_sync/downloader.py @@ -7,6 +7,7 @@ 下载器使用一个单独线程启动,它创建一个事件循环并在其内部保持同步。 """ +import logging import os from pathlib import Path from typing import TYPE_CHECKING @@ -17,6 +18,9 @@ ... +logger = logging.getLogger("alist-sync.downloader") + + def find_aria2_bin(default=None): """""" paths = os.getenv("PATH").split(";" if os.name == "nt" else ":") @@ -44,12 +48,18 @@ def make_aria2_cmd( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", ) + logs_dir = Path.cwd().joinpath("logs") + if logs_dir.exists() and logs_dir.is_dir(): + log_file = logs_dir.joinpath(f"aria2c-{local.name}.log") + else: + log_file = local.as_posix() + ".log" + _cmd = [ aria2_bin, "-q", "-c", "-l", - local.as_posix() + ".log", + log_file, "--user-agent", ua, "-x", @@ -59,7 +69,7 @@ def make_aria2_cmd( str(local), remote.as_download_uri() if isinstance(remote, AlistPath) else remote, ] - print(_cmd) + logger.debug("DOWNLOAD CMD: %s", " ".join(str(_) for _ in _cmd)) return _cmd