diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 06160d3fce..6255066545 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -74,12 +74,9 @@ jobs: DAPR_REF: TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64 steps: - - name: Install Stable Docker - id: setup_docker - uses: docker/setup-docker-action@v4 - - name: Check Docker version - run: docker version - uses: actions/checkout@v5 + - name: Check Docker version + run: docker version - name: Set up OpenJDK ${{ env.JDK_VER }} uses: actions/setup-java@v5 with: @@ -146,8 +143,6 @@ jobs: - name: Integration tests using spring boot version ${{ matrix.spring-boot-version }} id: integration_tests run: PRODUCT_SPRING_BOOT_VERSION=${{ matrix.spring-boot-version }} ./mvnw -B -Pintegration-tests dependency:copy-dependencies verify - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Upload failsafe test report for sdk-tests on failure if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }} uses: actions/upload-artifact@v4 diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 07f0b74dd7..20fb00478a 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -49,9 +49,6 @@ jobs: with: distribution: 'temurin' java-version: ${{ env.JDK_VER }} - - name: Install Stable Docker - id: setup_docker - uses: docker/setup-docker-action@v4 - name: Check Docker version run: docker version - name: Set up Dapr CLI @@ -109,114 +106,75 @@ jobs: run: sleep 30 && docker logs dapr_scheduler && nc -vz localhost 50006 - name: Install jars run: ./mvnw clean install -DskipTests -q - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate workflows example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/workflows/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate Spring Boot examples working-directory: ./spring-boot-examples run: | mm.py README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate Spring Boot Workflow examples working-directory: ./spring-boot-examples/workflows run: | mm.py README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate Jobs example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/jobs/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate conversation ai example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/conversation/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate invoke http example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/invoke/http/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate invoke grpc example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/invoke/grpc/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate tracing example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/tracing/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate expection handling example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/exception/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate state example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/state/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate pubsub example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/pubsub/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate bindings HTTP example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/bindings/http/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate secrets example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/secrets/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate unit testing example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/unittesting/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate Configuration API example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/configuration/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate actors example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/actors/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate query state HTTP example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/querystate/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate streaming subscription example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md - env: - DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index d9d41b3759..da3e4e2482 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. -In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription. +In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. ```java public class Subscriber { @@ -59,25 +59,19 @@ public class Subscriber { public static void main(String[] args) throws Exception { String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - var subscription = client.subscribeToEvents( + // Subscribe to events using the Flux-based reactive API + // The stream will emit CloudEvent objects as they arrive + client.subscribeToEvents( PUBSUB_NAME, topicName, - new SubscriptionListener<>() { - - @Override - public Mono onEvent(CloudEvent event) { - System.out.println("Subscriber got: " + event.getData()); - return Mono.just(Status.SUCCESS); - } - - @Override - public void onError(RuntimeException exception) { - System.out.println("Subscriber got exception: " + exception.getMessage()); - } - }, - TypeRef.STRING); - - subscription.awaitTermination(); + TypeRef.STRING) + .doOnNext(event -> { + System.out.println("Subscriber got: " + event.getData()); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) } } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java index 31678dce08..763bb436ce 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java @@ -14,10 +14,7 @@ package io.dapr.examples.pubsub.stream; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.SubscriptionListener; -import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; -import reactor.core.publisher.Mono; /** * Subscriber using bi-directional gRPC streaming, which does not require an app port. @@ -44,25 +41,19 @@ public class Subscriber { public static void main(String[] args) throws Exception { String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - var subscription = client.subscribeToEvents( + // Subscribe to events using the Flux-based reactive API + // The stream will emit CloudEvent objects as they arrive + client.subscribeToEvents( PUBSUB_NAME, topicName, - new SubscriptionListener<>() { - - @Override - public Mono onEvent(CloudEvent event) { - System.out.println("Subscriber got: " + event.getData()); - return Mono.just(Status.SUCCESS); - } - - @Override - public void onError(RuntimeException exception) { - System.out.println("Subscriber got exception: " + exception.getMessage()); - } - }, - TypeRef.STRING); - - subscription.awaitTermination(); + TypeRef.STRING) + .doOnNext(event -> { + System.out.println("Subscriber got: " + event.getData()); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/crossapp/WorkflowsCrossAppCallActivityIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/crossapp/WorkflowsCrossAppCallActivityIT.java index 868d0adec1..8b55fce1fd 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/crossapp/WorkflowsCrossAppCallActivityIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/crossapp/WorkflowsCrossAppCallActivityIT.java @@ -28,6 +28,7 @@ import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; @@ -55,6 +56,7 @@ public class WorkflowsCrossAppCallActivityIT { private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final DockerImageName JAVA_WORKER_IMAGE = DockerImageName.parse("eclipse-temurin:17-jdk"); @Container private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG) @@ -113,7 +115,7 @@ public class WorkflowsCrossAppCallActivityIT { // TestContainers for each app @Container - private static GenericContainer crossappWorker = new GenericContainer<>("openjdk:17-jdk-slim") + private static GenericContainer crossappWorker = new GenericContainer<>(JAVA_WORKER_IMAGE) .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") .withWorkingDirectory("/app") .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", @@ -127,7 +129,7 @@ public class WorkflowsCrossAppCallActivityIT { .withLogConsumer(outputFrame -> System.out.println("CrossAppWorker: " + outputFrame.getUtf8String())); @Container - private final static GenericContainer app2Worker = new GenericContainer<>("openjdk:17-jdk-slim") + private final static GenericContainer app2Worker = new GenericContainer<>(JAVA_WORKER_IMAGE) .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") .withWorkingDirectory("/app") .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", @@ -141,7 +143,7 @@ public class WorkflowsCrossAppCallActivityIT { .withLogConsumer(outputFrame -> System.out.println("App2Worker: " + outputFrame.getUtf8String())); @Container - private final static GenericContainer app3Worker = new GenericContainer<>("openjdk:17-jdk-slim") + private final static GenericContainer app3Worker = new GenericContainer<>(JAVA_WORKER_IMAGE) .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") .withWorkingDirectory("/app") .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 012921a89e..0dfb1b644b 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -91,6 +91,7 @@ import io.dapr.internal.grpc.DaprClientGrpcInterceptors; import io.dapr.internal.resiliency.RetryPolicy; import io.dapr.internal.resiliency.TimeoutPolicy; +import io.dapr.internal.subscription.EventSubscriberStreamObserver; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.DefaultContentTypeConverter; @@ -475,6 +476,42 @@ public Subscription subscribeToEvents( return buildSubscription(listener, type, request); } + /** + * {@inheritDoc} + */ + @Override + public Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type) { + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() + .setTopic(topic) + .setPubsubName(pubsubName) + .build(); + DaprProtos.SubscribeTopicEventsRequestAlpha1 request = + DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setInitialRequest(initialRequest) + .build(); + + return Flux.create(sink -> { + DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); + EventSubscriberStreamObserver eventSubscriber = new EventSubscriberStreamObserver<>( + interceptedStub, + sink, + type, + this.objectSerializer + ); + StreamObserver requestStream = eventSubscriber.start(request); + + // Cleanup when Flux is cancelled or completed + sink.onDispose(() -> { + try { + requestStream.onCompleted(); + } catch (Exception e) { + logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage()); + } + }); + }, FluxSink.OverflowStrategy.BUFFER); + } + @Nonnull private Subscription buildSubscription( SubscriptionListener listener, diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 92c6a61c3e..545b8e5dc5 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -17,6 +17,7 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.ConversationRequest; import io.dapr.client.domain.ConversationRequestAlpha2; import io.dapr.client.domain.ConversationResponse; @@ -32,6 +33,7 @@ import io.dapr.client.domain.UnlockResponseStatus; import io.dapr.client.domain.query.Query; import io.dapr.utils.TypeRef; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; @@ -271,12 +273,24 @@ Mono> publishEvents(String pubsubName, String topicNa * @param topic Name of the topic to subscribe to. * @param listener Callback methods to process events. * @param type Type for object deserialization. - * @return An active subscription. * @param Type of object deserialization. + * @return An active subscription. + * @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach. */ + @Deprecated Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux. + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param type Type for object deserialization. + * @return A Flux of CloudEvents containing deserialized event payloads and metadata. + * @param Type of the event payload. + */ + Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type); + /** * Schedules a job using the provided job request details. * diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java index 53e89e8456..2cbd1e9b30 100644 --- a/sdk/src/main/java/io/dapr/client/Subscription.java +++ b/sdk/src/main/java/io/dapr/client/Subscription.java @@ -35,6 +35,7 @@ * Streaming subscription of events for Dapr's pubsub. * @param Application's object type. */ +@Deprecated public class Subscription implements Closeable { private final BlockingQueue ackQueue = new LinkedBlockingQueue<>(50); diff --git a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java index 5a467d69f4..c5420af602 100644 --- a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java +++ b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java @@ -20,6 +20,7 @@ * Callback interface to receive events from a streaming subscription of events. * @param Object type for deserialization. */ +@Deprecated public interface SubscriptionListener { /** diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java new file mode 100644 index 0000000000..56131882b8 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -0,0 +1,223 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.subscription; + +import io.dapr.client.domain.CloudEvent; +import io.dapr.exceptions.DaprException; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.FluxSink; + +import java.io.IOException; + +/** + * StreamObserver implementation for subscribing to Dapr pub/sub events. + * Thread Safety: This class relies on gRPC's StreamObserver contract, which guarantees that + * onNext(), onError(), and onCompleted() are never called concurrently and always from the + * same thread. Therefore, no additional synchronization is needed. + * + * @param The type of the event payload + */ +public class EventSubscriberStreamObserver implements StreamObserver { + + private static final Logger logger = LoggerFactory.getLogger(EventSubscriberStreamObserver.class); + + private final DaprGrpc.DaprStub stub; + private final FluxSink> sink; + private final TypeRef type; + private final DaprObjectSerializer objectSerializer; + + private StreamObserver requestStream; + + /** + * Creates a new EventSubscriberStreamObserver. + * + * @param stub The gRPC stub for making Dapr service calls + * @param sink The FluxSink to emit CloudEvents to + * @param type The TypeRef for deserializing event payloads + * @param objectSerializer The serializer to use for deserialization + */ + public EventSubscriberStreamObserver( + DaprGrpc.DaprStub stub, + FluxSink> sink, + TypeRef type, + DaprObjectSerializer objectSerializer) { + this.stub = stub; + this.sink = sink; + this.type = type; + this.objectSerializer = objectSerializer; + } + + /** Starts the subscription by sending the initial request. + * + * @param request The subscription request + * @return The StreamObserver to send further requests (acknowledgments) + */ + public StreamObserver start( + DaprProtos.SubscribeTopicEventsRequestAlpha1 request + ) { + requestStream = stub.subscribeTopicEventsAlpha1(this); + + requestStream.onNext(request); + + return requestStream; + } + + @Override + public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { + if (!isValidEventMessage(response)) { + return; + } + + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + String eventId = message.getId(); + + try { + T data = deserializeEventData(message); + CloudEvent cloudEvent = buildCloudEvent(message, data); + emitEventAndAcknowledge(cloudEvent, eventId); + } catch (IOException e) { + // Deserialization failure - send DROP ack + handleDeserializationError(eventId, e); + } catch (Exception e) { + // Processing failure - send RETRY ack + handleProcessingError(eventId, e); + } + } + + @Override + public void onError(Throwable throwable) { + sink.error(DaprException.propagate(throwable)); + } + + @Override + public void onCompleted() { + sink.complete(); + } + + private boolean isValidEventMessage(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { + if (response.getEventMessage() == null) { + logger.debug("Received response with null event message, skipping"); + return false; + } + + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + + if (message.getPubsubName() == null || message.getPubsubName().isEmpty()) { + logger.debug("Received event with empty pubsub name, skipping"); + return false; + } + + if (message.getId() == null || message.getId().isEmpty()) { + logger.debug("Received event with empty ID, skipping"); + return false; + } + + return true; + } + + private T deserializeEventData(DaprAppCallbackProtos.TopicEventRequest message) throws IOException { + if (type == null) { + logger.debug("Type is null, skipping deserialization for event ID: {}", message.getId()); + return null; + } + + return objectSerializer.deserialize(message.getData().toByteArray(), type); + } + + private CloudEvent buildCloudEvent(DaprAppCallbackProtos.TopicEventRequest message, T data) { + CloudEvent cloudEvent = new CloudEvent<>(); + + cloudEvent.setId(message.getId()); + cloudEvent.setType(message.getType()); + cloudEvent.setSpecversion(message.getSpecVersion()); + cloudEvent.setDatacontenttype(message.getDataContentType()); + cloudEvent.setTopic(message.getTopic()); + cloudEvent.setPubsubName(message.getPubsubName()); + cloudEvent.setData(data); + + return cloudEvent; + } + + private void emitEventAndAcknowledge(CloudEvent cloudEvent, String eventId) { + sink.next(cloudEvent); + + // Send SUCCESS acknowledgment + requestStream.onNext(buildSuccessAck(eventId)); + } + + private void handleDeserializationError(String eventId, IOException cause) { + logger.error("Deserialization failed for event ID: {}, sending DROP ack", eventId, cause); + + // Send DROP ack - cannot process malformed data + requestStream.onNext(buildDropAck(eventId)); + + // Propagate error to sink + sink.error(new DaprException("DESERIALIZATION_ERROR", + "Failed to deserialize event with ID: " + eventId, cause)); + } + + private void handleProcessingError(String eventId, Exception cause) { + logger.error("Processing error for event ID: {}, attempting to send RETRY ack", eventId, cause); + + try { + // Try to send RETRY acknowledgment + requestStream.onNext(buildRetryAck(eventId)); + } catch (Exception ackException) { + // Failed to send ack - this is critical + logger.error("Failed to send RETRY ack for event ID: {}", eventId, ackException); + sink.error(DaprException.propagate(ackException)); + + return; + } + + // Propagate the original processing error + sink.error(DaprException.propagate(cause)); + } + + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS); + } + + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY); + } + + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP); + } + + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( + String eventId, + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) { + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder() + .setId(eventId) + .setStatus( + DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(status) + .build()) + .build(); + + return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setEventProcessed(eventProcessed) + .build(); + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index f7b5584cc7..a42c4f946c 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -586,6 +586,87 @@ public void onError(RuntimeException exception) { assertEquals(numErrors, errors.size()); } + @Test + public void subscribeEventFluxTest() throws Exception { + var numEvents = 100; + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + + for (int i = 0; i < numEvents; i++) { + DaprProtos.SubscribeTopicEventsResponseAlpha1 reponse = + DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build(); + observer.onNext(reponse); + } + + observer.onCompleted(); + }); + + emitterThread.start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + // No-op + } + + @Override + public void onCompleted() { + // No-op + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final AtomicInteger eventCount = new AtomicInteger(0); + final Semaphore gotAll = new Semaphore(0); + var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING) + .doOnNext(cloudEvent -> { + assertEquals(data, cloudEvent.getData()); + assertEquals(pubsubName, cloudEvent.getPubsubName()); + assertEquals(topicName, cloudEvent.getTopic()); + assertNotNull(cloudEvent.getId()); + + int count = eventCount.incrementAndGet(); + + if (count >= numEvents) { + gotAll.release(); + } + }) + .subscribe(); + + gotAll.acquire(); + disposable.dispose(); + + assertEquals(numEvents, eventCount.get()); + } + @Test public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception { List inputs = new ArrayList<>(); diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java new file mode 100644 index 0000000000..7328f79e51 --- /dev/null +++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java @@ -0,0 +1,506 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package io.dapr.internal.subscription; + +import com.google.protobuf.ByteString; +import io.dapr.client.domain.CloudEvent; +import io.dapr.exceptions.DaprException; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * Unit tests for EventSubscriberStreamObserver. + */ +class EventSubscriberStreamObserverTest { + + public static final String PUBSUB_NAME = "pubsub"; + public static final String TOPIC_NAME = "topic"; + private DaprGrpc.DaprStub mockStub; + private DaprObjectSerializer objectSerializer; + private StreamObserver mockRequestStream; + + @BeforeEach + @SuppressWarnings("unchecked") + void setUp() { + mockStub = mock(DaprGrpc.DaprStub.class); + objectSerializer = new DefaultObjectSerializer(); + mockRequestStream = mock(StreamObserver.class); + + when(mockStub.subscribeTopicEventsAlpha1(any())).thenReturn(mockRequestStream); + } + + @Test + @DisplayName("Should successfully process events and send SUCCESS acks") + void testSuccessfulEventProcessing() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + // Start the subscription + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + // Simulate receiving an event + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse( + "event-1", + "Hello World" + ); + observer.onNext(response); + + // Complete the stream + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .assertNext(cloudEvent -> { + assertEquals("Hello World", cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); + assertEquals(TOPIC_NAME, cloudEvent.getTopic()); + }) + .verifyComplete(); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + + verify(mockRequestStream, times(2)).onNext(requestCaptor.capture()); + + List requests = requestCaptor.getAllValues(); + + assertEquals(2, requests.size()); + assertTrue(requests.get(0).hasInitialRequest()); + assertTrue(requests.get(1).hasEventProcessed()); + assertEquals("event-1", requests.get(1).getEventProcessed().getId()); + assertEquals( + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS, + requests.get(1).getEventProcessed().getStatus().getStatus() + ); + } + + @Test + @DisplayName("Should handle multiple consecutive events correctly") + void testMultipleEvents() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + observer.onNext(buildEventResponse("event-1", "Message 1")); + observer.onNext(buildEventResponse("event-2", "Message 2")); + observer.onNext(buildEventResponse("event-3", "Message 3")); + + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .assertNext(cloudEvent -> { + assertEquals("Message 1", cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + }) + .assertNext(cloudEvent -> { + assertEquals("Message 2", cloudEvent.getData()); + assertEquals("event-2", cloudEvent.getId()); + }) + .assertNext(cloudEvent -> { + assertEquals("Message 3", cloudEvent.getData()); + assertEquals("event-3", cloudEvent.getId()); + }) + .verifyComplete(); + + verify(mockRequestStream, times(4)).onNext(any()); + } + + @Test + @DisplayName("Should send DROP ack when deserialization fails") + void testDeserializationError() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + // Send an event with invalid data (can't deserialize to String) + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) + .setData(ByteString.copyFrom(new byte[]{(byte) 0xFF, (byte) 0xFE})) // Invalid UTF-8 + .build() + ) + .build(); + + observer.onNext(response); + }); + + StepVerifier.create(flux) + .expectErrorMatches(error -> + error instanceof DaprException + && error.getMessage().contains("DESERIALIZATION_ERROR") + && error.getMessage().contains("event-1")) + .verify(); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + + verify(mockRequestStream, atLeast(2)).onNext(requestCaptor.capture()); + + List ackRequests = requestCaptor.getAllValues().stream() + .filter(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed) + .collect(Collectors.toList()); + + assertEquals(1, ackRequests.size()); + assertEquals("event-1", ackRequests.get(0).getEventProcessed().getId()); + assertEquals( + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP, + ackRequests.get(0).getEventProcessed().getStatus().getStatus() + ); + } + + @Test + @DisplayName("Should send RETRY ack when non-deserialization error occurs") + void testProcessingError() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); + observer.start(initialRequest); + + // Simulate a processing error by throwing during sink.next() + sink.onRequest(n -> { + throw new RuntimeException("Processing error"); + }); + + observer.onNext(buildEventResponse("event-1", "Hello")); + }); + + StepVerifier.create(flux) + .expectError(RuntimeException.class) + .verify(); + + // Note: When error occurs in onRequest callback (before processing), + // no ack is sent as the error happens before we can handle the event + verify(mockRequestStream, times(1)).onNext(any()); // Only initial request sent + } + + @Test + @DisplayName("Should propagate gRPC errors as DaprException") + void testGrpcError() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); + observer.start(initialRequest); + + // Simulate gRPC error + observer.onError(new RuntimeException("gRPC connection failed")); + }); + + StepVerifier.create(flux) + .expectError(DaprException.class) + .verify(); + } + + @Test + @DisplayName("Should handle null event messages gracefully without emitting events") + void testNullEventMessage() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .verifyComplete(); + + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + @DisplayName("Should skip events with empty pubsub name") + void testEmptyPubsubName() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName("") + .setTopic(TOPIC_NAME) + .setData(ByteString.copyFromUtf8("\"Hello\"")) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .verifyComplete(); + + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + @DisplayName("Should skip events with empty event ID") + void testEmptyEventId() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) + .setData(ByteString.copyFromUtf8("\"Hello\"")) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .verifyComplete(); + + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + @DisplayName("Should handle null type parameter and emit CloudEvent with null data") + void testNullData() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + null, // null type + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + + observer.start(initialRequest); + observer.onNext(buildEventResponse("event-1", "Hello")); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .assertNext(cloudEvent -> { + assertNull(cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); + assertEquals(TOPIC_NAME, cloudEvent.getTopic()); + }) + .verifyComplete(); + + verify(mockRequestStream, times(2)).onNext(any()); + } + + @Test + @DisplayName("Should deserialize and emit complex objects correctly") + void testComplexObjectSerialization() throws IOException { + TestEvent testEvent = new TestEvent("test-name", 42); + byte[] serializedEvent = objectSerializer.serialize(testEvent); + + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.get(TestEvent.class), + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) + .setData(ByteString.copyFrom(serializedEvent)) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .assertNext(cloudEvent -> { + TestEvent event = cloudEvent.getData(); + assertEquals("test-name", event.name); + assertEquals(42, event.value); + assertEquals("event-1", cloudEvent.getId()); + }) + .verifyComplete(); + } + + @Test + @DisplayName("Should propagate errors when ack sending fails") + void testErrorDuringSendingAck() { + doThrow(new RuntimeException("Failed to send ack")) + .when(mockRequestStream) + .onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)); + + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); + observer.start(initialRequest); + + observer.onNext(buildEventResponse("event-1", "Hello")); + }); + + StepVerifier.create(flux) + .assertNext(cloudEvent -> assertEquals("Hello", cloudEvent.getData())) // Event is emitted before ack + .expectError(DaprException.class) // Then error when sending ack + .verify(); + } + + private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest() { + return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setInitialRequest( + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) + .build() + ) + .build(); + } + + private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(String eventId, String data) { + + try { + byte[] serializedData = objectSerializer.serialize(data); + return DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(eventId) + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) + .setData(ByteString.copyFrom(serializedData)) + .build() + ) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static class TestEvent { + public String name; + public int value; + + public TestEvent() { + } + + public TestEvent(String name, int value) { + this.name = name; + this.value = value; + } + } +}