Skip to content

Commit 3c9e9b5

Browse files
authored
Merge pull request #187 from Maplemx/dev
v3.4.1.1
2 parents 6988ae8 + a0dd3a2 commit 3c9e9b5

File tree

13 files changed

+215
-57
lines changed

13 files changed

+215
-57
lines changed

Agently/Agent/Agent.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ def export(self):
172172
return self.register_agent_component(name, AttachedWorkflow)
173173

174174
async def start_async(self, request_type: str=None, *, return_generator:bool=False):
175+
if return_generator:
176+
return self.get_delta_generator()
175177
try:
176178
is_debug = self.settings.get_trace_back("is_debug")
177179
# Auto Save Agent runtime_ctx
@@ -265,12 +267,8 @@ async def handle_response(response):
265267

266268
await handle_response({ "event": "response:finally", "data": self.request.response_cache })
267269

268-
if return_generator:
269-
self.request_runtime_ctx.empty()
270-
return self.response_generator.start()
271-
else:
272-
self.request_runtime_ctx.empty()
273-
return self.request.response_cache["reply"]
270+
self.request_runtime_ctx.empty()
271+
return self.request.response_cache["reply"]
274272
except Exception as e:
275273
retry_time = self.settings.get_trace_back("request.retry_times")
276274
if not isinstance(retry_time, int):
@@ -288,6 +286,8 @@ async def handle_response(response):
288286
return await self.start_async(request_type, return_generator=return_generator)
289287

290288
def start(self, request_type: str=None, *, return_generator:bool=False):
289+
if return_generator:
290+
return self.get_delta_generator()
291291
reply_queue = queue.Queue()
292292
is_debug = self.settings.get_trace_back("is_debug")
293293
def start_in_theard():
@@ -307,15 +307,12 @@ def start_in_theard():
307307
loop.close()
308308
theard = threading.Thread(target=start_in_theard)
309309
theard.start()
310-
if return_generator:
311-
return self.response_generator.start()
312-
else:
313-
theard.join()
314-
try:
315-
reply = reply_queue.get_nowait()
316-
except:
317-
reply = None
318-
return reply
310+
theard.join()
311+
try:
312+
reply = reply_queue.get_nowait()
313+
except:
314+
reply = None
315+
return reply
319316

320317
def start_websocket_server(self, port:int=15365):
321318
is_debug = self.settings.get_trace_back("is_debug")

Agently/Stage/EventEmitter.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from .Stage import Stage
2+
3+
class EventEmitter:
4+
def __init__(self, private_max_workers=None, max_concurrent_tasks=None, on_error=None):
5+
self._listeners = {}
6+
self._once = {}
7+
self._stage = Stage(
8+
private_max_workers=private_max_workers,
9+
max_concurrent_tasks=max_concurrent_tasks,
10+
on_error=on_error,
11+
is_daemon=True,
12+
)
13+
14+
def add_listener(self, event, listener):
15+
if event not in self._listeners:
16+
self._listeners.update({ event: [] })
17+
if listener not in self._listeners[event]:
18+
self._listeners[event].append(listener)
19+
return listener
20+
21+
def remove_listener(self, event, listener):
22+
if event in self._listeners and listener in self._listeners[event]:
23+
self._listeners[event].remove(listener)
24+
25+
def remove_all_listeners(self, event_list):
26+
if isinstance(event_list, str):
27+
event_list = [event_list]
28+
for event in event_list:
29+
self._listeners.update({ event: [] })
30+
31+
def on(self, event, listener):
32+
return self.add_listener(event, listener)
33+
34+
def off(self, event, listener):
35+
return self.remove_listener(event, listener)
36+
37+
def once(self, event, listener):
38+
if event not in self._once:
39+
self._once.update({ event: [] })
40+
if listener not in self._listeners[event] and listener not in self._once[event]:
41+
self._once[event].append(listener)
42+
return listener
43+
44+
def listener_count(self, event):
45+
return len(self._listeners[event]) + len(self._once[event])
46+
47+
def emit(self, event, *args, **kwargs):
48+
listeners_to_execute = []
49+
if event in self._listeners:
50+
for listener in self._listeners[event]:
51+
listeners_to_execute.append((listener, args, kwargs))
52+
if event in self._once:
53+
for listener in self._once[event]:
54+
listeners_to_execute.append((listener, args, kwargs))
55+
self._once.update({ event: [] })
56+
for listener, args, kwargs in listeners_to_execute:
57+
self._stage.go(listener, *args, **kwargs)
58+
return len(listeners_to_execute)

