From c2f5a1463d7d9c8550e45550da3ac67eca44ec42 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Thu, 17 Apr 2025 14:38:55 -0700 Subject: [PATCH 1/3] Add some support types for snapshot orchestration. This patch adds a number of support types that are used to coordinate execution of a snapshot. They correspond to the entities defined in https://www.notion.so/responsivedev/Snapshot-Protocol-1a627549be4c8075a909cfd05adae3af SnapshotStore defines an interface for a store for Snapshot metadata. Its essentially a table of all the snapshots taken of an application keyed by generation. TopicSnapshotStore implements SnapshotStore by writing snapshot metadata to a topic and using a Kafka transaction to do transactional updates to the current snapshot. SnapshotOrchestrator defines the interface between Kafka Streams tasks and the snapshot orchestration process. Its how the tasks discover what the target generation should be, and how they report their task snapshot metadata (offsets+checkpoints). LocalSnapshotOrchestrator implements SnapshotOrchestrator for settings where the orchestrator runs embedded in the application. SnapshotApi defines the interface for interacting with snapshots from outside the application. LocalSnapshotApi implements SnapshotApi for settings where the api interacts directly with the SnapshotStore (rather than going through another api service like we might for responsive cloud) - e.g. in settings where we're just orchestrating snapshots locally by storing metadata in a topic using TopicSnapshotStore. --- .../e2e-test/build.gradle.kts | 2 +- kafka-client/build.gradle.kts | 2 +- .../internal/snapshot/GenerationStorage.java | 20 ++ .../internal/snapshot/LocalSnapshotApi.java | 41 +++ .../snapshot/LocalSnapshotOrchestrator.java | 99 ++++++ .../kafka/internal/snapshot/Snapshot.java | 272 ++++++++++++++++ .../kafka/internal/snapshot/SnapshotApi.java | 18 ++ .../snapshot/SnapshotOrchestrator.java | 40 +++ .../internal/snapshot/SnapshotStore.java | 39 +++ .../SnapshotStoreBasedGenerationStorage.java | 33 ++ .../snapshot/topic/SnapshotStoreRecord.java | 56 ++++ .../topic/SnapshotStoreRecordKey.java | 56 ++++ .../topic/SnapshotStoreRecordType.java | 5 + .../snapshot/topic/SnapshotStoreSerdes.java | 99 ++++++ .../topic/SynchronizedConsumerPosition.java | 20 ++ .../snapshot/topic/TopicSnapshotStore.java | 258 +++++++++++++++ .../LocalSnapshotOrchestratorTest.java | 195 +++++++++++ .../SnapshotOrchestrationIntegrationTest.java | 303 ++++++++++++++++++ ...apshotStoreBasedGenerationStorageTest.java | 59 ++++ .../internal/snapshot/TestSnapshotStore.java | 37 +++ .../topic/SnapshotStoreSerdesTest.java | 78 +++++ .../kafka/testutils/TestConstants.java | 2 +- operator/build.gradle.kts | 2 +- settings.gradle.kts | 4 +- 24 files changed, 1735 insertions(+), 5 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java diff --git a/kafka-client-examples/e2e-test/build.gradle.kts b/kafka-client-examples/e2e-test/build.gradle.kts index a554a07da..eaed1938f 100644 --- a/kafka-client-examples/e2e-test/build.gradle.kts +++ b/kafka-client-examples/e2e-test/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { implementation(libs.guava) implementation(libs.slf4j.log4j2) implementation(libs.bundles.scylla) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.core) testImplementation(testlibs.bundles.base) diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 08efad9c3..f93de5e66 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -137,7 +137,7 @@ dependencies { implementation("dev.responsive:controller-api:0.16.0") implementation(libs.bundles.scylla) implementation(libs.bundles.commons) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.sync) implementation(libs.bundles.otel) implementation(libs.bundles.grpc) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java new file mode 100644 index 000000000..4cea01ea0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Interface that abstracts away how we lookup a task's current generation. + * For synchronized snapshots we want to do one of the following so we can record a task's + * generation metadata transactionally alongside the rows with new generation markers when + * we bump generations: + * (1) store this information in the offset metadata as part of the transaction that + * bumps the generation + * (2) if/when kafka supports 2pc store this information in another store (like rs3 or + * the snapshot store) + * + * For simple uncoordinated snapshots we'll support looking up the generation by looking + * at the snapshot's state in the generation store. + */ +public interface GenerationStorage { + long lookupGeneration(final TaskId taskId); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java new file mode 100644 index 000000000..8ce602b8d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java @@ -0,0 +1,41 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.Objects; + +/** + * Implementation of SnapshotApi that directly interacts with the Snapshot Store rather + * than calling out to an API service. + */ +public class LocalSnapshotApi implements SnapshotApi { + private final SnapshotStore snapshotStore; + + public LocalSnapshotApi(final SnapshotStore snapshotStore) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + } + + @Override + public Snapshot createSnapshot() { + return snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently in progress"); + } + return snapshot.nextSnapshot(); + }); + } + + @Override + public Snapshot getCurrentSnapshot() { + return snapshotStore.currentSnapshot(true); + } + + @Override + public List getSnapshots() { + return snapshotStore.listSnapshots(true); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java new file mode 100644 index 000000000..3282266ed --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * An implementation of the orchestrator that runs within the application and interacts + * directly with the snapshot store. + */ +public class LocalSnapshotOrchestrator implements SnapshotOrchestrator { + private final SnapshotStore snapshotStore; + private final Set allTasks; + + public LocalSnapshotOrchestrator( + final SnapshotStore snapshotStore, + final Set allTasks + ) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + this.allTasks = Objects.requireNonNull(allTasks); + } + + @Override + public long getCurrentGeneration() { + return snapshotStore.currentSnapshot(false).generation(); + } + + @Override + public void reportTaskSnapshotMetadata( + final long generation, + final List taskSnapshots + ) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + // check that we're still working on this snapshot + if (snapshot.generation() != generation) { + throw new RuntimeException( + String.format("generation too old: %d > %d", snapshot.generation(), generation)); + } + if (!snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently completed. Cannot update"); + } + + // check that for all the specified tasks, we either haven't collected its metadata + // or the metadata is the same + final List newlyCompletedTaskSnapshots = new ArrayList<>(); + for (final var taskSnapshot : taskSnapshots) { + final var found = snapshot.taskSnapshots() + .stream() + .filter(s -> s.taskId().equals(taskSnapshot.taskId())) + .collect(Collectors.toList()); + if (found.size() > 1) { + throw new IllegalStateException( + "found multiple snapshots for task " + taskSnapshot.taskId()); + } + if (found.isEmpty()) { + newlyCompletedTaskSnapshots.add(taskSnapshot); + } else if (!found.get(0).equals(taskSnapshot)) { + throw new IllegalStateException( + "found conflicting snapshots for task" + taskSnapshot.taskId()); + } + } + + // if we've collected snapshots for all tasks, mark the snapshot as completed + final Set completedTasks = Stream.concat( + newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId), + snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId) + ).collect(Collectors.toSet()); + Snapshot.State state = snapshot.state(); + if (completedTasks.equals(this.allTasks)) { + state = Snapshot.State.COMPLETED; + } + + return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state); + }); + } + + @Override + public void failSnapshot(long snapshotGeneration) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.generation() != snapshotGeneration) { + // todo: do something more reasonable here + throw new RuntimeException("Generation mismatch"); + } + if (snapshot.state().equals(Snapshot.State.COMPLETED)) { + throw new RuntimeException("Cannot fail completed snapshot"); + } + return snapshot.withStateFailed(); + }); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java new file mode 100644 index 000000000..4b5aa7de2 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java @@ -0,0 +1,272 @@ +package dev.responsive.kafka.internal.snapshot; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * POJO representing an application snapshot + */ +public class Snapshot { + public enum State { + CREATED, + COMPLETED, + // TODO: add state between CREATED and COMPLETED to reflect a snapshot + // that needs to be finalized by converting expiring checkpoints to + // nonexpiring checkpoints + FAILED + // TODO: add state beteen CREATED and FAILED to reflect a snapshot that has + // failed but is not yet cleaned up + } + + private final Instant createdAt; + private final long generation; + private final State state; + private final List taskSnapshots; + + public static Snapshot initial() { + return new Snapshot( + Instant.EPOCH, + 0, + State.COMPLETED, + List.of() + ); + } + + @JsonCreator + public Snapshot( + @JsonProperty("createdAt") final Instant createdAt, + @JsonProperty("generation") final long generation, + @JsonProperty("state") final State state, + @JsonProperty("taskSnapshots") final List taskSnapshots + ) { + this.createdAt = createdAt; + this.generation = generation; + this.state = state; + this.taskSnapshots = List.copyOf(taskSnapshots); + } + + @JsonProperty("createdAt") + public Instant createdAt() { + return createdAt; + } + + @JsonProperty("generation") + public long generation() { + return generation; + } + + @JsonProperty("state") + public State state() { + return state; + } + + @JsonProperty("taskSnapshots") + public List taskSnapshots() { + return taskSnapshots; + } + + public Snapshot nextSnapshot() { + return new Snapshot( + Instant.now(), + generation + 1, + State.CREATED, + List.of() + ); + } + + public Snapshot withTaskSnapshots(List taskSnapshots, State state) { + return new Snapshot( + createdAt, + generation, + state, + Stream.concat(this.taskSnapshots.stream(), taskSnapshots.stream()) + .collect(Collectors.toList()) + ); + } + + public Snapshot withStateFailed() { + return new Snapshot( + createdAt, + generation, + State.FAILED, + taskSnapshots + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Snapshot)) { + return false; + } + final Snapshot snapshot = (Snapshot) o; + return generation == snapshot.generation + && Objects.equals(createdAt, snapshot.createdAt) + && state == snapshot.state && Objects.equals(taskSnapshots, snapshot.taskSnapshots); + } + + @Override + public int hashCode() { + return Objects.hash(createdAt, generation, state, taskSnapshots); + } + + @Override + public String toString() { + return "Snapshot{" + + "createdAt=" + createdAt + + ", generation=" + generation + + ", state=" + state + + ", taskSnapshots=" + taskSnapshots + + '}'; + } + + public static class TaskSnapshotMetadata { + private final TaskId taskId; + private final List committedOffsets; + private final Map checkpoints; + private final Instant timestamp; + + @JsonCreator + public TaskSnapshotMetadata( + @JsonProperty("taskId") final TaskId taskId, + @JsonProperty("committedOffsets") final List committedOffsets, + @JsonProperty("checkpoints") final Map checkpoints, + @JsonProperty("timestamp") final Instant timestamp + ) { + this.taskId = taskId; + this.committedOffsets = List.copyOf(committedOffsets); + this.checkpoints = Map.copyOf(checkpoints); + this.timestamp = Objects.requireNonNull(timestamp); + } + + @JsonProperty("taskId") + public TaskId taskId() { + return taskId; + } + + @JsonProperty("committedOffsets") + public List committedOffsets() { + return committedOffsets; + } + + @JsonProperty("checkpoints") + public Map checkpoints() { + return checkpoints; + } + + @JsonProperty("timestamp") + public Instant timestamp() { + return timestamp; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TaskSnapshotMetadata)) { + return false; + } + final TaskSnapshotMetadata that = (TaskSnapshotMetadata) o; + if (checkpoints == null ^ that.checkpoints == null) { + return false; + } + if (checkpoints != null) { + if (checkpoints.size() != that.checkpoints.size()) { + return false; + } + for (final Map.Entry entry : checkpoints.entrySet()) { + if (!Arrays.equals(entry.getValue(), that.checkpoints.get(entry.getKey()))) { + return false; + } + } + } + return Objects.equals(taskId, that.taskId) + && Objects.equals(committedOffsets, that.committedOffsets); + } + + @Override + public int hashCode() { + return Objects.hash(taskId, committedOffsets, checkpoints); + } + + @Override + public String toString() { + return "TaskSnapshotMetadata{" + + "taskId=" + taskId + + ", committedOffsets=" + committedOffsets + + ", checkpoints=" + checkpoints + + '}'; + } + } + + public static class CommittedOffset { + private final String topic; + private final int partition; + private final long offset; + + @JsonCreator + public CommittedOffset( + @JsonProperty("topic") final String topic, + @JsonProperty("partition") final int partition, + @JsonProperty("offset") final long offset + ) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + @JsonProperty("topic") + public String topic() { + return topic; + } + + @JsonProperty("partition") + public int partition() { + return partition; + } + + @JsonProperty("offset") + public long offset() { + return offset; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CommittedOffset)) { + return false; + } + final CommittedOffset that = (CommittedOffset) o; + return partition == that.partition + && offset == that.offset + && Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(topic, partition, offset); + } + + @Override + public String toString() { + return "CommittedOffset{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + ", offset=" + offset + + '}'; + } + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java new file mode 100644 index 000000000..f0c2a3bf7 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * Interface for interacting with Snapshots from outside an application. Supports + * reading the current/past snapshots, and creating a new snapshot. + */ +public interface SnapshotApi extends AutoCloseable { + Snapshot createSnapshot(); + + List getSnapshots(); + + Snapshot getCurrentSnapshot(); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java new file mode 100644 index 000000000..4482c7ba0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java @@ -0,0 +1,40 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * SnapshotOrchestrator is responsible for coordinating snapshot execution by initiating + * task-level snapshots and updating the snapshot's state as they complete. + */ +public interface SnapshotOrchestrator { + /** + * Gets the application's current snapshot generation + * + * @return The current snapshot generation. + */ + long getCurrentGeneration(); + + /** + * Called by the stream thread to report task snapshot metadata for a given task. + * + * @param snapshotGeneration The generation of the task snapshot(s) being reported. + * @param metadata The task snapshot(s) being reported. + */ + void reportTaskSnapshotMetadata( + long snapshotGeneration, + List metadata + ); + + /** + * Called by the stream thread to report a failed task snapshot. This should only + * be called to report terminal failures. + * + * @param snapshotGeneration The generation of the snapshot to fail. + */ + void failSnapshot(long snapshotGeneration); + + /** + * Clean up any resources held by the orchestrator + */ + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java new file mode 100644 index 000000000..e9b76b2fd --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java @@ -0,0 +1,39 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.function.Function; + +/** + * A store for the Snapshot metadata for a given applicaiton + */ +public interface SnapshotStore extends AutoCloseable { + /** + * Returns the current snapshot + * + * @param block if true, then the call blocks until the store has observed the latest update + * @return the current snapshot of the application + */ + Snapshot currentSnapshot(boolean block); + + /** + * List all snapshots + * + * @param block if true, then the call will block until the store has observed the latest update + * @return a list of all snapshots of the application + */ + List listSnapshots(boolean block); + + /** + * Updates a snapshot given an updater function. The update is guaranteed to be isolated from + * other concurrent updates. + * + * @param updater is passed the current snapshot and returns an updated snapshot. + * @return the updated snapshot + */ + Snapshot updateCurrentSnapshot( + Function updater + ); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java new file mode 100644 index 000000000..572ff9a86 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java @@ -0,0 +1,33 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Computes a task's generation from the current Snapshot metadata. If the current snapshot + * contains a task snapshot for a task, the task is considered to be on the current snapshot's + * generation. Otherwise, it's considered to be on the previous generation. + */ +public class SnapshotStoreBasedGenerationStorage implements GenerationStorage { + private final SnapshotStore snapshotStore; + + public SnapshotStoreBasedGenerationStorage(final SnapshotStore snapshotStore) { + this.snapshotStore = snapshotStore; + } + + @Override + public long lookupGeneration(final TaskId taskId) { + final var currentSnapshot = snapshotStore.currentSnapshot(false); + // todo: move to a fn + if (currentSnapshot.state() == Snapshot.State.COMPLETED + || currentSnapshot.state() == Snapshot.State.FAILED) { + return currentSnapshot.generation(); + } + if (currentSnapshot.taskSnapshots().stream() + .anyMatch(s -> s.taskId().equals(taskId))) { + return currentSnapshot.generation(); + } + // this task has not completed. set generation to previous generation + // todo: make previous generation a field rather than computing it here + return currentSnapshot.generation() - 1; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java new file mode 100644 index 000000000..0ff5fa0a5 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecord { + private final SnapshotStoreRecordType type; + private final Snapshot snapshot; + + @JsonCreator + public SnapshotStoreRecord( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("snapshot") final Snapshot snapshot + ) { + this.type = Objects.requireNonNull(type); + this.snapshot = snapshot; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("snapshot") + public Optional snapshot() { + return Optional.ofNullable(snapshot); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecord)) { + return false; + } + final SnapshotStoreRecord that = (SnapshotStoreRecord) o; + return type == that.type && Objects.equals(snapshot, that.snapshot); + } + + @Override + public int hashCode() { + return Objects.hash(type, snapshot); + } + + @Override + public String toString() { + return "SnapshotStoreRecord{" + + "type=" + type + + ", snapshot=" + snapshot + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java new file mode 100644 index 000000000..161fa3781 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecordKey { + + private final SnapshotStoreRecordType type; + private final Long generation; + + @JsonCreator + public SnapshotStoreRecordKey( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("generation") final Long generation + ) { + this.type = Objects.requireNonNull(type); + this.generation = generation; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("generation") + public Optional generation() { + return Optional.ofNullable(generation); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecordKey)) { + return false; + } + final SnapshotStoreRecordKey that = (SnapshotStoreRecordKey) o; + return type == that.type && Objects.equals(generation, that.generation); + } + + @Override + public int hashCode() { + return Objects.hash(type, generation); + } + + @Override + public String toString() { + return "SnapshotStoreRecordKey{" + + "type=" + type + + ", generation=" + generation + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java new file mode 100644 index 000000000..d14cfc6bf --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java @@ -0,0 +1,5 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +public enum SnapshotStoreRecordType { + Snapshot +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java new file mode 100644 index 000000000..df8483255 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.io.IOException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.TaskId; + +public class SnapshotStoreSerdes { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + static { + MAPPER.registerModule(new JavaTimeModule()); + MAPPER.registerModule(new Jdk8Module()); + final SimpleModule module = new SimpleModule(); + module.addSerializer(TaskId.class, new TaskIDJacksonSerializer()); + module.addDeserializer(TaskId.class, new TaskIDJacksonDeserializer()); + MAPPER.registerModule(module); + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_ABSENT); + } + + public static class SnapshotStoreRecordKeySerializer + implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecordKey data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordKeyDeserializer + implements Deserializer { + @Override + public SnapshotStoreRecordKey deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecordKey.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordSerializer implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecord data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordDeserializer implements Deserializer { + @Override + public SnapshotStoreRecord deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecord.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class TaskIDJacksonSerializer extends JsonSerializer { + @Override + public void serialize( + final TaskId taskId, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider + ) throws IOException { + jsonGenerator.writeString(taskId.toString()); + } + } + + public static class TaskIDJacksonDeserializer extends JsonDeserializer { + + @Override + public TaskId deserialize( + final JsonParser jsonParser, + final DeserializationContext deserializationContext + ) throws IOException { + return TaskId.parse(jsonParser.getValueAsString()); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java new file mode 100644 index 000000000..a26f6309f --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +class SynchronizedConsumerPosition { + private long position = 0; + + synchronized void waitTillConsumerPosition(long targetPosition) { + while (position < targetPosition) { + try { + wait(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + synchronized void updateConsumerPosition(long position) { + this.position = position; + notifyAll(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java new file mode 100644 index 000000000..150275e87 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java @@ -0,0 +1,258 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import dev.responsive.kafka.internal.snapshot.Snapshot; +import dev.responsive.kafka.internal.snapshot.SnapshotStore; +import java.time.Duration; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements SnapshotStore by storing Snapshot metadata for a single application on a Kafka topic. + * The topic has a single partition. Each record represents an update to a single snapshot of an + * application. The key includes the generation of the snapshot the update applies to. The + * value contains an instance of Snapshot. + */ +public class TopicSnapshotStore implements SnapshotStore { + private static final Logger LOG = LoggerFactory.getLogger(TopicSnapshotStore.class); + + private final TopicPartition topicPartition; + private final Supplier> producerSupplier; + private final Supplier> consumerSupplier; + private final Thread readerThread; + private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicReference currentSnapshot + = new AtomicReference<>(Snapshot.initial()); + private final ConcurrentMap snapshots = new ConcurrentHashMap<>(); + private final SynchronizedConsumerPosition consumedOffset = new SynchronizedConsumerPosition(); + private final Consumer endOffsetConsumer; + + public TopicSnapshotStore( + final String topic, + final short replicas, + final Supplier> consumerSupplier, + final Supplier> producerSupplier, + final Admin admin + ) { + this.topicPartition = new TopicPartition(topic, 0); + this.producerSupplier = producerSupplier; + this.consumerSupplier = consumerSupplier; + createTopic(admin, replicas); + final var consumer = consumerSupplier.get(); + consumer.assign(List.of(topicPartition)); + consumer.seekToBeginning(List.of(topicPartition)); + readerThread = new Thread(() -> runReader( + consumer, + currentSnapshot, + snapshots, + consumedOffset, + running + )); + readerThread.start(); + this.endOffsetConsumer = consumerSupplier.get(); + this.endOffsetConsumer.assign(List.of(topicPartition)); + waitTillConsumedAll(); + } + + public static TopicSnapshotStore create( + final String topic, + final short replicas, + final Map config + ) { + final Map consumerConfig = new HashMap<>(config); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, null); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + consumerConfig.put( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerConfig = new HashMap<>(config); + final String appId = config.get(StreamsConfig.APPLICATION_ID_CONFIG).toString(); + producerConfig.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + String.format("__responsive-%s-snapshot-store", appId) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + try (final Admin admin = Admin.create(config)) { + return new TopicSnapshotStore( + topic, replicas, consumerSupplier, producerSupplier, admin); + } + } + + @Override + public Snapshot currentSnapshot(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return currentSnapshot.get(); + } + + @Override + public List listSnapshots(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return snapshots.values().stream() + .sorted(Comparator.comparingLong(Snapshot::generation)) + .collect(Collectors.toList()); + } + + @Override + public Snapshot updateCurrentSnapshot( + final Function updater + ) { + final Future sendFut; + final Snapshot updated; + try (final var producer = producerSupplier.get()) { + producer.initTransactions(); + producer.beginTransaction(); + try { + waitTillConsumedAll(); + updated = updater.apply(currentSnapshot.get()); + final var record = createRecord(updated); + sendFut = producer.send(record); + producer.commitTransaction(); + } catch (final RuntimeException e) { + producer.abortTransaction(); + throw e; + } + } + final RecordMetadata recordMetadata; + try { + recordMetadata = sendFut.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + waitTillConsumerPosition(recordMetadata.offset() + 1); + return updated; + } + + @Override + public void close() { + try { + endOffsetConsumer.close(); + } catch (final RuntimeException e) { + LOG.warn("error closing end offset consumer", e); + } + running.set(false); + try { + readerThread.join(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + private ProducerRecord createRecord( + final Snapshot snapshot + ) { + return new ProducerRecord<>( + topicPartition.topic(), + topicPartition.partition(), + new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, snapshot.generation()), + new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot) + ); + } + + private void createTopic(final Admin admin, final short replicas) { + try { + final var result = admin.createTopics(List.of( + new NewTopic(topicPartition.topic(), 1, replicas) + )); + result.all().get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + LOG.info("snapshot store topic already exists."); + } else { + throw new RuntimeException(e); + } + } catch (final TopicExistsException e) { + LOG.info("snapshot store topic already exists."); + } + } + + private void waitTillConsumedAll() { + waitTillConsumerPosition(endOffset()); + } + + private void waitTillConsumerPosition(final long offset) { + consumedOffset.waitTillConsumerPosition(offset); + } + + private long endOffset() { + synchronized (endOffsetConsumer) { + return endOffsetConsumer.endOffsets(List.of(topicPartition)).get(topicPartition); + } + } + + private void runReader( + final Consumer consumer, + final AtomicReference currentSnapshot, + final ConcurrentMap allSnapshots, + final SynchronizedConsumerPosition consumedOffset, + final AtomicBoolean running + ) { + while (running.get()) { + final ConsumerRecords records + = consumer.poll(Duration.ofMillis(100)); + for (final ConsumerRecord record : records) { + switch (record.key().type()) { + case Snapshot: { + final Snapshot update = record.value().snapshot().get(); + currentSnapshot.getAndUpdate(c -> { + if (update.generation() >= c.generation()) { + return update; + } else { + return c; + } + }); + allSnapshots.put(update.generation(), update); + break; + } + default: { + throw new IllegalStateException(); + } + } + } + consumedOffset.updateConsumerPosition(consumer.position(topicPartition)); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java new file mode 100644 index 000000000..32f6a394f --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java @@ -0,0 +1,195 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class LocalSnapshotOrchestratorTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + private static final TaskId TASK_1 = new TaskId(0, 1); + private static final TaskId TASK_2 = new TaskId(0, 2); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private long generation; + private final LocalSnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + snapshotStore, + Set.of(TASK_0, TASK_1, TASK_2) + ); + + @BeforeEach + public void setup() { + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldFailUpdateWithConflictingTaskSnapshot() { + // given: + orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )) + ); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 456L)), + Map.of(), + Instant.now() + )) + ) + ); + } + + @Test + public void shouldDoIdempotentTaskSnapshotUpdate() { + // given: + final var taskSnapshots = List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldFailUpdateForWrongGeneration() { + // when/then: + assertThrows(RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation - 1, List.of())); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldCompleteSnapshotWhenFinishedTasksInSeparateUpdates() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ) + ); + final var taskSnapshots2 = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots2); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + final var allTaskSnapshots = new ArrayList<>(taskSnapshots); + allTaskSnapshots.addAll(taskSnapshots2); + assertThat(snapshot.taskSnapshots(), is(allTaskSnapshots)); + } + + @Test + public void shouldFailUpdateForCompletedSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when/then: + assertThrows( + RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots)); + } + + @Test + public void shouldGetCurrentGeneration() { + assertThat(orchestrator.getCurrentGeneration(), is(generation)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java new file mode 100644 index 000000000..e48bda4cc --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java @@ -0,0 +1,303 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecord; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecordKey; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreSerdes; +import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore; +import dev.responsive.kafka.testutils.TestConstants; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.lifecycle.Startables; + +@ExtendWith(MockitoExtension.class) +class SnapshotOrchestrationIntegrationTest { + private static KafkaContainer KAFKA = new KafkaContainer(TestConstants.KAFKA) + .withEnv("KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS", "1000") + .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "60000"); + private static final Set TASKS = Set.of( + new TaskId(0, 0), + new TaskId(0, 1), + new TaskId(0, 2) + ); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + KAFKA.stop(); + })); + } + + private String topic; + private TestCtx ctx; + + @BeforeAll + public static void setupAll() { + final var fut = Startables.deepStart(KAFKA); + try { + fut.get(); + } catch (final InterruptedException | ExecutionException e) { + System.out.println("LOGS: " + KAFKA.getLogs()); + throw new RuntimeException(e); + } + } + + @AfterAll + public static void teardownAll() { + KAFKA.stop(); + } + + @BeforeEach + public void setup(final TestInfo testInfo) { + topic = testInfo.getTestMethod().get().getName(); + ctx = createCtx(); + } + + @Test + public void shouldReturnGenerationZeroSnapshot() { + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + assertThat(snapshot, is(Snapshot.initial())); + } + + @Test + public void shouldCreateInitialSnapshot() { + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + assertThat(snapshot.generation(), is(1L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldRefreshStore() { + // given: + final var store2 = createCtx(); + store2.api.createSnapshot(); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + MatcherAssert.assertThat(snapshot.generation(), is(1L)); + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldFailCreateNextSnapshotIfNotCompleted() { + // given: + ctx.api.createSnapshot(); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.api.createSnapshot()); + } + + @Test + public void shouldCreatNextSnapshot() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + assertThat(snapshot.generation(), is(2L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldAddTaskSnapshotToSnapshot() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + MatcherAssert.assertThat(snapshot.taskSnapshots(), contains(metadata)); + } + + @Test + public void shouldFailAddTaskSnapshotIfConflictingMetadata() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of(), + Map.of("foo", "bar".getBytes(Charset.defaultCharset())), + Instant.now() + ); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldFailAddTaskSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + } + + @Test + public void shouldFailSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.failSnapshot(1); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.FAILED)); + } + + @Test + public void shouldFailFailSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.orchestrator.failSnapshot(1)); + } + + private TestCtx createCtx() { + return new TestCtx(topic); + } + + private static class TestCtx { + private final TopicSnapshotStore store; + private final SnapshotOrchestrator orchestrator; + private final SnapshotApi api; + + private TestCtx(final String topic) { + final Admin admin + = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers())); + final Map consumerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false + ); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ProducerConfig.TRANSACTIONAL_ID_CONFIG, String.format("%s-snapshot-store", topic) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + this.store = new TopicSnapshotStore( + topic, + (short) 1, + consumerSupplier, + producerSupplier, + admin + ); + this.orchestrator = new LocalSnapshotOrchestrator(this.store, TASKS); + this.api = new LocalSnapshotApi(this.store); + } + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java new file mode 100644 index 000000000..438287d89 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java @@ -0,0 +1,59 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SnapshotStoreBasedGenerationStorageTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private final GenerationStorage generationStorage + = new SnapshotStoreBasedGenerationStorage(snapshotStore); + private long oldGeneration; + private long generation; + + @BeforeEach + public void setup() { + oldGeneration = snapshotStore.currentSnapshot(true).generation(); + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldReturnNewGenerationForCompletedSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot(s -> s.withTaskSnapshots(List.of(), Snapshot.State.COMPLETED)); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnNewGenerationForTaskWithCompletedTaskSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot( + s -> s.withTaskSnapshots( + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, List.of(), Map.of(), Instant.now())), + s.state() + )); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnOldGenerationForTaskWithoutCompletedSnapshot() { + assertThat(generationStorage.lookupGeneration(TASK_0), is(oldGeneration)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java new file mode 100644 index 000000000..4777b6414 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java @@ -0,0 +1,37 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class TestSnapshotStore implements SnapshotStore { + private final Map snapshots = new HashMap<>(); + + public TestSnapshotStore() { + final var initial = Snapshot.initial(); + snapshots.put(initial.generation(), initial); + } + + @Override + public synchronized Snapshot currentSnapshot(final boolean block) { + return snapshots.get(snapshots.keySet().stream().max(Long::compare).get()); + } + + @Override + public synchronized List listSnapshots(final boolean block) { + return new ArrayList<>(snapshots.values()); + } + + @Override + public synchronized Snapshot updateCurrentSnapshot(final Function updater) { + final var updated = updater.apply(currentSnapshot(false)); + snapshots.put(updated.generation(), updated); + return updated; + } + + @Override + public void close() { + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java new file mode 100644 index 000000000..fd1b7496d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java @@ -0,0 +1,78 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.Test; + +class SnapshotStoreSerdesTest { + private final SnapshotStoreSerdes.SnapshotStoreRecordSerializer recordSerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordSerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordDeserializer recordDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer keySerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer keyDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(); + + @Test + public void shouldSerializeRecord() { + // given: + final Snapshot snapshot = new Snapshot( + Instant.now(), + 123, + Snapshot.State.COMPLETED, + List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of( + new Snapshot.CommittedOffset("foo", 0, 100), + new Snapshot.CommittedOffset("bar", 0, 200) + ), + Map.of( + "store1", "store1-cp".getBytes(StandardCharsets.UTF_8), + "store2", "store2-cp".getBytes(StandardCharsets.UTF_8) + ), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + new TaskId(1, 0), + List.of(), + Map.of(), + Instant.now() + ) + ) + ); + final SnapshotStoreRecord record + = new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot); + + // when: + final byte[] serialized = recordSerializer.serialize("", record); + System.out.println(new String(serialized)); + final SnapshotStoreRecord deserialized = recordDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(record)); + } + + @Test + public void shouldSerializeKey() { + // given: + final SnapshotStoreRecordKey key + = new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, 100L); + + // when; + final byte[] serialized = keySerializer.serialize("", key); + System.out.println(new String(serialized)); + final SnapshotStoreRecordKey deserialized = keyDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(key)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java index 7aae1d30e..535f035c0 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java @@ -17,7 +17,7 @@ public class TestConstants { public static final DockerImageName CASSANDRA = DockerImageName.parse("cassandra:4.1.0"); - public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.3.2"); + public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.9.0"); public static final DockerImageName MONGODB = DockerImageName.parse("mongo:7.0.2"); } \ No newline at end of file diff --git a/operator/build.gradle.kts b/operator/build.gradle.kts index 0536dfd6a..1bf08b090 100644 --- a/operator/build.gradle.kts +++ b/operator/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":controller-api")) implementation(libs.crd.generator.atp) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.javaoperatorsdk) implementation(libs.bundles.commons) diff --git a/settings.gradle.kts b/settings.gradle.kts index ea9325b38..d70615b5d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -52,7 +52,9 @@ dependencyResolutionManagement { version("mongoDB", "4.10.2") version("fabric8", "6.13.4") - library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jdk8", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jsr310", "com.fasterxml.jackson.datatype", "jackson-datatype-jsr310").versionRef("jackson") + bundle("jackson", listOf("jackson-jdk8", "jackson-jsr310")) library("kafka-clients", "org.apache.kafka", "kafka-clients").versionRef("kafka") library("kafka-streams", "org.apache.kafka", "kafka-streams").versionRef("kafka") From c0f82e37c0990f98db85e8adf23fabdbb33f37df Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Thu, 1 May 2025 01:15:53 -0700 Subject: [PATCH 2/3] add topology task info utility Adds TopologyTaskInfo which builds a mapping between partitions and tasks given a TopologyDescription. --- .../internal/utils/TopologyTaskInfo.java | 114 +++++++++++++++++ .../internal/utils/TopologyTaskInfoTest.java | 120 ++++++++++++++++++ .../internal/utils/TopologyTaskInfoUtils.java | 18 +++ 3 files changed, 252 insertions(+) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java new file mode 100644 index 000000000..c0ed37e7d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java @@ -0,0 +1,114 @@ +package dev.responsive.kafka.internal.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.processor.TaskId; + +public class TopologyTaskInfo { + private final Map tasksByPartition; + private final Map> partitionsByTask; + // todo: add info about internal topics + + @VisibleForTesting + TopologyTaskInfo( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + this.tasksByPartition = Map.copyOf(tasksByPartition); + this.partitionsByTask = Map.copyOf(partitionsByTask); + } + + public Map tasksByPartition() { + return tasksByPartition; + } + + public Map> partitionsByTask() { + return partitionsByTask; + } + + public static TopologyTaskInfo forTopology( + final TopologyDescription topology, + final Admin admin + ) { + final Map tasksByPartition = new HashMap<>(); + final Map> partitionsByTask = new HashMap<>(); + final Set sinkTopics = sinkTopics(topology); + for (final var st : topology.subtopologies()) { + final Set topics = new HashSet<>(); + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Source) { + topics.addAll(((TopologyDescription.Source) node).topicSet()); + if (((TopologyDescription.Source) node).topicPattern() != null) { + throw new TopologyTaskInfoException( + "topic patterns are not supported for snapshots"); + } + } + } + if (!Sets.intersection(topics, sinkTopics).isEmpty()) { + throw new TopologyTaskInfoException( + "internal topics are not supported for snapshots" + ); + } + final Map descriptions; + try { + descriptions = admin.describeTopics(topics).allTopicNames().get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + final Set partitionCounts = descriptions.values().stream() + .map(d -> d.partitions().size()) + .collect(Collectors.toSet()); + if (partitionCounts.size() != 1) { + throw new TopologyTaskInfoException( + "unexpected topics with different partition counts"); + } + final int nPartitions = partitionCounts.iterator().next(); + for (int i = 0; i < nPartitions; i++) { + final var taskId = new TaskId(st.id(), i); + partitionsByTask.put(taskId, new ArrayList<>(nPartitions)); + for (final var topic : topics) { + final var tp = new TopicPartition(topic, i); + tasksByPartition.put(tp, taskId); + partitionsByTask.get(taskId).add(tp); + } + } + } + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private static Set sinkTopics(TopologyDescription topology) { + final Set sinkTopics = new HashSet<>(); + for (final var st : topology.subtopologies()) { + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Sink) { + final String sinkTopic = ((TopologyDescription.Sink) node).topic(); + if (sinkTopic == null) { + throw new TopologyTaskInfoException("non-explicit sink topics not yet supported"); + } + sinkTopics.add(sinkTopic); + } + } + } + return sinkTopics; + } + + public static class TopologyTaskInfoException extends RuntimeException { + private static final long serialVersionUID = 0L; + + public TopologyTaskInfoException(final String message) { + super(message); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java new file mode 100644 index 000000000..17ebdd70b --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java @@ -0,0 +1,120 @@ +package dev.responsive.kafka.internal.utils; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TopologyTaskInfoTest { + @Mock + private Admin admin; + + @SuppressWarnings("unchecked") + @BeforeEach + public void setup() { + when(admin.describeTopics(any(Collection.class))).thenAnswer( + i -> { + final var topics = i.>getArgument(0); + final Map> futures = topics.stream() + .collect(Collectors.toMap( + t -> t, + t -> KafkaFuture.completedFuture(new TopicDescription( + t, + false, + List.of( + new TopicPartitionInfo(0, null, List.of(), List.of()), + new TopicPartitionInfo(1, null, List.of(), List.of()) + ) + )) + )); + return new TestDescribeTopicsResult(futures); + } + ); + } + + @Test + public void shouldComputeTaskAndPartitionMappings() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupByKey(Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when: + final var tti = TopologyTaskInfo.forTopology(description, admin); + + // then: + assertThat( + tti.partitionsByTask(), + is( + Map.of( + new TaskId(0, 0), List.of(new TopicPartition("source", 0)), + new TaskId(0, 1), List.of(new TopicPartition("source", 1)) + ) + ) + ); + assertThat( + tti.tasksByPartition(), + is( + Map.of( + new TopicPartition("source", 0), new TaskId(0, 0), + new TopicPartition("source", 1), new TaskId(0, 1) + ) + ) + ); + } + + @Test + public void shouldThrowIfRepartition() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupBy((k, v) -> v, Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when/then: + assertThrows( + TopologyTaskInfo.TopologyTaskInfoException.class, + () -> TopologyTaskInfo.forTopology(description, admin) + ); + } + + private static class TestDescribeTopicsResult extends DescribeTopicsResult { + protected TestDescribeTopicsResult( + final Map> nameFutures + ) { + super(null, nameFutures); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java new file mode 100644 index 000000000..428e01a5a --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.utils; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +public final class TopologyTaskInfoUtils { + public static TopologyTaskInfo createWith( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private TopologyTaskInfoUtils() { + } +} From ebbfc9a1def6adc02615fe4e696cf02c3ce3dda5 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Thu, 1 May 2025 01:22:20 -0700 Subject: [PATCH 3/3] support taking snapshots from ResponsiveKafkaStreams - Adds SnapshotCommitListener, which listens on commits and takes task snapshots for any tasks represented in a commit that are not yet on the current generation. - Hooks up all the support types and commit listener to the responsive clients. - Adds some config for enabling/disabling snapshots --- .../kafka/api/ResponsiveKafkaStreams.java | 9 +- .../kafka/api/config/ResponsiveConfig.java | 42 +++ .../ResponsiveKafkaClientSupplier.java | 44 ++- .../snapshot/KafkaStreamsSnapshotContext.java | 75 +++++ .../snapshot/SnapshotCommitListener.java | 180 +++++++++++ .../internal/snapshot/SnapshotSupport.java | 7 + .../snapshot/topic/TopicSnapshotStore.java | 15 + .../kafka/internal/stores/CommitBuffer.java | 44 ++- .../stores/PartitionedOperations.java | 7 +- .../stores/RemoteWindowOperations.java | 3 +- .../stores/ResponsiveStoreRegistration.java | 13 +- .../stores/ResponsiveStoreRegistry.java | 49 ++- .../stores/SessionOperationsImpl.java | 3 +- .../ResponsiveKafkaClientSupplierTest.java | 4 +- .../clients/StoreCommitListenerTest.java | 7 +- .../snapshot/SnapshotCommitListenerTest.java | 295 ++++++++++++++++++ .../internal/stores/CommitBufferTest.java | 90 ++++++ .../stores/ResponsiveStoreRegistryTest.java | 7 +- 18 files changed, 860 insertions(+), 34 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 68cabc143..acec9c989 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -60,6 +60,7 @@ import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService; import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService; import dev.responsive.kafka.internal.metrics.exporter.otel.OtelMetricsService; +import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; @@ -471,6 +472,11 @@ public Params build() { ) : innerClientSupplier; this.oeReporter = reporter(responsiveConfig, license); + final var snapshotCtx = KafkaStreamsSnapshotContext.create( + responsiveConfig, + streamsConfig, + topology.describe() + ); this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier( delegateKafkaClientSupplier, responsiveConfig, @@ -478,7 +484,8 @@ public Params build() { storeRegistry, metrics, oeReporter, - storageBackend + storageBackend, + snapshotCtx ); final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 6916380d2..6c1e9fd4b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.internal.db.partitioning.Murmur3Hasher; +import dev.responsive.kafka.internal.snapshot.SnapshotSupport; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -329,6 +330,24 @@ public class ResponsiveConfig extends AbstractConfig { private static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC = "How often to report origin event usage information. This should generally not be changed in production environments"; + // ------------------ Snapshot Configs + public static final String SNAPSHOTS_CONFIG = "responsive.snapshots"; + private static final String SNAPSHOTS_DEFAULT = SnapshotSupport.DISABLED.name(); + private static final String SNAPSHOTS_DOC + = "Set to LOCAL enable snapshot support. This feature is experimental"; + + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX + = "responsive.snapshots.local.store.topic.suffix"; + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT + = "snapshots"; + private static final String SNAPSHOT_LOCAL_STORE_TOPIC_DOC + = "The topic to store snapshot metadata on when using local snapshot coordination."; + + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR + = "responsive.snapshots.local.store.topic.replication.factor"; + public static final short SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT = (short) 3; + private static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC + = "replication factor for the snapshot store topic when using local snapshot coordination."; // ------------------ StreamsConfig overrides ---------------------- // These configuration values are required by Responsive, and a ConfigException will @@ -660,6 +679,29 @@ public class ResponsiveConfig extends AbstractConfig { ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT, Importance.LOW, ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC + ).define( + SNAPSHOTS_CONFIG, + Type.STRING, + SNAPSHOTS_DEFAULT, + ConfigDef.CaseInsensitiveValidString.in( + Arrays.stream(SnapshotSupport.values()) + .map(Enum::name) + .toArray(String[]::new) + ), + Importance.LOW, + SNAPSHOTS_DOC + ).define( + SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX, + Type.STRING, + SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT, + Importance.LOW, + SNAPSHOT_LOCAL_STORE_TOPIC_DOC + ).define( + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR, + Type.SHORT, + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT, + Importance.LOW, + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC ); /** diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index 8c944ac8e..e3832a5b1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -22,6 +22,8 @@ import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext; +import dev.responsive.kafka.internal.snapshot.SnapshotCommitListener; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import java.io.Closeable; import java.io.IOException; @@ -30,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import java.util.function.Function; import org.apache.kafka.clients.CommonClientConfigs; @@ -68,6 +71,7 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier private final boolean eos; private final StorageBackend storageBackend; private final boolean repairRestoreOffsetOutOfRange; + private final Optional snapshotCtx; public ResponsiveKafkaClientSupplier( final KafkaClientSupplier clientSupplier, @@ -76,7 +80,8 @@ public ResponsiveKafkaClientSupplier( final ResponsiveStoreRegistry storeRegistry, final ResponsiveMetrics metrics, final OriginEventReporter oeReporter, - final StorageBackend storageBackend + final StorageBackend storageBackend, + final Optional snapshotCtx ) { this( new Factories() { @@ -87,7 +92,8 @@ public ResponsiveKafkaClientSupplier( metrics, storageBackend, oeReporter, - responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG) + responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG), + snapshotCtx ); } @@ -99,7 +105,8 @@ public ResponsiveKafkaClientSupplier( final ResponsiveMetrics metrics, final StorageBackend storageBackend, final OriginEventReporter oeReporter, - final boolean repairRestoreOffsetOutOfRange + final boolean repairRestoreOffsetOutOfRange, + final Optional snapshotCtx ) { this.factories = factories; this.wrapped = wrapped; @@ -117,6 +124,7 @@ public ResponsiveKafkaClientSupplier( this ); applicationId = configs.getString(StreamsConfig.APPLICATION_ID_CONFIG); + this.snapshotCtx = snapshotCtx; } @Override @@ -138,7 +146,8 @@ public Producer getProducer(final Map config) { config, endOffsetsPoller, storeRegistry, - factories + factories, + snapshotCtx ); return factories.createResponsiveProducer( (String) config.get(CommonClientConfigs.CLIENT_ID_CONFIG), @@ -169,7 +178,8 @@ public Consumer getConsumer(final Map config) { config, endOffsetsPoller, storeRegistry, - factories + factories, + snapshotCtx ); // TODO: the end offsets poller call is kind of heavy for a synchronized block return factories.createResponsiveConsumer( @@ -253,7 +263,8 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( final Map configs, final EndOffsetsPoller endOffsetsPoller, final ResponsiveStoreRegistry storeRegistry, - final Factories factories + final Factories factories, + final Optional snapshotContext ) { if (threadListeners.containsKey(threadId)) { final var tl = threadListeners.get(threadId); @@ -262,6 +273,16 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( } final var offsetRecorder = factories.createOffsetRecorder(eos, threadId); final var originEventRecorder = factories.createOriginEventRecorder(oeReporter, eos); + final var storeCommitListener = new StoreCommitListener(storeRegistry, offsetRecorder); + final var snapshotCommitListener = snapshotContext.map( + ctx -> new SnapshotCommitListener( + ctx.orchestrator(), + ctx.generationStorage(), + storeRegistry, + ctx.topologyTaskInfo(), + offsetRecorder + ) + ); final var tl = new ReferenceCounted<>( String.format("ListenersForThread(%s)", threadId), new ListenersForThread( @@ -272,9 +293,10 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( threadId, offsetRecorder ), - new StoreCommitListener(storeRegistry, offsetRecorder), + storeCommitListener, endOffsetsPoller.addForThread(threadId), - originEventRecorder + originEventRecorder, + snapshotCommitListener ) ); threadListeners.put(threadId, tl); @@ -306,6 +328,7 @@ private static class ListenersForThread implements Closeable { final StoreCommitListener storeCommitListener; final EndOffsetsPoller.Listener endOffsetsPollerListener; final OriginEventTracker originEventTracker; + final Optional snapshotCommitListener; public ListenersForThread( final String threadId, @@ -313,7 +336,8 @@ public ListenersForThread( final MetricPublishingCommitListener committedOffsetMetricListener, final StoreCommitListener storeCommitListener, final EndOffsetsPoller.Listener endOffsetsPollerListener, - final OriginEventTracker originEventTracker + final OriginEventTracker originEventTracker, + final Optional snapshotCommitListener ) { this.threadId = threadId; this.offsetRecorder = offsetRecorder; @@ -321,12 +345,14 @@ public ListenersForThread( this.storeCommitListener = storeCommitListener; this.endOffsetsPollerListener = endOffsetsPollerListener; this.originEventTracker = originEventTracker; + this.snapshotCommitListener = snapshotCommitListener; } @Override public void close() { committedOffsetMetricListener.close(); endOffsetsPollerListener.close(); + snapshotCommitListener.ifPresent(SnapshotCommitListener::close); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java new file mode 100644 index 000000000..86f8c86dc --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java @@ -0,0 +1,75 @@ +package dev.responsive.kafka.internal.snapshot; + +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import java.util.Optional; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyDescription; + +public class KafkaStreamsSnapshotContext { + private final SnapshotOrchestrator orchestrator; + private final GenerationStorage generationStorage; + private final TopologyTaskInfo topologyTaskInfo; + private final SnapshotStore snapshotStore; + + private KafkaStreamsSnapshotContext( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final SnapshotStore snapshotStore, + final TopologyTaskInfo topologyTaskInfo + ) { + this.orchestrator = orchestrator; + this.generationStorage = generationStorage; + this.snapshotStore = snapshotStore; + this.topologyTaskInfo = topologyTaskInfo; + } + + public SnapshotOrchestrator orchestrator() { + return orchestrator; + } + + public GenerationStorage generationStorage() { + return generationStorage; + } + + public TopologyTaskInfo topologyTaskInfo() { + return topologyTaskInfo; + } + + public static Optional create( + final ResponsiveConfig config, + final StreamsConfig streamsConfig, + final TopologyDescription topologyDescription + ) { + final SnapshotSupport support = SnapshotSupport + .valueOf(config.getString(ResponsiveConfig.SNAPSHOTS_CONFIG)); + switch (support) { + case LOCAL: { + final SnapshotStore store = TopicSnapshotStore.create(config.originals()); + final TopologyTaskInfo tti; + try (final Admin admin = Admin.create(config.originals())) { + tti = TopologyTaskInfo.forTopology( + topologyDescription, + admin + ); + } + final SnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + store, + tti.partitionsByTask().keySet() + ); + final GenerationStorage generationStorage = new SnapshotStoreBasedGenerationStorage(store); + return Optional.of(new KafkaStreamsSnapshotContext( + orchestrator, + generationStorage, + store, + tti + )); + } + default: { + return Optional.empty(); + } + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java new file mode 100644 index 000000000..b60fde43d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java @@ -0,0 +1,180 @@ +package dev.responsive.kafka.internal.snapshot; + +import com.google.common.annotations.VisibleForTesting; +import dev.responsive.kafka.internal.clients.OffsetRecorder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapshotCommitListener { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommitListener.class); + + private final SnapshotOrchestrator orchestrator; + private final GenerationStorage generationStorage; + private final ResponsiveStoreRegistry storeRegistry; + private final TopologyTaskInfo topologyTaskInfo; + private final Supplier clock; + + public SnapshotCommitListener( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final ResponsiveStoreRegistry storeRegistry, + final TopologyTaskInfo topologyTaskInfo, + final OffsetRecorder offsetRecorder + ) { + this( + orchestrator, + generationStorage, + storeRegistry, + topologyTaskInfo, + offsetRecorder, + Instant::now + ); + } + + SnapshotCommitListener( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final ResponsiveStoreRegistry storeRegistry, + final TopologyTaskInfo topologyTaskInfo, + final OffsetRecorder offsetRecorder, + final Supplier clock + ) { + this.orchestrator = Objects.requireNonNull(orchestrator); + this.generationStorage = Objects.requireNonNull(generationStorage); + this.storeRegistry = Objects.requireNonNull(storeRegistry); + this.topologyTaskInfo = Objects.requireNonNull(topologyTaskInfo); + this.clock = Objects.requireNonNull(clock); + offsetRecorder.addCommitCallback(this::onCommit); + } + + @VisibleForTesting + void onCommit( + final String threadId, + final Map recordedCommittedOffsetsFromCommit, + final Map writtenOffsetsFromCommit + ) { + try { + maybeTakeTaskSnapshots( + threadId, + recordedCommittedOffsetsFromCommit, + writtenOffsetsFromCommit + ); + } catch (final RuntimeException e) { + LOG.warn("error taking task snapshots", e); + } + } + + private void maybeTakeTaskSnapshots( + final String threadId, + final Map recordedCommittedOffsetsFromCommit, + final Map writtenOffsetsFromCommit + ) { + final Map committedOffsetsFromCommit = recordedCommittedOffsetsFromCommit + .entrySet() + .stream() + .collect(Collectors.toMap(e -> e.getKey().getPartition(), Map.Entry::getValue)); + // get all tasks represented in the commit and snapshot them + // TODO: this means we won't ever snapshot a task if it's idle, or if its partitions + // don't commit together. This means that (in the former case for certain) we + // we may not ever finish the snapshot. We should do a few things: + // (1) trigger this logic when either commits happen or when we observe the app is + // totally idle. Alternatively we could trigger a commit for all tasks when the + // generation bumps, but that requires a kip to trigger commits. + // (2) derive the set of tasks to snapshot from the assignment rather than whats + // in the commit. + // (3) to do the above we need to make sure to filter out restoring tasks + final Set tasksInCommit = committedOffsetsFromCommit.keySet() + .stream() + .map(p -> topologyTaskInfo.tasksByPartition().get(p)) + .collect(Collectors.toSet()); + + final long generation = orchestrator.getCurrentGeneration(); + + // go task by task + final List taskSnapshots = new ArrayList<>(tasksInCommit.size()); + for (final TaskId taskId : tasksInCommit) { + final long taskGeneration = generationStorage.lookupGeneration(taskId); + if (taskGeneration >= generation) { + // we already have a snapshot for this task, skip + continue; + } + final List partitionsInTask = topologyTaskInfo.partitionsByTask().get(taskId); + final List snapshotOffsets + = new ArrayList<>(partitionsInTask.size()); + final Set partitionsNotInCommit = new HashSet<>(); + for (final TopicPartition p : partitionsInTask) { + if (committedOffsetsFromCommit.containsKey(p)) { + snapshotOffsets.add(new Snapshot.CommittedOffset( + p.topic(), + p.partition(), + committedOffsetsFromCommit.get(p) + )); + } else { + partitionsNotInCommit.add(p); + } + } + if (!partitionsNotInCommit.isEmpty()) { + // we can only take the snapshot if the commit includes all partitions from the task + // if it doesn't we don't know the source offset. In that case, wait for a later + // commit + LOG.warn("commit missing partitions from task {}: {}. Skip snapshot for this commit", + taskId, + partitionsNotInCommit.stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", ")) + ); + continue; + } + // checkpoint stores + final Map storeCheckpoints = new HashMap<>(); + final var stores = storeRegistry.getRegisteredStoresForTask(taskId, threadId); + for (final var store : stores) { + // get the store's changelog offset from either the written or committed offsets + Optional changelogOffset; + if (committedOffsetsFromCommit.containsKey(store.changelogTopicPartition())) { + // for stores sourced from a source topic + changelogOffset + = Optional.of(committedOffsetsFromCommit.get(store.changelogTopicPartition())); + } else if (writtenOffsetsFromCommit.containsKey(store.changelogTopicPartition())) { + // for stores with an internal changelog topic + changelogOffset + = Optional.of(writtenOffsetsFromCommit.get(store.changelogTopicPartition())); + } else { + // the task's source offset advanced, but it didn't write anything + changelogOffset = Optional.empty(); + } + final var checkpoint = store.callbacks().checkpoint(changelogOffset); + storeCheckpoints.put(store.storeName(), checkpoint); + } + final Snapshot.TaskSnapshotMetadata tsm = new Snapshot.TaskSnapshotMetadata( + taskId, + snapshotOffsets, + storeCheckpoints, + clock.get() + ); + taskSnapshots.add(tsm); + } + if (!taskSnapshots.isEmpty()) { + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + } + } + + public void close() { + orchestrator.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java new file mode 100644 index 000000000..4a0bc8049 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java @@ -0,0 +1,7 @@ +package dev.responsive.kafka.internal.snapshot; + +public enum SnapshotSupport { + DISABLED, + LOCAL, + // eventually add an option for responsive platform +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java index 150275e87..3274b15c8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java @@ -1,5 +1,6 @@ package dev.responsive.kafka.internal.snapshot.topic; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.snapshot.Snapshot; import dev.responsive.kafka.internal.snapshot.SnapshotStore; import java.time.Duration; @@ -82,6 +83,20 @@ public TopicSnapshotStore( waitTillConsumedAll(); } + public static TopicSnapshotStore create(final Map config) { + final ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(config); + final String applicationId = config.get(StreamsConfig.APPLICATION_ID_CONFIG).toString(); + final String topicSuffix = responsiveConfig + .getString(ResponsiveConfig.SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX); + final String snapshotStoreTopic + = String.format("_responsive-%s-%s", applicationId, topicSuffix); + return create( + snapshotStoreTopic, + responsiveConfig.getShort(ResponsiveConfig.SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR), + config + ); + } + public static TopicSnapshotStore create( final String topic, final short replicas, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index 85c900400..eb1084cb7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; @@ -93,6 +94,7 @@ public class CommitBuffer, P> implements Closeable { private final Sensor flushSensor; private final Sensor flushLatencySensor; private final Sensor flushErrorsSensor; + private Optional lastKnownConsumedOffset = Optional.empty(); private KafkaFuture deleteRecordsFuture = KafkaFuture.completedFuture(null); @@ -261,10 +263,12 @@ private static boolean hasSourceTopicChangelog(final String changelogTopicName) } public void put(final K key, final byte[] value, long timestamp) { + lastKnownConsumedOffset = Optional.empty(); buffer.put(key, Result.value(key, value, timestamp)); } public void tombstone(final K key, long timestamp) { + lastKnownConsumedOffset = Optional.empty(); buffer.put(key, Result.tombstone(key, timestamp)); } @@ -382,8 +386,45 @@ private boolean triggerFlush() { return recordsTrigger || bytesTrigger || timeTrigger; } + /** + * Force a flush of the commit buffer. When forcing a flush of the commit buffer, the + * caller can specify the offset in the changelog topic up to which the commit buffer + * has data. If not specified, then the commit buffer will use the last offset specified + * in a call to flush or forceFlush, unless there were writes done since the last flush. + * In that case, this call throws an IllegalStateException. + */ + public void forceFlush(final Optional maybeConsumedOffset) { + if (buffer.sizeInRecords() == 0) { + return; + } + final long consumedOffset; + if (maybeConsumedOffset.isPresent()) { + consumedOffset = maybeConsumedOffset.get(); + } else if (lastKnownConsumedOffset.isPresent()) { + consumedOffset = lastKnownConsumedOffset.get(); + } else { + throw new IllegalStateException( + "tried to force-flush buffer w/ new writes since last flush without specifying" + + " a consumed offset" + ); + } + flush(consumedOffset, true); + } + public void flush(final long consumedOffset) { - if (!triggerFlush()) { + flush(consumedOffset, false); + } + + private void flush(final long consumedOffset, final boolean force) { + if (lastKnownConsumedOffset.isPresent() && lastKnownConsumedOffset.get() > consumedOffset) { + throw new IllegalStateException(String.format( + "tried to flush to an earlier offset (%d) than last known consumed offset (%d", + consumedOffset, + lastKnownConsumedOffset.get() + )); + } + lastKnownConsumedOffset = Optional.of(consumedOffset); + if (!force && !triggerFlush()) { return; } @@ -471,6 +512,7 @@ private void restoreCassandraBatch(final Collection= 0) { + lastKnownConsumedOffset = Optional.of(consumedOffset); doFlush(consumedOffset, records.size()); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 9496d33da..df73d01a0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -81,7 +81,6 @@ public static PartitionedOperations create( final StateStoreContext storeContext, final ResponsiveKeyValueParams params ) throws InterruptedException, TimeoutException { - final var log = new LogContext( String.format("store [%s] ", name.kafkaName()) ).logger(PartitionedOperations.class); @@ -177,11 +176,13 @@ public void notifyCommit(long committedOffset) { } @Override - public byte[] checkpoint() { + public byte[] checkpoint(final Optional committedOffset) { + initializedBuffer.forceFlush(committedOffset); return table.checkpoint(); } }, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java index 54b6907c1..be47a1325 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java @@ -169,7 +169,8 @@ public static RemoteWindowOperations create( ? OptionalLong.empty() : OptionalLong.of(restoreStartOffset), buffer::flush, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java index 7d4b50a55..329d7654c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java @@ -14,9 +14,11 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; public final class ResponsiveStoreRegistration { @@ -27,6 +29,7 @@ public final class ResponsiveStoreRegistration { private final TopicPartition changelogTopicPartition; private final StoreCallbacks callbacks; private final String threadId; + private final TaskId taskId; private final InjectedStoreArgs injectedStoreArgs = new InjectedStoreArgs(); private final OptionalLong startOffset; // stored offset during init, (where restore should start) @@ -37,13 +40,15 @@ public ResponsiveStoreRegistration( final TopicPartition changelogTopicPartition, final OptionalLong startOffset, final StoreCallbacks callbacks, - final String threadId + final String threadId, + final TaskId taskId ) { this.storeName = Objects.requireNonNull(storeName); this.changelogTopicPartition = Objects.requireNonNull(changelogTopicPartition); this.startOffset = startOffset; this.callbacks = Objects.requireNonNull(callbacks); this.threadId = Objects.requireNonNull(threadId); + this.taskId = Objects.requireNonNull(taskId); this.log = new LogContext( String.format("changelog [%s]", changelogTopicPartition) ).logger(ResponsiveStoreRegistration.class); @@ -74,10 +79,14 @@ public String threadId() { return threadId; } + public TaskId taskId() { + return taskId; + } + public interface StoreCallbacks { void notifyCommit(long committedOffset); - default byte[] checkpoint() { + default byte[] checkpoint(Optional committedOffset) { throw new UnsupportedOperationException("checkpoints not supported for store type"); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java index c409c99b0..8db52bd77 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java @@ -18,6 +18,7 @@ import java.util.OptionalLong; import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,25 @@ public synchronized OptionalLong getCommittedOffset( .max(); } + private List filterStoresForThread( + final List stores, + final String threadId, + final String context + ) { + if (stores.isEmpty()) { + return stores; + } + final List storesForThread = stores.stream() + .filter(s -> s.threadId().equals(threadId)) + .collect(Collectors.toList()); + if (storesForThread.isEmpty()) { + throw new IllegalStateException(String.format( + "there should always be a store for the thread (%s) if there are stores registered " + + "for this %s", threadId, context)); + } + return stores; + } + public synchronized List getRegisteredStoresForChangelog( final TopicPartition topicPartition ) { @@ -53,18 +73,25 @@ public synchronized List getRegisteredStoresForChan final List storesForTopicPartition = stores.stream() .filter(s -> s.changelogTopicPartition().equals(topicPartition)) .collect(Collectors.toList()); - if (storesForTopicPartition.isEmpty()) { - return storesForTopicPartition; - } - final List storesForThread = storesForTopicPartition.stream() - .filter(s -> s.threadId().equals(threadId)) + return filterStoresForThread( + storesForTopicPartition, + threadId, + String.format("topic partition (%s)", topicPartition) + ); + } + + public synchronized List getRegisteredStoresForTask( + final TaskId taskId, + final String threadId + ) { + final List storesForTask = stores.stream() + .filter(s -> s.taskId().equals(taskId)) .collect(Collectors.toList()); - if (storesForThread.isEmpty()) { - throw new IllegalStateException(String.format( - "there should always be a store for the thread (%s) if there are stores registered " - + "for this topic partition (%s)", threadId, topicPartition)); - } - return storesForThread; + return filterStoresForThread( + storesForTask, + threadId, + String.format("task (%s)", taskId) + ); } public synchronized void registerStore(final ResponsiveStoreRegistration registration) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java index 3d3e554af..7d7df378b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java @@ -125,7 +125,8 @@ public static SessionOperationsImpl create( ? OptionalLong.empty() : OptionalLong.of(restoreStartOffset), buffer::flush, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java index 459911bbc..61800697d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; @@ -308,7 +309,8 @@ private ResponsiveKafkaClientSupplier supplier( metrics, storageBackend, oeReporter, - false + false, + Optional.empty() ); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java index 10cef9fcf..e3f7f1a13 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -50,14 +51,16 @@ public void setup() { PARTITION1, OptionalLong.of(0), store1Callbacks, - "thread1" + "thread1", + new TaskId(0, 0) )); registry.registerStore(new ResponsiveStoreRegistration( "store2", PARTITION2, OptionalLong.of(0), store2Callbacks, - "thread1" + "thread1", + new TaskId(0, 1) )); commitListener = new StoreCommitListener(registry, offsetRecorder); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java new file mode 100644 index 000000000..12011e31d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java @@ -0,0 +1,295 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import dev.responsive.kafka.internal.clients.OffsetRecorder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import dev.responsive.kafka.internal.utils.TopologyTaskInfoUtils; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SnapshotCommitListenerTest { + private static final int SUBTOPOLOGIES = 1; + private static final int TOPICS = 2; + private static final int PARTITIONS = 2; + private static final String STORE0_NAME = "store0"; + private static final byte[] TASK0_STORE0_CHECKPOINT + = "task0topic0cp".getBytes(Charset.defaultCharset()); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final TopologyTaskInfo topologyTaskInfo = createTopologyTaskInfoWith( + SUBTOPOLOGIES, + TOPICS, + PARTITIONS + ); + private final GenerationStorage generationStorage + = new SnapshotStoreBasedGenerationStorage(snapshotStore); + private final LocalSnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + snapshotStore, + Set.copyOf(topologyTaskInfo.partitionsByTask().keySet()) + ); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private final AtomicReference clock = new AtomicReference<>(Instant.now()); + + @Mock + private ResponsiveStoreRegistry storeRegistry; + @Spy + private SnapshotOrchestrator orchestratorSpy = orchestrator; + @Mock + private ResponsiveStoreRegistration task0t0storeRegistration; + @Mock + private ResponsiveStoreRegistration.StoreCallbacks task0t0storeCallbacks; + @Mock + private OffsetRecorder offsetRecorder; + private long generation; + + private SnapshotCommitListener listener; + + @BeforeEach + public void setup() { + listener = new SnapshotCommitListener( + orchestratorSpy, + generationStorage, + storeRegistry, + topologyTaskInfo, + offsetRecorder, + clock::get + ); + generation = api.createSnapshot().generation(); + lenient().when(task0t0storeRegistration.storeName()).thenReturn(STORE0_NAME); + lenient().when(task0t0storeRegistration.callbacks()).thenReturn(task0t0storeCallbacks); + lenient().when(task0t0storeCallbacks.checkpoint(any())).thenReturn(TASK0_STORE0_CHECKPOINT); + lenient().when(storeRegistry.getRegisteredStoresForTask(eq(taskId(0, 0)), any())) + .thenReturn(List.of(task0t0storeRegistration)); + } + + @Test + public void shouldSkipTasksOnTheCurrentGeneration() { + // given: + orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), List.of(), Map.of(), Instant.now())) + ); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition(0, 0, 0)), 100L, + recordingKey(partition(0, 1, 0)), 200L + ), + Map.of() + ); + + // then: + verify(orchestratorSpy, times(0)) + .reportTaskSnapshotMetadata(anyLong(), any()); + } + + @Test + public void shouldSkipTasksWithPartitionsThatDidNotCommit() { + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition(0, 0, 0)), 100L + ), + Map.of() + ); + + // then: + verify(orchestratorSpy, times(0)) + .reportTaskSnapshotMetadata(anyLong(), any()); + } + + @Test + public void shouldTakeCheckpointOfStoreUsingSourceTopicOffset() { + // given: + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(partition(0, 0, 0)); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.of(100L)); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldTakeCheckpointOfStoreUsingChangelogOffset() { + // given: + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of(changelog, 300L) + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.of(300L)); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldTakeCheckpointOfStoreWithoutSpecifyingOffsetIfChangelogNotInCommit() { + // given: + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.empty()); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldNotThrowOnErrorFromSnapshotting() { + // given: + doThrow(new RuntimeException("oops")) + .when(orchestratorSpy) + .reportTaskSnapshotMetadata(anyLong(), any()); + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when/then (no throw0: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + verify(orchestratorSpy).reportTaskSnapshotMetadata(anyLong(), any()); + } + + private OffsetRecorder.RecordingKey recordingKey(final TopicPartition topicPartition) { + return new OffsetRecorder.RecordingKey(topicPartition, ""); + } + + private TaskId taskId(final int subtopology, final int partition) { + return new TaskId(subtopology, partition); + } + + private static TopicPartition partition( + final int subtopology, + final int topic, + final int partition + ) { + return new TopicPartition(String.format("%s-%s", subtopology, topic), partition); + } + + private static TopologyTaskInfo createTopologyTaskInfoWith( + final int subtoplogies, + final int topics, + final int partitions + ) { + final Map partitionToTask = new HashMap<>(); + final Map> taskToPartitions = new HashMap<>(); + for (int s = 0; s < subtoplogies; s++) { + for (int t = 0; t < topics; t++) { + for (int p = 0; p < partitions; p++) { + final TaskId taskId = new TaskId(s, p); + taskToPartitions.putIfAbsent(new TaskId(s, p), new ArrayList<>(t)); + final TopicPartition tp = partition(s, t, p); + taskToPartitions.get(taskId).add(tp); + partitionToTask.put(tp, taskId); + } + } + } + return TopologyTaskInfoUtils.createWith(partitionToTask, taskToPartitions); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 2b14e27ee..944f61bbc 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -503,6 +503,96 @@ public void shouldOnlyFlushWhenIntervalTriggerElapsed() { } } + @Test + public void shouldForceFlush() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + + // when: + buffer.forceFlush(Optional.of(9L)); + + // Then: + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(9L)); + } + } + + @Test + public void shouldForceFlushUsingLastKnownConsumedOffset() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + + // when: + buffer.forceFlush(Optional.empty()); + + // Then: + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(9L)); + } + } + + @Test + public void shouldFailForceFlushIfPutsSinceLastFlushAndNoOffsetSpecified() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + buffer.put(Bytes.wrap(new byte[]{0}), VALUE, CURRENT_TS); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> buffer.forceFlush(Optional.empty())); + } + } + + @Test + public void shouldFailForceFlushIfDeletesSinceLastFlushAndNoOffsetSpecified() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + buffer.tombstone(Bytes.wrap(new byte[]{0}), CURRENT_TS); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> buffer.forceFlush(Optional.empty())); + } + } + @Test public void shouldUpdateOffsetWhenNoRecordsInBuffer() { // Given: diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java index 0c07d289a..d14fd0b96 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java @@ -19,6 +19,7 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import java.util.OptionalLong; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,7 +32,8 @@ class ResponsiveStoreRegistryTest { TOPIC_PARTITION, OptionalLong.of(123L), o -> {}, - "thread" + "thread", + new TaskId(0, 5) ); private static final ResponsiveStoreRegistration UNINIT_REGISTRATION = @@ -40,7 +42,8 @@ class ResponsiveStoreRegistryTest { UNINIT_TOPIC_PARTITION, OptionalLong.empty(), o -> { }, - "thread" + "thread", + new TaskId(0, 2) ); private final ResponsiveStoreRegistry registry = new ResponsiveStoreRegistry();