Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,51 @@ The following example shows a supervisor spec with idle configuration enabled:
```
</details>

#### Streaming partitions spec

When you set `streamingPartitionsSpec.partitionDimensions` in the tuning config, the supervisor tracks the distinct values observed for each listed dimension during ingestion. At segment publish time, each segment is annotated with only the values it actually ingested by publishing it with a `dim_value_set` shard spec, which records the observed values per tracked dimension. The broker then uses these annotations to skip segments at query time when the query filter doesn't intersect the segment's declared values.

This enables segment pruning for streaming-ingested data without waiting for compaction to produce hash or range-partitioned segments. The `partitionDimensions` should be kept in sync with the compaction config's `partitionDimensions` for the same datasource.

**Usage guidelines:**

- Only string-typed dimensions are currently supported.
- Use only low-to-medium cardinality dimensions (for example, `tenant_id`, `region`, `environment`). High-cardinality dimensions bloat segment metadata with no pruning benefit.
- Most effective when Kafka partitions are keyed by the tracked dimension (for example, using tenant ID as the message key). Each task naturally sees a subset of values, and segments get tight filter annotations.
- Also works with multiple supervisors reading from separate topics into one datasource.
- Use a range or hashed compaction `partitionsSpec`, not the dynamic strategy: dynamic compaction does not partition by dimension, so it cannot preserve pruning after compaction.
- After compaction, the streaming pruning annotations are replaced by the compaction output's partitioning (hash or range), which provides its own pruning.

The following example configures a supervisor to track the `tenant` dimension:

```json
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "multi_tenant_events",
"timestampSpec": {"column": "timestamp", "format": "iso"},
"dimensionsSpec": {"dimensions": ["tenant", "region", "event_type"]},
"granularitySpec": {"type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE"}
},
"ioConfig": {
"type": "kafka",
"topic": "events",
"consumerProperties": {"bootstrap.servers": "localhost:9092"},
"inputFormat": {"type": "json"},
"taskCount": 4,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"streamingPartitionsSpec": {"partitionDimensions": ["tenant"]}

@aho135 aho135 Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Makes sense to have this in tuningConfig, and in the future we can add cardinality guardrails into streamingPartitionsSpec as well

}
}
}
```

With this configuration, a query like `SELECT * FROM multi_tenant_events WHERE tenant = 'acme'` skips segments that contain no rows for `acme`, reducing the number of segments scanned.

#### Data format

The Kafka indexing service supports [`inputFormat`](data-formats.md#input-format). For more information, see [Source input formats](data-formats.md).
Expand Down Expand Up @@ -440,6 +485,7 @@ For configuration properties shared across all streaming ingestion methods, refe
|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`numPersistThreads`|Integer|The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available.|No|1|
|`streamingPartitionsSpec`|Object|Configures query-time segment pruning for streaming-ingested segments. Contains a single property, `partitionDimensions` (List of String), the dimensions whose observed values each segment records so the broker can skip segments that can't match a query filter. See [Streaming partitions spec](#streaming-partitions-spec) for details.|No|null|

## Deployment notes on Kafka partitions and Druid segments

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -54,7 +55,8 @@ public KafkaIndexTaskTuningConfig(
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads,
@Nullable Integer maxColumnsToMerge,
@Nullable Boolean releaseLocksOnHandoff
@Nullable Boolean releaseLocksOnHandoff,
@Nullable StreamingPartitionsSpec streamingPartitionsSpec
)
{
super(
Expand All @@ -80,7 +82,8 @@ public KafkaIndexTaskTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
streamingPartitionsSpec
);
}

Expand All @@ -106,7 +109,8 @@ private KafkaIndexTaskTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff,
@JsonProperty("streamingPartitionsSpec") @Nullable StreamingPartitionsSpec streamingPartitionsSpec
)
{
this(
Expand All @@ -131,7 +135,8 @@ private KafkaIndexTaskTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
streamingPartitionsSpec
);
}

Expand Down Expand Up @@ -160,7 +165,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
isReleaseLocksOnHandoff()
isReleaseLocksOnHandoff(),
getStreamingPartitionsSpec()
);
}

Expand Down Expand Up @@ -188,6 +194,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", getMaxColumnsToMerge=" + getMaxColumnsToMerge() +
", streamingPartitionsSpec=" + getStreamingPartitionsSpec() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
Expand Down Expand Up @@ -67,6 +68,7 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -97,7 +99,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff,
@JsonProperty("streamingPartitionsSpec") @Nullable StreamingPartitionsSpec streamingPartitionsSpec
)
{
super(
Expand All @@ -122,7 +125,8 @@ public KafkaSupervisorTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
streamingPartitionsSpec
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -237,7 +241,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
isReleaseLocksOnHandoff()
isReleaseLocksOnHandoff(),
getStreamingPartitionsSpec()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,7 @@ private KafkaIndexTask createTask(
maxSavedParseExceptions,
null,
null,
null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaTuningConfigBuilder;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
Expand All @@ -36,6 +37,9 @@
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class KafkaIndexTaskTuningConfigTest
{
Expand Down Expand Up @@ -128,6 +132,86 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}

@Test
public void testSerdeWithStreamingPartitionsSpec() throws Exception
{
final String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"streamingPartitionsSpec\": {\"partitionDimensions\": [\"tenant\", \"region\"]}\n"
+ "}";

final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(jsonStr, TuningConfig.class)),
TuningConfig.class
);

Assert.assertEquals(
new StreamingPartitionsSpec(List.of("tenant", "region")),
config.getStreamingPartitionsSpec()
);
Assert.assertEquals(List.of("tenant", "region"), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string, integers)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string

I've added some tests in this unit test and there's some existing end-to-end coverage in the embedded ones too for null, empty cases

Re numeric types, they're currently unsupported for the range shard spec as well, given how the filters work - some details here: #19415. For now it's just documented for this new shard spec, but it would be nice to expand that functionality generally so all shard specs benefit from it; if that's not trivial, we could block them at creation time at least for this new shard spec in the future

@Test
public void testSerdeWithoutStreamingPartitionsSpecIsNull() throws Exception
{
final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(mapper.readValue("{\"type\": \"kafka\"}", TuningConfig.class)),
TuningConfig.class
);
Assert.assertNull(config.getStreamingPartitionsSpec());
}

@Test
public void testSerdeWithEmptyPartitionDimensions() throws Exception
{
final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[]");
Assert.assertEquals(Collections.emptyList(), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

@Test
public void testSerdeWithNullPartitionDimensionsCoalescesToEmpty() throws Exception
{
final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("null");
Assert.assertEquals(Collections.emptyList(), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

@Test
public void testSerdeWithEmptyStringPartitionDimension() throws Exception
{
// An empty-string dimension name is preserved verbatim (it simply never matches an ingested value).
final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"\"]");
Assert.assertEquals(List.of(""), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

@Test
public void testSerdeWithNumericLookingPartitionDimension() throws Exception
{
// Dimension names are plain strings; a numeric-looking name is just a string.
final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"123\"]");
Assert.assertEquals(List.of("123"), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

@Test
public void testSerdeWithNullElementInPartitionDimensions() throws Exception
{
final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"tenant\", null]");
Assert.assertEquals(Arrays.asList("tenant", null), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

private KafkaIndexTaskTuningConfig roundTripWithStreamingPartitionsSpec(String partitionDimensionsJson)
throws IOException
{
final String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"streamingPartitionsSpec\": {\"partitionDimensions\": " + partitionDimensionsJson + "}\n"
+ "}";
return (KafkaIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(jsonStr, TuningConfig.class)),
TuningConfig.class
);
}

@Test
public void testConvert()
{
Expand Down Expand Up @@ -186,7 +270,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
42,
2,
-1,
false
false,
null
);

String serialized = mapper.writeValueAsString(base);
Expand Down
Loading
Loading