Skip to content

Make task and asset store row size limits configurable#68133

Open
amoghrajesh wants to merge 2 commits into
apache:mainfrom
astronomer:aip-103-backlog-max-length-cap
Open

Make task and asset store row size limits configurable#68133
amoghrajesh wants to merge 2 commits into
apache:mainfrom
astronomer:aip-103-backlog-max-length-cap

Conversation

@amoghrajesh

@amoghrajesh amoghrajesh commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

What's being solved?

The task and asset store had a hardcoded 64 KB size limit that in core API datamodels that could not be adjusted without code changes. Users with legitimate large-payload use case had no way to raise it, and the limit wasn't enforced consistently too, the core API validated it but the execution API (worker path) did not.

Current behaviour

  • Core API rejected values over 64 KB with a hardcoded limit.
  • Execution API (worker writes) had no size check at all.
  • No config key existed to adjust the limit.

Proposed change

Adds [state_store] max_value_storage_bytes (default 65535) to control the limit:

  • Core API (task and asset store): rejects oversized values with HTTP 400. Set to 0 to disable the limit entirely. DB column limits then apply (~1 GB on PostgreSQL, 16 MB on MySQL).
  • Task SDK (task_store.set() and asset_store.set()): logs a warning in task logs when the serialized value exceeds the limit, but allows the write through to not interrupt the execution mid run. The warning suggests configuring a custom [state_store] backend for large payloads.

Testing

Running example:

Task Store:

    @task
    def my_task(**context: Context):
        task_state = context["task_store"]
        value = "x" * 700000
        task_state.set("str_value", value)
image

Asset store:

with DAG(
    dag_id="aip103_asset_producer",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["aip-103", "asset-state-test"],
):

    @task(outlets=[aip103_test_asset])
    def produce(**context):
        print(f"Producer running for {aip103_test_asset.uri!r}")
        w = "w" * 700000
        context["asset_store"].set("my_asset_value_watermark", w)

    produce()
image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment on lines +69 to +74
limit = conf.getint("state_store", "max_value_storage_bytes")
if limit > 0 and len(serialized) > limit:
raise ValueError(
f"value exceeds max_value_storage_bytes ({limit}); "
"for large payloads configure a custom [state_store] backend"
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we limit this? Do we do so for XCom or ObjectStorage?

Worth checking

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xcom does not enforce a size limit in the model, and the ObjectStorage XCom backend has xcom_objectstorage_threshold which tiers large values to object storage rather than rejecting them.

For task state, we have a clearer use case than xcom did at design time itself which is coordination state (job IDs, cursors, retry counts), not data payloads. A configurable default cap lets us nudge operators toward a custom backend before they accidentally bombard the DB with large values which is something xcom learned from retroactively. The 65535 default can be raised as needed via [state_store] max_value_storage_bytes.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean towards putting a limit on this. I think we want to use it as a "state store" rather than a "data store". I'd be worried about folks using as a place to dump huge amounts of data, which I don't think is the intention.

@amoghrajesh amoghrajesh requested a review from kaxil June 7, 2026 06:36
Comment on lines +69 to +74
limit = conf.getint("state_store", "max_value_storage_bytes")
if limit > 0 and len(serialized) > limit:
raise ValueError(
f"value exceeds max_value_storage_bytes ({limit}); "
"for large payloads configure a custom [state_store] backend"
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean towards putting a limit on this. I think we want to use it as a "state store" rather than a "data store". I'd be worried about folks using as a place to dump huge amounts of data, which I don't think is the intention.

raise ValueError("value contains non-finite numbers; NaN and Inf are not JSON representable")
if len(serialized) > _MAX_SERIALIZED_BYTES:
raise ValueError(f"value exceeds maximum serialized size of {_MAX_SERIALIZED_BYTES} bytes")
limit = conf.getint("state_store", "max_value_storage_bytes")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to throw this in a validate_payload_size function or something to reuse like that, correct? I could be way off, but thought I'd throw it out there.

@amoghrajesh amoghrajesh self-assigned this Jun 8, 2026
@amoghrajesh

Copy link
Copy Markdown
Contributor Author

@kaxil appreciate another round of review from you here? WDYT?

@amoghrajesh amoghrajesh requested a review from jroachgolf84 June 10, 2026 05:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants