Skip to content

Commit

Permalink
删除d_worker中一些同步方法
Browse files Browse the repository at this point in the history
fixbug: Downloader重复下载
添加Grafana指标检控
  • Loading branch information
lee-cq committed Mar 24, 2024
1 parent 822b67d commit 54cf2f8
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 160 deletions.
18 changes: 17 additions & 1 deletion .github/workflows/alist-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ jobs:
./.alist-sync-cache/*.shelve
./alist/data
- name: Install Grafana Agent
if: secrets.GRAFANA_AGENT_CONFIG
run: |
cat > /etc/grafana-agent.yaml << EOF
${{ secrets.GRAFANA_AGENT_CONFIG }}
EOF
sed -i "s/instance: <your-instance-name>/instance: ${_ALIST_SYNC_NAME}/g" /etc/grafana-agent.yaml
sed -i "s#<your-log-path>#$(pwd)/logs/*.log#g" /etc/grafana-agent.yaml
curl -fL# https://github.com/grafana/agent/releases/download/v0.39.1/grafana-agent-0.39.1-1.amd64.deb -o grafana-agent-0.39.1-1.amd64.deb
echo -n "19ba2f69ec14fae4812128475beea6e034e7e9c1183cd0c1f6814d04d0ba788b grafana-agent-0.39.1-1.amd64.deb" | sha256sum -c - 2>&1 | grep "OK" || fatal 'Failed sha256sum check'
sudo dpkg -i "/tmp/grafana-agent-0.39.1-1.amd64.deb"
sudo systemctl enable grafana-agent.service
sudo systemctl start grafana-agent.service
- name: Install and Init Alist Server
run: |
sudo ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
Expand Down Expand Up @@ -113,7 +129,7 @@ jobs:
fi
- name: Debugger
if: ${{ github.event.inputs.debug == 'true' && failure() }}
if: ${{ github.event.inputs.debug == 'true' && always() }}
uses: csexton/debugger-action@master

- name: Upload Logs
Expand Down
9 changes: 6 additions & 3 deletions alist_sync/a_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ async def backup(self, worker: Worker):
async def downloader(self, worker: Worker):
""""""
self.tmp_files.auto_clear()
if self.tmp_files.pre_total_size() > 15 * GB:
await asyncio.sleep(5)
if worker.tmp_file.exists():
if worker.tmp_file.stat().st_size == worker.source_path.stat().size:
worker.update(status="downloaded")
return
async with self.sp_downloader:
self.tmp_files.add_tmp(worker.tmp_file, worker.source_path)
Expand All @@ -94,11 +95,13 @@ async def downloader(self, worker: Worker):
self.tmp_files.clear_file(worker.tmp_file)
logger.error("下载失败, 返回码: %d", rt_code)
raise DownloaderError(f"下载失败, 返回码: {rt_code}")

logger.info("下载成功: %s", worker.source_path.as_uri())
worker.update(status="downloaded")

async def uploader(self, worker: Worker):
""""""
assert worker.tmp_file.exists(), "临时文件不存在"
assert worker.tmp_file.exists(), f"临时文件不存在: {self.tmp_files}"
assert (
worker.tmp_file.stat().st_size == worker.source_path.stat().size
), f"临时文件大小不一致: {worker.tmp_file.stat().st_size} != {worker.source_path.stat().size}"
Expand Down
4 changes: 2 additions & 2 deletions alist_sync/d_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ def checker(
) -> "Worker|None":
if not target_stat.exists():
logger.info(
f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}"
f"Checked: [COPY] {source_stat.path.as_uri()[4:]} -> {target_stat.path.as_uri()[4:]}"
)
return self.create_worker(
type_="copy",
source_path=source_stat.path,
target_path=target_stat.path,
)

logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}")
logger.debug(f"Checked: [JUMP] {source_stat.path.as_uri()[4:]}")
return None


Expand Down
152 changes: 0 additions & 152 deletions alist_sync/d_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,158 +188,6 @@ def __retry(
retry -= 1
continue

def downloader(self):
"""HTTP多线程下载"""
# download
with self.tmp_file.open("wb") as _tmp:
with downloader_client.stream(
"GET",
self.source_path.get_download_uri(),
follow_redirects=True,
) as _res:
logger.debug(
f"Worker[{self.short_id}] Downloading from {self.source_path}"
)
for i in _res.iter_bytes(chunk_size=1024 * 1024):
_tmp.write(i)
assert (
self.tmp_file.exists()
and self.tmp_file.stat().st_size == self.source_path.stat().size
), "下载后文件大小不一致"
self.update(status="downloaded")

def uploader(self):
import urllib.parse

# upload
with self.tmp_file.open("rb") as fs:
res = self.target_path.client.verify_request(
"PUT",
"/api/fs/put",
headers={
"As-Task": "false",
"Content-Type": "application/octet-stream",
"Last-Modified": str(
int(self.source_path.stat().modified.timestamp() * 1000)
),
"File-Path": urllib.parse.quote(str(self.target_path.as_posix())),
},
content=fs,
timeout=Timeout(300, read=300, write=300, connect=300),
)

assert res.code == 200
logger.info(
f"Worker[{self.short_id}] Upload File "
f"[{self.target_path}] [{res.code}]{res.message}."
)
self.update(status="uploaded")

def copy_type(self):
"""复制任务"""
logger.debug(f"Worker[{self.short_id}] Start Copping")
if self.status not in ["downloaded", "uploaded"]:
self.__retry(
3,
(TimeoutException, AssertionError),
self.downloader,
)

if self.status != "uploaded":
self.target_path.unlink(missing_ok=True)
self.target_path.parent.mkdir(parents=True, exist_ok=True)
self.__retry(
3,
(TimeoutException, AssertionError),
self.uploader,
)

return self.update(status="copied")

def delete_type(self):
"""删除任务"""
self.target_path.unlink(missing_ok=True)
assert not self.target_path.exists()
self.update(status="deleted")

def recheck_copy(self, retry=5, re_time=2):
"""再次检查当前Worker的结果是否符合预期。"""
try:
return (
self.target_path.re_stat(retry=retry, timeout=re_time).size
== self.source_path.re_stat().size
)
except FileNotFoundError:
if retry > 0:
return self.recheck_copy(retry=retry - 1, re_time=re_time)
logger.error(
f"Worker[{self.short_id}] Recheck Error: 文件不存在.({retry=})"
)
return False

def recheck(self) -> bool:
"""再次检查当前Worker的结果是否符合预期。"""
if self.type == "copy":
return self.recheck_copy(retry=3, re_time=3)
elif self.type == "delete":
self.target_path.re_stat(retry=5, timeout=2)
return not self.target_path.exists()
else:
raise ValueError(f"Unknown Worker Type {self.type}.")

def run(self, is_retry=False):
"""启动Worker"""
if is_retry is False:
logger.info(f"worker[{self.short_id}] 已经开始工作.")
self.update()

try:
if self.status in ["done", "failed"]:
self.update()
return
if self.need_backup and self.status in ["init"]:
self.backup()

if self.type == "copy" and self.status in [
"init",
"back-upped",
"downloaded",
"uploaded",
]:
self.copy_type()

elif self.type == "delete" and self.status in ["init", "back-upped"]:
self.delete_type()

assert self.recheck()
self.update(status=f"done")
return
except TimeoutException as _e:
if is_retry:
return self.__error_exec(_e)
time.sleep(2)
return self.__retry(
3,
(TimeoutException, AssertionError),
self.run,
True,
)
except Exception as _e:
return self.__error_exec(_e)

def __error_exec(self, _e: Exception):
logger.error(
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}",
exc_info=_e,
)
self.error_info = (
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n"
f"{traceback.format_exc()}"
)
self.update(status="failed")
if sync_config.debug:
raise WorkerError from _e


class Workers:
def __init__(self):
Expand Down
14 changes: 12 additions & 2 deletions alist_sync/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
下载器使用一个单独线程启动,它创建一个事件循环并在其内部保持同步。
"""
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING
Expand All @@ -17,6 +18,9 @@
...


logger = logging.getLogger("alist-sync.downloader")


def find_aria2_bin(default=None):
""""""
paths = os.getenv("PATH").split(";" if os.name == "nt" else ":")
Expand Down Expand Up @@ -44,12 +48,18 @@ def make_aria2_cmd(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
)

logs_dir = Path.cwd().joinpath("logs")
if logs_dir.exists() and logs_dir.is_dir():
log_file = logs_dir.joinpath(f"aria2c-{local.name}.log")
else:
log_file = local.as_posix() + ".log"

_cmd = [
aria2_bin,
"-q",
"-c",
"-l",
local.as_posix() + ".log",
log_file,
"--user-agent",
ua,
"-x",
Expand All @@ -59,7 +69,7 @@ def make_aria2_cmd(
str(local),
remote.as_download_uri() if isinstance(remote, AlistPath) else remote,
]
print(_cmd)
logger.debug("DOWNLOAD CMD: %s", " ".join(str(_) for _ in _cmd))
return _cmd


Expand Down

0 comments on commit 54cf2f8

Please sign in to comment.