Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: split and run grpc_server, ota_core and otaclient main as separated processes #431

Merged
merged 121 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
7c5e02d
otaclient._utils
Bodong-Yang Nov 22, 2024
141dba9
add otaclient_common.shm_status module
Bodong-Yang Nov 22, 2024
8617f5f
utils: add SharedOTAClientStatusWriter/Reader types
Bodong-Yang Nov 22, 2024
9564b52
ecu_tracker: use new shm_status
Bodong-Yang Nov 22, 2024
1ffbf7d
_status_monitor: integrate shm
Bodong-Yang Nov 22, 2024
dfb231f
ecu_tracker: cleanup at exit
Bodong-Yang Nov 22, 2024
82f945c
ota_core: control_flag now becomes mp_sync.Event, cleanup unused code
Bodong-Yang Nov 22, 2024
3a518b2
ota_core: use session_id from request
Bodong-Yang Nov 22, 2024
c83d17c
move gen_session_id to otaclient._utils
Bodong-Yang Nov 22, 2024
19e4e34
_types: add types for IPC
Bodong-Yang Nov 22, 2024
4d9b43d
api_v2.servicer: integrate IPC
Bodong-Yang Nov 22, 2024
747d247
minor fix
Bodong-Yang Nov 22, 2024
89e3ca0
types: cleanup unused
Bodong-Yang Nov 22, 2024
3ae0f4b
ota_core: implement IPC interface
Bodong-Yang Nov 22, 2024
8448628
minor update
Bodong-Yang Nov 22, 2024
e473dcc
api_v2.servicer: nolonger in charge of launching/shutting down otaproxy
Bodong-Yang Nov 22, 2024
dec832a
api_v2.ecu_status: expose any_requires_network and all_ecus_succeeded…
Bodong-Yang Nov 22, 2024
72cd46c
WIP: main
Bodong-Yang Nov 22, 2024
2009c09
re-implement _otaproxy_ctx, move it to otaclient package
Bodong-Yang Nov 22, 2024
544e356
_otaproxy_ctx: minor cleanup
Bodong-Yang Nov 22, 2024
a26a4e8
WIP: main
Bodong-Yang Nov 22, 2024
467de3f
implement grpc.api_v2.main
Bodong-Yang Nov 24, 2024
d6df546
move otaproxy control thread logic into otaproxy_ctx
Bodong-Yang Nov 24, 2024
256d822
move ota_core_process into ota_core module
Bodong-Yang Nov 24, 2024
bd7a183
otaproxy_ctx: simplify the implementation of otaproxy control
Bodong-Yang Nov 24, 2024
4149c39
finish up main
Bodong-Yang Nov 24, 2024
772c5ba
fix main
Bodong-Yang Nov 24, 2024
57d5947
status_monitor: not unlink the shm
Bodong-Yang Nov 24, 2024
3575406
api_v2.ecu_tracker: actively polling until we get the first valid res…
Bodong-Yang Nov 24, 2024
712b441
ecu_tracker: minor cleanup
Bodong-Yang Nov 24, 2024
83c4e49
status_monitor: minor cleanup
Bodong-Yang Nov 24, 2024
afc8ef5
status_monitor: minor cleanup
Bodong-Yang Nov 24, 2024
5ed8128
main: refine
Bodong-Yang Nov 24, 2024
a9e1770
add logging for grpc server startup
Bodong-Yang Nov 24, 2024
9b50fa9
do not use logger in signal handler
Bodong-Yang Nov 24, 2024
cfee25c
ota_core: use two channels for req and resp
Bodong-Yang Nov 24, 2024
445d38e
servicer: use two channels for req and resp
Bodong-Yang Nov 24, 2024
42a7cb2
finish up channel split
Bodong-Yang Nov 24, 2024
194c6ca
do not wait otaproxy at OTAUpdater.__init__ method
Bodong-Yang Nov 24, 2024
968afb6
ota_core: use thread for OTA operation executing
Bodong-Yang Nov 24, 2024
fef3a9c
minor fix
Bodong-Yang Nov 24, 2024
d265184
fix status_monitor
Bodong-Yang Nov 24, 2024
809311a
status_monitor: increase minimum shm write interval
Bodong-Yang Nov 24, 2024
b983b91
Squashed commit of the following:
Bodong-Yang Nov 24, 2024
03e5c93
main._on_shutdown: add sys_exit arg, if this func is called by atexit…
Bodong-Yang Nov 24, 2024
67462c2
split ensure_* series helpers into otaclient_common.cmdhelper module
Bodong-Yang Nov 25, 2024
21f47cf
remove unused test_subprocess_launch_otaproxy
Bodong-Yang Nov 25, 2024
667931f
move some otaproxy related settings to otaproxy.config
Bodong-Yang Nov 25, 2024
0e94cdc
minor fix to cmdhelper.ensure_mointpoint
Bodong-Yang Nov 25, 2024
8da3a33
otaproxy: implement external cache helper module
Bodong-Yang Nov 25, 2024
02a105c
OTACache now takes external_cache_mnt_point instead
Bodong-Yang Nov 25, 2024
681e02f
otaproxy.__main__: now otaproxy CLI will try to mount external cache …
Bodong-Yang Nov 25, 2024
5cbc91e
otaproxy: now run_otaproxy will mount/umount external cache dev
Bodong-Yang Nov 25, 2024
f602e25
_otaproxy_ctx: integrate
Bodong-Yang Nov 25, 2024
0ee2621
minor fix
Bodong-Yang Nov 25, 2024
a18db9e
minor fix
Bodong-Yang Nov 25, 2024
52cb243
minor fix
Bodong-Yang Nov 25, 2024
31b3169
slot_mnt_helper: register atexit hooks for umounting mounted devs
Bodong-Yang Nov 25, 2024
76b7ee2
slot_mnt_helper: keep the atexit hooks all the time
Bodong-Yang Nov 25, 2024
e31859e
revert src/ota_proxy to main branch's status for merge
Bodong-Yang Nov 27, 2024
a9bf9e5
revert src/otaclient_common/cmdhelper.py to main branch's status for …
Bodong-Yang Nov 27, 2024
95f43f9
Merge remote-tracking branch 'origin/main' into feat/otaclient_split_…
Bodong-Yang Nov 27, 2024
4648dc7
do not install SIGINT handler; SIGNTERM handler now only raises a Sys…
Bodong-Yang Nov 27, 2024
8275cf9
ota_core: do not replace the original SIGINT handler
Bodong-Yang Nov 27, 2024
fbb1272
signame -> signal_value
Bodong-Yang Nov 27, 2024
6934021
ota_core: not install SIGTERM handler for now
Bodong-Yang Nov 27, 2024
66ae925
main: call on_shutdown on sigint and sigterm
Bodong-Yang Nov 27, 2024
732b5e1
main: still use sys.exit
Bodong-Yang Nov 27, 2024
7f1c7e5
Revert "ota_core: not install SIGTERM handler for now"
Bodong-Yang Nov 27, 2024
187d8dc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 27, 2024
f1c187a
ready for merging main branch's shm_status
Bodong-Yang Nov 28, 2024
108d6e1
Merge remote-tracking branch 'origin/main' into feat/otaclient_split_…
Bodong-Yang Nov 28, 2024
5645585
_types: add MultipleECUStatusFlags
Bodong-Yang Nov 28, 2024
17a58c8
ecu_status: use MultipleECUStatusFlags
Bodong-Yang Nov 28, 2024
95e300b
otaproxy_ctx: use MultipleECUStatusFlags
Bodong-Yang Nov 28, 2024
d9d791e
api_v2.servicer: not manage otaclient control flags here
Bodong-Yang Nov 28, 2024
a0cee32
api_v2.main: cleanup accordingly
Bodong-Yang Nov 28, 2024
d45dd27
main: finish up integration
Bodong-Yang Nov 28, 2024
674ea3e
utils.wait_and_log: take a func that returns bool
Bodong-Yang Nov 28, 2024
f1be14b
ota_core: use MultipleECUStatusFlags instead
Bodong-Yang Nov 28, 2024
aaa1984
fix up test_utils
Bodong-Yang Nov 28, 2024
c2d8f5e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 28, 2024
6339bd3
Merge branch 'main' into feat/otaclient_split_processes
Bodong-Yang Nov 29, 2024
c075d90
status_monitor: reduce min collect interval to 0.5
Bodong-Yang Nov 29, 2024
77c4e94
grpc.api_v2.ECUTracker: reduce local active poll interval on startup …
Bodong-Yang Nov 29, 2024
1bd6c4d
Merge branch 'main' into feat/otaclient_split_processes
Bodong-Yang Nov 29, 2024
2b72d57
squashed merge from fix/ota_image_invalid_handling
Bodong-Yang Nov 30, 2024
ec8bd67
Merge remote-tracking branch 'origin/main' into feat/otaclient_split_…
Bodong-Yang Dec 2, 2024
e3be330
Merge remote-tracking branch 'origin/main' into feat/otaclient_split_…
Bodong-Yang Dec 5, 2024
b771c3f
ecu_tracker: log errors from failed local ecu status query
Bodong-Yang Dec 5, 2024
85de82e
_status_monitor: still record the exception during shm write with bur…
Bodong-Yang Dec 5, 2024
6e30cbb
minor update
Bodong-Yang Dec 5, 2024
d7cf32e
api_v2.servicer: use threadpool to execute local update/rollback
Bodong-Yang Dec 5, 2024
c2322a0
api_v2: use thread pool for blocking operations
Bodong-Yang Dec 5, 2024
a9e1293
minor update
Bodong-Yang Dec 5, 2024
9dcf67a
minor update
Bodong-Yang Dec 5, 2024
d85823f
minor update
Bodong-Yang Dec 5, 2024
9a61c79
limit the failure_traceback field's length
Bodong-Yang Dec 5, 2024
4de3d8e
conftest: fix up ota_status_collector
Bodong-Yang Dec 5, 2024
dd51d91
fix up test_status_monitor
Bodong-Yang Dec 5, 2024
36170db
fix up test_create_standby
Bodong-Yang Dec 5, 2024
d9d5fb2
remove test_main for now
Bodong-Yang Dec 5, 2024
137f952
fix up test_ota_core
Bodong-Yang Dec 5, 2024
422ec33
fix up test_ecu_status
Bodong-Yang Dec 5, 2024
0f330d7
fix up test_ecu_status
Bodong-Yang Dec 5, 2024
2d44b6c
ota_core: minor fix, now handler do the live_ota_status change, inste…
Bodong-Yang Dec 5, 2024
04f497a
ota_core: increase the minimum request interval to 16 seconds
Bodong-Yang Dec 5, 2024
17e7109
fix up test_ota_core.py
Bodong-Yang Dec 5, 2024
420557f
minor update
Bodong-Yang Dec 5, 2024
8b30cd7
minor fix
Bodong-Yang Dec 5, 2024
1488414
minor fix
Bodong-Yang Dec 5, 2024
25688e1
temporary remove test_servicer as api_v2.servicer module changes a lo…
Bodong-Yang Dec 5, 2024
0ebd9c6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 5, 2024
e46c966
Merge branch 'main' into feat/otaclient_split_processes
Bodong-Yang Dec 9, 2024
6868bd4
otaclient._utils: minor update to check_other_otaclient
Bodong-Yang Dec 9, 2024
d094d9f
minor update to _utils
Bodong-Yang Dec 9, 2024
96353cd
no need to write test code for main module
Bodong-Yang Dec 9, 2024
209a302
add some comments
Bodong-Yang Dec 9, 2024
e3b21f6
minor update
Bodong-Yang Dec 9, 2024
22c5d3e
_otaproxy_ctx: add a _global_shutdown flag
Bodong-Yang Dec 9, 2024
7ce694e
add some comments in the code
Bodong-Yang Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions src/otaclient/_otaproxy_ctx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Control of the otaproxy server startup/shutdown.

