Skip to content

Commit

Permalink
重构缓存的 cached_task_undone & cached_task_undone
Browse files Browse the repository at this point in the history
移除SyncJob - copy_tasks remove_tasks checker 及相关内容
Common 添加 get_alist_client  clear_path clear_cache
job_copy.py 完成
重构run_copy
更新测试
  • Loading branch information
lee-cq committed Dec 16, 2023
1 parent 4debc92 commit 25b1371
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 269 deletions.
127 changes: 73 additions & 54 deletions alist_sync/alist_client.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,66 @@
import asyncio
import builtins
import logging
import time
from functools import lru_cache
from typing import Literal

from alist_sdk import AsyncClient as _AsyncClient, Task

from alist_sync.common import async_all_task_names
from alist_sync.common import get_alist_client

logger = logging.getLogger("alist-sync.client")

__all__ = ["AlistClient"]


class TaskStat:
copy_task_done = 0, []
copy_task_undone = 0, []
CopyStatusModify = Literal[
"init",
"created",
"waiting",
"getting src object",
"",
"running",
"success",
"failed",
"checked_done",
]


async def get_status(task_name, client: 'AlistClient' = None) -> tuple[CopyStatusModify, int | float]:
"""获取任务状态
:param client: AlistClient
:param task_name: 任务名称
:return: 任务状态(状态名称,状态进展)
"""
client = client or get_alist_client()
_task_done, _task_undone = await asyncio.gather(
client.cached_task_done('copy', dict),
client.cached_task_undone('copy', dict),
)
if task_name in _task_done and task_name not in _task_undone:
return 'success', 100
elif task_name in _task_undone:
return _task_undone[task_name].status, _task_undone[task_name].progress
else:
raise ValueError(f"任务不存在: {task_name}")


class AlistClient(_AsyncClient):

def __init__(self, base_url, max_connect=30, *,
token=None, username=None, password=None, has_opt=False, **kwargs):
super().__init__(
base_url, token, username=username,
password=password, has_opt=has_opt, **kwargs)
base_url,
token,
username=username,
password=password,
has_opt=has_opt,
**kwargs)

self._max_connect = asyncio.Semaphore(max_connect)
self.__closed = False
setattr(builtins, 'alist_client', self)

def close(self):
self.__closed = True
Expand All @@ -37,50 +73,33 @@ async def request(self, *args, **kwargs):
async with self._max_connect:
return await super().request(*args, **kwargs)

async def _cache(self, attr_name, task_type):
wait_time = 0.01
while True:
await asyncio.sleep(wait_time)
if self.__closed:
break
res = await self.__getattribute__(attr_name)(task_type)
if res.code == 200:
setattr(
TaskStat, f"{task_type}_{attr_name}",
(
int(time.time()),
res.data or []
)
)
wait_time = 5
else:
wait_time = 0.1

@property
def cached_copy_task_done(self) -> tuple[int, list[Task]]:
"""
:return 更新时间, {task_name, Task, ...}
"""
if "cached_copy_task_done" not in async_all_task_names():
asyncio.create_task(
self._cache('task_done', 'copy'),
name='cached_copy_task_done'
)
logger.info("cached_copy_task_done 已经创建 ")

return TaskStat.copy_task_done

@property
def cached_copy_task_undone(self) -> tuple[int, list[Task]]:
"""
:return 更新时间, {task_name, Task, ...}
"""
if "cached_copy_task_undone" not in async_all_task_names():
asyncio.create_task(
self._cache('task_undone', 'copy'),
name='cached_copy_task_undone'
)
logger.info("cached_copy_task_undone 已经创建 ")
return getattr(TaskStat, f"copy_task_undone",)
@staticmethod
def __task_rtype(rtype, data):
if rtype == list:
return data or []
return {i.name: i for i in data or []}

@lru_cache(maxsize=1)
async def cached_task_undone(
self,
task_type,
rtype: object = list,
timestamp=0
) -> tuple[int, list[Task] | dict[str, Task]]:
"""获取已完成的任务"""
res = await self.task_done(task_type)
if res.code == 200:
return timestamp * 5, self.__task_rtype(rtype, res.data)
raise ValueError(f"获取已完成的任务失败: {res.code = } {res.message = }")

