Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions docs/v3/concepts/deployments.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ class Deployment:

# concurrency limiting
concurrency_limit: int | None = None
concurrency_options: ConcurrencyOptions(collision_strategy=Literal['ENQUEUE', 'CANCEL_NEW']) | None = None
concurrency_options: ConcurrencyOptions(
collision_strategy=Literal['ENQUEUE', 'CANCEL_NEW'],
grace_period_seconds=int # 60-86400, default 600
) | None = None

# metadata for bookkeeping
version: str | None = None
Expand Down Expand Up @@ -255,6 +258,9 @@ deployment can be active at once. To enable this behavior, deployments have the
Falls back to `ENQUEUE` if unset.
- `ENQUEUE`: new runs transition to `AwaitingConcurrencySlot` and execute as slots become available.
- `CANCEL_NEW`: new runs are canceled until a slot becomes available.
- **`grace_period_seconds`**: the time in seconds to allow infrastructure to start before the concurrency
slot is released. This is useful for deployments with slow-starting infrastructure. Must be between 60 and
86400 seconds. If not set, falls back to the server setting (default 300 seconds / 5 minutes).

<CodeGroup>

Expand All @@ -273,7 +279,9 @@ my_flow.deploy(..., concurrency_limit=3)
my_flow.deploy(
...,
concurrency_limit=ConcurrencyLimitConfig(
limit=3, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW
limit=3,
collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW,
grace_period_seconds=120, # 2 minutes
),
)
```
Expand All @@ -289,7 +297,9 @@ my_flow.serve(..., global_limit=3)
my_flow.serve(
...,
global_limit=ConcurrencyLimitConfig(
limit=3, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW
limit=3,
collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW,
grace_period_seconds=120, # 2 minutes
),
)
```
Expand Down
1 change: 1 addition & 0 deletions docs/v3/how-to-guides/deployments/prefect-yaml.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ These are fields you can add to a deployment declaration's `concurrency_limit` s
| ------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `limit` | The maximum number of concurrent flow runs for the deployment. |
| `collision_strategy` | Configure the behavior for runs once the concurrency limit is reached. Options are `ENQUEUE`, and `CANCEL_NEW`. Defaults to `ENQUEUE`. |
| `grace_period_seconds` | The time in seconds to allow infrastructure to start before the concurrency slot is released. Must be between 60 and 86400 seconds. If not set, falls back to the server setting (default 300 seconds / 5 minutes). |