Agently/Stage/MessageCenter.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import uuid
2+
from .Stage import Stage
3+
4+
class MessageCenter:
5+
def __init__(self, private_max_workers=None, max_concurrent_tasks=None, on_error=None):
6+
self._stage = Stage(
7+
private_max_workers=private_max_workers,
8+
max_concurrent_tasks=max_concurrent_tasks,
9+
on_error=on_error,
10+
is_daemon=True
11+
)
12+
self._consumers = {}
13+
14+
def register_consumer(self, handler):
15+
consumer_id = uuid.uuid4()
16+
self._consumers.update({ consumer_id: handler })
17+
return consumer_id
18+
19+
def remove_consumer(self, consumer_id):
20+
del self._consumers[consumer_id]
21+
22+
def put(self, data):
23+
for _, consumer_handler in self._consumers.items():
24+
self._stage.go(consumer_handler, data)
25+
26+
def close(self):
27+
self._stage.close()

Agently/utils/Stage/Stage.py renamed to Agently/Stage/Stage.py

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import atexit
12
import inspect
23
import threading
34
import asyncio
@@ -7,42 +8,92 @@
78
from .StageFunction import StageFunctionMixin
89

910
class BaseStage:
10-
def __init__(self, max_workers=5, max_concurrent_tasks=None, on_error=None, close_when_exception=False):
11-
self._max_workers = max_workers
11+
_global_executor = None
12+
_global_max_workers = 5
13+
_executor_lock = threading.RLock()
14+
15+
@staticmethod
16+
def _get_global_executor():
17+
if BaseStage._global_executor is None:
18+
with BaseStage._executor_lock:
19+
BaseStage._global_executor = ThreadPoolExecutor(max_workers=BaseStage._global_max_workers)
20+
return BaseStage._global_executor
21+
22+
@staticmethod
23+
def set_global_max_workers(global_max_workers):
24+
BaseStage._global_max_workers = global_max_workers
25+
with BaseStage._executor_lock:
26+
if BaseStage._global_executor:
27+
BaseStage._global_executor.shutdown(wait=True)
28+
BaseStage._global_executor = ThreadPoolExecutor(max_workers=BaseStage._global_max_workers)
29+
atexit.register(BaseStage._global_executor.shutdown)
30+
31+
def __init__(self, private_max_workers=5, max_concurrent_tasks=None, on_error=None, is_daemon=False):
32+
self._private_max_workers = private_max_workers
1233
self._max_concurrent_tasks = max_concurrent_tasks
1334
self._on_error = on_error
35+
self._is_daemon = is_daemon
1436
self._semaphore = None
1537
self._loop_thread = None
1638
self._loop = None
17-
self._executor = None
18-
self._responses = []
19-
#self._initialize()
20-
21-
def _initialize(self):
39+
self._current_executor = None
2240
self._loop_ready = threading.Event()
23-
self._loop_thread = threading.Thread(target=self._start_loop)
24-
self._loop_thread.start()
25-
self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
26-
self._loop_ready.wait()
27-
del self._loop_ready
41+
self._responses = set()
42+
self._closed = False
43+
if self._is_daemon:
44+
atexit.register(self.close)
45+
self._initialize()
46+
47+
def __enter__(self):
48+
self._initialize()
49+
return self
50+
51+
def __exit__(self, type, value, traceback):
52+
self.close()
53+
if type is not None and self._on_error is not None:
54+
self._on_error(value)
55+
return False
2856

29-
def _loop_exception_handler(self, loop, context):
30-
if self._on_error is not None:
31-
loop.call_soon_threadsafe(self._on_error, context["exception"])
57+
@property
58+
def _executor(self):
59+
if self._current_executor is not None:
60+
return self._current_executor
61+
if self._private_max_workers:
62+
self._current_executor = ThreadPoolExecutor(max_workers=self._private_max_workers)
63+
return self._current_executor
3264
else:
33-
raise context["exception"]
65+
self._current_executor = BaseStage._get_global_executor()
66+
return self._current_executor
3467

68+
def _initialize(self):
69+
self._closed = False
70+
if (
71+
not self._loop_thread
72+
or not self._loop_thread.is_alive()
73+
or not self._loop
74+
or not self._loop.is_running()
75+
):
76+
self._loop_thread = threading.Thread(target=self._start_loop, daemon=self._is_daemon)
77+
self._loop_thread.start()
78+
self._loop_ready.wait()
79+
3580
def _start_loop(self):
3681
self._loop = asyncio.new_event_loop()
3782
self._loop.set_exception_handler(self._loop_exception_handler)
38-
asyncio.set_event_loop(self._loop)
3983
if self._max_concurrent_tasks:
4084
self._semaphore = asyncio.Semaphore(self._max_concurrent_tasks)
41-
self._loop_ready.set()
85+
asyncio.set_event_loop(self._loop)
86+
self._loop.call_soon(lambda: self._loop_ready.set())
4287
self._loop.run_forever()
4388

