Skip to content

Commit 1699ea9

Browse files
author
aman bansal
authored
chore | increase shutdown time for kafka streams app (#74)
* chore | increase shutdown time for kafka streams app
1 parent df777b6 commit 1699ea9

File tree

1 file changed

+11
-3
lines changed
  • kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework

1 file changed

+11
-3
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ public abstract class KafkaStreamsApp extends PlatformService {
6262
public static final String CLEANUP_LOCAL_STATE = "cleanup.local.state";
6363
public static final String PRE_CREATE_TOPICS = "precreate.topics";
6464
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";
65+
private static final String SHUTDOWN_DURATION = "shutdown.duration";
6566
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class);
6667

6768
private final GrpcChannelRegistry grpcChannelRegistry;
6869

6970
protected KafkaStreams app;
7071
private KafkaStreamsMetrics metrics;
72+
private Duration shutdownDuration;
7173

7274
// Visible for testing only
7375
protected Topology topology;
@@ -93,7 +95,6 @@ protected void doInit() {
9395
try {
9496
// configure properties
9597
streamsConfig = mergeProperties(getBaseStreamsConfig(), getJobStreamsConfig(getAppConfig()));
96-
9798
// build topologies
9899
Map<String, KStream<?, ?>> sourceStreams = new HashMap<>();
99100
StreamsBuilder streamsBuilder = new StreamsBuilder();
@@ -140,7 +141,7 @@ protected void doInit() {
140141
ExceptionUtils.getStackTrace(exception));
141142
System.exit(1);
142143
});
143-
144+
this.shutdownDuration = getShutdownDuration();
144145
getLogger().info("kafka streams topologies: {}", topology.describe());
145146
} catch (Exception e) {
146147
getLogger().error("Error initializing - ", e);
@@ -165,7 +166,7 @@ protected void doStop() {
165166
if (metrics != null) {
166167
metrics.close();
167168
}
168-
app.close(Duration.ofSeconds(30));
169+
app.close(shutdownDuration);
169170
grpcChannelRegistry.shutdown(after(10, SECONDS));
170171
}
171172

@@ -294,4 +295,11 @@ private void preCreateTopics(Map<String, Object> properties) {
294295
(String) properties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), topics);
295296
}
296297
}
298+
299+
private Duration getShutdownDuration() {
300+
Config config = (Config) streamsConfig.get(getJobConfigKey());
301+
return config.hasPath(SHUTDOWN_DURATION)
302+
? config.getDuration(SHUTDOWN_DURATION)
303+
: Duration.ofSeconds(30);
304+
}
297305
}

0 commit comments

Comments
 (0)