diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index e56a4cbfb4e56..3e3434e0add72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -42,6 +43,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.regex.Pattern; @@ -83,6 +85,7 @@ public StreamsBuilder() { * * @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail */ + @Deprecated @SuppressWarnings("this-escape") public StreamsBuilder(final TopologyConfig topologyConfigs) { topology = newTopology(topologyConfigs); @@ -90,10 +93,25 @@ public StreamsBuilder(final TopologyConfig topologyConfigs) { internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } + public StreamsBuilder(final Map configs) { + this(Utils.mkObjectProperties(configs)); + } + + public StreamsBuilder(final Properties properties) { + topology = newTopology(properties); + internalTopologyBuilder = topology.internalTopologyBuilder; + internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + } + + @Deprecated protected Topology newTopology(final TopologyConfig topologyConfigs) { return new Topology(topologyConfigs); } + protected Topology newTopology(final Properties properties) { + return new Topology(properties); + } + /** * Create a {@link KStream} from the specified topic. * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value @@ -574,7 +592,8 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< * @return the {@link Topology} that represents the specified processing logic */ public synchronized Topology build() { - return build(null); + internalStreamsBuilder.buildAndOptimizeTopology(); + return topology; } /** @@ -584,6 +603,7 @@ public synchronized Topology build() { * @param props the {@link Properties} used for building possibly optimized topology * @return the {@link Topology} that represents the specified processing logic */ + @Deprecated public synchronized Topology build(final Properties props) { internalStreamsBuilder.buildAndOptimizeTopology(props); return topology; diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 944446b4bec74..e37267ccb9654 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -49,6 +49,8 @@ import org.apache.kafka.streams.kstream.SessionWindowedSerializer; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindowedSerializer; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.assignment.TaskAssignor; @@ -57,6 +59,7 @@ import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; +import org.apache.kafka.streams.state.DslStoreSuppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +74,10 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -81,6 +86,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.common.config.ConfigDef.parseType; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize; /** * Configuration for a {@link KafkaStreams} instance. @@ -1515,6 +1521,18 @@ public StreamsConfig(final Map props) { this(props, true); } + + public final int maxBufferedSize; + public final long cacheSize; + public final long maxTaskIdleMs; + public final long taskTimeoutMs; + public final String storeType; + public final Class dslStoreSuppliers; + public final Supplier timestampExtractorSupplier; + public final Supplier deserializationExceptionHandlerSupplier; + public final Supplier processingExceptionHandlerSupplier; + public final boolean ensureExplicitInternalResourceNaming; + @SuppressWarnings("this-escape") protected StreamsConfig(final Map props, final boolean doLog) { @@ -1523,11 +1541,41 @@ protected StreamsConfig(final Map props, if (eosEnabled) { verifyEOSTransactionTimeoutCompatibility(); } + + this.maxBufferedSize = this.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + this.cacheSize = totalCacheSize(this); + this.maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG); + this.taskTimeoutMs = getLong(TASK_TIMEOUT_MS_CONFIG); + this.timestampExtractorSupplier = () -> getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + + final String deserializationExceptionHandlerKey = (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) + || originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) ? + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG : + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; + this.deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class); + + this.processingExceptionHandlerSupplier = () -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); + this.storeType = getString(DEFAULT_DSL_STORE_CONFIG); + this.dslStoreSuppliers = getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG); + this.ensureExplicitInternalResourceNaming = getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG); + verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG)); verifyClientTelemetryConfigs(); verifyStreamsProtocolCompatibility(doLog); } + public TaskConfig getTaskConfig() { + return new TaskConfig( + maxTaskIdleMs, + taskTimeoutMs, + maxBufferedSize, + timestampExtractorSupplier.get(), + deserializationExceptionHandlerSupplier.get(), + processingExceptionHandlerSupplier.get(), + eosEnabled + ); + } + private void verifyStreamsProtocolCompatibility(final boolean doLog) { if (doLog && isStreamsProtocolEnabled()) { log.warn("The streams rebalance protocol is still in development and should not be used in production. " @@ -2146,6 +2194,20 @@ protected boolean isStreamsProtocolEnabled() { return getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name()); } + /** + * @return the DslStoreSuppliers if the value was explicitly configured (either by + * {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}) + */ + public Optional resolveDslStoreSuppliers() { + if (this.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) { + return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class)); + } else if (this.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) { + return Optional.of(MaterializedInternal.parse(storeType)); + } else { + return Optional.empty(); + } + } + /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index e032abc346fd4..3b0d3200432db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.AutoOffsetResetInternal; import org.apache.kafka.streams.kstream.KStream; @@ -38,7 +39,9 @@ import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.state.StoreBuilder; +import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -65,10 +68,19 @@ public Topology() { this(new InternalTopologyBuilder()); } + @Deprecated public Topology(final TopologyConfig topologyConfigs) { this(new InternalTopologyBuilder(topologyConfigs)); } + public Topology(final Map configs) { + this(Utils.mkObjectProperties(configs)); + } + + public Topology(final Properties properties) { + this(new InternalTopologyBuilder(new StreamsConfig(properties))); + } + protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { this.internalTopologyBuilder = internalTopologyBuilder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index da8c246b26d12..c7807ba3b45b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -84,7 +84,7 @@ * If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them * to be applied and the config will be ignored. */ -@SuppressWarnings("deprecation") +@Deprecated public final class TopologyConfig extends AbstractConfig { private static final ConfigDef CONFIG; static { @@ -294,6 +294,7 @@ public Materialized.StoreType parseStoreType() { * @return the DslStoreSuppliers if the value was explicitly configured (either by * {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}) */ + @Deprecated public Optional resolveDslStoreSuppliers() { if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) { return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 968276bd501b0..73148c29508a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -297,22 +297,30 @@ private void maybeAddNodeForVersionedSemanticsMetadata(final GraphNode node) { } } - // use this method for testing only public void buildAndOptimizeTopology() { - buildAndOptimizeTopology(null); + buildAndOptimizeTopologyInternal(null); } + @Deprecated public void buildAndOptimizeTopology(final Properties props) { + buildAndOptimizeTopologyInternal(props); + } + + private void buildAndOptimizeTopologyInternal(final Properties props) { mergeDuplicateSourceNodes(); - optimizeTopology(props); + if (props != null) { + optimizeTopology(props); + } else { + optimizeTopology(); + } enableVersionedSemantics(); - final PriorityQueue graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority)); - + final PriorityQueue graphNodePriorityQueue = + new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority)); graphNodePriorityQueue.offer(root); while (!graphNodePriorityQueue.isEmpty()) { - final GraphNode streamGraphNode = graphNodePriorityQueue.remove(); + final GraphNode streamGraphNode = graphNodePriorityQueue.poll(); if (LOG.isDebugEnabled()) { LOG.debug("Adding nodes to topology {} child nodes {}", streamGraphNode, streamGraphNode.children()); @@ -323,14 +331,13 @@ public void buildAndOptimizeTopology(final Properties props) { streamGraphNode.setHasWrittenToTopology(true); } - for (final GraphNode graphNode : streamGraphNode.children()) { - graphNodePriorityQueue.offer(graphNode); + for (final GraphNode child : streamGraphNode.children()) { + graphNodePriorityQueue.offer(child); } } - internalTopologyBuilder.validateCopartition(); + internalTopologyBuilder.validateCopartition(); internalTopologyBuilder.checkUnprovidedNames(); - } /** @@ -339,12 +346,36 @@ public void buildAndOptimizeTopology(final Properties props) { */ private void optimizeTopology(final Properties props) { final Set optimizationConfigs; + if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { optimizationConfigs = Collections.emptySet(); } else { optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs( - (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)); + (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG) + ); } + + applyOptimizations(optimizationConfigs); + } + + /** + * This method is called after the topology has been built, and it applies the optimizations + * based on the configuration set in the topology configs. + */ + private void optimizeTopology() { + final StreamsConfig topicSpecificConfigs = internalTopologyBuilder.topologySpecificConfigs(); + final String configValue = topicSpecificConfigs != null + ? topicSpecificConfigs.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG) + : null; + + final Set optimizationConfigs = configValue != null + ? StreamsConfig.verifyTopologyOptimizationConfigs(configValue) + : Collections.emptySet(); + + applyOptimizations(optimizationConfigs); + } + + private void applyOptimizations(final Set optimizationConfigs) { if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) { LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes"); reuseKTableSourceTopics(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index 7bd727b09be3b..e6bce92a7f8bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -43,6 +43,7 @@ public MaterializedInternal(final Materialized materialized, this(materialized, nameProvider, generatedStorePrefix, false); } + @SuppressWarnings("deprecation") public MaterializedInternal(final Materialized materialized, final InternalNameProvider nameProvider, final String generatedStorePrefix, @@ -68,9 +69,14 @@ public MaterializedInternal(final Materialized materialized, // is configured with the main StreamsConfig if (dslStoreSuppliers == null) { if (nameProvider instanceof InternalStreamsBuilder) { - final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs(); - if (topologyConfig != null) { - dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); + final StreamsConfig streamsConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologySpecificConfigs(); + if (streamsConfig != null) { + dslStoreSuppliers = streamsConfig.resolveDslStoreSuppliers().orElse(null); + } else { + final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs(); + if (topologyConfig != null) { + dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java index 0f245ba153ac1..ba955be98cec3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.state.DslStoreSuppliers; @@ -32,6 +33,7 @@ public class StreamJoinedInternal extends StreamJoined { // store in the desired order (see comments in OuterStreamJoinFactory) private final DslStoreSuppliers passedInDslStoreSuppliers; + @SuppressWarnings("deprecation") //Needs to be public for testing public StreamJoinedInternal( final StreamJoined streamJoined, @@ -40,9 +42,14 @@ public StreamJoinedInternal( super(streamJoined); passedInDslStoreSuppliers = dslStoreSuppliers; if (dslStoreSuppliers == null) { - final TopologyConfig topologyConfig = builder.internalTopologyBuilder().topologyConfigs(); - if (topologyConfig != null) { - dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); + final StreamsConfig streamsConfig = builder.internalTopologyBuilder.topologySpecificConfigs(); + if (streamsConfig != null) { + dslStoreSuppliers = streamsConfig.resolveDslStoreSuppliers().orElse(null); + } else { + final TopologyConfig topologyConfig = builder.internalTopologyBuilder().topologyConfigs(); + if (topologyConfig != null) { + dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TaskConfig.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TaskConfig.java new file mode 100644 index 0000000000000..c6419c8b4d7c3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TaskConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public class TaskConfig { + public final long maxTaskIdleMs; + public final long taskTimeoutMs; + public final int maxBufferedSize; + public final TimestampExtractor timestampExtractor; + public final DeserializationExceptionHandler deserializationExceptionHandler; + public final ProcessingExceptionHandler processingExceptionHandler; + public final boolean eosEnabled; + + public TaskConfig(final long maxTaskIdleMs, + final long taskTimeoutMs, + final int maxBufferedSize, + final TimestampExtractor timestampExtractor, + final DeserializationExceptionHandler deserializationExceptionHandler, + final ProcessingExceptionHandler processingExceptionHandler, + final boolean eosEnabled) { + this.maxTaskIdleMs = maxTaskIdleMs; + this.taskTimeoutMs = taskTimeoutMs; + this.maxBufferedSize = maxBufferedSize; + this.timestampExtractor = timestampExtractor; + this.deserializationExceptionHandler = deserializationExceptionHandler; + this.processingExceptionHandler = processingExceptionHandler; + this.eosEnabled = eosEnabled; + } + + public TaskConfig getTaskConfig() { + return new TaskConfig( + maxTaskIdleMs, + taskTimeoutMs, + maxBufferedSize, + timestampExtractor, + deserializationExceptionHandler, + processingExceptionHandler, + eosEnabled + ); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorWrapper.java index 22b80a35ecb3e..7ef376a23f2f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorWrapper.java @@ -20,12 +20,11 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedFixedKeyProcessorSupplierImpl; import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper.WrappedProcessorSupplierImpl; import java.util.Map; +import java.util.Properties; /** * Wrapper class that can be used to inject custom wrappers around the processors of their application topology. @@ -35,12 +34,12 @@ * while testing or debugging an application topology). *

* NOTE: in order to use this feature, you must set the {@link StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG} config and pass it - * in as a {@link TopologyConfig} when creating the {@link StreamsBuilder} or {@link Topology} by using the - * appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} or {@link Topology#Topology(TopologyConfig)}) + * in as a Properties or Map when creating the {@link StreamsBuilder} by using the + * appropriate constructor (ie {@link StreamsBuilder#StreamsBuilder(Properties)} or {@link StreamsBuilder#StreamsBuilder(Map)}) *

* Can be configured, if desired, by implementing the {@link #configure(Map)} method. This will be invoked when - * the {@code ProcessorWrapper} is instantiated, and will provide it with the TopologyConfigs that were passed in - * to the {@link StreamsBuilder} or {@link Topology} constructor. + * the {@code ProcessorWrapper} is instantiated, and will provide it with the StreamsConfig that were passed in + * to one of the {@link StreamsBuilder} constructors. */ public interface ProcessorWrapper extends Configurable { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 7f33c4693b8a4..a74c04f698299 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -20,9 +20,9 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 2503a35841963..77e07d919742f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.internals.AutoOffsetResetInternal; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -66,23 +67,28 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; +@SuppressWarnings("deprecation") public class InternalTopologyBuilder { public InternalTopologyBuilder() { this.topologyName = null; this.ensureExplicitInternalResourceNaming = false; this.processorWrapper = new NoOpProcessorWrapper(); + this.dslStoreSuppliers = null; } + @Deprecated public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { Objects.requireNonNull(topologyConfigs, "topologyConfigs cannot be null"); this.topologyConfigs = topologyConfigs; this.topologyName = topologyConfigs.topologyName; this.ensureExplicitInternalResourceNaming = topologyConfigs.ensureExplicitInternalResourceNaming; + this.dslStoreSuppliers = topologyConfigs.dslStoreSuppliers; try { processorWrapper = topologyConfigs.getConfiguredInstance( PROCESSOR_WRAPPER_CLASS_CONFIG, @@ -98,6 +104,28 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { } } + public InternalTopologyBuilder(final StreamsConfig topologySpecificConfigs) { + Objects.requireNonNull(topologySpecificConfigs, "streamConfigs cannot be null"); + + this.topologySpecificConfigs = topologySpecificConfigs; + this.topologyName = null; + this.ensureExplicitInternalResourceNaming = topologySpecificConfigs.ensureExplicitInternalResourceNaming; + this.dslStoreSuppliers = topologySpecificConfigs.dslStoreSuppliers; + try { + processorWrapper = topologySpecificConfigs.getConfiguredInstance( + PROCESSOR_WRAPPER_CLASS_CONFIG, + ProcessorWrapper.class, + topologySpecificConfigs.originals() + ); + } catch (final Exception e) { + final String errorMessage = String.format( + "Unable to instantiate ProcessorWrapper from value of config %s. Please provide a valid class " + + "that implements the ProcessorWrapper interface.", PROCESSOR_WRAPPER_CLASS_CONFIG); + log.error(errorMessage, e); + throw new ConfigException(errorMessage, e); + } + } + private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class); private static final String[] NO_PREDECESSORS = {}; @@ -194,14 +222,21 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { // TODO KAFKA-13283: once we enforce all configs be passed in when constructing the topology builder then we can set // this up front and make it final, but for now we have to wait for the global app configs passed in to rewriteTopology + @SuppressWarnings("deprecation") private TopologyConfig topologyConfigs; // the configs for this topology, including overrides and global defaults + private StreamsConfig topologySpecificConfigs; + + private TaskConfig taskConfig; + private boolean hasPersistentStores = false; private final boolean ensureExplicitInternalResourceNaming; private final Set implicitInternalNames = new LinkedHashSet<>(); + public final Class dslStoreSuppliers; + public static class ReprocessFactory { private final ProcessorSupplier processorSupplier; @@ -411,6 +446,9 @@ public final synchronized void setStreamsConfig(final StreamsConfig applicationC ? new Properties() : topologyConfigs.topologyOverrides; topologyConfigs = new TopologyConfig(topologyName, applicationConfig, topologyOverrides); + + topologySpecificConfigs = applicationConfig; + taskConfig = applicationConfig.getTaskConfig(); } @SuppressWarnings("deprecation") @@ -418,10 +456,19 @@ public final synchronized void setNamedTopology(final NamedTopology namedTopolog this.namedTopology = namedTopology; } + @Deprecated public synchronized TopologyConfig topologyConfigs() { return topologyConfigs; } + public synchronized StreamsConfig topologySpecificConfigs() { + return topologySpecificConfigs; + } + + public synchronized TaskConfig taskConfigs() { + return taskConfig; + } + public String topologyName() { return topologyName; } @@ -438,7 +485,7 @@ public final synchronized InternalTopologyBuilder rewriteTopology(final StreamsC setStreamsConfig(config); // maybe strip out caching layers - if (topologyConfigs.cacheSize == 0L) { + if (topologyConfigs.cacheSize == 0L && topologySpecificConfigs.cacheSize == 0L) { for (final StoreFactory storeFactory : stateFactories.values()) { storeFactory.withCachingDisabled(); } @@ -457,6 +504,76 @@ public final synchronized InternalTopologyBuilder rewriteTopology(final StreamsC return this; } + /** + * Verify that the topology-specific configs are set correctly in the provided {@link StreamsConfig}. + * This method checks that the configs are set in both the topology and the StreamsConfig, and that they match. + * + * @param config the {@link StreamsConfig} to verify against + * @throws TopologyException if any of the topology-specific configs are not set correctly + */ + public void verifySpecificTopologyConfig(final StreamsConfig config) { + Objects.requireNonNull(config, "config cannot be null"); + + // check that the topology configs is set + if (topologySpecificConfigs == null) { + return; + } + + final StringBuilder resultException = new StringBuilder(); + final StringBuilder resultLog = new StringBuilder(); + + verifyConfig( + config.originals().get(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG), + topologySpecificConfigs.originals().get(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG), + ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, + ensureExplicitInternalResourceNaming, + resultException, + resultLog + ); + verifyConfig( + config.originals().get(DEFAULT_DSL_STORE_CONFIG), + topologySpecificConfigs.originals().get(DEFAULT_DSL_STORE_CONFIG), + DEFAULT_DSL_STORE_CONFIG, + topologySpecificConfigs.originals().get(DEFAULT_DSL_STORE_CONFIG), + resultException, + resultLog + ); + verifyConfig( + config.originals().get(PROCESSOR_WRAPPER_CLASS_CONFIG), + topologySpecificConfigs.originals().get(PROCESSOR_WRAPPER_CLASS_CONFIG), + PROCESSOR_WRAPPER_CLASS_CONFIG, + processorWrapper, + resultException, + resultLog + ); + + if (resultLog.length() > 0) { + log.warn(resultLog.toString()); + } + if (resultException.length() > 0) { + throw new TopologyException(resultException.toString()); + } + } + + @SuppressWarnings("unchecked") + private void verifyConfig(final Object configValue, + final Object topologyValue, + final String configKey, + final Object expectedTopologyValue, + final StringBuilder exceptionBuilder, + final StringBuilder logBuilder + ) { + final T original = (T) configValue; + final T topology = (T) topologyValue; + + if (original != null && topology == null) { + exceptionBuilder.append(String.format("The topology-specific config %s is set in StreamsConfig but not applied to the topology.%n", configKey)); + } else if (original != null && !original.equals(expectedTopologyValue)) { + logBuilder.append(String.format("The topology-specific config %s has different values in the topology %s and in StreamsConfig %s.%n", + configKey, expectedTopologyValue, original)); + } + } + private void verifyName(final String name) { Objects.requireNonNull(name, "name cannot be null"); if (nodeFactories.containsKey(name)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4c6e6674bdbcf..f43fb8465ada2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -23,10 +23,10 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java index 2abdad5998677..1bad7034dfccb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.StoreBuilder; import java.util.Map; +import java.util.Properties; import java.util.Set; /** @@ -43,7 +45,8 @@ * creation of the Topology but before the stores themselves are created. * This allows Kafka Streams to respect configurations such as * {@link StreamsConfig#DEFAULT_DSL_STORE_CONFIG} even if it isn't passed - * to {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} + * to {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)}, + * {@link StreamsBuilder#StreamsBuilder(Properties)} or {@link StreamsBuilder#StreamsBuilder(Map)} * */ public interface StoreFactory extends ConfigurableStore { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 82e9c8d7fb110..1da60d62b33a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; @@ -38,6 +37,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index fe04f2c4613f2..ddaa69f59b27e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -22,11 +22,11 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.UnknownTopologyException; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; @@ -310,7 +310,7 @@ public KafkaFuture unregisterTopology(final KafkaFutureImpl removeTo public TaskConfig taskConfig(final TaskId taskId) { final InternalTopologyBuilder builder = lookupBuilderForTask(taskId); - return builder.topologyConfigs().getTaskConfig(); + return builder.taskConfigs().getTaskConfig(); } public void buildAndRewriteTopology() { @@ -318,6 +318,7 @@ public void buildAndRewriteTopology() { } private void buildAndVerifyTopology(final InternalTopologyBuilder builder) { + builder.verifySpecificTopologyConfig(config); builder.rewriteTopology(config); builder.buildTopology(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java index 014d1f4f9702e..c1c20b6345002 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java @@ -17,9 +17,10 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.Configurable; -import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.StreamsBuilder; import java.util.Map; +import java.util.Properties; /** * {@code DslStoreSuppliers} defines a grouping of factories to construct @@ -36,8 +37,8 @@ * {@link org.apache.kafka.streams.kstream.StreamJoined#withDslStoreSuppliers(DslStoreSuppliers)} * *

  • Passed in via a Topology configuration override (configured in a - * {@link org.apache.kafka.streams.TopologyConfig} and passed into the - * {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor
  • + * Properties or Map and passed into the + * {@link StreamsBuilder#StreamsBuilder(Properties)} or {@link StreamsBuilder#StreamsBuilder(Map)} constructors * *
  • Configured as a global default in {@link org.apache.kafka.streams.StreamsConfig} using * the {@link org.apache.kafka.streams.StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}
  • diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java index 24600c57fec1a..7a7a5d9b7c78e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java @@ -102,6 +102,22 @@ public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() { assertThat(materialized.dslStoreSuppliers(), equalTo(Optional.of(Materialized.StoreType.IN_MEMORY))); } + @SuppressWarnings("deprecation") + @Test + public void shouldUseStoreTypeWhenProvidedViaStreamsConfig() { + final Properties properties = StreamsTestUtils.getStreamsConfig(); + properties.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY); + final StreamsConfig config = new StreamsConfig(properties); + + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(config); + + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + + final MaterializedInternal> materialized = + new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); + assertThat(materialized.dslStoreSuppliers(), equalTo(Optional.of(Materialized.StoreType.IN_MEMORY))); + } + @SuppressWarnings("deprecation") @Test public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig() { @@ -121,6 +137,24 @@ public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig( assertThat(materialized.dslStoreSuppliers().get(), instanceOf(TestStoreSupplier.class)); } + @SuppressWarnings("deprecation") + @Test + public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaStreamsConfig() { + final Properties properties = StreamsTestUtils.getStreamsConfig(); + properties.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.ROCKS_DB); + properties.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, TestStoreSupplier.class); + final StreamsConfig config = new StreamsConfig(properties); + + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(new TopologyConfig(config)); + + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + + final MaterializedInternal> materialized = + new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); + assertThat(materialized.dslStoreSuppliers().isPresent(), is(true)); + assertThat(materialized.dslStoreSuppliers().get(), instanceOf(TestStoreSupplier.class)); + } + @Test public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified() { final Properties topologyOverrides = new Properties(); @@ -136,6 +170,19 @@ public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified assertThat(materialized.dslStoreSuppliers().isPresent(), is(false)); } + @Test + public void shouldReturnEmptyWhenOriginalsDontHaveSuppliersSpecified() { + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); + + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(config); + + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + + final MaterializedInternal> materialized = + new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); + assertThat(materialized.dslStoreSuppliers().isPresent(), is(false)); + } + public static class TestStoreSupplier implements DslStoreSuppliers { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 17eb27bb3d8f1..dc964e37f84c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -26,8 +26,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -255,7 +255,7 @@ private void createTasks() { final ProcessorTopology topology = mock(ProcessorTopology.class); final SourceNode sourceNode = mock(SourceNode.class); - when(builder.topologyConfigs()).thenReturn(new TopologyConfig(new StreamsConfig(properties))); + when(builder.taskConfigs()).thenReturn(new TaskConfig(0, 0, 1000, null, null, null, false)); when(builder.buildSubtopology(0)).thenReturn(topology); when(topology.sinkTopics()).thenReturn(emptySet()); when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 86264b59bf712..7ee8ed145fb1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -84,6 +84,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -1452,6 +1453,182 @@ public void shouldThrowOnInvalidProcessorWrapperClassName() { ); } + @Test + public void shouldThrowNullPointerExceptionWhenConfigIsNull() { + assertThrows(NullPointerException.class, () -> builder.verifySpecificTopologyConfig(null)); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldNotThrowExceptionWhenConfigsMatch() { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class); + props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "rocksDB"); + + final StreamsConfig config = new StreamsConfig(props); + builder.setStreamsConfig(config); + + // Should not throw any exception + builder.verifySpecificTopologyConfig(config); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldNotThrowTopologyExceptionWhenStoreTypeConfigsDoNotMatch() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props1.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory"); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "rocksDB"); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + assertDoesNotThrow(() -> builder.verifySpecificTopologyConfig(streamsConfig)); + } + + @Test + public void shouldNotThrowTopologyExceptionWhenProcessorConfigsDoNotMatch() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props1.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, NoOpProcessorWrapper.class); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + assertDoesNotThrow(() -> builder.verifySpecificTopologyConfig(streamsConfig)); + } + + @Test + public void shouldThrowTopologyExceptionWhenExplicitNamingConfigsDoNotMatch() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props1.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, false); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + assertDoesNotThrow(() -> builder.verifySpecificTopologyConfig(streamsConfig)); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldNotThrowTopologyExceptionWhenStoreTypeConfigsIsMissing() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "rocksDB"); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + final TopologyException e = assertThrows(TopologyException.class, () -> builder.verifySpecificTopologyConfig(streamsConfig)); + assertTrue(e.getMessage().contains("Invalid topology: The topology-specific config default.dsl.store is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config processor.wrapper.class is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config ensure.explicit.internal.resource.naming is set in StreamsConfig but not applied to the topology.")); + + } + + @Test + public void shouldNotThrowTopologyExceptionWhenProcessorConfigsIsMissing() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + final TopologyException e = assertThrows(TopologyException.class, () -> builder.verifySpecificTopologyConfig(streamsConfig)); + assertTrue(e.getMessage().contains("Invalid topology: The topology-specific config processor.wrapper.class is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config default.dsl.store is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config ensure.explicit.internal.resource.naming is set in StreamsConfig but not applied to the topology.")); + } + + @Test + public void shouldThrowTopologyExceptionWhenExplicitNamingConfigsIsMissing() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + final TopologyException e = assertThrows(TopologyException.class, () -> builder.verifySpecificTopologyConfig(streamsConfig)); + assertTrue(e.getMessage().contains("Invalid topology: The topology-specific config ensure.explicit.internal.resource.naming is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config default.dsl.store is set in StreamsConfig but not applied to the topology.")); + assertFalse(e.getMessage().contains("The topology-specific config processor.wrapper.class is set in StreamsConfig but not applied to the topology.")); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldThrowTopologyExceptionWhenAllConfigsIsMissing() { + final Properties props1 = new Properties(); + props1.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props1.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + + final Properties props2 = new Properties(); + props2.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props2.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy-1234"); + props2.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + props2.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, ProcessorSkippingWrapper.class); + props2.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "rocksDB"); + + final StreamsConfig topologyConfig = new StreamsConfig(props1); + final StreamsConfig streamsConfig = new StreamsConfig(props2); + + builder.setStreamsConfig(topologyConfig); + + final TopologyException e = assertThrows(TopologyException.class, () -> builder.verifySpecificTopologyConfig(streamsConfig)); + assertTrue(e.getMessage().contains("Invalid topology: The topology-specific config ensure.explicit.internal.resource.naming is set in StreamsConfig but not applied to the topology.")); + assertTrue(e.getMessage().contains("The topology-specific config default.dsl.store is set in StreamsConfig but not applied to the topology.")); + assertTrue(e.getMessage().contains("The topology-specific config processor.wrapper.class is set in StreamsConfig but not applied to the topology.")); + } + public static class ProcessorSkippingWrapper implements ProcessorWrapper { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 6be27187f0fc7..2d24620ccc10d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; @@ -582,7 +581,7 @@ private StandbyTask createStandbyTask() { taskId, Collections.singleton(partition), topology, - new TopologyConfig(config).getTaskConfig(), + config.getTaskConfig(), streamsMetrics, stateManager, stateDirectory, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index f850093d389d4..88fb039698bfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -931,7 +930,6 @@ public void shouldNotCloseStartupTasksOnAutoCleanUp() { private StateStore initializeStartupTasks(final TaskId taskId, final boolean createTaskDir) { directory.initializeProcessId(); final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class); - final TopologyConfig topologyConfig = new TopologyConfig(config); final StateStore store = new MockKeyValueStore("test", true); @@ -952,7 +950,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Collections.emptyMap() ); Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); - Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); + Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(config.getTaskConfig()); directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 701f38eda0c4e..da044514b100b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -45,7 +45,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; @@ -97,7 +96,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1994,7 +1992,7 @@ public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean Set.of(partition1, repartition), topology, consumer, - new TopologyConfig(null, config, new Properties()).getTaskConfig(), + config.getTaskConfig(), streamsMetrics, stateDirectory, cache, @@ -2616,7 +2614,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { partitions, topology, consumer, - new TopologyConfig(null, createConfig("100"), new Properties()).getTaskConfig(), + createConfig("100").getTaskConfig(), metrics, stateDirectory, cache, @@ -3087,7 +3085,7 @@ private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Set.of(partition1), topology, consumer, - new TopologyConfig(null, config, new Properties()).getTaskConfig(), + config.getTaskConfig(), streamsMetrics, stateDirectory, cache, @@ -3129,7 +3127,7 @@ public Map committed(final Set partitions, final boolean active, final ProcessorStateManager processorStateManager) { - super(id, null, null, processorStateManager, partitions, (new TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task", StateMachineTask.class); + super(id, null, null, processorStateManager, partitions, new DummyStreamsConfig().getTaskConfig(), "test-task", StateMachineTask.class); this.active = active; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 212a65eacddf9..85308e3e25f1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -474,7 +473,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, partitions, topology, consumer, - new TopologyConfig(null, streamsConfig, new Properties()).getTaskConfig(), + streamsConfig.getTaskConfig(), streamsMetrics, stateDirectory, mock(ThreadCache.class), diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 81c90d043cec4..90064dab85fa3 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -41,11 +41,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TaskConfig; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -355,7 +355,7 @@ public List partitionsFor(final String topic) { ); setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, cache); - setupTask(streamsConfig, streamsMetrics, cache, internalTopologyBuilder.topologyConfigs().getTaskConfig()); + setupTask(streamsConfig, streamsMetrics, cache, internalTopologyBuilder.topologySpecificConfigs().getTaskConfig()); } private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) { @@ -1304,7 +1304,7 @@ public KeyValueIterator, V> fetch(final K keyFrom, final long timeFrom, final long timeTo) { return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), - Instant.ofEpochMilli(timeTo)); + Instant.ofEpochMilli(timeTo)); } @Override diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 2dc0089990e69..930d057790da5 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -403,6 +403,7 @@ public void process(final Record record) { return topology; } + @SuppressWarnings("deprecation") private Topology setupTopologyWithInternalTopic(final String firstTableName, final String secondTableName, final String joinName) {