### Work pool fields
Expand Down
8 changes: 7 additions & 1 deletion src/prefect/cli/deploy/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,17 @@ async def _run_single_deploy(
triggers = []

if isinstance(deploy_config.get("concurrency_limit"), dict):
deploy_config["concurrency_options"] = {
concurrency_options = {
"collision_strategy": get_from_dict(
deploy_config, "concurrency_limit.collision_strategy"
)
}
grace_period_seconds = get_from_dict(
deploy_config, "concurrency_limit.grace_period_seconds"
)
if grace_period_seconds is not None:
concurrency_options["grace_period_seconds"] = grace_period_seconds
deploy_config["concurrency_options"] = concurrency_options
deploy_config["concurrency_limit"] = get_from_dict(
deploy_config, "concurrency_limit.limit"
)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/cli/deploy/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class ConcurrencyLimitSpec(BaseModel):

limit: Optional[int] = None
collision_strategy: Optional[str] = None
grace_period_seconds: Optional[int] = None


class RawScheduleConfig(BaseModel):
Expand Down
12 changes: 12 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ class ConcurrencyOptions(PrefectBaseModel):
"""

collision_strategy: ConcurrencyLimitStrategy
grace_period_seconds: Optional[int] = Field(
default=None,
ge=60,
le=86400,
description="Grace period in seconds for infrastructure to start before concurrency slots are revoked. If not set, falls back to server setting.",
)


class ConcurrencyLimitConfig(PrefectBaseModel):
Expand All @@ -164,6 +170,12 @@ class ConcurrencyLimitConfig(PrefectBaseModel):

limit: int
collision_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.ENQUEUE
grace_period_seconds: Optional[int] = Field(
default=None,
ge=60,
le=86400,
description="Grace period in seconds for infrastructure to start before concurrency slots are revoked",
)


class ConcurrencyLeaseHolder(PrefectBaseModel):
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/deployments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ def _format_deployment_for_saving_to_prefect_file(
concurrency_limit["collision_strategy"] = str(
concurrency_limit["collision_strategy"].value
)
concurrency_limit = {
k: v for k, v in concurrency_limit.items() if v is not None
}
deployment["concurrency_limit"] = concurrency_limit

return deployment
Expand Down
67 changes: 39 additions & 28 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,33 @@ def fast_flow():
__all__ = ["RunnerDeployment"]


def _extract_concurrency_options(
concurrency_limit: Union[int, ConcurrencyLimitConfig, None],
) -> tuple[Optional[int], Optional[dict[str, Any]]]:
"""
Extract concurrency limit and options from a ConcurrencyLimitConfig or int.

Args:
concurrency_limit: Either an int (just the limit), a ConcurrencyLimitConfig
(with limit, collision_strategy, and optional grace_period_seconds),
or None.

Returns:
A tuple of (limit, concurrency_options_dict) where concurrency_options_dict
is None if concurrency_limit is not a ConcurrencyLimitConfig.
"""
if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options: dict[str, Any] = {
"collision_strategy": concurrency_limit.collision_strategy
}
if concurrency_limit.grace_period_seconds is not None:
concurrency_options["grace_period_seconds"] = (
concurrency_limit.grace_period_seconds
)
return concurrency_limit.limit, concurrency_options
return concurrency_limit, None


class DeploymentApplyError(RuntimeError):
"""
Raised when an error occurs while applying a deployment.
Expand Down Expand Up @@ -708,13 +735,9 @@ def from_flow(

job_variables = job_variables or {}

if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options = {
"collision_strategy": concurrency_limit.collision_strategy
}
concurrency_limit = concurrency_limit.limit
else:
concurrency_options = None
concurrency_limit, concurrency_options = _extract_concurrency_options(
concurrency_limit
)

deployment = cls(
name=name,
Expand Down Expand Up @@ -852,13 +875,9 @@ def from_entrypoint(
schedule=schedule,
)

if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options = {
"collision_strategy": concurrency_limit.collision_strategy
}
concurrency_limit = concurrency_limit.limit
else:
concurrency_options = None
concurrency_limit, concurrency_options = _extract_concurrency_options(
concurrency_limit
)

deployment = cls(
name=name,
Expand Down Expand Up @@ -962,13 +981,9 @@ async def afrom_storage(
schedule=schedule,
)

if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options = {
"collision_strategy": concurrency_limit.collision_strategy
}
concurrency_limit = concurrency_limit.limit
else:
concurrency_options = None
concurrency_limit, concurrency_options = _extract_concurrency_options(
concurrency_limit
)

job_variables = job_variables or {}

Expand Down Expand Up @@ -1088,13 +1103,9 @@ def from_storage(
schedule=schedule,
)

if isinstance(concurrency_limit, ConcurrencyLimitConfig):
concurrency_options = {
"collision_strategy": concurrency_limit.collision_strategy
}
concurrency_limit = concurrency_limit.limit
else:
concurrency_options = None
concurrency_limit, concurrency_options = _extract_concurrency_options(
concurrency_limit
)

job_variables = job_variables or {}

Expand Down
19 changes: 16 additions & 3 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,27 @@ async def before_transition(
if acquired:
lease_storage = get_concurrency_lease_storage()
settings = get_current_settings()

concurrency_options = deployment.concurrency_options
grace_period = None
if concurrency_options is not None:
if isinstance(concurrency_options, dict):
concurrency_options = core.ConcurrencyOptions.model_validate(
concurrency_options
)
grace_period = concurrency_options.grace_period_seconds
# Fall back to server setting if grace_period_seconds is not explicitly set
if grace_period is None:
grace_period = (
settings.server.concurrency.initial_deployment_lease_duration
)

lease = await lease_storage.create_lease(
resource_ids=[deployment.concurrency_limit_id],
metadata=ConcurrencyLimitLeaseMetadata(
slots=1,
),
ttl=datetime.timedelta(
seconds=settings.server.concurrency.initial_deployment_lease_duration
),
ttl=datetime.timedelta(seconds=grace_period),
)
proposed_state.state_details.deployment_concurrency_lease_id = lease.id

Expand Down
6 changes: 6 additions & 0 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ class ConcurrencyOptions(BaseModel):
"""

collision_strategy: ConcurrencyLimitStrategy
grace_period_seconds: Optional[int] = Field(
default=None,
ge=60,
le=86400,
description="Grace period in seconds for infrastructure to start before concurrency slots are revoked. If not set, falls back to server setting.",
)


class FlowRun(TimeSeriesBaseModel, ORMBaseModel):
Expand Down
Loading
Loading