-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat: Prunable shard specs for streaming published segments #19571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9ea1fcc
58eaeb8
c0ef019
c2925d3
c9186d5
f1153e5
2eaf560
0db49c2
1e7c55c
b9df2b6
fcfc04f
f742476
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; | ||
| import org.apache.druid.indexing.kafka.supervisor.KafkaTuningConfigBuilder; | ||
| import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; | ||
| import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec; | ||
| import org.apache.druid.jackson.DefaultObjectMapper; | ||
| import org.apache.druid.segment.IndexSpec; | ||
| import org.apache.druid.segment.data.CompressionStrategy; | ||
|
|
@@ -36,6 +37,9 @@ | |
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| public class KafkaIndexTaskTuningConfigTest | ||
| { | ||
|
|
@@ -128,6 +132,86 @@ public void testSerdeWithNonDefaults() throws Exception | |
| Assert.assertEquals(-1, config.getMaxColumnsToMerge()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithStreamingPartitionsSpec() throws Exception | ||
| { | ||
| final String jsonStr = "{\n" | ||
| + " \"type\": \"kafka\",\n" | ||
| + " \"streamingPartitionsSpec\": {\"partitionDimensions\": [\"tenant\", \"region\"]}\n" | ||
| + "}"; | ||
|
|
||
| final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( | ||
| mapper.writeValueAsString(mapper.readValue(jsonStr, TuningConfig.class)), | ||
| TuningConfig.class | ||
| ); | ||
|
|
||
| Assert.assertEquals( | ||
| new StreamingPartitionsSpec(List.of("tenant", "region")), | ||
| config.getStreamingPartitionsSpec() | ||
| ); | ||
| Assert.assertEquals(List.of("tenant", "region"), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string, integers)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've added some tests in this unit test and there's some existing end-to-end coverage in the embedded ones too for null, empty cases Re numeric types, they're currently unsupported for the |
||
| @Test | ||
| public void testSerdeWithoutStreamingPartitionsSpecIsNull() throws Exception | ||
| { | ||
| final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( | ||
| mapper.writeValueAsString(mapper.readValue("{\"type\": \"kafka\"}", TuningConfig.class)), | ||
| TuningConfig.class | ||
| ); | ||
| Assert.assertNull(config.getStreamingPartitionsSpec()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithEmptyPartitionDimensions() throws Exception | ||
| { | ||
| final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[]"); | ||
| Assert.assertEquals(Collections.emptyList(), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithNullPartitionDimensionsCoalescesToEmpty() throws Exception | ||
| { | ||
| final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("null"); | ||
| Assert.assertEquals(Collections.emptyList(), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithEmptyStringPartitionDimension() throws Exception | ||
| { | ||
| // An empty-string dimension name is preserved verbatim (it simply never matches an ingested value). | ||
| final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"\"]"); | ||
| Assert.assertEquals(List.of(""), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithNumericLookingPartitionDimension() throws Exception | ||
| { | ||
| // Dimension names are plain strings; a numeric-looking name is just a string. | ||
| final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"123\"]"); | ||
| Assert.assertEquals(List.of("123"), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSerdeWithNullElementInPartitionDimensions() throws Exception | ||
| { | ||
| final KafkaIndexTaskTuningConfig config = roundTripWithStreamingPartitionsSpec("[\"tenant\", null]"); | ||
| Assert.assertEquals(Arrays.asList("tenant", null), config.getStreamingPartitionsSpec().getPartitionDimensions()); | ||
| } | ||
|
|
||
| private KafkaIndexTaskTuningConfig roundTripWithStreamingPartitionsSpec(String partitionDimensionsJson) | ||
| throws IOException | ||
| { | ||
| final String jsonStr = "{\n" | ||
| + " \"type\": \"kafka\",\n" | ||
| + " \"streamingPartitionsSpec\": {\"partitionDimensions\": " + partitionDimensionsJson + "}\n" | ||
| + "}"; | ||
| return (KafkaIndexTaskTuningConfig) mapper.readValue( | ||
| mapper.writeValueAsString(mapper.readValue(jsonStr, TuningConfig.class)), | ||
| TuningConfig.class | ||
| ); | ||
| } | ||
|
|
||
| @Test | ||
| public void testConvert() | ||
| { | ||
|
|
@@ -186,7 +270,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException | |
| 42, | ||
| 2, | ||
| -1, | ||
| false | ||
| false, | ||
| null | ||
| ); | ||
|
|
||
| String serialized = mapper.writeValueAsString(base); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Makes sense to have this in tuningConfig, and in the future we can add cardinality guardrails into
streamingPartitionsSpecas well