The API exposed by this module is meant to be controlled by otaproxy managing thread only.
"""


from __future__ import annotations

import asyncio
import atexit
import logging
import multiprocessing as mp
import multiprocessing.context as mp_ctx
import shutil
import time
from functools import partial
from pathlib import Path

from ota_proxy import config as local_otaproxy_cfg
from ota_proxy import run_otaproxy
from ota_proxy.config import config as otaproxy_cfg
from otaclient._types import MultipleECUStatusFlags
from otaclient.configs.cfg import cfg, proxy_info
from otaclient_common.common import ensure_otaproxy_start

logger = logging.getLogger(__name__)

_otaproxy_p: mp_ctx.SpawnProcess | None = None
_global_shutdown: bool = False


def shutdown_otaproxy_server() -> None:
global _otaproxy_p, _global_shutdown
_global_shutdown = True
if _otaproxy_p:
_otaproxy_p.terminate()
_otaproxy_p.join()
_otaproxy_p = None


OTAPROXY_CHECK_INTERVAL = 3
OTAPROXY_MIN_STARTUP_TIME = 120
"""Keep otaproxy running at least 60 seconds after startup."""
OTA_CACHE_DIR_CHECK_INTERVAL = 60


def otaproxy_process(*, init_cache: bool) -> None:
from otaclient._logging import configure_logging

configure_logging()
logger.info("otaproxy process started")

external_cache_mnt_point = None
if cfg.OTAPROXY_ENABLE_EXTERNAL_CACHE:
external_cache_mnt_point = cfg.EXTERNAL_CACHE_DEV_MOUNTPOINT

host, port = (
str(proxy_info.local_ota_proxy_listen_addr),
proxy_info.local_ota_proxy_listen_port,
)

upper_proxy = str(proxy_info.upper_ota_proxy or "")
logger.info(f"will launch otaproxy at http://{host}:{port}, with {upper_proxy=}")
if upper_proxy:
logger.info(f"wait for {upper_proxy=} online...")
ensure_otaproxy_start(str(upper_proxy))

asyncio.run(
run_otaproxy(
host=host,
port=port,
init_cache=init_cache,
cache_dir=local_otaproxy_cfg.BASE_DIR,
cache_db_f=local_otaproxy_cfg.DB_FILE,
upper_proxy=upper_proxy,
enable_cache=proxy_info.enable_local_ota_proxy_cache,
enable_https=proxy_info.gateway_otaproxy,
external_cache_mnt_point=external_cache_mnt_point,
)
)


def otaproxy_control_thread(
ecu_status_flags: MultipleECUStatusFlags,
) -> None: # pragma: no cover
atexit.register(shutdown_otaproxy_server)

_mp_ctx = mp.get_context("spawn")

ota_cache_dir = Path(otaproxy_cfg.BASE_DIR)
next_ota_cache_dir_checkpoint = 0

global _otaproxy_p
while not _global_shutdown:
time.sleep(OTAPROXY_CHECK_INTERVAL)
_now = time.time()

_otaproxy_running = _otaproxy_p and _otaproxy_p.is_alive()
_otaproxy_should_run = ecu_status_flags.any_requires_network.is_set()
_all_success = ecu_status_flags.all_success.is_set()

if not _otaproxy_should_run and not _otaproxy_running:
if (
_now > next_ota_cache_dir_checkpoint
and _all_success
and ota_cache_dir.is_dir()
):
logger.info(
"all tracked ECUs are in SUCCESS OTA status, cleanup ota cache dir ..."
)
next_ota_cache_dir_checkpoint = _now + OTA_CACHE_DIR_CHECK_INTERVAL
shutil.rmtree(ota_cache_dir, ignore_errors=True)

elif _otaproxy_should_run and not _otaproxy_running:
# NOTE: always try to re-use cache. If the cache dir is empty, otaproxy
# will still init the cache even init_cache is False.
_otaproxy_p = _mp_ctx.Process(
target=partial(otaproxy_process, init_cache=False),
name="otaproxy",
)
_otaproxy_p.start()
next_ota_cache_dir_checkpoint = _now + OTAPROXY_MIN_STARTUP_TIME
time.sleep(OTAPROXY_MIN_STARTUP_TIME) # prevent pre-mature shutdown

elif _otaproxy_p and _otaproxy_running and not _otaproxy_should_run:
logger.info("shutting down otaproxy as not needed now ...")
shutdown_otaproxy_server()
89 changes: 69 additions & 20 deletions src/otaclient/_status_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from dataclasses import asdict, dataclass
from enum import Enum, auto
from threading import Thread
from typing import Union, cast
from typing import Literal, Union, cast

from otaclient._types import (
FailureType,
Expand All @@ -34,17 +34,25 @@
UpdateProgress,
UpdateTiming,
)
from otaclient._utils import SharedOTAClientStatusWriter
from otaclient_common.logging import BurstSuppressFilter

logger = logging.getLogger(__name__)
burst_suppressed_logger = logging.getLogger(f"{__name__}.shm_push")
# NOTE: for request_error, only allow max 6 lines of logging per 30 seconds
burst_suppressed_logger.addFilter(
BurstSuppressFilter(
f"{__name__}.shm_push",
upper_logger_name=__name__,
burst_round_length=30,
burst_max=6,
)
)

_otaclient_shutdown = False
_status_report_queue: queue.Queue | None = None


def _global_shutdown():
global _otaclient_shutdown
_otaclient_shutdown = True

if _status_report_queue:
_status_report_queue.put_nowait(TERMINATE_SENTINEL)

Expand Down Expand Up @@ -120,7 +128,7 @@ class StatusReport:
#
def _on_session_finished(
status_storage: OTAClientStatus, payload: OTAStatusChangeReport
):
) -> Literal[True]:
status_storage.session_id = ""
status_storage.update_phase = UpdatePhase.INITIALIZING
status_storage.update_meta = UpdateMeta()
Expand All @@ -137,10 +145,12 @@ def _on_session_finished(
status_storage.failure_reason = ""
status_storage.failure_traceback = ""

return True


def _on_new_ota_session(
status_storage: OTAClientStatus, payload: OTAStatusChangeReport
):
) -> Literal[True]:
status_storage.ota_status = payload.new_ota_status
status_storage.update_phase = UpdatePhase.INITIALIZING
status_storage.update_meta = UpdateMeta()
Expand All @@ -149,6 +159,8 @@ def _on_new_ota_session(
status_storage.failure_type = FailureType.NO_FAILURE
status_storage.failure_reason = ""

return True


def _on_update_phase_changed(
status_storage: OTAClientStatus, payload: OTAUpdatePhaseChangeReport
Expand All @@ -157,7 +169,7 @@ def _on_update_phase_changed(
logger.warning(
"attempt to update update_timing when no OTA update session on-going"
)
return
return False

phase, trigger_timestamp = payload.new_update_phase, payload.trigger_timestamp
if phase == UpdatePhase.PROCESSING_POSTUPDATE:
Expand All @@ -170,14 +182,17 @@ def _on_update_phase_changed(
update_timing.update_apply_start_timestamp = trigger_timestamp

status_storage.update_phase = phase
return True


def _on_update_progress(status_storage: OTAClientStatus, payload: UpdateProgressReport):
def _on_update_progress(
status_storage: OTAClientStatus, payload: UpdateProgressReport
) -> bool:
if (update_progress := status_storage.update_progress) is None:
logger.warning(
"attempt to update update_progress when no OTA update session on-going"
)
return
return False

op = payload.operation
if (
Expand All @@ -195,6 +210,7 @@ def _on_update_progress(status_storage: OTAClientStatus, payload: UpdateProgress
update_progress.downloading_errors += payload.errors
elif op == UpdateProgressReport.Type.APPLY_REMOVE_DELTA:
update_progress.removed_files_num += payload.processed_file_num
return True


def _on_update_meta(status_storage: OTAClientStatus, payload: SetUpdateMetaReport):
Expand All @@ -204,7 +220,7 @@ def _on_update_meta(status_storage: OTAClientStatus, payload: SetUpdateMetaRepor
logger.warning(
"attempt to update update_meta when no OTA update session on-going"
)
return
return False

_input = asdict(payload)
for k, v in _input.items():
Expand All @@ -213,31 +229,45 @@ def _on_update_meta(status_storage: OTAClientStatus, payload: SetUpdateMetaRepor
continue
if v:
setattr(update_meta, k, v)
return True


#
# ------ status monitor implementation ------ #
#

# A sentinel object to tell the thread stop
TERMINATE_SENTINEL = cast(StatusReport, object())
MIN_COLLECT_INTERVAL = 0.5 # seconds
SHM_PUSH_INTERVAL = 0.5 # seconds


class OTAClientStatusCollector:
"""NOTE: status_monitor will only be started once during whole otaclient lifecycle!"""

def __init__(
self,
msg_queue: queue.Queue[StatusReport],
shm_status: SharedOTAClientStatusWriter,
*,
min_collect_interval: int = 1,
min_push_interval: int = 1,
min_collect_interval: float = MIN_COLLECT_INTERVAL,
shm_push_interval: float = SHM_PUSH_INTERVAL,
max_traceback_size: int,
) -> None:
self.max_traceback_size = max_traceback_size
self.min_collect_interval = min_collect_interval
self.min_push_interval = min_push_interval
self.shm_push_interval = shm_push_interval

self._input_queue = msg_queue
global _status_report_queue
_status_report_queue = msg_queue

self._status = None
self._shm_status = shm_status

atexit.register(shm_status.atexit)

def load_report(self, report: StatusReport):
def load_report(self, report: StatusReport) -> bool:
if self._status is None:
self._status = OTAClientStatus()
status_storage = self._status
Expand All @@ -246,37 +276,56 @@ def load_report(self, report: StatusReport):
# ------ update otaclient meta ------ #
if isinstance(payload, SetOTAClientMetaReport):
status_storage.firmware_version = payload.firmware_version
return True

# ------ on session start/end ------ #
if isinstance(payload, OTAStatusChangeReport):
if (_traceback := payload.failure_traceback) and len(
_traceback
) > self.max_traceback_size:
payload.failure_traceback = _traceback[-self.max_traceback_size :]

new_ota_status = payload.new_ota_status
if new_ota_status in [OTAStatus.UPDATING, OTAStatus.ROLLBACKING]:
status_storage.session_id = report.session_id
return _on_new_ota_session(status_storage, payload)

status_storage.session_id = "" # clear session if we are not in an OTA
return _on_session_finished(status_storage, payload)

# ------ during OTA session ------ #
report_session_id = report.session_id
if report_session_id != status_storage.session_id:
logger.warning(f"drop reports from mismatched session: {report}")
return # drop invalid report
logger.warning(
f"drop reports from mismatched session (expect {status_storage.session_id=}): {report}"
)
return False
if isinstance(payload, OTAUpdatePhaseChangeReport):
return _on_update_phase_changed(status_storage, payload)
if isinstance(payload, UpdateProgressReport):
return _on_update_progress(status_storage, payload)
if isinstance(payload, SetUpdateMetaReport):
return _on_update_meta(status_storage, payload)
return False

def _status_collector_thread(self) -> None:
"""Main entry of status monitor working thread."""
while not _otaclient_shutdown:
_next_shm_push = 0
while True:
_now = time.time()
try:
report = self._input_queue.get_nowait()
if report is TERMINATE_SENTINEL:
break
self.load_report(report)

# ------ push status on load_report ------ #
if self.load_report(report) and self._status and _now > _next_shm_push:
try:
self._shm_status.write_msg(self._status)
_next_shm_push = _now + self.shm_push_interval
except Exception as e:
burst_suppressed_logger.debug(
f"failed to push status to shm: {e!r}"
)
except queue.Empty:
time.sleep(self.min_collect_interval)

Expand Down
Loading
Loading