feat: Prunable shard specs for streaming published segments#19571
feat: Prunable shard specs for streaming published segments#19571abhishekrb19 wants to merge 8 commits into
Conversation
…gestion
Adds a new core ShardSpec (stream_range) that lets Kafka streaming tasks
declare, per published segment, the distinct values observed for configured
partitionFilterDimensions. The broker uses these to prune segments whose
declared values cannot match a query filter — enabling near-realtime pruning
without waiting for compaction.
Highlights:
- StreamRangeShardSpec extends NumberedShardSpec; possibleInDomain prunes by
per-value range intersection. Null is declared as a first-class value
(encoded as Range.lessThan("")) so IS NULL queries are never wrongly pruned,
and is kept distinct from the empty string.
- Opt-in via partitionFilterDimensions on the Kafka supervisor/IOConfig
(null by default; segments otherwise get a plain NumberedShardSpec). Kafka
only for now; backward-compatible config (old specs/constructors unchanged).
- Per-segment value accumulation at ingest time; each segment is stamped with
only its own observed values at publish.
- Correctness guards: restart-spanning segments fall back to NumberedShardSpec
(pre-restart rows are not re-read, so their values can't be fully observed);
dimensions that observed a null/missing value declare null so IS NULL is not
pruned.
- BaseAppenderatorDriver reconciles the returned SegmentsAndCommitMetadata to
the published shard specs so handoff/publish logs report the real spec.
Tests:
- StreamRangeShardSpecTest: possibleInDomain matrix incl. null vs "" and serde.
- SeekableStreamIndexTaskRunnerTest: annotator unit tests (restart fallback,
null handling).
- EmbeddedStreamRangeShardSpecTest: end-to-end pruning verified via the
query/segment/time scan metric across a predicate matrix (=, !=, IN, NOT IN,
IS NULL, IS NOT NULL, multi-value, untracked dimension, non-existent value),
plus a no-partitioning control twin and in-memory/graceful-widening cases.
- StreamAppenderatorDriverTest: returned metadata carries the published spec.
|
|
||
| // annotateSegmentWithPartitionFilters is a no-op (returns the segment unchanged) when partition filters are not | ||
| // configured, so it is always safe to apply here. | ||
| final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> shardSpecAnnotator = |
There was a problem hiding this comment.
nit: can add Function to imports
|
|
||
| for (DataSegment segment : publishedSegmentsAndCommitMetadata.getSegments()) { | ||
| observedDimensionValuesBySegment.remove( | ||
| SegmentIdWithShardSpec.fromDataSegment(segment).toString() |
There was a problem hiding this comment.
Should we also clean up restartSpannedSegments.remove here?
| return s; | ||
| } | ||
| final Map<String, List<String>> snapshotFilters = new HashMap<>(); | ||
| for (String dim : filterDims) { |
There was a problem hiding this comment.
for (String dim : filterDims) {
segObserved.computeIfPresent(dim, (k, vals) -> {
synchronized (vals) {
if (!vals.isEmpty()) {
snapshotFilters.put(dim, new ArrayList<>(vals));
}
}
return vals; // Return unchanged - we're just reading
});
}
There was a problem hiding this comment.
Claude recommendation for race condition
| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| | ||
| |`useEarliestOffset`|Boolean|If a supervisor is managing a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether the supervisor retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.|No|`false`| | ||
| |`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle configuration](#idle-configuration) for more details.|No|null| | ||
| |`partitionFilterDimensions`|List of String|Dimensions to track for query-time segment pruning. See [Partition filter dimensions](#partition-filter-dimensions) for details.|No|null| |
There was a problem hiding this comment.
What do you think about naming this partitionDimensions to align with the compaction config? That may make it more clear that those values should be in sync
There was a problem hiding this comment.
Yes, good call - thanks for the suggestion! I also moved this into the tuningConfig for consistency. It's nested under a streamingPartitionsSpec container to avoid naming ambiguity and so we can extend it with additional properties and/or add new types in the future if needed.
| - Use only low-to-medium cardinality dimensions (for example, `tenant_id`, `region`, `environment`). High-cardinality dimensions bloat segment metadata with no pruning benefit. | ||
| - Most effective when Kafka partitions are keyed by the tracked dimension (for example, using tenant ID as the message key). Each task naturally sees a subset of values, and segments get tight filter annotations. | ||
| - Also works with multiple supervisors reading from separate topics into one datasource. | ||
| - After compaction, the `StreamRangeShardSpec` annotations are replaced by the compaction output's shard spec (hash or range partitioning), which provides its own pruning. |
There was a problem hiding this comment.
Maybe worth mentioning that when using partitionFilterDimensions, dynamic compaction strategy should not be used
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 1 |
| Total | 2 |
Reviewed 16 of 16 changed files.
I found two issues: restart-spanning segments can mix shard spec classes within one publish interval and fail publishing, and Kafka backfill specs drop partitionFilterDimensions.
This is an automated review by Codex GPT-5.5
| return s; | ||
| } | ||
| final String lookupKey = SegmentIdWithShardSpec.fromDataSegment(s).toString(); | ||
| if (restartSpannedSegments.contains(lookupKey)) { |
There was a problem hiding this comment.
P1 Mixed shard specs can fail publish after restart
Restart-spanned segments return unchanged as NumberedShardSpec, while new same-interval segments in the same publish batch can be annotated as StreamRangeShardSpec. TransactionalSegmentPublisher then runs SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec classes per interval, so a restarted task can fail publish/handoff. Make the fallback interval-wide, or stamp restored segments with a non-pruning stream_range spec.
There was a problem hiding this comment.
Thanks, this is fixed with some test coverage added.
| emitTimeLagMetrics, | ||
| serverPriorityToReplicas, | ||
| boundedStreamConfig, | ||
| null |
There was a problem hiding this comment.
P3 Backfill specs drop partitionFilterDimensions
This compatibility constructor always forwards null for partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses this overload when deriving bounded backfill specs, so a supervisor configured with partitionFilterDimensions silently creates backfill tasks without the pruning annotations. Pass the existing dimension list through for backfill specs.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 0 |
| P3 | 1 |
| Total | 1 |
Reviewed 19 of 19 changed files.
This is an automated review by Codex GPT-5.5
| if (vals.isEmpty()) { | ||
| continue; | ||
| } | ||
| snapshot = new ArrayList<>(vals); |
There was a problem hiding this comment.
[P3] Sort observed partition values before publishing
The list stored in partitionDimensionValues is created directly from a HashSet, so its order is unspecified. The embedded test already asserts a concrete order for these values, and equivalent published segment metadata can vary by JVM or run even when the value set is identical. Sort the snapshot deterministically, with explicit null handling, before putting it into the shard spec.
Reviewed 19 of 19 changed files.
Relates to #12929
Streaming-published segments currently have
numberedshard specs, which aren't prunable by design. Compaction must reindex the data withrangeorhashedpartition strategy once the data is handed off — even if the topic is partitioned, which is easy to do with multiplesupervisors. For multi-tenant datasources this means every tenant-filtered query hits every recent segment regardless of the partitioning strategy for
numberedshard specs.This PR lets streaming tasks record, per published segment, the distinct values observed for a configured set of dimensions, and declare them on a new shard spec so the broker can prune near-realtime data without waiting for compaction to reindex handed off segments. Concurrent compaction cannot always keep up with the incoming data and additionally the compaction process itself takes time to reindex; so the benefits of
rangeorhashedshard specs may not be fully realized for however long it takes to reindex (30-45 minutes in our case), and doesn't help with high concurrent query workloads that are only querying more recent data.So this PR adds a way to publish prunable shards right off the bat when they’re handed off by streaming tasks, if configured. This functionality is opt-in, Kafka-only, and disabled by default.
Design
dimension and none of its values intersect the domain; a dimension not in partitionFilters is never pruned on. Set-based (not min/max), so it prunes precisely for sparse values and tolerates overlapping value sets across tasks/restarts.
numberedshard) or bloated shard specs (this caveat can be addressed with a guard rail noted below).Configuration
New optional
tuningConfigfieldpartitionDimensionson the Kafka supervisor/task (default null). When unset, behavior is unchanged. Documented indocs/ingestion/kafka-ingestion.md.Compatibility
Backward-compatible and opt-in. But
stream_rangeis a new core ShardSpec type with no defaultImpl fallback, so it is not forward-compatible: upgrade all services before enablingpartitionDimensions, and note that oncestream_rangesegments are published, downgrade isn't supported until they're compacted away.
Results
Tested in a cluster, where I saw up to ~40% reduction in segment scans on the historicals for a few low to medium cardinal partition dimensions. In a follow-up, I want to extend this to also prune tasks, for reduced peon buffers and better query performance at the task layer.
Caveats
There's currently no limit on the number of observed values stamped into a segment's partitionFilters. It may make sense, in a follow-up, to add a configurable guardrail that falls back to NumberedShardSpec when the count exceeds a threshold, so shard specs don't get bloated.
Release note
Kafka ingestion can now publish segments that the broker prunes at query time, without waiting for compaction. Set
tuningConfig.streamingPartitionsSpec.partitionDimensionsto a list of low-to-medium cardinality dimensions; each task records the distinct values it observes per dimension and stamps them onto a new stream_range shard spec. Queries that filter on a declared dimension then skip segments whose values can't match. The feature is opt-in, Kafka-only, and disabled by default; when unset, behavior is unchanged.Compatibility: stream_range is a new core shard spec type with no fallback, so it is not forward-compatible. Upgrade all services before enabling
partitionDimensions. Oncestream_rangesegments are published, downgrade is unsupported until they are compacted away orpartitionDimensionsis removed.This PR has: