Make task and asset store row size limits configurable#68133
Make task and asset store row size limits configurable#68133amoghrajesh wants to merge 2 commits into
Conversation
| limit = conf.getint("state_store", "max_value_storage_bytes") | ||
| if limit > 0 and len(serialized) > limit: | ||
| raise ValueError( | ||
| f"value exceeds max_value_storage_bytes ({limit}); " | ||
| "for large payloads configure a custom [state_store] backend" | ||
| ) |
There was a problem hiding this comment.
Why should we limit this? Do we do so for XCom or ObjectStorage?
Worth checking
There was a problem hiding this comment.
xcom does not enforce a size limit in the model, and the ObjectStorage XCom backend has xcom_objectstorage_threshold which tiers large values to object storage rather than rejecting them.
For task state, we have a clearer use case than xcom did at design time itself which is coordination state (job IDs, cursors, retry counts), not data payloads. A configurable default cap lets us nudge operators toward a custom backend before they accidentally bombard the DB with large values which is something xcom learned from retroactively. The 65535 default can be raised as needed via [state_store] max_value_storage_bytes.
There was a problem hiding this comment.
I'd lean towards putting a limit on this. I think we want to use it as a "state store" rather than a "data store". I'd be worried about folks using as a place to dump huge amounts of data, which I don't think is the intention.
| limit = conf.getint("state_store", "max_value_storage_bytes") | ||
| if limit > 0 and len(serialized) > limit: | ||
| raise ValueError( | ||
| f"value exceeds max_value_storage_bytes ({limit}); " | ||
| "for large payloads configure a custom [state_store] backend" | ||
| ) |
There was a problem hiding this comment.
I'd lean towards putting a limit on this. I think we want to use it as a "state store" rather than a "data store". I'd be worried about folks using as a place to dump huge amounts of data, which I don't think is the intention.
| raise ValueError("value contains non-finite numbers; NaN and Inf are not JSON representable") | ||
| if len(serialized) > _MAX_SERIALIZED_BYTES: | ||
| raise ValueError(f"value exceeds maximum serialized size of {_MAX_SERIALIZED_BYTES} bytes") | ||
| limit = conf.getint("state_store", "max_value_storage_bytes") |
There was a problem hiding this comment.
We don't want to throw this in a validate_payload_size function or something to reuse like that, correct? I could be way off, but thought I'd throw it out there.
|
@kaxil appreciate another round of review from you here? WDYT? |
Was generative AI tooling used to co-author this PR?
What's being solved?
The task and asset store had a hardcoded 64 KB size limit that in core API datamodels that could not be adjusted without code changes. Users with legitimate large-payload use case had no way to raise it, and the limit wasn't enforced consistently too, the core API validated it but the execution API (worker path) did not.
Current behaviour
Proposed change
Adds
[state_store] max_value_storage_bytes(default65535) to control the limit:0to disable the limit entirely. DB column limits then apply (~1 GB on PostgreSQL, 16 MB on MySQL).task_store.set()andasset_store.set()): logs a warning in task logs when the serialized value exceeds the limit, but allows the write through to not interrupt the execution mid run. The warning suggests configuring a custom[state_store] backendfor large payloads.Testing
Running example:
Task Store:
Asset store:
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.