@lru_cache(maxsize=1)
async def cached_task_undone(
self,
task_type,
rtype: object = list,
timestamp=0) -> tuple[int, list[Task] | dict[str, Task]]:
"""获取未完成的任务"""
res = await self.task_undone(task_type)
if res.code == 200:
return timestamp * 5, self.__task_rtype(rtype, res.data)
raise ValueError(f"获取未完成的任务失败: {res.code = } {res.message = }")
9 changes: 2 additions & 7 deletions alist_sync/base_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from alist_sync.alist_client import AlistClient
from alist_sync.config import cache_dir
from alist_sync.models import SyncJob, AlistServer, Checker
from alist_sync.models import SyncJob, AlistServer
from alist_sync.checker import Checker
from alist_sync.scan_dir import scan_dir
from alist_sync.common import sha1_6, is_task_all_success, timeout_input

Expand Down Expand Up @@ -56,9 +57,6 @@ def clear_cache(self, force=False):
"""清除缓存"""
if force:
self.sync_task_cache_file.unlink(missing_ok=True)
if all((is_task_all_success(self.sync_job.copy_tasks),
is_task_all_success(self.sync_job.remove_tasks)
)):
self.sync_task_cache_file.unlink(missing_ok=True)

async def create_storages(self, storages):
Expand Down Expand Up @@ -105,9 +103,6 @@ async def async_run(self):
if not self.sync_job.sync_dirs.values():
await self.scans()
self.save_to_cache()
self.sync_job.checker = Checker.checker(
*self.sync_job.sync_dirs.values()
)
else:
logger.info(f"一件从缓存中找到 %d 个 SyncDir",
len(self.sync_job.sync_dirs))
4 changes: 2 additions & 2 deletions alist_sync/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pydantic import BaseModel

from alist_sdk import Item
from alist_sync.models import SyncDir


class Checker(BaseModel):
Expand All @@ -11,7 +12,7 @@ class Checker(BaseModel):
cols: list[PurePosixPath]

@classmethod
def checker(cls, *scanned_dirs):
def checker(cls, *scanned_dirs: SyncDir):
_result = {}
for scanned_dir in scanned_dirs:
for item in scanned_dir.items:
Expand Down Expand Up @@ -55,7 +56,6 @@ def model_dump_table(self):
if __name__ == '__main__':
import json
from pathlib import Path
from alist_sync.models import SyncDir

checker = Checker.checker(*[SyncDir(**s) for s in json.load(
Path(__file__).parent.parent.joinpath('tests/resource/SyncDirs-m.json').open())
Expand Down
44 changes: 41 additions & 3 deletions alist_sync/common.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
import asyncio
import hashlib

import builtins
import logging
import selectors
import sys
from pathlib import Path

from alist_sync.config import cache_dir

logger = logging.getLogger("alist-sync.common")

__all__ = [
"sha1", "sha1_6", "async_all_task_names", "is_task_all_success", "timeout_input"
"get_alist_client",
"sha1",
"sha1_6",

"async_all_task_names",
"is_task_all_success",
"timeout_input",

"clear_cache",
"clear_path"
]


# noinspection PyUnresolvedReferences
def get_alist_client() -> 'AlistClient':
"""获取AlistClient"""
if not hasattr(builtins, 'alist_client'):
raise ValueError("AlistClient未初始化")
return getattr(builtins, 'alist_client', None)


def clear_path(path: Path):
"""清除缓存"""
for i in path.iterdir():
if i.is_file():
i.unlink()
elif i.is_dir():
clear_path(i)
i.rmdir()


def clear_cache():
"""清除缓存"""
clear_path(cache_dir)


def sha1(s):
return hashlib.sha1(str(s).encode()).hexdigest()

Expand Down Expand Up @@ -52,8 +87,11 @@ def timeout_input(msg, default, timeout=3):
if __name__ == "__main__":
from pydantic import BaseModel


class Task(BaseModel):
status: str = "init"


print(is_task_all_success(
[Task(status='success'), Task(status='running'),]
[Task(status='success'), Task(status='running'), ]
))
73 changes: 63 additions & 10 deletions alist_sync/job_copy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import logging
from pathlib import Path, PurePosixPath
from typing import Literal, Optional
from pydantic import BaseModel, computed_field

from alist_sync.alist_client import AlistClient
from alist_sync.alist_client import AlistClient, get_status
from alist_sync.checker import Checker
from alist_sync.common import get_alist_client
from alist_sync.config import cache_dir

logger = logging.getLogger("alist-sync.job_copy")

CopyStatusModify = Literal[
"init",
"created",
"waiting",
"getting src object",
"",
"running",
"success"
"success",
"failed",
"checked_done",
]


Expand Down Expand Up @@ -42,26 +48,61 @@ def name(self) -> str:

return f"copy [{source_provider}](/{source_path}) to [{target_provider}](/{_t})"

async def check_status(self, client: AlistClient, ):
async def create(self, client: AlistClient = None):
"""创建复制任务"""
client = client or get_alist_client()
if self.status != 'init':
raise ValueError(f"任务状态错误: {self.status}")
_res = await client.copy(
name=self.name,
source=self.copy_source,
target=self.copy_target,
)
if _res.code == 200:
self.status = 'created'

async def recheck_done(self, client: AlistClient = None) -> bool:
"""检查任务复制到文件是否已经存在与目标位置"""
client = client or get_alist_client()
_res = await client.get_item_info(str(self.copy_target.joinpath(self.copy_name)))
if _res.code == 200 and _res.data.name == self.copy_name:
self.status = 'checked_done'
return True
else:
logger.error(f"复查任务失败: [{_res.code}]{_res.message}: {self.name}")
self.status = 'failed'
return False

async def check_status(self, client: AlistClient = None):
"""异步运行类 - 检查该Task的状态
在异步循环中反复检查
1. 在alist中创建 复制任务 :: init -> created
2. 检查复制任务已经存在于 undone_list :: created -> running
3. 检查任务已经失败 (重试3次) :: running -> failed
1. 在alist中创建 复制任务 :: init -> created
2. 检查复制任务已经存在于 undone_list :: created -> running
3. 检查任务已经失败 (重试3次) :: running -> failed
4. 检查任务已经完成 :: running -> success
5. 复查任务是否已经完成 :: success -> checked_done
"""
if self.name in client.cached_copy_task_undone[-1]:
self.status = 'created'
if self.name in client.cached_copy_task_done[-1]:
self.status = 'success'
client = client or get_alist_client()

if self.status == 'init':
return self.create(client)
elif self.status == 'checked_done':
return True
elif self.status == 'success':
return await self.recheck_done(client)

_status, _p = await get_status(self.name)
self.status = _status
return False


class CopyJob(BaseModel):
"""复制工作 - 从Checker中找出需要被复制的任务并创建"""
# tasks: task_name -> task
tasks: dict[str, CopyTask]
done_tasks: dict[str, CopyTask] = {}

@classmethod
def from_json_file(cls, file: Path):
Expand Down Expand Up @@ -116,3 +157,15 @@ def _n_n(sps, tps):
self = cls(tasks=_tasks)
self.save_to_cache()
return cls(tasks=_tasks)

async def start(self, client: AlistClient = None):
"""开始复制任务"""
client = client or get_alist_client()
while self.tasks:
for t_name in [k for k in self.tasks.keys()]:
task = self.tasks[t_name]
if await task.check_status(client=client):
self.done_tasks[task.name] = task
self.tasks.pop(task.name)
continue
self.save_to_cache()
1 change: 0 additions & 1 deletion alist_sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class SyncJob(BaseModel):
"""同步任务"""
alist_info: AlistServer # Alist Info
sync_dirs: dict[str, SyncDir] = {} # 同步目录
checker: Optional['Checker'] = None


class Config(BaseModel):
Expand Down
Loading

0 comments on commit 25b1371

Please sign in to comment.