diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java index 9ef2855b..cfe9a007 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java @@ -1016,6 +1016,111 @@ void testLoggingNotification(String clientType) throws InterruptedException { mcpServer.close(); } + // --------------------------------------- + // Progress Tests + // --------------------------------------- + @ParameterizedTest(name = "{0} : {displayName} ") + @ValueSource(strings = { "httpclient", "webflux" }) + void testProgressNotification(String clientType) throws InterruptedException { + int expectedNotificationsCount = 4; // 3 notifications + 1 for another progress + // token + CountDownLatch latch = new CountDownLatch(expectedNotificationsCount); + // Create a list to store received logging notifications + List receivedNotifications = new CopyOnWriteArrayList<>(); + + var clientBuilder = clientBuilders.get(clientType); + + // Create server with a tool that sends logging notifications + McpServerFeatures.AsyncToolSpecification tool = McpServerFeatures.AsyncToolSpecification.builder() + .tool(McpSchema.Tool.builder() + .name("progress-test") + .description("Test progress notifications") + .inputSchema(emptyJsonSchema) + .build()) + .callHandler((exchange, request) -> { + + // Create and send notifications + var progressToken = (String) request.meta().get("progressToken"); + + return exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.0, 1.0, "Processing started")) + .then(exchange.progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Processing data"))) + .then(// Send a progress notification with another progress value + // should + exchange.progressNotification(new McpSchema.ProgressNotification("another-progress-token", + 0.0, 1.0, "Another processing started"))) + .then(exchange.progressNotification( + new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Processing completed"))) + .thenReturn(new CallToolResult(("Progress test completed"), false)); + }) + .build(); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().tools(true).build()) + .tools(tool) + .build(); + + try ( + // Create client with progress notification handler + var mcpClient = clientBuilder.progressConsumer(notification -> { + receivedNotifications.add(notification); + latch.countDown(); + }).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Call the tool that sends progress notifications + McpSchema.CallToolRequest callToolRequest = McpSchema.CallToolRequest.builder() + .name("progress-test") + .meta(Map.of("progressToken", "test-progress-token")) + .build(); + CallToolResult result = mcpClient.callTool(callToolRequest); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Progress test completed"); + + assertThat(latch.await(5, TimeUnit.SECONDS)).as("Should receive notifications in reasonable time").isTrue(); + + // Should have received 3 notifications + assertThat(receivedNotifications).hasSize(expectedNotificationsCount); + + Map notificationMap = receivedNotifications.stream() + .collect(Collectors.toMap(n -> n.message(), n -> n)); + + // First notification should be 0.0/1.0 progress + assertThat(notificationMap.get("Processing started").progressToken()).isEqualTo("test-progress-token"); + assertThat(notificationMap.get("Processing started").progress()).isEqualTo(0.0); + assertThat(notificationMap.get("Processing started").total()).isEqualTo(1.0); + assertThat(notificationMap.get("Processing started").message()).isEqualTo("Processing started"); + + // Second notification should be 0.5/1.0 progress + assertThat(notificationMap.get("Processing data").progressToken()).isEqualTo("test-progress-token"); + assertThat(notificationMap.get("Processing data").progress()).isEqualTo(0.5); + assertThat(notificationMap.get("Processing data").total()).isEqualTo(1.0); + assertThat(notificationMap.get("Processing data").message()).isEqualTo("Processing data"); + + // Third notification should be another progress token with 0.0/1.0 progress + assertThat(notificationMap.get("Another processing started").progressToken()) + .isEqualTo("another-progress-token"); + assertThat(notificationMap.get("Another processing started").progress()).isEqualTo(0.0); + assertThat(notificationMap.get("Another processing started").total()).isEqualTo(1.0); + assertThat(notificationMap.get("Another processing started").message()) + .isEqualTo("Another processing started"); + + // Fourth notification should be 1.0/1.0 progress + assertThat(notificationMap.get("Processing completed").progressToken()).isEqualTo("test-progress-token"); + assertThat(notificationMap.get("Processing completed").progress()).isEqualTo(1.0); + assertThat(notificationMap.get("Processing completed").total()).isEqualTo(1.0); + assertThat(notificationMap.get("Processing completed").message()).isEqualTo("Processing completed"); + } + mcpServer.close(); + } + // --------------------------------------- // Completion Tests // --------------------------------------- diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index cf8142c6..9e861deb 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -100,6 +100,9 @@ public class McpAsyncClient { public static final TypeReference LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeReference<>() { }; + public static final TypeReference PROGRESS_NOTIFICATION_TYPE_REF = new TypeReference<>() { + }; + /** * Client capabilities. */ @@ -253,6 +256,16 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, asyncLoggingNotificationHandler(loggingConsumersFinal)); + // Utility Progress Notification + List>> progressConsumersFinal = new ArrayList<>(); + progressConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); + if (!Utils.isEmpty(features.progressConsumers())) { + progressConsumersFinal.addAll(features.progressConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, + asyncProgressNotificationHandler(progressConsumersFinal)); + this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, List.of(McpSchema.LATEST_PROTOCOL_VERSION), initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers, @@ -846,6 +859,28 @@ public Mono setLoggingLevel(LoggingLevel loggingLevel) { }); } + /** + * Create a notification handler for progress notifications from the server. This + * handler automatically distributes progress notifications to all registered + * consumers. + * @param progressConsumers List of consumers that will be notified when a progress + * message is received. Each consumer receives the progress notification. + * @return A NotificationHandler that processes progress notifications by distributing + * the message to all registered consumers + */ + private NotificationHandler asyncProgressNotificationHandler( + List>> progressConsumers) { + + return params -> { + McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, + PROGRESS_NOTIFICATION_TYPE_REF); + + return Flux.fromIterable(progressConsumers) + .flatMap(consumer -> consumer.apply(progressNotification)) + .then(); + }; + } + /** * This method is package-private and used for test only. Should not be called by user * code. diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index d8925b00..c8af28ac 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -177,6 +177,8 @@ class SyncSpec { private final List> loggingConsumers = new ArrayList<>(); + private final List> progressConsumers = new ArrayList<>(); + private Function samplingHandler; private Function elicitationHandler; @@ -377,6 +379,36 @@ public SyncSpec loggingConsumers(List progressConsumer) { + Assert.notNull(progressConsumer, "Progress consumer must not be null"); + this.progressConsumers.add(progressConsumer); + return this; + } + + /** + * Adds a multiple consumers to be notified of progress notifications from the + * server. This allows the client to track long-running operations and provide + * feedback to users. + * @param progressConsumers A list of consumers that receives progress + * notifications. Must not be null. + * @return This builder instance for method chaining + * @throws IllegalArgumentException if progressConsumer is null + */ + public SyncSpec progressConsumers(List> progressConsumers) { + Assert.notNull(progressConsumers, "Progress consumers must not be null"); + this.progressConsumers.addAll(progressConsumers); + return this; + } + /** * Create an instance of {@link McpSyncClient} with the provided configurations or * sensible defaults. @@ -385,7 +417,8 @@ public SyncSpec loggingConsumers(List>> loggingConsumers = new ArrayList<>(); + private final List>> progressConsumers = new ArrayList<>(); + private Function> samplingHandler; private Function> elicitationHandler; @@ -654,6 +689,37 @@ public AsyncSpec loggingConsumers( return this; } + /** + * Adds a consumer to be notified of progress notifications from the server. This + * allows the client to track long-running operations and provide feedback to + * users. + * @param progressConsumer A consumer that receives progress notifications. Must + * not be null. + * @return This builder instance for method chaining + * @throws IllegalArgumentException if progressConsumer is null + */ + public AsyncSpec progressConsumer(Function> progressConsumer) { + Assert.notNull(progressConsumer, "Progress consumer must not be null"); + this.progressConsumers.add(progressConsumer); + return this; + } + + /** + * Adds a multiple consumers to be notified of progress notifications from the + * server. This allows the client to track long-running operations and provide + * feedback to users. + * @param progressConsumers A list of consumers that receives progress + * notifications. Must not be null. + * @return This builder instance for method chaining + * @throws IllegalArgumentException if progressConsumer is null + */ + public AsyncSpec progressConsumers( + List>> progressConsumers) { + Assert.notNull(progressConsumers, "Progress consumers must not be null"); + this.progressConsumers.addAll(progressConsumers); + return this; + } + /** * Create an instance of {@link McpAsyncClient} with the provided configurations * or sensible defaults. @@ -663,8 +729,8 @@ public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, - this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, - this.elicitationHandler)); + this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, + this.samplingHandler, this.elicitationHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index bd1a0985..3b655076 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -59,6 +59,7 @@ class McpClientFeatures { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -68,6 +69,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -79,6 +81,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -89,6 +92,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -106,10 +110,28 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } + /** + * @deprecated Only exists for backwards-compatibility purposes. + */ + public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, + Map roots, + List, Mono>> toolsChangeConsumers, + List, Mono>> resourcesChangeConsumers, + List, Mono>> resourcesUpdateConsumers, + List, Mono>> promptsChangeConsumers, + List>> loggingConsumers, + Function> samplingHandler, + Function> elicitationHandler) { + this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers, + resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler, + elicitationHandler); + } + /** * Convert a synchronous specification into an asynchronous one and provide * blocking code offloading to prevent accidental blocking of the non-blocking @@ -149,6 +171,12 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } + List>> progressConsumers = new ArrayList<>(); + for (Consumer consumer : syncSpec.progressConsumers()) { + progressConsumers.add(l -> Mono.fromRunnable(() -> consumer.accept(l)) + .subscribeOn(Schedulers.boundedElastic())); + } + Function> samplingHandler = r -> Mono .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); @@ -159,7 +187,7 @@ public static Async fromSync(Sync syncSpec) { return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, - loggingConsumers, samplingHandler, elicitationHandler); + loggingConsumers, progressConsumers, samplingHandler, elicitationHandler); } } @@ -174,6 +202,7 @@ public static Async fromSync(Sync syncSpec) { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -183,6 +212,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -196,6 +226,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -205,6 +236,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -222,9 +254,26 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } + + /** + * @deprecated Only exists for backwards-compatibility purposes. + */ + public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities, + Map roots, List>> toolsChangeConsumers, + List>> resourcesChangeConsumers, + List>> resourcesUpdateConsumers, + List>> promptsChangeConsumers, + List> loggingConsumers, + Function samplingHandler, + Function elicitationHandler) { + this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers, + resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler, + elicitationHandler); + } } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java index e56c695f..c0923e10 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java @@ -177,6 +177,19 @@ public Mono loggingNotification(LoggingMessageNotification loggingMessageN }); } + /** + * Sends a notification to the client that the current progress status has changed for + * long-running operations. + * @param progressNotification The progress notification to send + * @return A Mono that completes when the notification has been sent + */ + public Mono progressNotification(McpSchema.ProgressNotification progressNotification) { + if (progressNotification == null) { + return Mono.error(new McpError("Progress notification must not be null")); + } + return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_PROGRESS, progressNotification); + } + /** * Sends a ping request to the client. * @return A Mono that completes with clients's ping response diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java index 3ce599c8..12edfb34 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java @@ -209,7 +209,7 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se * represents a specific capability. * * @param tool The tool definition including name, description, and parameter schema - * @param call Deprecated. Uset he {@link AsyncToolSpecification#callHandler} instead. + * @param call Deprecated. Use the {@link AsyncToolSpecification#callHandler} instead. * @param callHandler The function that implements the tool's logic, receiving a * {@link McpAsyncServerExchange} and a * {@link io.modelcontextprotocol.spec.McpSchema.CallToolRequest} and returning diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java index 4b9a3777..dad1e4c1 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java @@ -107,6 +107,15 @@ public void loggingNotification(LoggingMessageNotification loggingMessageNotific this.exchange.loggingNotification(loggingMessageNotification).block(); } + /** + * Sends a notification to the client that the current progress status has changed for + * long-running operations. + * @param progressNotification The progress notification to send + */ + public void progressNotification(McpSchema.ProgressNotification progressNotification) { + this.exchange.progressNotification(progressNotification).block(); + } + /** * Sends a synchronous ping request to the client. * @return diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 4a570aea..4b003e81 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -60,6 +60,8 @@ private McpSchema() { public static final String METHOD_PING = "ping"; + public static final String METHOD_NOTIFICATION_PROGRESS = "notifications/progress"; + // Tool Methods public static final String METHOD_TOOLS_LIST = "tools/list"; @@ -1242,6 +1244,8 @@ private static JsonSchema parseSchema(String schema) { * tools/list. * @param arguments Arguments to pass to the tool. These must conform to the tool's * input schema. + * @param meta Optional metadata about the request. This can include additional + * information like `progressToken` */ @JsonInclude(JsonInclude.Include.NON_ABSENT) @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java index 66f33fb6..d8ff4c44 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java @@ -1020,6 +1020,80 @@ void testLoggingNotification() { mcpServer.close(); } + // --------------------------------------- + // Progress Tests + // --------------------------------------- + @Test + void testProgressNotification() { + // Create a list to store received progress notifications + List receivedNotifications = new ArrayList<>(); + + // Create server with a tool that sends progress notifications + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + McpSchema.Tool.builder() + .name("progress-test") + .description("Test progress notifications") + .inputSchema(emptyJsonSchema) + .build(), + null, (exchange, request) -> { + var progressToken = (String) request.meta().get("progressToken"); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.1, 1.0, "Test progress 1/10")) + .block(); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Test progress 5/10")) + .block(); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Test progress 10/10")) + .block(); + + return Mono.just(new CallToolResult("Progress test completed", false)); + }); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().logging().tools(true).build()) + .tools(tool) + .build(); + try ( + // Create client with progress notification handler + var mcpClient = clientBuilder.progressConsumer(receivedNotifications::add).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Call the tool that sends progress notifications + CallToolResult result = mcpClient.callTool( + new McpSchema.CallToolRequest("progress-test", Map.of(), Map.of("progressToken", "test-token"))); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Progress test completed"); + + // Wait for notifications to be processed + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + + System.out.println("Received notifications: " + receivedNotifications); + + // Should have received 3 notifications + assertThat(receivedNotifications).hasSize(3); + + // Check the progress notifications + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progressToken)) + .containsExactlyInAnyOrder("test-token", "test-token", "test-token"); + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progress)) + .containsExactlyInAnyOrder(0.1, 0.5, 1.0); + }); + } + mcpServer.close(); + } + // --------------------------------------- // Ping Tests // ---------------------------------------