diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java index e55b24a31..9adb40ec3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/StoreCommitListener.java @@ -41,6 +41,7 @@ private void onCommit( final TopicPartition p = e.getKey().getPartition(); for (final ResponsiveStoreRegistration storeRegistration : storeRegistry.getRegisteredStoresForChangelog(p, threadId)) { + // Committed offsets are already exclusive (one more than last consumed offset) storeRegistration.callbacks().notifyCommit(e.getValue()); } } @@ -48,7 +49,8 @@ private void onCommit( final TopicPartition p = e.getKey(); for (final ResponsiveStoreRegistration storeRegistration : storeRegistry.getRegisteredStoresForChangelog(p, threadId)) { - storeRegistration.callbacks().notifyCommit(e.getValue()); + // Add one since the written offset is inclusive + storeRegistration.callbacks().notifyCommit(e.getValue() + 1); } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java index 79ca898ee..33720fce8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTable.java @@ -62,11 +62,12 @@ S delete( /** * Get the offset corresponding to the last write to the table for a specific - * Kafka partition. + * Kafka partition. This offset is exclusive (one more than the offset of + * the last processed record). * * @param kafkaPartition the kafka partition - * @return the current offset fetched from the metadata table - * partition for the given kafka partition + * @return the last written offset (exclusive) from the table for + * the given kafka partition */ long lastWrittenOffset(final int kafkaPartition); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index 85c900400..1ccb7b777 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -449,17 +449,17 @@ public long restoreBatch( streamTimeMs = Math.max(streamTimeMs, r.timestamp()); batch.add(r); if (batch.size() >= maxBatchSize) { - restoreCassandraBatch(batch); + writeBatch(batch); batch.clear(); } } if (batch.size() > 0) { - restoreCassandraBatch(batch); + writeBatch(batch); } return streamTimeMs; } - private void restoreCassandraBatch(final Collection> records) { + private void writeBatch(final Collection> records) { long consumedOffset = -1L; for (ConsumerRecord record : records) { consumedOffset = record.offset(); @@ -471,7 +471,7 @@ private void restoreCassandraBatch(final Collection= 0) { - doFlush(consumedOffset, records.size()); + doFlush(consumedOffset + 1, records.size()); } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index d3d6ffcfc..e94fe4d54 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -12,7 +12,6 @@ package dev.responsive.kafka.integration; -import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_CONNECTION_STRING_CONFIG; import static dev.responsive.kafka.testutils.IntegrationTestUtils.getCassandraValidName; import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeInput; import static dev.responsive.kafka.testutils.IntegrationTestUtils.slurpPartition; @@ -57,14 +56,12 @@ import dev.responsive.kafka.internal.db.CassandraClientFactory; import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory; import dev.responsive.kafka.internal.db.RemoteKVTable; -import dev.responsive.kafka.internal.db.mongo.CollectionCreationOptions; -import dev.responsive.kafka.internal.db.mongo.MongoKVTable; +import dev.responsive.kafka.internal.db.partitioning.SubPartitioner; import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; import dev.responsive.kafka.internal.db.spec.DefaultTableSpec; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema; import dev.responsive.kafka.internal.stores.TtlResolver; -import dev.responsive.kafka.internal.utils.SessionUtil; import dev.responsive.kafka.testutils.IntegrationTestUtils; import dev.responsive.kafka.testutils.IntegrationTestUtils.MockResponsiveKafkaStreams; import dev.responsive.kafka.testutils.ResponsiveConfigParam; @@ -75,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -107,6 +105,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -185,7 +184,8 @@ public void shouldFlushStoresBeforeClose(final KVSchema type) throws Exception { final List> changelogRecords = slurpPartition(changelog, properties); final long last = changelogRecords.get(changelogRecords.size() - 1).offset(); - assertThat(cassandraOffset, equalTo(last)); + // Written offset is exclusive + assertThat(cassandraOffset, equalTo(last + 1)); } } @@ -259,9 +259,9 @@ public void shouldRepairOffsetsIfOutOfRangeAndConfigured(final KVSchema type) th } } - @ParameterizedTest - @EnumSource(KVSchema.class) - public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exception { + @Test + public void shouldRestoreUnflushedChangelog() throws Exception { + final KVSchema type = KVSchema.KEY_VALUE; final Map properties = getMutableProperties(type); final KafkaProducer producer = new KafkaProducer<>(properties); final KafkaClientSupplier defaultClientSupplier = new DefaultKafkaClientSupplier(); @@ -280,6 +280,12 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio waitTillFullyConsumed(admin, input, name, Duration.ofSeconds(120)); } + final TopicPartition changelog = new TopicPartition(name + "-" + aggName() + "-changelog", 0); + final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties); + + final RemoteKVTable changelogTable = remoteKVTable(type, defaultFactory, config, changelog); + assertThat(changelogTable.lastWrittenOffset(0), greaterThan(0L)); + // restart with fault injecting cassandra client final FaultInjectingCassandraClientSupplier cassandraFaultInjector = new FaultInjectingCassandraClientSupplier(); @@ -299,22 +305,12 @@ public void shouldRestoreUnflushedChangelog(final KVSchema type) throws Exceptio IntegrationTestUtils .waitTillConsumedPast(admin, input, name, endInput + 1, Duration.ofSeconds(30)); } - final TopicPartition changelog = new TopicPartition(name + "-" + aggName() + "-changelog", 0); // Make sure changelog is ahead of remote - final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(properties); - final RemoteKVTable table; - - table = remoteKVTable(type, defaultFactory, config, changelog); - - final long remoteOffset = table.lastWrittenOffset(0); + final long remoteOffset = changelogTable.lastWrittenOffset(0); assertThat(remoteOffset, greaterThan(0L)); - - final long changelogOffset = admin.listOffsets(Map.of(changelog, OffsetSpec.latest())).all() - .get() - .get(changelog) - .offset(); - assertThat(remoteOffset, lessThan(changelogOffset)); + final long changelogEndOffset = IntegrationTestUtils.endOffset(admin, changelog); + assertThat(remoteOffset, lessThan(changelogEndOffset)); // Restart with restore recorder final TestKafkaClientSupplier recordingClientSupplier = new TestKafkaClientSupplier(); @@ -351,11 +347,11 @@ private RemoteKVTable remoteKVTable( ) throws InterruptedException, TimeoutException { final RemoteKVTable table; - if (type == KVSchema.FACT) { - final CassandraClient cassandraClient = defaultFactory.createClient( - defaultFactory.createCqlSession(config, null), - config); + final CassandraClient cassandraClient = defaultFactory.createClient( + defaultFactory.createCqlSession(config, null), + config); + if (type == KVSchema.FACT) { table = cassandraClient.factFactory() .create(new DefaultTableSpec( aggName(), @@ -363,18 +359,21 @@ private RemoteKVTable remoteKVTable( TtlResolver.NO_TTL, config )); - } else if (type == KVSchema.KEY_VALUE) { - final var connectionString = config.getPassword(MONGO_CONNECTION_STRING_CONFIG).value(); - final var mongoClient = SessionUtil.connect(connectionString, "", null); - table = new MongoKVTable( - mongoClient, + final SubPartitioner partitioner = SubPartitioner.create( + OptionalInt.empty(), + NUM_PARTITIONS, aggName(), - CollectionCreationOptions.fromConfig(config), - TtlResolver.NO_TTL, - config + config, + changelog.topic() ); - table.init(0); + table = cassandraClient.kvFactory() + .create(new DefaultTableSpec( + aggName(), + partitioner, + TtlResolver.NO_TTL, + config + )); } else { throw new IllegalArgumentException("Unsupported type: " + type); } @@ -482,7 +481,7 @@ public CqlSession createCqlSession( final var spy = spy(wrapped); doAnswer(a -> { final Fault fault = this.fault.get(); - if (fault != null && a.getArgument(0) instanceof BatchStatement) { + if (fault != null && (a.getArgument(0) instanceof BatchStatement)) { fault.fire(); } return wrapped.execute((Statement) a.getArgument(0)); @@ -537,14 +536,7 @@ public ConsumerRecords record(final ConsumerRecords getMutableProperties(final KVSchema type) { final Map properties = new HashMap<>(responsiveProps); - if (type == KVSchema.FACT) { - properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name()); - } else if (type == KVSchema.KEY_VALUE) { - properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name()); - } else { - throw new IllegalArgumentException("Unexpected schema type: " + type.name()); - } - + properties.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.CASSANDRA.name()); properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java index 10cef9fcf..00531ea66 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java @@ -78,7 +78,7 @@ public void shouldNotifyStoresOnCommittedChangelogWrites() { sendWrittenOffsets(Map.of(PARTITION2, 456L)); // then: - verify(store2Callbacks).notifyCommit(456L); + verify(store2Callbacks).notifyCommit(457L); verifyNoInteractions(store1Callbacks); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 2b14e27ee..7bec244ce 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -613,7 +613,7 @@ public void shouldRestoreRecords() { buffer.restoreBatch(List.of(record), -1L); // Then: - assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(100L)); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(101L)); final byte[] value = table.get(KAFKA_PARTITION, KEY, MIN_VALID_TS); assertThat(value, is(VALUE)); } @@ -657,7 +657,7 @@ public void shouldNotRestoreRecordsWhenFencedByEpoch() { // Then: final String errorMsg = "commit-buffer [" + tableName.tableName() + "-2] " + "Failed table partition [8]: " - + ", " + + ", " + ""; assertThat(e.getMessage(), equalTo(errorMsg)); }