89+
def _loop_exception_handler(self, loop, context):
90+
if self._on_error is not None:
91+
loop.call_soon_threadsafe(self._on_error, context["exception"])
92+
else:
93+
raise context["exception"]
94+
4495
def go(self, task, *args, on_success=None, on_error=None, lazy=False, async_gen_interval=0.1, **kwargs):
45-
if not self._executor or not self._loop or not self._loop.is_running():
96+
if not self._loop or self._loop.is_running():
4697
self._initialize()
4798
response_kwargs = {
4899
"on_success": on_success,
@@ -93,7 +144,7 @@ async def async_generator():
93144
elif inspect.isfunction(task) or inspect.ismethod(task):
94145
return StageResponse(self, self._loop.run_in_executor(self._executor, lambda: task(*args, **kwargs)), **response_kwargs)
95146
else:
96-
return task
147+
raise TypeError(f"Task seems like a value or an executed function not an executable task: { task }")
97148

98149
def go_all(self, *task_list):
99150
response_list = []
@@ -134,7 +185,11 @@ def on_error(self, handler):
134185
self._on_error = handler
135186

136187
def close(self):
137-
for response in self._responses:
188+
if self._closed:
189+
return
190+
self._closed = True
191+
192+
for response in self._responses.copy():
138193
response._result_ready.wait()
139194

140195
if self._loop and self._loop.is_running():
@@ -143,15 +198,15 @@ def close(self):
143198
pending = asyncio.all_tasks(self._loop)
144199
if pending:
145200
self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
201+
if self._private_max_workers and self._current_executor is not None:
202+
self._current_executor.shutdown(wait=True)
203+
self._current_executor = None
146204
if self._loop_thread and self._loop_thread.is_alive():
147205
self._loop_thread.join()
148206
self._loop_thread = None
149-
if self._loop and not self._loop.is_closed:
207+
if self._loop and not self._loop.is_closed():
150208
self._loop.close()
151209
self._loop = None
152-
if self._executor:
153-
self._executor.shutdown(wait=True)
154-
self._executor = None
155210

156211
class Stage(BaseStage, StageFunctionMixin):
157212
pass
File renamed without changes.

Agently/utils/Stage/StageHybridGenerator.py renamed to Agently/Stage/StageHybridGenerator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
class StageHybridGenerator:
66
def __init__(self, stage, task, on_success=None, on_error=None, lazy=None, async_gen_interval=0.1):
77
self._stage = stage
8-
self._stage._responses.append(self)
8+
self._stage._responses.add(self)
99
self._loop = stage._loop
1010
self._on_success = on_success
1111
self._on_error = on_error
@@ -29,11 +29,12 @@ def _on_consume_async_gen_done(self, future):
2929
future.result()
3030
if self._error is not None:
3131
def raise_error():
32-
raise Exception(self._error)
32+
raise self._error
3333
self._loop.call_soon_threadsafe(raise_error)
3434
if self._on_success:
3535
self._final_result = self._on_success(self._result)
3636
self._result_ready.set()
37+
self._stage._responses.discard(self)
3738

3839
async def _consume_async_gen(self, task):
3940
try:

Agently/utils/Stage/StageResponse.py renamed to Agently/Stage/StageResponse.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
class StageResponse:
55
def __init__(self, stage, task, on_success=None, on_error=None):
66
self._stage = stage
7-
self._stage._responses.append(self)
7+
self._stage._responses.add(self)
88
self._loop = self._stage._loop
99
self._on_success = on_success
1010
self._on_error = on_error
@@ -26,14 +26,17 @@ def _on_task_done(self, future):
2626
if self._on_success:
2727
self._final_result = self._on_success(self._result)
2828
self._result_ready.set()
29+
self._stage._responses.discard(self)
2930
except Exception as e:
3031
self._status = False
3132
self._error = e
3233
if self._on_error:
3334
self._final_result = self._on_error(self._error)
3435
self._result_ready.set()
36+
self._stage._responses.discard(self)
3537
else:
3638
self._result_ready.set()
39+
self._stage._responses.discard(self)
3740
raise self._error
3841

3942
def get(self):

0 commit comments

Comments
 (0)