Skip to content

Commit 23aac48

Browse files
meatballhatevilstreakphilandstuff
authored
Tag worker events with prediction (#2020)
* Formatting, moving `Envelope`, and using "tag" * Include tag in Cancel event and clean up TODO comments * Add pydantic v2 support to child worker `_apredict` * Keep a map of thread IDs to tags on _ChildWorker * Use context var to pass tag instead of dict since context vars are meant to be async-safe * don't use the contextvar in sync StreamRedirector * fix StreamPredictor working with tags * fix record_event with tags We were unintentionally dropping the tag from record_event() with sync predictions. This is fixed by introducing a self._current_tag property which Does The Right Thing whether in sync or async context. As part of this, we can also go back to a single _stream_write_hook() implementation. --------- Co-authored-by: Dominic Baggott <[email protected]> Co-authored-by: Philip Potter <[email protected]>
1 parent e46c4f3 commit 23aac48

File tree

7 files changed

+282
-93
lines changed

7 files changed

+282
-93
lines changed

python/cog/server/eventtypes.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Dict, Union
1+
from typing import Any, Dict, Optional, Union
22

33
from attrs import define, field, validators
44

@@ -7,7 +7,6 @@
77
#
88
@define
99
class Cancel:
10-
# TODO: identify which prediction!
1110
pass
1211

1312

@@ -50,3 +49,23 @@ class Done:
5049
canceled: bool = False
5150
error: bool = False
5251
error_detail: str = ""
52+
53+
54+
@define
55+
class Envelope:
56+
"""
57+
Envelope contains an arbitrary event along with an optional tag used to
58+
tangle/untangle concurrent work.
59+
"""
60+
61+
event: Union[
62+
Cancel,
63+
PredictionInput,
64+
Shutdown,
65+
Log,
66+
PredictionMetric,
67+
PredictionOutput,
68+
PredictionOutputType,
69+
Done,
70+
]
71+
tag: Optional[str] = None

0 commit comments

Comments
 (0)