Skip to content

Commit

Permalink
对超时和断言错误进行重试,默认3次
Browse files Browse the repository at this point in the history
  • Loading branch information
lee-cq committed Mar 3, 2024
1 parent 4fdefb6 commit d38ad48
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 43 deletions.
25 changes: 2 additions & 23 deletions .github/workflows/alist-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,35 +93,14 @@ jobs:
${{ secrets.SYNC_CONFIG }}
EOF
set -ex
export TERM=dumb
# export TERM=dumb
export FORCE_COLOR=true
export COLUMNS=160
./bootstrap.sh main test-config
./bootstrap.sh main sync
- name: Debugger
if: ${{ github.event.inputs.debug == 'true' && failure() }}
run: |
echo "Debugging"
if [ -z "$(sudo netstat -ntlp |grep -v grep |grep :22 |grep sshd)" ]; then
echo "sshd is not running."
if [ ! -f /lib/systemd/system/ssh.service ]; then
echo sshd is not installed. will install ...
sudo apt-get update;
sudo apt-get install -y openssh-server;
fi
echo "starting sshd ..."
sudo systemctl restart sshd
fi
git clone https://gist.github.com/lee-cq/ed0f7e2bf9e039f1797c5b700ba118ec
mkdir -p ~/.ssh
cd ed0f7e2bf9e039f1797c5b700ba118ec || exit 1
chmod +x setup.sh && ./setup.sh
touch /tmp/keepalive
echo 'created keepalive file.'
echo 'timeout 21000s(5h 50m).'
timeout 21000 bash -c 'stopout=21000; while true;do echo $((stopout=$stopout-3)) > /tmp/stopout ; test -f /tmp/keepalive || break; sleep 3; done ' || echo Timeouted.
echo 'The VM will be shutdown in 10 minutes.'
uses: csexton/debugger-action@master


77 changes: 57 additions & 20 deletions alist_sync/d_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import logging
import threading
import time
import traceback
from pathlib import Path
from queue import Queue, Empty
from typing import Literal, Any
from typing import Literal, Any, Type

from pydantic import BaseModel, computed_field, Field
from pymongo.collection import Collection
from httpx import Client, Timeout
from httpx import Client, TimeoutException, Timeout
from alist_sdk.path_lib import AbsAlistPathType, AlistPath

from alist_sync.config import create_config
from alist_sync.common import sha1, prefix_in_threads
from alist_sync.err import WorkerError
from alist_sync.thread_pool import MyThreadPoolExecutor
from alist_sync.version import __version__

Expand Down Expand Up @@ -132,24 +134,52 @@ def backup(self):
self.update(status="back-upped")
logger.info(f"Worker[{self.short_id}] Backup Success.")

def downloader(self):
"""HTTP多线程下载"""
def __retry(
self,
retry: int,
excepts: tuple[Type[Exception], ...],
func,
*args,
**kwargs,
):

def copy_single_stream(self):
import urllib.parse
while retry > 0:
try:
return func(*args, **kwargs)
except excepts as _e:
if retry <= 0:
logger.error(
f"Worker[{self.short_id}] Retry Error [{func.__name__}]: "
f"{type(_e)} - {_e}"
)
raise _e
logger.warning(
f"Worker[{self.short_id}] {retry = } [{func.__name__}]: "
f"{type(_e)} - {_e}"
)
retry -= 1
continue

def downloader(self):
"""HTTP多线程下载"""
# download
_tmp = self.tmp_file.open("wb")
with downloader_client.stream(
"GET",
self.source_path.get_download_uri(),
follow_redirects=True,
) as _res:
for i in _res.iter_bytes(chunk_size=1024 * 1024):
_tmp.write(i)
_tmp.close()
with self.tmp_file.open("wb") as _tmp:
with downloader_client.stream(
"GET",
self.source_path.get_download_uri(),
follow_redirects=True,
) as _res:
for i in _res.iter_bytes(chunk_size=1024 * 1024):
_tmp.write(i)
assert (
self.tmp_file.exists()
and self.tmp_file.stat().st_size == self.source_path.stat().size
), "下载后文件大小不一致"
self.update(status="downloaded")

def uploader(self):
import urllib.parse

# upload
with self.tmp_file.open("rb") as fs:
res = self.target_path.client.verify_request(
Expand Down Expand Up @@ -180,7 +210,8 @@ def copy_type(self):

self.target_path.unlink(missing_ok=True)
self.target_path.parent.mkdir(parents=True, exist_ok=True)
self.copy_single_stream()
self.__retry(3, (TimeoutException, AssertionError), self.downloader)
self.__retry(3, (TimeoutException, AssertionError), self.uploader)

return self.update(status="copied")

Expand Down Expand Up @@ -219,7 +250,7 @@ def run(self):
"""启动Worker"""
logger.info(f"worker[{self.short_id}] 已经开始工作.")
self.update()
logger.debug(f"Worker[{self.short_id}] Updated to DB.")

try:
if self.status in ["done", "failed"]:
return
Expand All @@ -235,11 +266,17 @@ def run(self):
assert self.recheck()
self.update(status=f"done")
except Exception as _e:
logger.error(f"worker[{self.short_id}] 出现错误: {_e}")
self.error_info = str(_e)
logger.error(
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}",
exc_info=_e,
)
self.error_info = (
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n"
f"{traceback.format_exc()}"
)
self.update(status="failed")
if sync_config.debug:
raise _e
raise WorkerError from _e


class Workers:
Expand Down

0 comments on commit d38ad48

Please sign in to comment.