diff --git a/.github/workflows/alist-sync.yaml b/.github/workflows/alist-sync.yaml index 960475a..96ac574 100644 --- a/.github/workflows/alist-sync.yaml +++ b/.github/workflows/alist-sync.yaml @@ -93,7 +93,7 @@ jobs: ${{ secrets.SYNC_CONFIG }} EOF set -ex - export TERM=dumb + # export TERM=dumb export FORCE_COLOR=true export COLUMNS=160 ./bootstrap.sh main test-config @@ -101,27 +101,6 @@ jobs: - name: Debugger if: ${{ github.event.inputs.debug == 'true' && failure() }} - run: | - echo "Debugging" - if [ -z "$(sudo netstat -ntlp |grep -v grep |grep :22 |grep sshd)" ]; then - echo "sshd is not running." - if [ ! -f /lib/systemd/system/ssh.service ]; then - echo sshd is not installed. will install ... - sudo apt-get update; - sudo apt-get install -y openssh-server; - fi - echo "starting sshd ..." - sudo systemctl restart sshd - fi - git clone https://gist.github.com/lee-cq/ed0f7e2bf9e039f1797c5b700ba118ec - mkdir -p ~/.ssh - cd ed0f7e2bf9e039f1797c5b700ba118ec || exit 1 - chmod +x setup.sh && ./setup.sh - - touch /tmp/keepalive - echo 'created keepalive file.' - echo 'timeout 21000s(5h 50m).' - timeout 21000 bash -c 'stopout=21000; while true;do echo $((stopout=$stopout-3)) > /tmp/stopout ; test -f /tmp/keepalive || break; sleep 3; done ' || echo Timeouted. - echo 'The VM will be shutdown in 10 minutes.' + uses: csexton/debugger-action@master diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index 75bdede..c14035a 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -3,17 +3,19 @@ import logging import threading import time +import traceback from pathlib import Path from queue import Queue, Empty -from typing import Literal, Any +from typing import Literal, Any, Type from pydantic import BaseModel, computed_field, Field from pymongo.collection import Collection -from httpx import Client, Timeout +from httpx import Client, TimeoutException, Timeout from alist_sdk.path_lib import AbsAlistPathType, AlistPath from alist_sync.config import create_config from alist_sync.common import sha1, prefix_in_threads +from alist_sync.err import WorkerError from alist_sync.thread_pool import MyThreadPoolExecutor from alist_sync.version import __version__ @@ -132,24 +134,52 @@ def backup(self): self.update(status="back-upped") logger.info(f"Worker[{self.short_id}] Backup Success.") - def downloader(self): - """HTTP多线程下载""" + def __retry( + self, + retry: int, + excepts: tuple[Type[Exception], ...], + func, + *args, + **kwargs, + ): - def copy_single_stream(self): - import urllib.parse + while retry > 0: + try: + return func(*args, **kwargs) + except excepts as _e: + if retry <= 0: + logger.error( + f"Worker[{self.short_id}] Retry Error [{func.__name__}]: " + f"{type(_e)} - {_e}" + ) + raise _e + logger.warning( + f"Worker[{self.short_id}] {retry = } [{func.__name__}]: " + f"{type(_e)} - {_e}" + ) + retry -= 1 + continue + def downloader(self): + """HTTP多线程下载""" # download - _tmp = self.tmp_file.open("wb") - with downloader_client.stream( - "GET", - self.source_path.get_download_uri(), - follow_redirects=True, - ) as _res: - for i in _res.iter_bytes(chunk_size=1024 * 1024): - _tmp.write(i) - _tmp.close() + with self.tmp_file.open("wb") as _tmp: + with downloader_client.stream( + "GET", + self.source_path.get_download_uri(), + follow_redirects=True, + ) as _res: + for i in _res.iter_bytes(chunk_size=1024 * 1024): + _tmp.write(i) + assert ( + self.tmp_file.exists() + and self.tmp_file.stat().st_size == self.source_path.stat().size + ), "下载后文件大小不一致" self.update(status="downloaded") + def uploader(self): + import urllib.parse + # upload with self.tmp_file.open("rb") as fs: res = self.target_path.client.verify_request( @@ -180,7 +210,8 @@ def copy_type(self): self.target_path.unlink(missing_ok=True) self.target_path.parent.mkdir(parents=True, exist_ok=True) - self.copy_single_stream() + self.__retry(3, (TimeoutException, AssertionError), self.downloader) + self.__retry(3, (TimeoutException, AssertionError), self.uploader) return self.update(status="copied") @@ -219,7 +250,7 @@ def run(self): """启动Worker""" logger.info(f"worker[{self.short_id}] 已经开始工作.") self.update() - logger.debug(f"Worker[{self.short_id}] Updated to DB.") + try: if self.status in ["done", "failed"]: return @@ -235,11 +266,17 @@ def run(self): assert self.recheck() self.update(status=f"done") except Exception as _e: - logger.error(f"worker[{self.short_id}] 出现错误: {_e}") - self.error_info = str(_e) + logger.error( + f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}", + exc_info=_e, + ) + self.error_info = ( + f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n" + f"{traceback.format_exc()}" + ) self.update(status="failed") if sync_config.debug: - raise _e + raise WorkerError from _e class Workers: