|
| 1 | +<!-- |
| 2 | +SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
| 3 | +SPDX-License-Identifier: Apache-2.0 |
| 4 | +
|
| 5 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | +you may not use this file except in compliance with the License. |
| 7 | +You may obtain a copy of the License at |
| 8 | +
|
| 9 | +http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +
|
| 11 | +Unless required by applicable law or agreed to in writing, software |
| 12 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +See the License for the specific language governing permissions and |
| 15 | +limitations under the License. |
| 16 | +--> |
| 17 | + |
| 18 | +# Adding a Custom MCP Server Worker |
| 19 | + |
| 20 | +:::{note} |
| 21 | +We recommend reading the [MCP Server Guide](../workflows/mcp/mcp-server.md) before proceeding with this documentation, to understand how MCP servers work in NVIDIA NeMo Agent toolkit. |
| 22 | +::: |
| 23 | + |
| 24 | +The NVIDIA NeMo Agent toolkit provides a default MCP server worker that publishes your workflow functions as MCP tools. However, you may need to customize the server behavior for enterprise requirements such as authentication, custom endpoints, or telemetry. This guide shows you how to create custom MCP server workers that extend the default implementation. |
| 25 | + |
| 26 | +## When to Create a Custom Worker |
| 27 | + |
| 28 | +Create a custom MCP worker when you need to: |
| 29 | +- **Add authentication/authorization**: OAuth, API keys, JWT tokens, or custom auth flows |
| 30 | +- **Integrate custom transport protocols**: WebSocket, gRPC, or other communication methods |
| 31 | +- **Add logging and telemetry**: Custom logging, metrics collection, or distributed tracing |
| 32 | +- **Modify server behavior**: Custom middleware, error handling, or protocol extensions |
| 33 | +- **Integrate with enterprise systems**: SSO, audit logging, or compliance requirements |
| 34 | + |
| 35 | +## Creating and Registering a Custom MCP Worker |
| 36 | + |
| 37 | +To extend the NeMo Agent toolkit with custom MCP workers, you need to create a worker class that inherits from {py:class}`~nat.front_ends.mcp.mcp_front_end_plugin_worker.MCPFrontEndPluginWorker` and override the methods you want to customize. |
| 38 | + |
| 39 | +This section provides a step-by-step guide to create and register a custom MCP worker with the NeMo Agent toolkit. A custom status endpoint worker is used as an example to demonstrate the process. |
| 40 | + |
| 41 | +## Step 1: Implement the Worker Class |
| 42 | + |
| 43 | +Create a new Python file for your worker implementation. The following example shows a minimal worker that adds a custom status endpoint to the MCP server. |
| 44 | + |
| 45 | +Each worker is instantiated once when `nat mcp serve` runs. The `create_mcp_server()` method executes during initialization, and `add_routes()` runs after the workflow is built. |
| 46 | + |
| 47 | +<!-- path-check-skip-next-line --> |
| 48 | +`src/my_package/custom_worker.py`: |
| 49 | +```python |
| 50 | +import logging |
| 51 | + |
| 52 | +from mcp.server.fastmcp import FastMCP |
| 53 | + |
| 54 | +from nat.builder.workflow_builder import WorkflowBuilder |
| 55 | +from nat.front_ends.mcp.mcp_front_end_plugin_worker import MCPFrontEndPluginWorker |
| 56 | + |
| 57 | +logger = logging.getLogger(__name__) |
| 58 | + |
| 59 | + |
| 60 | +class CustomStatusWorker(MCPFrontEndPluginWorker): |
| 61 | + """MCP worker that adds a custom status endpoint.""" |
| 62 | + |
| 63 | + async def add_routes(self, mcp: FastMCP, builder: WorkflowBuilder): |
| 64 | + """Register tools and add custom server behavior. |
| 65 | +
|
| 66 | + This method calls the parent implementation to get all default behavior, |
| 67 | + then adds custom routes. |
| 68 | +
|
| 69 | + Args: |
| 70 | + mcp: The FastMCP server instance |
| 71 | + builder: The workflow builder containing functions to expose |
| 72 | + """ |
| 73 | + # Get all default routes and tool registration |
| 74 | + await super().add_routes(mcp, builder) |
| 75 | + |
| 76 | + # Add a custom status endpoint |
| 77 | + @mcp.custom_route("/custom/status", methods=["GET"]) |
| 78 | + async def custom_status(_request): |
| 79 | + """Custom status endpoint with additional server information.""" |
| 80 | + from starlette.responses import JSONResponse |
| 81 | + |
| 82 | + logger.info("Custom status endpoint called") |
| 83 | + return JSONResponse({ |
| 84 | + "status": "ok", |
| 85 | + "server": mcp.name, |
| 86 | + "custom_worker": "CustomStatusWorker" |
| 87 | + }) |
| 88 | +``` |
| 89 | + |
| 90 | +**Key components**: |
| 91 | +- **Inheritance**: Extend {py:class}`~nat.front_ends.mcp.mcp_front_end_plugin_worker.MCPFrontEndPluginWorker` |
| 92 | +- **`super().add_routes()`**: Calls parent to get standard tool registration and default routes |
| 93 | +- **`@mcp.custom_route()`**: Adds custom HTTP endpoints to the server |
| 94 | +- **Clean inheritance**: Use standard Python `super()` pattern to extend behavior |
| 95 | + |
| 96 | +## Step 2: Use the Worker in Your Workflow |
| 97 | + |
| 98 | +Configure your workflow to use the custom worker by specifying the fully qualified class name in the `runner_class` field. |
| 99 | + |
| 100 | +<!-- path-check-skip-next-line --> |
| 101 | +`custom_mcp_server_workflow.yml`: |
| 102 | +```yaml |
| 103 | +general: |
| 104 | + front_end: |
| 105 | + _type: mcp |
| 106 | + runner_class: "my_package.custom_worker.CustomStatusWorker" |
| 107 | + name: "my_custom_server" |
| 108 | + host: "localhost" |
| 109 | + port: 9000 |
| 110 | + |
| 111 | + |
| 112 | +llms: |
| 113 | + nim_llm: |
| 114 | + _type: nim |
| 115 | + model_name: meta/llama-3.3-70b-instruct |
| 116 | + |
| 117 | +functions: |
| 118 | + search: |
| 119 | + _type: tavily_internet_search |
| 120 | + |
| 121 | +workflow: |
| 122 | + _type: react_agent |
| 123 | + llm_name: nim_llm |
| 124 | + tool_names: [search] |
| 125 | +``` |
| 126 | +
|
| 127 | +## Step 3: Run and Test Your Server |
| 128 | +
|
| 129 | +Start your server using the NeMo Agent toolkit CLI: |
| 130 | +
|
| 131 | +```bash |
| 132 | +nat mcp serve --config_file custom_mcp_server_workflow.yml |
| 133 | +``` |
| 134 | + |
| 135 | +**Expected output**: |
| 136 | +``` |
| 137 | +INFO: Started server process [12345] |
| 138 | +INFO: Waiting for application startup. |
| 139 | +INFO: Application startup complete. |
| 140 | +INFO: Uvicorn running on http://localhost:9000 (Press CTRL+C to quit) |
| 141 | +``` |
| 142 | + |
| 143 | +**Test the server** with the MCP client: |
| 144 | + |
| 145 | +```bash |
| 146 | +# List available tools |
| 147 | +nat mcp client tool list --url http://localhost:9000/mcp |
| 148 | + |
| 149 | +# Call a tool |
| 150 | +nat mcp client tool call search \ |
| 151 | + --url http://localhost:9000/mcp \ |
| 152 | + --json-args '{"question": "When is the next GTC event?"}' |
| 153 | + |
| 154 | +# Test the custom status endpoint |
| 155 | +curl http://localhost:9000/custom/status |
| 156 | +``` |
| 157 | + |
| 158 | +**Expected response from custom endpoint**: |
| 159 | +```json |
| 160 | +{ |
| 161 | + "status": "ok", |
| 162 | + "server": "my_custom_server", |
| 163 | + "custom_worker": "CustomStatusWorker" |
| 164 | +} |
| 165 | +``` |
| 166 | + |
| 167 | +## Understanding Inheritance and Extension |
| 168 | + |
| 169 | +### Using `super().add_routes()` |
| 170 | + |
| 171 | +When extending {py:class}`~nat.front_ends.mcp.mcp_front_end_plugin_worker.MCPFrontEndPluginWorker`, call `super().add_routes()` to get all default functionality: |
| 172 | + |
| 173 | +- **Health endpoint**: `/health` for server status checks |
| 174 | +- **Workflow building**: Processes your workflow configuration |
| 175 | +- **Function-to-tool conversion**: Registers NeMo Agent toolkit functions as MCP tools |
| 176 | +- **Debug endpoints**: Additional routes for development |
| 177 | + |
| 178 | +Most workers call `super().add_routes()` first to ensure all standard NeMo Agent toolkit tools are registered, then add custom features: |
| 179 | + |
| 180 | +```python |
| 181 | +async def add_routes(self, mcp: FastMCP, builder: WorkflowBuilder): |
| 182 | + # Get all default behavior from parent |
| 183 | + await super().add_routes(mcp, builder) |
| 184 | + |
| 185 | + # Add your custom features |
| 186 | + @mcp.custom_route("/my/endpoint", methods=["GET"]) |
| 187 | + async def my_endpoint(_request): |
| 188 | + return JSONResponse({"custom": "data"}) |
| 189 | +``` |
| 190 | + |
| 191 | +### Overriding `create_mcp_server()` |
| 192 | + |
| 193 | +Override `create_mcp_server()` when you need to use a different MCP server implementation: |
| 194 | + |
| 195 | +```python |
| 196 | +async def create_mcp_server(self) -> FastMCP: |
| 197 | + from my_custom_mcp import CustomFastMCP |
| 198 | + |
| 199 | + return CustomFastMCP( |
| 200 | + name=self.front_end_config.name, |
| 201 | + host=self.front_end_config.host, |
| 202 | + port=self.front_end_config.port, |
| 203 | + # Custom parameters |
| 204 | + auth_provider=self.get_auth_provider(), |
| 205 | + ) |
| 206 | +``` |
| 207 | + |
| 208 | +**Authentication ownership**: When you override `create_mcp_server()`, your worker controls authentication. If you need custom auth (JWT, OAuth2, API keys), configure it inside `create_mcp_server()`. Any front-end config auth settings are optional hints and may be ignored by your worker. |
| 209 | + |
| 210 | +### Accessing Configuration |
| 211 | + |
| 212 | +Your worker has access to configuration through instance variables: |
| 213 | + |
| 214 | +- **`self.front_end_config`**: MCP server configuration |
| 215 | + - `name`: Server name |
| 216 | + - `host`: Server host address |
| 217 | + - `port`: Server port number |
| 218 | + - `debug`: Debug mode flag |
| 219 | + |
| 220 | +- **`self.full_config`**: Complete NeMo Agent toolkit configuration |
| 221 | + - `general`: General settings including front end config |
| 222 | + - `llms`: LLM configurations |
| 223 | + - `functions`: Function configurations |
| 224 | + - `workflow`: Workflow configuration |
| 225 | + |
| 226 | +**Example using configuration**: |
| 227 | + |
| 228 | +```python |
| 229 | +async def create_mcp_server(self) -> FastMCP: |
| 230 | + # Access server name from config |
| 231 | + server_name = self.front_end_config.name |
| 232 | + |
| 233 | + # Customize based on debug mode |
| 234 | + if self.front_end_config.debug: |
| 235 | + logger.info(f"Creating debug server: {server_name}") |
| 236 | + |
| 237 | + return FastMCP( |
| 238 | + name=server_name, |
| 239 | + host=self.front_end_config.host, |
| 240 | + port=self.front_end_config.port, |
| 241 | + debug=self.front_end_config.debug, |
| 242 | + ) |
| 243 | +``` |
| 244 | + |
| 245 | +## Summary |
| 246 | + |
| 247 | +This guide provides a step-by-step process to create custom MCP server workers in the NeMo Agent toolkit. The custom status worker demonstrates how to: |
| 248 | + |
| 249 | +1. Extend {py:class}`~nat.front_ends.mcp.mcp_front_end_plugin_worker.MCPFrontEndPluginWorker` |
| 250 | +2. Override `add_routes()` and use `super()` to get default behavior |
| 251 | +3. Override `create_mcp_server()` to use a different server implementation. When doing so, implement your own authentication and authorization logic within that server. |
| 252 | + |
| 253 | +Custom workers enable enterprise features like authentication, telemetry, and integration with existing infrastructure without modifying NeMo Agent toolkit core code. |
0 commit comments