diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f70936c7e..1e130db9d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -63,8 +63,24 @@ jobs:
with:
java-version: ${{ matrix.java-version }}
distribution: 'adopt'
+ - name: Install python
+ uses: actions/setup-python@v4
+ with:
+ python-version: '3.11'
+ - name: Install uv
+ uses: astral-sh/setup-uv@v4
+ with:
+ version: "latest"
+ - name: Install Python dependencies for MCP tests
+ run: |
+ cd python
+ uv sync --all-extras
- name: Run Java Tests
- run: tools/ut.sh -j
+ run: |
+ # Add Python venv to PATH so Java tests can find MCP dependencies
+ export PATH="${{ github.workspace }}/python/.venv/bin:$PATH"
+ export PYTHONPATH="${{ github.workspace }}/python/.venv/lib/python3.11/site-packages:$PYTHONPATH"
+ tools/ut.sh -j
python_tests:
name: ut-python [${{ matrix.os }}] [python-${{ matrix.python-version}}]
diff --git a/api/pom.xml b/api/pom.xml
index a540361f6..9f5203316 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -65,4 +65,26 @@ under the License.
+
+
+ java-11
+
+ [11,17)
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org/apache/flink/agents/api/annotation/MCPServer.java
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java b/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
index 37a5d5327..506b2848b 100644
--- a/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
+++ b/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
@@ -91,7 +91,7 @@ public ReActAgent(
"Output schema must be RowTypeInfo or Pojo class.");
}
Prompt schemaPrompt =
- new Prompt(
+ Prompt.fromText(
String.format(
"The final response should be json format, and match the schema %s",
jsonSchema));
diff --git a/api/src/main/java/org/apache/flink/agents/api/annotation/MCPServer.java b/api/src/main/java/org/apache/flink/agents/api/annotation/MCPServer.java
new file mode 100644
index 000000000..f427c96c3
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/annotation/MCPServer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to mark a method as an MCP server resource that should be managed by the agent plan.
+ *
+ *
Methods annotated with @MCPServer will be scanned during agent plan creation. The agent plan
+ * will automatically:
+ *
+ *
+ *
Discover all tools exposed by the MCP server via {@code listTools()}
+ *
Discover all prompts exposed by the MCP server via {@code listPrompts()}
+ *
Register each tool and prompt as individual resources
+ *
Close the MCP server connection after discovery
+ *
+ *
+ *
Example usage:
+ *
+ *
{@code
+ * public class MyAgent extends Agent {
+ * @MCPServer
+ * public static MCPServer myMcpServer() {
+ * return MCPServer.builder("http://localhost:8000/mcp")
+ * .timeout(Duration.ofSeconds(30))
+ * .build();
+ * }
+ *
+ * @ChatModelSetup
+ * public static ChatModel chatModel() {
+ * return new ChatModel.Builder()
+ * .prompt("greeting") // MCP prompt from server
+ * .tools(List.of("calculator")) // MCP tool from server
+ * .build();
+ * }
+ * }
+ * }
+ *
+ *
This is the Java equivalent of Python's {@code @mcp_server} decorator.
+ *
+ * @see org.apache.flink.agents.integrations.mcp.MCPServer
+ * @see org.apache.flink.agents.integrations.mcp.MCPTool
+ * @see org.apache.flink.agents.integrations.mcp.MCPPrompt
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MCPServer {}
diff --git a/api/src/main/java/org/apache/flink/agents/api/prompt/Prompt.java b/api/src/main/java/org/apache/flink/agents/api/prompt/Prompt.java
index bb0be8bb8..28651c286 100644
--- a/api/src/main/java/org/apache/flink/agents/api/prompt/Prompt.java
+++ b/api/src/main/java/org/apache/flink/agents/api/prompt/Prompt.java
@@ -25,6 +25,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.ArrayList;
@@ -33,66 +34,85 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
- * Prompt for a language model
+ * Abstract base class for prompts.
*
- *
The template can be either a string or a sequence of ChatMessage objects.
+ *
This is the base class for all prompt implementations in Flink Agents. Subclasses must
+ * implement the formatting methods to generate text or messages from templates.
+ *
+ *
Common implementations:
+ *
+ *
+ *
Template-based prompts with placeholders (via {@link #fromText} or {@link #fromMessages})
+ *
+ *
+ * @see org.apache.flink.agents.integrations.mcp.MCPPrompt
*/
-public class Prompt extends SerializableResource {
- private static final String FIELD_TEMPLATE = "template";
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({@JsonSubTypes.Type(value = Prompt.LocalPrompt.class, name = "LocalPrompt")})
+public abstract class Prompt extends SerializableResource {
- @JsonProperty(FIELD_TEMPLATE)
- private final PromptTemplate template;
-
- @JsonCreator
- private Prompt(@JsonProperty(FIELD_TEMPLATE) PromptTemplate promptTemplate) {
- this.template = promptTemplate;
- }
-
- public Prompt(String template) {
- this.template = PromptTemplate.fromString(template);
+ /**
+ * Create a prompt from a text string template.
+ *
+ * @param text The text template with placeholders like {variable}
+ * @return A Prompt instance
+ */
+ public static Prompt fromText(String text) {
+ return new LocalPrompt(text);
}
- public Prompt(List template) {
- this.template = PromptTemplate.fromMessages(template);
+ /**
+ * Create a prompt from a sequence of chat messages.
+ *
+ * @param messages The list of chat messages forming the prompt template
+ * @return A Prompt instance
+ */
+ public static Prompt fromMessages(List messages) {
+ return new LocalPrompt(messages);
}
- public String formatString(Map kwargs) {
- return template.match(
- // Handle string template
- content -> format(content, kwargs),
- // Handle messages template
- messages -> {
- List formattedMessages = new ArrayList<>();
- for (ChatMessage message : messages) {
- String formattedContent = format(message.getContent(), kwargs);
- String formatted = message.getRole().getValue() + ": " + formattedContent;
- formattedMessages.add(formatted);
- }
- return String.join("\n", formattedMessages);
- });
- }
+ /**
+ * Generate a text string from the prompt template with additional arguments.
+ *
+ * @param kwargs Key-value pairs to substitute in the template
+ * @return The formatted prompt as a string
+ */
+ public abstract String formatString(Map kwargs);
- public List formatMessages(MessageRole defaultRole, Map kwargs) {
- return template.match(
- // Handle string template
- content ->
- new ArrayList<>(
- Collections.singletonList(
- new ChatMessage(defaultRole, format(content, kwargs)))),
- // Handle messages template
- messages ->
- messages.stream()
- .map(
- message ->
- new ChatMessage(
- message.getRole(),
- format(message.getContent(), kwargs)))
- .collect(Collectors.toList()));
- }
+ /**
+ * Generate a list of ChatMessage from the prompt template with additional arguments.
+ *
+ * @param defaultRole The default message role (usually SYSTEM)
+ * @param kwargs Key-value pairs to substitute in the template
+ * @return List of formatted chat messages
+ */
+ public abstract List formatMessages(
+ MessageRole defaultRole, Map kwargs);
@JsonIgnore
@Override
@@ -100,98 +120,165 @@ public ResourceType getResourceType() {
return ResourceType.PROMPT;
}
- private static final Pattern BRACE_PATTERN = Pattern.compile("\\{([^}]+)\\}");
+ /**
+ * Local prompt implementation for language models.
+ *
+ *
This prompt implementation uses a local template that can be either a string or a sequence
+ * of ChatMessage objects. The template supports placeholder substitution using {variable}
+ * syntax.
+ *
+ *
While this class is public, users should prefer using {@link Prompt#fromText(String)} or
+ * {@link Prompt#fromMessages(List)} factory methods to create prompt instances instead of
+ * directly instantiating this class.
+ */
+ public static class LocalPrompt extends Prompt {
+ private static final String FIELD_TEMPLATE = "template";
- /** Format template string with keyword arguments */
- private static String format(String template, Map kwargs) {
- if (template == null) {
- return "";
- }
+ @JsonProperty(FIELD_TEMPLATE)
+ private final transient PromptTemplate template;
- String result = template;
- for (Map.Entry entry : kwargs.entrySet()) {
- String placeholder = "{" + entry.getKey() + "}";
- String value = entry.getValue() != null ? entry.getValue() : "";
- result = result.replace(placeholder, value);
+ @JsonCreator
+ private LocalPrompt(@JsonProperty(FIELD_TEMPLATE) PromptTemplate promptTemplate) {
+ this.template = promptTemplate;
}
- return result;
- }
- @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
- private abstract static class PromptTemplate {
- public static PromptTemplate fromString(String content) {
- return new StringTemplate(content);
+ public LocalPrompt(String template) {
+ this.template = PromptTemplate.fromString(template);
}
- public static PromptTemplate fromMessages(List messages) {
- return new MessagesTemplate(messages);
+ public LocalPrompt(List template) {
+ this.template = PromptTemplate.fromMessages(template);
}
- /**
- * Pattern matching method for type-safe operations. This replaces instanceof checks and
- * casting.
- */
- public abstract T match(
- Function onString, Function, T> onMessages);
- }
+ @Override
+ public String formatString(Map kwargs) {
+ return template.match(
+ // Handle string template
+ content -> format(content, kwargs),
+ // Handle messages template
+ messages -> {
+ List formattedMessages = new ArrayList<>();
+ for (ChatMessage message : messages) {
+ String formattedContent = format(message.getContent(), kwargs);
+ String formatted =
+ message.getRole().getValue() + ": " + formattedContent;
+ formattedMessages.add(formatted);
+ }
+ return String.join("\n", formattedMessages);
+ });
+ }
- /** String template implementation. */
- private static class StringTemplate extends PromptTemplate {
- private static final String FIELD_CONTENT = "content";
+ @Override
+ public List formatMessages(
+ MessageRole defaultRole, Map kwargs) {
+ return template.match(
+ // Handle string template
+ content ->
+ new ArrayList<>(
+ Collections.singletonList(
+ new ChatMessage(defaultRole, format(content, kwargs)))),
+ // Handle messages template
+ messages ->
+ messages.stream()
+ .map(
+ message ->
+ new ChatMessage(
+ message.getRole(),
+ format(message.getContent(), kwargs)))
+ .collect(Collectors.toList()));
+ }
- @JsonProperty(FIELD_CONTENT)
- private final String content;
+ /** Format template string with keyword arguments */
+ private static String format(String template, Map kwargs) {
+ if (template == null) {
+ return "";
+ }
- @JsonCreator
- public StringTemplate(@JsonProperty(FIELD_CONTENT) String content) {
- this.content = Objects.requireNonNull(content, "content cannot be null");
+ String result = template;
+ for (Map.Entry entry : kwargs.entrySet()) {
+ String placeholder = "{" + entry.getKey() + "}";
+ String value = entry.getValue() != null ? entry.getValue() : "";
+ result = result.replace(placeholder, value);
+ }
+ return result;
}
- public String getContent() {
- return content;
- }
+ @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
+ private abstract static class PromptTemplate {
+ public static PromptTemplate fromString(String content) {
+ return new StringTemplate(content);
+ }
- @Override
- public T match(
- Function onString, Function, T> onMessages) {
- return onString.apply(content);
- }
+ public static PromptTemplate fromMessages(List messages) {
+ return new MessagesTemplate(messages);
+ }
- @Override
- public String toString() {
- return "StringTemplate{content='" + content + "'}";
+ /**
+ * Pattern matching method for type-safe operations. This replaces instanceof checks and
+ * casting.
+ */
+ public abstract T match(
+ Function onString, Function, T> onMessages);
}
- }
- /** Messages template implementation. */
- private static class MessagesTemplate extends PromptTemplate {
- private static final String FIELD_MESSAGES = "messages";
+ /** String template implementation. */
+ private static class StringTemplate extends PromptTemplate {
+ private static final String FIELD_CONTENT = "content";
- @JsonProperty(FIELD_MESSAGES)
- private final List messages;
+ @JsonProperty(FIELD_CONTENT)
+ private final String content;
- @JsonCreator
- public MessagesTemplate(@JsonProperty(FIELD_MESSAGES) List messages) {
- Objects.requireNonNull(messages, "messages cannot be null");
- if (messages.isEmpty()) {
- throw new IllegalArgumentException("Messages cannot be empty");
+ @JsonCreator
+ public StringTemplate(@JsonProperty(FIELD_CONTENT) String content) {
+ this.content = Objects.requireNonNull(content, "content cannot be null");
}
- this.messages = new ArrayList<>(messages);
- }
- public List getMessages() {
- return new ArrayList<>(messages);
- }
+ public String getContent() {
+ return content;
+ }
- @Override
- public T match(
- Function onString, Function, T> onMessages) {
- return onMessages.apply(new ArrayList<>(messages));
+ @Override
+ public T match(
+ Function onString, Function, T> onMessages) {
+ return onString.apply(content);
+ }
+
+ @Override
+ public String toString() {
+ return "StringTemplate{content='" + content + "'}";
+ }
}
- @Override
- public String toString() {
- return "MessagesTemplate{messages=" + messages.size() + " items}";
+ /** Messages template implementation. */
+ private static class MessagesTemplate extends PromptTemplate {
+ private static final String FIELD_MESSAGES = "messages";
+
+ @JsonProperty(FIELD_MESSAGES)
+ private final List messages;
+
+ @JsonCreator
+ public MessagesTemplate(@JsonProperty(FIELD_MESSAGES) List messages) {
+ Objects.requireNonNull(messages, "messages cannot be null");
+ if (messages.isEmpty()) {
+ throw new IllegalArgumentException("Messages cannot be empty");
+ }
+ this.messages = new ArrayList<>(messages);
+ }
+
+ public List getMessages() {
+ return new ArrayList<>(messages);
+ }
+
+ @Override
+ public T match(
+ Function onString, Function, T> onMessages) {
+ return onMessages.apply(new ArrayList<>(messages));
+ }
+
+ @Override
+ public String toString() {
+ return "MessagesTemplate{messages=" + messages.size() + " items}";
+ }
}
}
}
diff --git a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelTest.java b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelTest.java
index 9a11503d4..65c83152a 100644
--- a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelTest.java
@@ -94,14 +94,14 @@ void setUp() {
null);
// Create simple prompt
- simplePrompt = new Prompt("You are a helpful assistant. User says: {user_input}");
+ simplePrompt = Prompt.fromText("You are a helpful assistant. User says: {user_input}");
// Create conversation prompt
List conversationTemplate =
Arrays.asList(
new ChatMessage(MessageRole.SYSTEM, "You are a helpful AI assistant."),
new ChatMessage(MessageRole.USER, "{user_message}"));
- conversationPrompt = new Prompt(conversationTemplate);
+ conversationPrompt = Prompt.fromMessages(conversationTemplate);
}
@Test
@@ -118,7 +118,7 @@ void testBasicChat() {
// Format the prompt with variables
Prompt formattedPrompt =
- new Prompt(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
+ Prompt.fromMessages(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
ChatMessage response =
chatModel.chat(formattedPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -135,7 +135,8 @@ void testChatWithConversationPrompt() {
variables.put("user_message", "What's the weather like?");
Prompt formattedPrompt =
- new Prompt(conversationPrompt.formatMessages(MessageRole.SYSTEM, variables));
+ Prompt.fromMessages(
+ conversationPrompt.formatMessages(MessageRole.SYSTEM, variables));
ChatMessage response =
chatModel.chat(formattedPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -148,7 +149,7 @@ void testChatWithConversationPrompt() {
@Test
@DisplayName("Test chat with empty prompt")
void testChatWithEmptyPrompt() {
- Prompt emptyPrompt = new Prompt("");
+ Prompt emptyPrompt = Prompt.fromText("");
ChatMessage response =
chatModel.chat(emptyPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -169,7 +170,7 @@ void testChatWithMultipleUserMessages() {
new ChatMessage(
MessageRole.USER, "Second message - this should be the response"));
- Prompt multiPrompt = new Prompt(multipleMessages);
+ Prompt multiPrompt = Prompt.fromMessages(multipleMessages);
ChatMessage response =
chatModel.chat(multiPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -187,7 +188,7 @@ void testChatModelConfiguration() {
variables.put("user_input", "Test message");
Prompt formattedPrompt =
- new Prompt(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
+ Prompt.fromMessages(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
ChatMessage response =
chatModel.chat(formattedPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -199,7 +200,7 @@ void testChatModelConfiguration() {
@DisplayName("Test chat with system-only prompt")
void testChatWithSystemOnlyPrompt() {
Prompt systemOnlyPrompt =
- new Prompt(
+ Prompt.fromMessages(
Arrays.asList(
new ChatMessage(MessageRole.SYSTEM, "System instruction only")));
@@ -218,7 +219,7 @@ void testChatResponseFormat() {
variables.put("user_input", "Format test");
Prompt formattedPrompt =
- new Prompt(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
+ Prompt.fromMessages(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
ChatMessage response =
chatModel.chat(formattedPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
@@ -243,7 +244,7 @@ void testChatWithLongInput() {
variables.put("user_input", longInput.toString());
Prompt formattedPrompt =
- new Prompt(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
+ Prompt.fromMessages(simplePrompt.formatMessages(MessageRole.SYSTEM, variables));
ChatMessage response =
chatModel.chat(formattedPrompt.formatMessages(MessageRole.USER, new HashMap<>()));
diff --git a/api/src/test/java/org/apache/flink/agents/api/prompt/PromptTest.java b/api/src/test/java/org/apache/flink/agents/api/prompt/PromptTest.java
index 454c496f4..ddd1852ec 100644
--- a/api/src/test/java/org/apache/flink/agents/api/prompt/PromptTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/prompt/PromptTest.java
@@ -53,7 +53,7 @@ void setUp() {
String textTemplate =
"You are a product review analyzer, please generate a score and the dislike reasons "
+ "(if any) for the review. The product {product_id} is {description}, and user review is '{review}'.";
- textPrompt = new Prompt(textTemplate);
+ textPrompt = Prompt.fromText(textTemplate);
// Create message-based prompt template
List messageTemplate =
@@ -65,7 +65,7 @@ void setUp() {
new ChatMessage(
MessageRole.USER,
"The product {product_id} is {description}, and user review is '{review}'."));
- messagesPrompt = new Prompt(messageTemplate);
+ messagesPrompt = Prompt.fromMessages(messageTemplate);
// Set up test variables
variables = new HashMap<>();
@@ -170,7 +170,7 @@ void testPromptResourceType() {
@Test
@DisplayName("Test empty prompt")
void testEmptyPrompt() {
- Prompt emptyPrompt = new Prompt("");
+ Prompt emptyPrompt = Prompt.fromText("");
String result = emptyPrompt.formatString(new HashMap<>());
assertEquals("", result);
@@ -183,7 +183,7 @@ void testEmptyPrompt() {
@DisplayName("Test prompt with special characters")
void testPromptWithSpecialCharacters() {
String specialTemplate = "Handle special chars: {text} with symbols like @#$%^&*()";
- Prompt specialPrompt = new Prompt(specialTemplate);
+ Prompt specialPrompt = Prompt.fromText(specialTemplate);
Map specialVars = new HashMap<>();
specialVars.put("text", "Hello & Welcome!");
@@ -197,7 +197,7 @@ void testPromptWithSpecialCharacters() {
@DisplayName("Test prompt with nested braces")
void testPromptWithNestedBraces() {
String nestedTemplate = "JSON example: {{\"key\": \"{value}\"}}";
- Prompt nestedPrompt = new Prompt(nestedTemplate);
+ Prompt nestedPrompt = Prompt.fromText(nestedTemplate);
Map nestedVars = new HashMap<>();
nestedVars.put("value", "test");
@@ -220,7 +220,7 @@ void testComplexConversationPrompt() {
"I'd be happy to help with {task}. Let me know what specifically you need."),
new ChatMessage(MessageRole.USER, "{user_request}"));
- Prompt conversationPrompt = new Prompt(conversationTemplate);
+ Prompt conversationPrompt = Prompt.fromMessages(conversationTemplate);
Map conversationVars = new HashMap<>();
conversationVars.put("assistant_type", "an AI assistant");
diff --git a/docs/content/docs/development/prompts.md b/docs/content/docs/development/prompts.md
index 17ffe3cc9..591fabf8e 100644
--- a/docs/content/docs/development/prompts.md
+++ b/docs/content/docs/development/prompts.md
@@ -103,7 +103,7 @@ String PRODUCT_SUGGESTION_PROMPT_STR =
+ "{input}";
-Prompt productSuggestionPrompt = new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
+Prompt productSuggestionPrompt = Prompt.fromText(PRODUCT_SUGGESTION_PROMPT_STR);
```
{{< /tab >}}
@@ -158,7 +158,7 @@ review_analysis_prompt = Prompt.from_messages(
{{< tab "Java" >}}
```java
Prompt reviewAnalysisPrompt =
- new Prompt(
+ new Prompt.fromMessages(
Arrays.asList(
new ChatMessage(
MessageRole.SYSTEM,
diff --git a/docs/content/docs/development/react_agent.md b/docs/content/docs/development/react_agent.md
index a22054f32..a8de5072d 100644
--- a/docs/content/docs/development/react_agent.md
+++ b/docs/content/docs/development/react_agent.md
@@ -139,7 +139,7 @@ String systemPromptString =
+ "...";
// Prompt for review analysis react agent.
-Prompt myPrompt = new Prompt(
+Prompt myPrompt = Prompt.fromMessages(
Arrays.asList(
new ChatMessage(MessageRole.SYSTEM, systemPromptString),
new ChatMessage(
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
index 10905fc54..510c451d3 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
@@ -166,7 +166,7 @@ private static Agent getAgent() {
.build();
Prompt prompt =
- new Prompt(
+ Prompt.fromMessages(
List.of(
new ChatMessage(
MessageRole.SYSTEM,
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
index f18b45392..fd3b8aefe 100644
--- a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
+++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
@@ -56,14 +56,14 @@ public class CustomTypesAndResources {
+ "no need to disclose whether the tool was used.";
public static final Prompt REVIEW_ANALYSIS_PROMPT =
- new Prompt(
+ Prompt.fromMessages(
Arrays.asList(
new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
new ChatMessage(MessageRole.USER, "\"input\":\n" + "{input}")));
// Prompt for review analysis react agent
public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT =
- new Prompt(
+ Prompt.fromMessages(
Arrays.asList(
new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
new ChatMessage(
@@ -91,7 +91,7 @@ public class CustomTypesAndResources {
+ "{input}";
public static final Prompt PRODUCT_SUGGESTION_PROMPT =
- new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
+ Prompt.fromText(PRODUCT_SUGGESTION_PROMPT_STR);
/**
* Tool for notifying the shipping manager when product received a negative review due to
diff --git a/integrations/mcp/pom.xml b/integrations/mcp/pom.xml
new file mode 100644
index 000000000..040cf78c8
--- /dev/null
+++ b/integrations/mcp/pom.xml
@@ -0,0 +1,95 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-agents-integrations
+ 0.2-SNAPSHOT
+ ../pom.xml
+
+
+ flink-agents-integrations-mcp
+ Flink Agents : Integrations: MCP
+ jar
+
+
+
+ org.apache.flink
+ flink-agents-api
+ ${project.version}
+
+
+ org.apache.flink
+ flink-shaded-jackson
+ ${flink.shaded.jackson.version}-${flink.shaded.version}
+
+
+
+
+
+ java-17
+
+ [17,)
+
+
+
+ io.modelcontextprotocol.sdk
+ mcp
+ 0.16.0
+
+
+
+
+
+ java-11
+
+ [11,17)
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org/apache/flink/agents/integrations/mcp/**/*.java
+
+
+ org/apache/flink/agents/integrations/mcp/**/*.java
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ **/mcp/**/*Test.java
+
+
+
+
+
+
+
+
+
diff --git a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPContentExtractor.java b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPContentExtractor.java
new file mode 100644
index 000000000..50e62428f
--- /dev/null
+++ b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPContentExtractor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.mcp;
+
+import io.modelcontextprotocol.spec.McpSchema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for extracting and normalizing MCP content items.
+ *
+ *
MCP servers can return various content types (text, images, embedded resources). This utility
+ * converts them to Java-friendly objects.
+ */
+public class MCPContentExtractor {
+
+ /**
+ * Extract and normalize a single MCP content item.
+ *
+ * @param contentItem A content item from MCP (TextContent, ImageContent, etc.)
+ * @return Normalized content as String or Map
+ */
+ public static Object extractContentItem(Object contentItem) {
+ if (contentItem instanceof McpSchema.TextContent) {
+ return extractTextContent((McpSchema.TextContent) contentItem);
+ } else if (contentItem instanceof McpSchema.ImageContent) {
+ return extractImageContent((McpSchema.ImageContent) contentItem);
+ } else if (contentItem instanceof McpSchema.EmbeddedResource) {
+ return extractEmbeddedResource((McpSchema.EmbeddedResource) contentItem);
+ } else {
+ // Handle unknown content types as string
+ return contentItem != null ? contentItem.toString() : "";
+ }
+ }
+
+ /**
+ * Extract text content from MCP TextContent.
+ *
+ * @param textContent The text content
+ * @return The text as a string
+ */
+ private static String extractTextContent(McpSchema.TextContent textContent) {
+ return textContent.text();
+ }
+
+ /**
+ * Extract image content from MCP ImageContent.
+ *
+ * @param imageContent The image content
+ * @return A map with image details
+ */
+ private static Map extractImageContent(McpSchema.ImageContent imageContent) {
+ Map result = new HashMap<>();
+ result.put("type", "image");
+ result.put("data", imageContent.data());
+ result.put("mimeType", imageContent.mimeType());
+ return result;
+ }
+
+ /**
+ * Extract embedded resource from MCP EmbeddedResource.
+ *
+ * @param embeddedResource The embedded resource
+ * @return A map with resource details
+ */
+ private static Map extractEmbeddedResource(
+ McpSchema.EmbeddedResource embeddedResource) {
+ Map result = new HashMap<>();
+ result.put("type", "resource");
+
+ var resource = embeddedResource.resource();
+ if (resource instanceof McpSchema.TextResourceContents) {
+ McpSchema.TextResourceContents textResource = (McpSchema.TextResourceContents) resource;
+ result.put("uri", textResource.uri());
+ result.put("text", textResource.text());
+ } else if (resource instanceof McpSchema.BlobResourceContents) {
+ McpSchema.BlobResourceContents blobResource = (McpSchema.BlobResourceContents) resource;
+ result.put("uri", blobResource.uri());
+ result.put("blob", blobResource.blob());
+ }
+
+ return result;
+ }
+
+ /** Private constructor to prevent instantiation. */
+ private MCPContentExtractor() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+}
diff --git a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
new file mode 100644
index 000000000..a456be486
--- /dev/null
+++ b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.mcp;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * MCP prompt definition that extends the base Prompt class.
+ *
+ *
This represents a prompt managed by an MCP server. Unlike static prompts, MCP prompts are
+ * fetched dynamically from the server and can accept arguments.
+ */
+public class MCPPrompt extends Prompt {
+
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_DESCRIPTION = "description";
+ private static final String FIELD_ARGUMENTS = "arguments";
+ private static final String FIELD_MCP_SERVER = "mcpServer";
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_DESCRIPTION)
+ private final String description;
+
+ @JsonProperty(FIELD_ARGUMENTS)
+ private final Map promptArguments;
+
+ @JsonProperty(FIELD_MCP_SERVER)
+ private final MCPServer mcpServer;
+
+ /** Represents an argument that can be passed to an MCP prompt. */
+ public static class PromptArgument {
+ @JsonProperty("name")
+ private final String name;
+
+ @JsonProperty("description")
+ private final String description;
+
+ @JsonProperty("required")
+ private final boolean required;
+
+ @JsonCreator
+ public PromptArgument(
+ @JsonProperty("name") String name,
+ @JsonProperty("description") String description,
+ @JsonProperty("required") boolean required) {
+ this.name = Objects.requireNonNull(name, "name cannot be null");
+ this.description = description;
+ this.required = required;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PromptArgument that = (PromptArgument) o;
+ return required == that.required
+ && Objects.equals(name, that.name)
+ && Objects.equals(description, that.description);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, description, required);
+ }
+ }
+
+ /**
+ * Create a new MCPPrompt.
+ *
+ * @param name The prompt name
+ * @param description The prompt description
+ * @param promptArguments Map of argument names to argument definitions
+ * @param mcpServer The MCP server reference
+ */
+ @JsonCreator
+ public MCPPrompt(
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_DESCRIPTION) String description,
+ @JsonProperty(FIELD_ARGUMENTS) Map promptArguments,
+ @JsonProperty(FIELD_MCP_SERVER) MCPServer mcpServer) {
+ this.name = Objects.requireNonNull(name, "name cannot be null");
+ this.description = description;
+ this.promptArguments =
+ promptArguments != null ? new HashMap<>(promptArguments) : new HashMap<>();
+ this.mcpServer = Objects.requireNonNull(mcpServer, "mcpServer cannot be null");
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Map getPromptArguments() {
+ return new HashMap<>(promptArguments);
+ }
+
+ @JsonIgnore
+ public MCPServer getMcpServer() {
+ return mcpServer;
+ }
+
+ /**
+ * Format the prompt as a string with the given arguments. Overrides the base Prompt class to
+ * fetch prompts from the MCP server.
+ *
+ * @param arguments Arguments to pass to the prompt (String keys and values)
+ * @return The formatted prompt as a string
+ */
+ @Override
+ public String formatString(Map arguments) {
+ List messages = formatMessages(MessageRole.SYSTEM, arguments);
+ return messages.stream()
+ .map(msg -> msg.getRole().getValue() + ": " + msg.getContent())
+ .collect(Collectors.joining("\n"));
+ }
+
+ /**
+ * Format the prompt as a list of chat messages with the given arguments. Overrides the base
+ * Prompt class to fetch prompts from the MCP server.
+ *
+ * @param defaultRole The default role for messages (usually SYSTEM)
+ * @param kwargs Arguments to pass to the prompt (String keys and values)
+ * @return List of formatted chat messages
+ */
+ @Override
+ public List formatMessages(MessageRole defaultRole, Map kwargs) {
+ Map objectArgs = new HashMap<>(kwargs);
+ return formatMessages(objectArgs);
+ }
+
+ /**
+ * Format the prompt as a list of chat messages with the given arguments.
+ *
+ * @param arguments Arguments to pass to the prompt (Object values)
+ * @return List of formatted chat messages
+ */
+ private List formatMessages(Map arguments) {
+ return mcpServer.getPrompt(name, validateAndPrepareArguments(arguments));
+ }
+
+ /**
+ * Validate that all required arguments are present and prepare the arguments map.
+ *
+ * @param arguments The provided arguments
+ * @return A validated map of arguments
+ * @throws IllegalArgumentException if required arguments are missing
+ */
+ private Map validateAndPrepareArguments(Map arguments) {
+ Map result = new HashMap<>();
+
+ for (PromptArgument arg : promptArguments.values()) {
+ if (arg.isRequired()) {
+ if (arguments == null || !arguments.containsKey(arg.getName())) {
+ throw new IllegalArgumentException(
+ "Missing required argument: " + arg.getName());
+ }
+ result.put(arg.getName(), arguments.get(arg.getName()));
+ } else if (arguments != null && arguments.containsKey(arg.getName())) {
+ result.put(arg.getName(), arguments.get(arg.getName()));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MCPPrompt mcpPrompt = (MCPPrompt) o;
+ return Objects.equals(name, mcpPrompt.name)
+ && Objects.equals(description, mcpPrompt.description)
+ && Objects.equals(promptArguments, mcpPrompt.promptArguments)
+ && Objects.equals(mcpServer, mcpPrompt.mcpServer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, description, promptArguments, mcpServer);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("MCPPrompt{name='%s', server='%s'}", name, mcpServer.getEndpoint());
+ }
+}
diff --git a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
new file mode 100644
index 000000000..a116c4bdc
--- /dev/null
+++ b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integrations.mcp;
+
+import io.modelcontextprotocol.client.McpClient;
+import io.modelcontextprotocol.client.McpSyncClient;
+import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
+import io.modelcontextprotocol.spec.McpSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+import org.apache.flink.agents.integrations.mcp.auth.ApiKeyAuth;
+import org.apache.flink.agents.integrations.mcp.auth.Auth;
+import org.apache.flink.agents.integrations.mcp.auth.BasicAuth;
+import org.apache.flink.agents.integrations.mcp.auth.BearerTokenAuth;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.net.URI;
+import java.net.http.HttpRequest;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Resource representing an MCP server and exposing its tools/prompts.
+ *
+ *
This is a logical container for MCP tools and prompts; it is not directly invokable. It uses
+ * the official MCP Java SDK to communicate with MCP servers via HTTP/SSE.
+ *
+ *
Authentication is supported through the {@link Auth} interface with multiple implementations:
+ *
+ *
+ *
{@link BearerTokenAuth} - For OAuth 2.0 and JWT tokens
+ *
{@link BasicAuth} - For username/password authentication
+ *
{@link ApiKeyAuth} - For API key authentication via custom headers
+ *