Skip to content

Commit

Permalink
Merge pull request #75 from mkouba/issue-74-auto-ping
Browse files Browse the repository at this point in the history
core: introduce auto ping feature
  • Loading branch information
mkouba authored Jan 20, 2025
2 parents 03907e6 + a07dc42 commit 13c6830
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,56 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@Singleton
public class ConnectionManager {

private ConcurrentMap<String, McpConnectionBase> connections = new ConcurrentHashMap<>();
@Inject
Vertx vertx;

private final ConcurrentMap<String, ConnectionTimerId> connections = new ConcurrentHashMap<>();

// TODO we might need to extract this in a global component in the future
private final AtomicInteger idGenerator = new AtomicInteger();

public McpConnectionBase get(String id) {
return connections.get(id);
ConnectionTimerId connectionTimerId = connections.get(id);
return connectionTimerId != null ? connectionTimerId.connection() : null;
}

public void add(McpConnectionBase connection) {
connections.put(connection.id(), connection);
Long timerId = null;
if (connection.autoPingInterval().isPresent()) {
timerId = vertx.setPeriodic(connection.autoPingInterval().get().toMillis(), new Handler<Long>() {
@Override
public void handle(Long timerId) {
connection.send(Messages.newPing(idGenerator.incrementAndGet()));
}
});
}
connections.put(connection.id(), new ConnectionTimerId(connection, timerId));

}

public boolean remove(String id) {
return connections.remove(id) != null;
ConnectionTimerId connection = connections.remove(id);
if (connection != null) {
if (connection.timerId() != null) {
vertx.cancelTimer(connection.timerId());
}
return true;
}
return false;
}

record ConnectionTimerId(McpConnectionBase connection, Long timerId) {
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkiverse.mcp.server.runtime;

import static io.quarkiverse.mcp.server.runtime.Messages.isResponse;

import io.vertx.core.json.JsonObject;

public class JsonRPC {
Expand All @@ -21,9 +23,11 @@ public static boolean validate(JsonObject message, Responder responder) {
responder.sendError(id, INVALID_REQUEST, "Invalid jsonrpc version: " + jsonrpc);
return false;
}
if (message.getString("method") == null) {
responder.sendError(id, METHOD_NOT_FOUND, "Method not set");
return false;
if (!isResponse(message)) {
if (message.getString("method") == null) {
responder.sendError(id, METHOD_NOT_FOUND, "Method not set");
return false;
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
package io.quarkiverse.mcp.server.runtime;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import io.quarkiverse.mcp.server.InitializeRequest;
import io.quarkiverse.mcp.server.McpConnection;
import io.quarkiverse.mcp.server.McpLog.LogLevel;

public abstract class McpConnectionBase implements McpConnection {
public abstract class McpConnectionBase implements McpConnection, Responder {

private final String id;
protected final String id;

private final AtomicReference<Status> status;
protected final AtomicReference<Status> status;

private final AtomicReference<InitializeRequest> initializeRequest;
protected final AtomicReference<InitializeRequest> initializeRequest;

private final AtomicReference<LogLevel> logLevel;
protected final AtomicReference<LogLevel> logLevel;

protected McpConnectionBase(String id, LogLevel defaultLogLevel) {
protected final TrafficLogger trafficLogger;

protected final Optional<Duration> autoPingInterval;

protected McpConnectionBase(String id, LogLevel defaultLogLevel, TrafficLogger trafficLogger,
Optional<Duration> autoPingInterval) {
this.id = id;
this.status = new AtomicReference<>(Status.NEW);
this.initializeRequest = new AtomicReference<>();
this.logLevel = new AtomicReference<>(defaultLogLevel);
this.trafficLogger = trafficLogger;
this.autoPingInterval = autoPingInterval;
}

@Override
Expand Down Expand Up @@ -59,4 +68,12 @@ public boolean setInitialized() {
return status.compareAndSet(Status.INITIALIZING, Status.IN_OPERATION);
}

public TrafficLogger trafficLogger() {
return trafficLogger;
}

public Optional<Duration> autoPingInterval() {
return autoPingInterval;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ protected McpMessageHandler(McpRuntimeConfig config, ConnectionManager connectio
}

public void handle(JsonObject message, McpConnection connection, Responder responder) {
switch (connection.status()) {
case NEW -> initializeNew(message, responder, connection);
case INITIALIZING -> initializing(message, responder, connection);
case IN_OPERATION -> operation(message, responder, connection);
case SHUTDOWN -> responder.send(
Messages.newError(message.getValue("id"), JsonRPC.INTERNAL_ERROR, "Connection was already shut down"));
if (Messages.isResponse(message)) {
// Reponse from a client
// Currently we discard all responses, including pong responses
LOG.debugf("Discard client response: %s", message);
} else {
switch (connection.status()) {
case NEW -> initializeNew(message, responder, connection);
case INITIALIZING -> initializing(message, responder, connection);
case IN_OPERATION -> operation(message, responder, connection);
case SHUTDOWN -> responder.send(
Messages.newError(message.getValue("id"), JsonRPC.INTERNAL_ERROR, "Connection was already shut down"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

public class Messages {

static JsonObject newResult(Object id, Object result) {
public static JsonObject newResult(Object id, Object result) {
JsonObject response = new JsonObject();
response.put("jsonrpc", JsonRPC.VERSION);
response.put("id", id);
response.put("result", result);
return response;
}

static JsonObject newError(Object id, int code, String message) {
public static JsonObject newError(Object id, int code, String message) {
JsonObject response = new JsonObject();
response.put("jsonrpc", JsonRPC.VERSION);
response.put("id", id);
Expand All @@ -22,11 +22,22 @@ static JsonObject newError(Object id, int code, String message) {
return response;
}

static JsonObject newNotification(String method, Object params) {
public static JsonObject newNotification(String method, Object params) {
return new JsonObject()
.put("jsonrpc", JsonRPC.VERSION)
.put("method", method)
.put("params", params);
}

public static JsonObject newPing(Object id) {
return new JsonObject()
.put("jsonrpc", JsonRPC.VERSION)
.put("id", id)
.put("method", "ping");
}

public static boolean isResponse(JsonObject message) {
return message.containsKey("result") && message.containsKey("error");
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkiverse.mcp.server.runtime.config;

import java.time.Duration;
import java.util.Optional;

import io.quarkiverse.mcp.server.McpLog.LogLevel;
Expand Down Expand Up @@ -30,6 +31,13 @@ public interface McpRuntimeConfig {
*/
ClientLogging clientLogging();

/**
* The interval after which, when set, the server sends a ping message to the connected client automatically.
* <p>
* Ping messages are not sent automatically by default.
*/
Optional<Duration> autoPingInterval();

public interface TrafficLogging {

/**
Expand Down
38 changes: 38 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-mcp-server-core.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,43 @@ endif::add-copy-button-to-env-var[]
a|`debug`, `info`, `notice`, `warning`, `error`, `critical`, `alert`, `emergency`
|`info`

a| [[quarkus-mcp-server-core_quarkus-mcp-server-auto-ping-interval]] [.property-path]##link:#quarkus-mcp-server-core_quarkus-mcp-server-auto-ping-interval[`quarkus.mcp.server.auto-ping-interval`]##

[.description]
--
The interval after which, when set, the server sends a ping message to the connected client automatically.

Ping messages are not sent automatically by default.


ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_MCP_SERVER_AUTO_PING_INTERVAL+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_MCP_SERVER_AUTO_PING_INTERVAL+++`
endif::add-copy-button-to-env-var[]
--
|link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html[Duration] link:#duration-note-anchor-quarkus-mcp-server-core_quarkus-mcp[icon:question-circle[title=More information about the Duration format]]
|

|===

ifndef::no-duration-note[]
[NOTE]
[id=duration-note-anchor-quarkus-mcp-server-core_quarkus-mcp]
.About the Duration format
====
To write duration values, use the standard `java.time.Duration` format.
See the link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html#parse(java.lang.CharSequence)[Duration#parse() Java API documentation] for more information.

You can also use a simplified format, starting with a number:

* If the value is only a number, it represents time in seconds.
* If the value is a number followed by `ms`, it represents time in milliseconds.
In other cases, the simplified format is translated to the `java.time.Duration` format for parsing:

* If the value is a number followed by `h`, `m`, or `s`, it is prefixed with `PT`.
* If the value is a number followed by `d`, it is prefixed with `P`.
====
endif::no-duration-note[]
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,46 @@ ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_CLIENT_LOGGING_DEFAULT_LEVEL+++`
endif::add-copy-button-to-env-var[]
--
a|`debug`, `info`, `notice`, `warning`, `error`, `critical`, `alert`, `emergency`
a|`alert`, `critical`, `debug`, `emergency`, `error`, `info`, `notice`, `warning`
|`info`

a|icon:lock[title=Fixed at build time] [[quarkus-mcp-server-core_quarkus-auto-ping-interval]] [.property-path]##link:#quarkus-mcp-server-core_quarkus-auto-ping-interval[`quarkus.auto-ping-interval`]##

[.description]
--
The interval after which, when set, the server sends a ping message to the connected client automatically.

Ping messages are not sent automatically by default.


ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_AUTO_PING_INTERVAL+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_AUTO_PING_INTERVAL+++`
endif::add-copy-button-to-env-var[]
--
|link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html[Duration] link:#duration-note-anchor-quarkus-mcp-server-core_quarkus[icon:question-circle[title=More information about the Duration format]]
|

|===

ifndef::no-duration-note[]
[NOTE]
[id=duration-note-anchor-quarkus-mcp-server-core_quarkus]
.About the Duration format
====
To write duration values, use the standard `java.time.Duration` format.
See the link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html#parse(java.lang.CharSequence)[Duration#parse() Java API documentation] for more information.

You can also use a simplified format, starting with a number:

* If the value is only a number, it represents time in seconds.
* If the value is a number followed by `ms`, it represents time in milliseconds.
In other cases, the simplified format is translated to the `java.time.Duration` format for parsing:

* If the value is a number followed by `h`, `m`, or `s`, it is prefixed with `PT`.
* If the value is a number followed by `d`, it is prefixed with `P`.
====
endif::no-duration-note[]
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,43 @@ endif::add-copy-button-to-env-var[]
a|`debug`, `info`, `notice`, `warning`, `error`, `critical`, `alert`, `emergency`
|`info`

a| [[quarkus-mcp-server-core_quarkus-mcp-server-auto-ping-interval]] [.property-path]##link:#quarkus-mcp-server-core_quarkus-mcp-server-auto-ping-interval[`quarkus.mcp.server.auto-ping-interval`]##

[.description]
--
The interval after which, when set, the server sends a ping message to the connected client automatically.

Ping messages are not sent automatically by default.


ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_MCP_SERVER_AUTO_PING_INTERVAL+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_MCP_SERVER_AUTO_PING_INTERVAL+++`
endif::add-copy-button-to-env-var[]
--
|link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html[Duration] link:#duration-note-anchor-quarkus-mcp-server-core_quarkus-mcp[icon:question-circle[title=More information about the Duration format]]
|

|===

ifndef::no-duration-note[]
[NOTE]
[id=duration-note-anchor-quarkus-mcp-server-core_quarkus-mcp]
.About the Duration format
====
To write duration values, use the standard `java.time.Duration` format.
See the link:https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/Duration.html#parse(java.lang.CharSequence)[Duration#parse() Java API documentation] for more information.

You can also use a simplified format, starting with a number:

* If the value is only a number, it represents time in seconds.
* If the value is a number followed by `ms`, it represents time in milliseconds.
In other cases, the simplified format is translated to the `java.time.Duration` format for parsing:

* If the value is a number followed by `h`, `m`, or `s`, it is prefixed with `PT`.
* If the value is a number followed by `d`, it is prefixed with `P`.
====
endif::no-duration-note[]
Loading

0 comments on commit 13c6830

Please sign in to comment.