Skip to content

Commit 9a2f9e4

Browse files
committed
Implement start_activity
1 parent ee613b5 commit 9a2f9e4

File tree

7 files changed

+314
-23
lines changed

7 files changed

+314
-23
lines changed

temporalio/activity.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,20 @@ def must_from_callable(fn: Callable) -> _Definition:
588588
f"Activity {fn_name} missing attributes, was it decorated with @activity.defn?"
589589
)
590590

591+
@classmethod
592+
def get_name_and_result_type(
593+
cls, name_or_run_fn: Union[str, Callable[..., Any]]
594+
) -> Tuple[str, Optional[Type]]:
595+
if isinstance(name_or_run_fn, str):
596+
return name_or_run_fn, None
597+
elif callable(name_or_run_fn):
598+
defn = cls.must_from_callable(name_or_run_fn)
599+
if not defn.name:
600+
raise ValueError(f"Activity {name_or_run_fn} definition has no name")
601+
return defn.name, defn.ret_type
602+
else:
603+
raise TypeError("Activity must be a string or callable")
604+
591605
@staticmethod
592606
def _apply_to_callable(
593607
fn: Callable,

temporalio/client.py

Lines changed: 238 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from google.protobuf.internal.containers import MessageMap
4444
from typing_extensions import Concatenate, Required, Self, TypedDict
4545

46+
import temporalio.activity
4647
import temporalio.api.activity.v1
4748
import temporalio.api.common.v1
4849
import temporalio.api.enums.v1
@@ -65,6 +66,7 @@
6566
import temporalio.workflow
6667
from temporalio.activity import ActivityCancellationDetails
6768
from temporalio.converter import (
69+
ActivitySerializationContext,
6870
DataConverter,
6971
SerializationContext,
7072
WithSerializationContext,
@@ -1287,18 +1289,19 @@ async def count_workflows(
12871289
# - TODO: Support sync and async activity functions
12881290
async def start_activity(
12891291
self,
1290-
activity: Callable[..., ReturnType],
1292+
activity: Union[str, Callable[..., ReturnType]],
12911293
*,
1292-
args: Sequence[Any],
1294+
args: Sequence[Any] = [],
12931295
id: str,
12941296
task_queue: str,
1297+
result_type: Optional[Type] = None,
12951298
# Either schedule_to_close_timeout or start_to_close_timeout must be present
12961299
schedule_to_close_timeout: Optional[timedelta] = None,
12971300
start_to_close_timeout: Optional[timedelta] = None,
12981301
schedule_to_start_timeout: Optional[timedelta] = None,
12991302
heartbeat_timeout: Optional[timedelta] = None,
1300-
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
1301-
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.FAIL,
1303+
id_reuse_policy: temporalio.common.IdReusePolicy = temporalio.common.IdReusePolicy.ALLOW_DUPLICATE,
1304+
id_conflict_policy: temporalio.common.IdConflictPolicy = temporalio.common.IdConflictPolicy.FAIL,
13021305
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
13031306
search_attributes: Optional[
13041307
Union[
@@ -1315,41 +1318,118 @@ async def start_activity(
13151318
"""Start an activity and return its handle.
13161319
13171320
Args:
1318-
activity: The activity function to execute.
1321+
activity: String name or callable activity function to execute.
13191322
args: Arguments to pass to the activity.
13201323
id: Unique identifier for the activity. Required.
13211324
task_queue: Task queue to send the activity to.
1325+
result_type: For string name activities, optional type to deserialize result into.
13221326
schedule_to_close_timeout: Total time allowed for the activity from schedule to completion.
13231327
start_to_close_timeout: Time allowed for a single execution attempt.
13241328
schedule_to_start_timeout: Time allowed for the activity to sit in the task queue.
13251329
heartbeat_timeout: Time between heartbeats before the activity is considered failed.
13261330
id_reuse_policy: How to handle reusing activity IDs from closed activities.
1331+
Default is ALLOW_DUPLICATE.
13271332
id_conflict_policy: How to handle activity ID conflicts with running activities.
1333+
Default is FAIL.
13281334
retry_policy: Retry policy for the activity.
1329-
search_attributes: Search attributes to attach to the activity.
1330-
static_summary: A single-line fixed summary for this workflow execution that may appear
1335+
search_attributes: Search attributes for the activity.
1336+
static_summary: A single-line fixed summary for this activity that may appear
13311337
in the UI/CLI. This can be in single-line Temporal markdown format.
1332-
static_details: General fixed details for this workflow execution that may appear in
1333-
UI/CLI. This can be in Temporal markdown format and can span multiple lines. This is
1334-
a fixed value on the workflow that cannot be updated. For details that can be
1335-
updated, use :py:meth:`temporalio.workflow.get_current_details` within the workflow.
1336-
priority: Priority metadata.
1338+
static_details: General fixed details for this activity that may appear in
1339+
UI/CLI. This can be in Temporal markdown format and can span multiple lines.
1340+
priority: Priority of the activity execution.
13371341
rpc_metadata: Headers used on the RPC call.
13381342
rpc_timeout: Optional RPC deadline to set for the RPC call.
13391343
13401344
Returns:
13411345
A handle to the started activity.
13421346
"""
1343-
# Issues workflowservice StartActivityExecution
1344-
raise NotImplementedError
1347+
name, result_type_from_type_annotation = (
1348+
temporalio.activity._Definition.get_name_and_result_type(activity)
1349+
)
1350+
return await self._impl.start_activity(
1351+
StartActivityInput(
1352+
activity_type=name,
1353+
args=args,
1354+
id=id,
1355+
task_queue=task_queue,
1356+
ret_type=result_type or result_type_from_type_annotation,
1357+
schedule_to_close_timeout=schedule_to_close_timeout,
1358+
start_to_close_timeout=start_to_close_timeout,
1359+
schedule_to_start_timeout=schedule_to_start_timeout,
1360+
heartbeat_timeout=heartbeat_timeout,
1361+
id_reuse_policy=id_reuse_policy,
1362+
id_conflict_policy=id_conflict_policy,
1363+
retry_policy=retry_policy,
1364+
search_attributes=search_attributes,
1365+
static_summary=static_summary,
1366+
static_details=static_details,
1367+
headers={},
1368+
rpc_metadata=rpc_metadata,
1369+
rpc_timeout=rpc_timeout,
1370+
priority=priority,
1371+
)
1372+
)
13451373

1346-
# Same parameters as start_activity
1347-
# (*args **kwargs is just temporary to avoid duplicating parameter lists while they're being designed)
1348-
async def execute_activity(self, *args, **kwargs) -> ReturnType:
1349-
"""
1350-
Start an activity, wait for it to complete, and return its result.
1374+
async def execute_activity(
1375+
self,
1376+
activity: Union[str, Callable[..., ReturnType]],
1377+
*,
1378+
args: Sequence[Any] = [],
1379+
id: str,
1380+
task_queue: str,
1381+
result_type: Optional[Type] = None,
1382+
# Either schedule_to_close_timeout or start_to_close_timeout must be present
1383+
schedule_to_close_timeout: Optional[timedelta] = None,
1384+
start_to_close_timeout: Optional[timedelta] = None,
1385+
schedule_to_start_timeout: Optional[timedelta] = None,
1386+
heartbeat_timeout: Optional[timedelta] = None,
1387+
id_reuse_policy: temporalio.common.IdReusePolicy = temporalio.common.IdReusePolicy.ALLOW_DUPLICATE,
1388+
id_conflict_policy: temporalio.common.IdConflictPolicy = temporalio.common.IdConflictPolicy.FAIL,
1389+
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
1390+
search_attributes: Optional[
1391+
Union[
1392+
temporalio.common.SearchAttributes,
1393+
temporalio.common.TypedSearchAttributes,
1394+
]
1395+
] = None,
1396+
static_summary: Optional[str] = None,
1397+
static_details: Optional[str] = None,
1398+
priority: temporalio.common.Priority = temporalio.common.Priority.default,
1399+
rpc_metadata: Mapping[str, Union[str, bytes]] = {},
1400+
rpc_timeout: Optional[timedelta] = None,
1401+
) -> ReturnType:
1402+
"""Start an activity, wait for it to complete, and return its result.
1403+
1404+
This is a convenience method that combines :py:meth:`start_activity` and
1405+
:py:meth:`ActivityHandle.result`.
1406+
1407+
Returns:
1408+
The result of the activity.
1409+
1410+
Raises:
1411+
ActivityFailedError: If the activity completed with a failure.
13511412
"""
1352-
handle = await self.start_activity(*args, **kwargs)
1413+
handle = await self.start_activity(
1414+
activity,
1415+
args=args,
1416+
id=id,
1417+
task_queue=task_queue,
1418+
result_type=result_type,
1419+
schedule_to_close_timeout=schedule_to_close_timeout,
1420+
start_to_close_timeout=start_to_close_timeout,
1421+
schedule_to_start_timeout=schedule_to_start_timeout,
1422+
heartbeat_timeout=heartbeat_timeout,
1423+
id_reuse_policy=id_reuse_policy,
1424+
id_conflict_policy=id_conflict_policy,
1425+
retry_policy=retry_policy,
1426+
search_attributes=search_attributes,
1427+
static_summary=static_summary,
1428+
static_details=static_details,
1429+
priority=priority,
1430+
rpc_metadata=rpc_metadata,
1431+
rpc_timeout=rpc_timeout,
1432+
)
13531433
return await handle.result()
13541434

13551435
def list_activities(
@@ -6473,6 +6553,36 @@ class TerminateWorkflowInput:
64736553
rpc_timeout: Optional[timedelta]
64746554

64756555

6556+
@dataclass
6557+
class StartActivityInput:
6558+
"""Input for :py:meth:`OutboundInterceptor.start_activity`."""
6559+
6560+
activity_type: str
6561+
args: Sequence[Any]
6562+
id: str
6563+
task_queue: str
6564+
ret_type: Optional[Type]
6565+
schedule_to_close_timeout: Optional[timedelta]
6566+
start_to_close_timeout: Optional[timedelta]
6567+
schedule_to_start_timeout: Optional[timedelta]
6568+
heartbeat_timeout: Optional[timedelta]
6569+
id_reuse_policy: temporalio.common.IdReusePolicy
6570+
id_conflict_policy: temporalio.common.IdConflictPolicy
6571+
retry_policy: Optional[temporalio.common.RetryPolicy]
6572+
priority: temporalio.common.Priority
6573+
search_attributes: Optional[
6574+
Union[
6575+
temporalio.common.SearchAttributes,
6576+
temporalio.common.TypedSearchAttributes,
6577+
]
6578+
]
6579+
static_summary: Optional[str]
6580+
static_details: Optional[str]
6581+
headers: Mapping[str, temporalio.api.common.v1.Payload]
6582+
rpc_metadata: Mapping[str, Union[str, bytes]]
6583+
rpc_timeout: Optional[timedelta]
6584+
6585+
64766586
@dataclass
64776587
class CancelActivityInput:
64786588
"""Input for :py:meth:`OutboundInterceptor.cancel_activity`."""
@@ -6866,6 +6976,10 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
68666976

68676977
### Activity calls
68686978

6979+
async def start_activity(self, input: StartActivityInput) -> ActivityHandle[Any]:
6980+
"""Called for every :py:meth:`Client.start_activity` call."""
6981+
return await self.next.start_activity(input)
6982+
68696983
async def cancel_activity(self, input: CancelActivityInput) -> None:
68706984
"""Called for every :py:meth:`ActivityHandle.cancel` call."""
68716985
await self.next.cancel_activity(input)
@@ -7343,6 +7457,110 @@ async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
73437457
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
73447458
)
73457459

7460+
async def start_activity(self, input: StartActivityInput) -> ActivityHandle[Any]:
7461+
"""Start an activity and return a handle to it."""
7462+
if not (input.start_to_close_timeout or input.schedule_to_close_timeout):
7463+
raise ValueError(
7464+
"Activity must have start_to_close_timeout or schedule_to_close_timeout"
7465+
)
7466+
req = await self._build_start_activity_execution_request(input)
7467+
7468+
# TODO(dan): any counterpart of WorkflowExecutionAlreadyStartedFailure?
7469+
# If RPCError with err.status == RPCStatusCode.ALREADY_EXISTS
7470+
7471+
resp = await self._client.workflow_service.start_activity_execution(
7472+
req,
7473+
retry=True,
7474+
metadata=input.rpc_metadata,
7475+
timeout=input.rpc_timeout,
7476+
)
7477+
return ActivityHandle(
7478+
self._client,
7479+
id=input.id,
7480+
run_id=resp.run_id,
7481+
result_type=input.ret_type,
7482+
)
7483+
7484+
async def _build_start_activity_execution_request(
7485+
self, input: StartActivityInput
7486+
) -> temporalio.api.workflowservice.v1.StartActivityExecutionRequest:
7487+
"""Build StartActivityExecutionRequest from input."""
7488+
data_converter = self._client.data_converter.with_context(
7489+
ActivitySerializationContext(
7490+
namespace=self._client.namespace,
7491+
activity_id=input.id,
7492+
activity_type=input.activity_type,
7493+
activity_task_queue=input.task_queue,
7494+
is_local=False,
7495+
workflow_id=None,
7496+
workflow_type=None,
7497+
)
7498+
)
7499+
7500+
req = temporalio.api.workflowservice.v1.StartActivityExecutionRequest(
7501+
namespace=self._client.namespace,
7502+
identity=self._client.identity,
7503+
activity_id=input.id,
7504+
activity_type=temporalio.api.common.v1.ActivityType(
7505+
name=input.activity_type
7506+
),
7507+
id_reuse_policy=cast(
7508+
"temporalio.api.enums.v1.IdReusePolicy.ValueType",
7509+
int(input.id_reuse_policy),
7510+
),
7511+
id_conflict_policy=cast(
7512+
"temporalio.api.enums.v1.IdConflictPolicy.ValueType",
7513+
int(input.id_conflict_policy),
7514+
),
7515+
)
7516+
7517+
# Build ActivityOptions
7518+
options = temporalio.api.activity.v1.ActivityOptions(
7519+
task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue),
7520+
)
7521+
if input.schedule_to_close_timeout is not None:
7522+
options.schedule_to_close_timeout.FromTimedelta(
7523+
input.schedule_to_close_timeout
7524+
)
7525+
if input.start_to_close_timeout is not None:
7526+
options.start_to_close_timeout.FromTimedelta(input.start_to_close_timeout)
7527+
if input.schedule_to_start_timeout is not None:
7528+
options.schedule_to_start_timeout.FromTimedelta(
7529+
input.schedule_to_start_timeout
7530+
)
7531+
if input.heartbeat_timeout is not None:
7532+
options.heartbeat_timeout.FromTimedelta(input.heartbeat_timeout)
7533+
if input.retry_policy is not None:
7534+
input.retry_policy.apply_to_proto(options.retry_policy)
7535+
req.options.CopyFrom(options)
7536+
7537+
# Set input payloads
7538+
if input.args:
7539+
req.input.payloads.extend(await data_converter.encode(input.args))
7540+
7541+
# Set search attributes
7542+
if input.search_attributes is not None:
7543+
temporalio.converter.encode_search_attributes(
7544+
input.search_attributes, req.search_attributes
7545+
)
7546+
7547+
# Set user metadata
7548+
metadata = await _encode_user_metadata(
7549+
data_converter, input.static_summary, input.static_details
7550+
)
7551+
if metadata is not None:
7552+
req.user_metadata.CopyFrom(metadata)
7553+
7554+
# Set headers
7555+
if input.headers is not None:
7556+
await self._apply_headers(input.headers, req.header.fields)
7557+
7558+
# Set priority
7559+
if input.priority is not None:
7560+
req.priority.CopyFrom(input.priority._to_proto())
7561+
7562+
return req
7563+
73467564
async def cancel_activity(self, input: CancelActivityInput) -> None:
73477565
"""Cancel a standalone activity."""
73487566
await self._client.workflow_service.request_cancel_activity_execution(

temporalio/common.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,42 @@ class WorkflowIDConflictPolicy(IntEnum):
154154
)
155155

156156

157+
class IdReusePolicy(IntEnum):
158+
"""How already-closed entity IDs are handled on start.
159+
160+
See :py:class:`temporalio.api.enums.v1.IdReusePolicy`.
161+
"""
162+
163+
UNSPECIFIED = int(temporalio.api.enums.v1.IdReusePolicy.ID_REUSE_POLICY_UNSPECIFIED)
164+
ALLOW_DUPLICATE = int(
165+
temporalio.api.enums.v1.IdReusePolicy.ID_REUSE_POLICY_ALLOW_DUPLICATE
166+
)
167+
ALLOW_DUPLICATE_FAILED_ONLY = int(
168+
temporalio.api.enums.v1.IdReusePolicy.ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
169+
)
170+
REJECT_DUPLICATE = int(
171+
temporalio.api.enums.v1.IdReusePolicy.ID_REUSE_POLICY_REJECT_DUPLICATE
172+
)
173+
174+
175+
class IdConflictPolicy(IntEnum):
176+
"""How already-running entity IDs are handled on start.
177+
178+
See :py:class:`temporalio.api.enums.v1.IdConflictPolicy`.
179+
"""
180+
181+
UNSPECIFIED = int(
182+
temporalio.api.enums.v1.IdConflictPolicy.ID_CONFLICT_POLICY_UNSPECIFIED
183+
)
184+
FAIL = int(temporalio.api.enums.v1.IdConflictPolicy.ID_CONFLICT_POLICY_FAIL)
185+
USE_EXISTING = int(
186+
temporalio.api.enums.v1.IdConflictPolicy.ID_CONFLICT_POLICY_USE_EXISTING
187+
)
188+
TERMINATE_EXISTING = int(
189+
temporalio.api.enums.v1.IdConflictPolicy.ID_CONFLICT_POLICY_TERMINATE_EXISTING
190+
)
191+
192+
157193
class ActivityExecutionStatus(IntEnum):
158194
"""Status of a standalone activity execution.
159195

0 commit comments

Comments
 (0)