Skip to content

Commit 3b0da3f

Browse files
authored
revert: going back to kstream to older version (#36)
* revert: going back to ksteram to older version * Revert "Fix RocksDB native memory leak in stateful applications (#32)" This reverts commit e2e26cf.
1 parent b8cdae7 commit 3b0da3f

File tree

4 files changed

+9
-19
lines changed

4 files changed

+9
-19
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ tasks.test {
1212
dependencies {
1313
api(project(":kafka-streams-serdes"))
1414
api("com.typesafe:config:1.4.1")
15-
api("org.apache.kafka:kafka-streams:6.1.0-ccs")
16-
api("io.confluent:kafka-streams-avro-serde:6.1.0")
15+
api("org.apache.kafka:kafka-streams:6.0.1-ccs")
16+
api("io.confluent:kafka-streams-avro-serde:6.0.1")
1717

1818
implementation("com.google.guava:guava:30.1-jre")
1919
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.23")
2020
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.23")
21-
implementation("org.apache.kafka:kafka-clients:6.1.0-ccs")
21+
implementation("org.apache.kafka:kafka-clients:6.0.1-ccs")
2222

23-
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.1.0-ccs")
23+
testImplementation("org.apache.kafka:kafka-streams-test-utils:6.0.1-ccs")
2424
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
2525
testImplementation("org.junit-pioneer:junit-pioneer:1.1.0")
2626
testImplementation("org.mockito:mockito-core:3.6.28")

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBCacheProvider.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.google.common.annotations.VisibleForTesting;
1414
import java.util.Map;
1515
import org.apache.kafka.common.config.ConfigException;
16-
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
1716
import org.rocksdb.BlockBasedTableConfig;
1817
import org.rocksdb.Cache;
1918
import org.rocksdb.LRUCache;
@@ -108,7 +107,7 @@ protected synchronized void initCache(Options options, Map<String, Object> confi
108107
cacheTotalCapacity / (1024 * 1024), writeBuffersRatio, highPriorityPoolRatio);
109108
}
110109

111-
final BlockBasedTableConfigWithAccessibleCache tableConfig = (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
110+
final BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
112111

113112
// ######### Block cache (Read buffers) #########
114113
if (configs.containsKey(BLOCK_SIZE)) {
@@ -134,12 +133,7 @@ protected synchronized void initCache(Options options, Map<String, Object> confi
134133

135134
options.setWriteBufferManager(writeBufferManager);
136135

137-
final Cache oldCache = tableConfig.blockCache();
138-
if(oldCache != null) {
139-
LOG.info("Releasing old rocksdb cache before setting shared cache.");
140-
oldCache.close();
141-
}
142-
tableConfig.setBlockCache(this.cache);
136+
tableConfig.setBlockCache(cache);
143137
options.setTableFormatConfig(tableConfig);
144138
}
145139

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetterTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.stream.Stream;
1818
import org.apache.kafka.common.config.ConfigException;
1919
import org.apache.kafka.streams.state.RocksDBConfigSetter;
20-
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
2120
import org.junit.jupiter.api.AfterEach;
2221
import org.junit.jupiter.api.BeforeEach;
2322
import org.junit.jupiter.api.Test;
@@ -27,7 +26,6 @@
2726
import org.rocksdb.CompactionStyle;
2827
import org.rocksdb.CompressionType;
2928
import org.rocksdb.InfoLogLevel;
30-
import org.rocksdb.LRUCache;
3129
import org.rocksdb.Options;
3230

3331
class BoundedMemoryConfigSetterTest {
@@ -43,9 +41,7 @@ public void setUp() {
4341
RocksDBCacheProvider.get().testDestroy();
4442
options = new Options();
4543
configs = new HashMap<>();
46-
tableConfig = new BlockBasedTableConfigWithAccessibleCache();
47-
// mimic kafka streams
48-
tableConfig.setBlockCache(new LRUCache(32 * 1024));
44+
tableConfig = new BlockBasedTableConfig();
4945
options.setTableFormatConfig(tableConfig);
5046
configSetter = new BoundedMemoryConfigSetter();
5147
}

kafka-streams-serdes/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ tasks.test {
1111
}
1212

1313
dependencies {
14-
api("org.apache.kafka:kafka-streams:6.1.0-ccs")
14+
api("org.apache.kafka:kafka-streams:6.0.1-ccs")
1515
implementation("org.apache.avro:avro:1.10.2")
16-
implementation("org.apache.kafka:kafka-clients:6.1.0-ccs")
16+
implementation("org.apache.kafka:kafka-clients:6.0.1-ccs")
1717
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
1818
constraints {
1919
api("com.fasterxml.jackson.core:jackson-databind:2.11.0") {

0 commit comments

Comments
 (0)