forked from xai-org/x-algorithm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathengine.py
More file actions
137 lines (123 loc) · 5.02 KB
/
Copy pathengine.py
File metadata and controls
137 lines (123 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import time
import asyncio
import logging
import traceback
import multiprocessing
from queue import Empty, Queue
from threading import Event
from multiprocessing import Process
from monitor.logging import Logging
from monitor.metrics import Metrics
from grox.config.config import grox_config
from grox.schedules.init import init_proc
from grox.schedules.types import TaskResult, TaskPayload
from grox.plans.plan_master import PlanMaster
from grox.schedules.context import ScheduleContext
from grox.data_loaders.media_processor import MediaProcessor
from grox.data_loaders.asr_processor import ASRProcessor
logger = logging.getLogger(__name__)
class Engine:
def __init__(self, context: ScheduleContext):
self.config = grox_config.engine
self.context = context
self._task_queue: Queue[TaskPayload] = self.context["task_queue"]
self._resp_queue: Queue[TaskResult] = self.context["resp_queue"]
self._shutdown_event: Event = self.context["shutdown_event"]
self._process = None
def _is_shutdown(self) -> bool:
try:
return self._shutdown_event.is_set()
except BrokenPipeError:
logger.error("Broken pipe error, assuming shutdown")
return True
except Exception:
logger.error(
f"Error checking shutdown event, assuming shutdown: {traceback.format_exc()}"
)
return True
async def _init_run(self):
await init_proc("engine")
MediaProcessor.start()
ASRProcessor.start()
async def _process_task(self, task: TaskPayload) -> TaskResult:
logger.debug(f"engine started processing task")
start = time.perf_counter()
res = await PlanMaster.exec(task)
end = time.perf_counter()
logger.debug(f"engine finished processing task in {end - start:.2f} seconds")
Metrics.histogram("engine.task.processing_time").record(end - start)
return res
async def _run_task(self, task: TaskPayload):
start = time.perf_counter()
with Metrics.tracer("engine").start_as_current_span("task.root"):
Logging.set_context(task=task.payload_id)
if task.post:
Logging.set_context(post=task.post.id)
if task.user:
Logging.set_context(user=task.user.id)
if task.user_context:
Logging.set_context(user=task.user_context.user.id)
try:
res = await self._process_task(task)
self._resp_queue.put(res)
Metrics.counter("engine.task.success.count").add(1)
except Exception as e:
logger.error(f"failed to process task, error: {traceback.format_exc()}")
self._resp_queue.put(
TaskResult(
task=task,
success=False,
error=str(e),
task_finished_at=start,
task_started_at=time.perf_counter(),
)
)
Metrics.counter("engine.task.failed.count").add(1)
async def _poll_task(self) -> TaskPayload | None:
logger.debug(f"engine polling task, queue size: {self._task_queue.qsize()}")
try:
task = self._task_queue.get(block=False)
logger.debug(f"engine received task: {task.payload_id}")
Metrics.counter("engine.task.received.count").add(1)
return task
except Empty:
logger.debug("engine polling task returned None")
return None
except BrokenPipeError:
logger.error("Broken pipe error, shutting down")
return None
except Exception:
logger.error(f"failed to poll task: {traceback.format_exc()}")
return None
async def _run(self, started_event: Event):
await self._init_run()
started_event.set()
while not self._is_shutdown() or not self._task_queue.empty():
task = await self._poll_task()
if task is None:
await asyncio.sleep(0.1)
continue
asyncio.create_task(self._run_task(task))
logger.warning("engine stopped")
def run(self, started_event: Event):
asyncio.run(self._run(started_event))
os._exit(0)
async def start(self):
logger.info("Starting Grox engine...")
started_event = multiprocessing.Event()
self._process = Process(
target=self.run, args=(started_event,), name="grox-engine"
)
self._process.start()
started_event.wait()
logger.info("Grox engine started")
async def stop(self):
logger.warning("Stopping Grox engine...")
if self._process and self._process.is_alive():
self._process.join(self.config.graceful_shutdown_timeout)
else:
logger.warning("Engine process is not alive, skipping join")
await MediaProcessor.stop()
await ASRProcessor.stop()
logger.warning("Engine stopped")