diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/README.md b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/README.md new file mode 100644 index 000000000..10c53de86 --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/README.md @@ -0,0 +1,238 @@ + + +# MCP Server Load Testing + +This utility simulates concurrent users making tool calls to MCP servers and generates detailed performance reports for NVIDIA NeMo Agent toolkit. + +## Requirements + +Before running load tests, ensure you have the following: + +- NeMo Agent toolkit with MCP support installed through `nvidia-nat[mcp]` +- NeMo Agent toolkit with Test support installed through `nvidia-nat[test]` +- Valid NeMo Agent toolkit workflow configuration with MCP-compatible tools + +The `psutil` package is required for monitoring server memory usage during load tests. Install it using the following command: + +```bash +uv pip install psutil +``` + +## Quick Start + +Run a load test from the project root: + +```bash +python packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py \ + --config_file=packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml +``` + +Get help: + +```bash +python packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py --help +``` + +## Configuration + +Configure load test options using YAML files stored in the `configs/` directory. + +### Example Configuration + +```yaml +# Path to NeMo Agent toolkit workflow configuration file +config_file: "examples/getting_started/simple_calculator/configs/config.yml" + +# Server configuration +server: + host: "localhost" + port: 9901 + transport: "streamable-http" # Options: "streamable-http" or "sse" + +# Load test parameters +load_test: + num_concurrent_users: 10 + duration_seconds: 30 + warmup_seconds: 5 + +# Output configuration +output: + directory: "load_test_results" + +# Tool calls to execute during load testing +tool_calls: + - tool_name: "calculator_multiply" + args: + text: "2 * 3" + weight: 2.0 # Called twice as often as weight 1.0 tools + + - tool_name: "calculator_divide" + args: + text: "10 / 2" + weight: 1.0 +``` + +### Configuration Parameters + +#### Required Parameters + +**`config_file`** (string) +: Path to the NeMo Agent toolkit workflow configuration file. + +#### Server Configuration + +Configure the MCP server settings in the `server` section: + +**`host`** (string, default: `"localhost"`) +: Host address where the MCP server will run. + +**`port`** (integer, default: `9901`) +: Port number for the MCP server. + +**`transport`** (string, default: `"streamable-http"`) +: Transport protocol type. Options: `"streamable-http"` or `"sse"`. + +#### Load Test Parameters + +Configure load test behavior in the `load_test` section: + +**`num_concurrent_users`** (integer, default: `10`) +: Number of concurrent users to simulate. + +**`duration_seconds`** (integer, default: `60`) +: Duration of the load test in seconds. + +**`warmup_seconds`** (integer, default: `5`) +: Warmup period before measurements begin, in seconds. + +#### Output Configuration + +Configure report output in the `output` section: + +**`directory`** (string, default: `"load_test_results"`) +: Directory where test reports will be saved. + +#### Tool Calls + +Define tool calls to execute in the `tool_calls` list. Each tool call includes: + +**`tool_name`** (string, required) +: Name of the MCP tool to call. + +**`args`** (dictionary, optional) +: Arguments to pass to the tool. + +**`weight`** (float, default: `1.0`) +: Relative call frequency. Tools with higher weights are called more frequently. A tool with weight 2.0 is called twice as often as a tool with weight 1.0. + +## Running Load Tests + +### Command Line + +Run load tests from the project root using the command-line interface: + +```bash +# Basic usage +python packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py \ + --config_file=packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml + +# With verbose logging +python packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py \ + --config_file=packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml \ + --verbose + +# Short form +python packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py \ + -c packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml +``` + +### Python API + +#### Using YAML Configuration + +```python +from nat.test.mcp.load_test_utils import run_load_test_from_yaml + +results = run_load_test_from_yaml( + "packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml" +) +``` + +#### Programmatic Usage + +```python +from nat.test.mcp.load_test_utils import run_load_test + +results = run_load_test( + config_file="examples/getting_started/simple_calculator/configs/config.yml", + tool_calls=[ + { + "tool_name": "calculator_multiply", + "args": {"text": "2 * 3"}, + "weight": 2.0, + }, + { + "tool_name": "calculator_divide", + "args": {"text": "10 / 2"}, + "weight": 1.0, + }, + ], + num_concurrent_users=10, + duration_seconds=30, +) +``` + +## Output Reports + +The load test generates two report files in the output directory: + +### CSV Report + +**File name**: `load_test_YYYYMMDD_HHMMSS.csv` + +Detailed per-request data with the following columns: + +- `timestamp`: Request timestamp +- `tool_name`: Name of the tool called +- `success`: Boolean success status +- `latency_ms`: Request latency in milliseconds +- `memory_rss_mb`: Resident Set Size (RSS) memory in MB at request time +- `memory_vms_mb`: Virtual Memory Size (VMS) in MB at request time +- `memory_percent`: Memory usage percentage at request time +- `error`: Error message if the request failed + +### Summary Report + +**File name**: `load_test_YYYYMMDD_HHMMSS_summary.txt` + +Human-readable summary with the following statistics: + +**Summary Metrics** +: Total requests, success rate, requests per second + +**Latency Statistics** +: Mean, median, P95, P99, minimum, and maximum latencies + +**Memory Statistics** +: RSS and VMS memory usage (mean and max), memory percentage (mean and max) + +**Per-Tool Statistics** +: Individual performance metrics for each tool + +**Error Analysis** +: Breakdown of failed requests by error type diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/__init__.py b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/__init__.py new file mode 100644 index 000000000..87f735d8c --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/__init__.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MCP Server Load Testing Utilities. + +This module provides utilities for load testing MCP servers. +""" + +from nat.test.mcp.load_test_utils.load_tester import MCPLoadTest +from nat.test.mcp.load_test_utils.load_tester import run_load_test +from nat.test.mcp.load_test_utils.load_tester import run_load_test_from_yaml + +__all__ = ["MCPLoadTest", "run_load_test", "run_load_test_from_yaml"] diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py new file mode 100755 index 000000000..5651e8b2a --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/cli.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MCP Load Test Entry Point Script. + +Run load tests against MCP servers using YAML configuration files. +""" + +import argparse +import logging +import sys +from pathlib import Path + +from nat.test.mcp.load_test_utils import run_load_test_from_yaml + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', +) + +logger = logging.getLogger(__name__) + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Run MCP server load tests using YAML configuration", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "-c", + "--config_file", + type=str, + required=True, + help="Path to YAML configuration file", + ) + + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose debug logging", + ) + + return parser.parse_args() + + +def print_summary(results: dict): + """Print formatted summary of load test results. + + Args: + results: Load test results dictionary + """ + logger.info("=" * 70) + logger.info("LOAD TEST RESULTS SUMMARY") + logger.info("=" * 70) + + summary = results.get("summary", {}) + logger.info("\nSummary Metrics:") + logger.info(" Total Requests: %d", summary.get("total_requests", 0)) + logger.info(" Successful: %d", summary.get("successful_requests", 0)) + logger.info(" Failed: %d", summary.get("failed_requests", 0)) + logger.info(" Success Rate: %.2f%%", summary.get("success_rate", 0)) + logger.info(" Test Duration: %.2f seconds", summary.get("test_duration_seconds", 0)) + logger.info(" Requests/Second: %.2f", summary.get("requests_per_second", 0)) + + latency = results.get("latency_statistics", {}) + logger.info("\nLatency Statistics:") + logger.info(" Mean: %.2f ms", latency.get("mean_ms", 0)) + logger.info(" Median: %.2f ms", latency.get("median_ms", 0)) + logger.info(" P95: %.2f ms", latency.get("p95_ms", 0)) + logger.info(" P99: %.2f ms", latency.get("p99_ms", 0)) + logger.info(" Min: %.2f ms", latency.get("min_ms", 0)) + logger.info(" Max: %.2f ms", latency.get("max_ms", 0)) + + memory = results.get("memory_statistics", {}) + if memory: + logger.info("\nMemory Statistics:") + logger.info(" Mean RSS: %.2f MB", memory.get("rss_mean_mb", 0)) + logger.info(" Max RSS: %.2f MB", memory.get("rss_max_mb", 0)) + logger.info(" Mean VMS: %.2f MB", memory.get("vms_mean_mb", 0)) + logger.info(" Max VMS: %.2f MB", memory.get("vms_max_mb", 0)) + logger.info(" Mean Memory Usage: %.2f%%", memory.get("percent_mean", 0)) + logger.info(" Max Memory Usage: %.2f%%", memory.get("percent_max", 0)) + + tool_stats = results.get("per_tool_statistics", {}) + if tool_stats: + logger.info("\nPer-Tool Statistics:") + for tool_name, stats in sorted(tool_stats.items()): + logger.info(" %s:", tool_name) + logger.info(" Calls: %d", stats.get("total_calls", 0)) + logger.info(" Success Rate: %.2f%%", stats.get("success_rate", 0)) + logger.info(" Mean Latency: %.2f ms", stats.get("mean_latency_ms", 0)) + + errors = results.get("errors", {}) + if errors: + logger.info("\nErrors:") + for error_msg, count in sorted(errors.items(), key=lambda x: x[1], reverse=True)[:5]: + logger.info(" [%d] %s", count, error_msg[:80]) + + test_config = results.get("test_configuration", {}) + output_dir = test_config.get("output_dir", "load_test_results") + logger.info("\n" + "=" * 70) + logger.info("Reports saved to: %s/", output_dir) + logger.info("=" * 70) + + +def main(): + """Main entry point.""" + args = parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug("Verbose logging enabled") + + config_path = Path(args.config_file) + + if not config_path.exists(): + logger.error("Configuration file not found: %s", config_path) + script_dir = Path(__file__).parent + alternative_path = script_dir / args.config_file + + if alternative_path.exists(): + logger.info("Found config at: %s", alternative_path) + config_path = alternative_path + else: + return 1 + + logger.info("Starting MCP load test") + logger.info("Configuration file: %s", config_path.absolute()) + + try: + results = run_load_test_from_yaml(str(config_path)) + + print_summary(results) + + return 0 + + except KeyboardInterrupt: + logger.warning("\nLoad test interrupted by user") + return 130 + + except Exception as e: + logger.error("Load test failed: %s", e, exc_info=args.verbose) + if not args.verbose: + logger.info("Use --verbose flag for detailed error information") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/config_loader.py b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/config_loader.py new file mode 100644 index 000000000..07181813d --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/config_loader.py @@ -0,0 +1,133 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Configuration loader for MCP load tests.""" + +import logging +from pathlib import Path + +import yaml + +from nat.test.mcp.load_test_utils.load_tester import LoadTestConfig +from nat.test.mcp.load_test_utils.load_tester import ToolCallConfig + +logger = logging.getLogger(__name__) + + +def load_config_from_yaml(config_path: str | Path) -> LoadTestConfig: + """Load load test configuration from a YAML file. + + Args: + config_path: Path to the YAML configuration file + + Returns: + LoadTestConfig object + + Raises: + FileNotFoundError: If the config file doesn't exist + ValueError: If the config file is invalid + """ + config_path = Path(config_path) + + if not config_path.exists(): + raise FileNotFoundError(f"Config file not found: {config_path}") + + logger.info("Loading config from: %s", config_path) + + with open(config_path) as f: + config_data = yaml.safe_load(f) + + if not config_data: + raise ValueError("Config file is empty") + + config_file = config_data.get("config_file") + if not config_file: + raise ValueError("config_file is required in the config") + + server_config = config_data.get("server", {}) + server_host = server_config.get("host", "localhost") + server_port = server_config.get("port", 9901) + transport = server_config.get("transport", "streamable-http") + + load_test_config = config_data.get("load_test", {}) + num_concurrent_users = load_test_config.get("num_concurrent_users", 10) + duration_seconds = load_test_config.get("duration_seconds", 60) + warmup_seconds = load_test_config.get("warmup_seconds", 5) + + output_config = config_data.get("output", {}) + output_dir = output_config.get("directory", None) + + tool_calls_data = config_data.get("tool_calls", []) + if not tool_calls_data: + raise ValueError("At least one tool call must be specified in tool_calls") + + tool_calls = [] + for tc in tool_calls_data: + if "tool_name" not in tc: + raise ValueError("Each tool call must have 'tool_name'") + + tool_calls.append( + ToolCallConfig( + tool_name=tc["tool_name"], + args=tc.get("args", {}), + weight=tc.get("weight", 1.0), + )) + + return LoadTestConfig( + config_file=config_file, + tool_calls=tool_calls, + num_concurrent_users=num_concurrent_users, + duration_seconds=duration_seconds, + server_host=server_host, + server_port=server_port, + transport=transport, + warmup_seconds=warmup_seconds, + output_dir=output_dir, + ) + + +def validate_config(config: LoadTestConfig) -> None: + """Validate load test configuration. + + Args: + config: LoadTestConfig to validate + + Raises: + ValueError: If configuration is invalid + """ + if not config.config_file: + raise ValueError("config_file must be specified") + + if not Path(config.config_file).exists(): + raise ValueError(f"NAT workflow config file not found: {config.config_file}") + + if config.num_concurrent_users < 1: + raise ValueError("num_concurrent_users must be at least 1") + + if config.duration_seconds < 1: + raise ValueError("duration_seconds must be at least 1") + + if config.warmup_seconds < 0: + raise ValueError("warmup_seconds must be non-negative") + + if not config.tool_calls: + raise ValueError("At least one tool call must be specified") + + if config.transport not in ["streamable-http", "sse"]: + raise ValueError("transport must be streamable-http or sse") + + if config.server_port < 1 or config.server_port > 65535: + raise ValueError("server_port must be between 1 and 65535") + + logger.info("Configuration validated successfully") diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml new file mode 100644 index 000000000..b682a0d86 --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/configs/config.yml @@ -0,0 +1,61 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# MCP Load Test Configuration Example + +# Path to NAT workflow config file +config_file: "examples/getting_started/simple_calculator/configs/config.yml" + +# Server configuration +server: + host: "localhost" + port: 9901 + transport: "streamable-http" # Options: "streamable-http" or "sse" + +# Load test parameters +load_test: + num_concurrent_users: 10 + duration_seconds: 30 + warmup_seconds: 5 + +# Output configuration +output: + directory: "load_test_results" + +# Tool calls to execute during load testing +# Each tool call can have: +# - tool_name (required): Name of the MCP tool +# - args (optional): Dictionary of arguments to pass to the tool +# - weight (optional): Relative frequency weight (default: 1.0) +tool_calls: + - tool_name: "calculator_multiply" + args: + text: "2 * 3" + weight: 2.0 # Called twice as often as weight 1.0 tools + + - tool_name: "calculator_divide" + args: + text: "10 / 2" + weight: 1.0 + + - tool_name: "calculator_subtract" + args: + text: "10 - 3" + weight: 1.0 + + - tool_name: "calculator_inequality" + args: + text: "5 > 3" + weight: 1.0 diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/load_tester.py b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/load_tester.py new file mode 100644 index 000000000..e91b2db75 --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/load_tester.py @@ -0,0 +1,563 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MCP Server Load Testing.""" + +import asyncio +import logging +import random +import subprocess +import time +from dataclasses import dataclass +from dataclasses import field +from datetime import datetime +from pathlib import Path +from typing import Any + +import psutil +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from mcp.types import TextContent + +from nat.test.mcp.load_test_utils.report_generator import generate_summary_report + +logger = logging.getLogger(__name__) + + +@dataclass +class ToolCallConfig: + """Configuration for a tool call in load testing.""" + + tool_name: str + """Name of the tool to call""" + + args: dict[str, Any] = field(default_factory=dict) + """Arguments to pass to the tool""" + + weight: float = 1.0 + """Relative weight for this tool call""" + + +@dataclass +class LoadTestConfig: + """Configuration for MCP load testing.""" + + config_file: str + """Path to the NAT workflow config file""" + + tool_calls: list[ToolCallConfig] + """List of tool calls to execute during load testing""" + + num_concurrent_users: int = 10 + """Number of concurrent users to simulate""" + + duration_seconds: int = 60 + """Duration of the load test in seconds""" + + server_host: str = "localhost" + """MCP server host""" + + server_port: int = 9901 + """MCP server port""" + + transport: str = "streamable-http" + """Transport type""" + + warmup_seconds: int = 5 + """Warmup period before starting measurements""" + + output_dir: str | None = None + """Output directory for reports""" + + +@dataclass +class ToolCallResult: + """Result of a single tool call.""" + + tool_name: str + success: bool + latency_ms: float + timestamp: float + error: str | None = None + response: str | None = None + + +@dataclass +class MemorySample: + """Memory usage sample at a point in time.""" + + timestamp: float + rss_mb: float + vms_mb: float + percent: float + + +class MCPLoadTest: + """MCP Server Load Test Runner.""" + + def __init__(self, config: LoadTestConfig): + """Initialize load test runner. + + Args: + config: Load test configuration + """ + self.config = config + self.results: list[ToolCallResult] = [] + self.memory_samples: list[MemorySample] = [] + self.server_process: subprocess.Popen | None = None + self.server_url = self._get_server_url() + self._memory_monitor_task: asyncio.Task | None = None + + if config.output_dir: + self.output_dir = Path(config.output_dir) + else: + self.output_dir = Path("load_test_results") + self.output_dir.mkdir(parents=True, exist_ok=True) + + def _get_server_url(self) -> str: + """Get the MCP server URL based on transport type.""" + endpoint = "mcp" if self.config.transport == "streamable-http" else "sse" + return f"http://{self.config.server_host}:{self.config.server_port}/{endpoint}" + + def _client_ctx(self): + """Get the appropriate MCP client context manager based on transport type. + + Returns: + Client context manager for the configured transport + """ + if self.config.transport == "streamable-http": + return streamablehttp_client(url=self.server_url) + else: + from mcp.client.sse import sse_client + return sse_client(url=self.server_url) + + async def _start_server(self) -> None: + """Start the MCP server.""" + logger.info("Starting MCP server with config: %s", self.config.config_file) + + cmd = [ + "nat", + "mcp", + "serve", + "--config_file", + self.config.config_file, + "--host", + self.config.server_host, + "--port", + str(self.config.server_port), + "--transport", + self.config.transport, + ] + + self.server_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + max_retries = 30 + for i in range(max_retries): + try: + async with self._client_ctx() as ctx: + read, write = (ctx[0], ctx[1]) if isinstance(ctx, tuple) else ctx + async with ClientSession(read, write) as session: + await session.initialize() + logger.info("MCP server is ready") + return + except Exception as e: + if i == max_retries - 1: + raise RuntimeError(f"Failed to start MCP server after {max_retries} retries") from e + await asyncio.sleep(1) + + async def _stop_server(self) -> None: + """Stop the MCP server.""" + if self.server_process: + logger.info("Stopping MCP server") + self.server_process.terminate() + try: + self.server_process.wait(timeout=10) + except subprocess.TimeoutExpired: + logger.warning("Server didn't stop gracefully, killing it") + self.server_process.kill() + self.server_process.wait() + + def _select_tool_call(self) -> ToolCallConfig: + """Select a tool call based on weights.""" + total_weight = sum(tc.weight for tc in self.config.tool_calls) + r = random.uniform(0, total_weight) + cumulative = 0.0 + + for tool_call in self.config.tool_calls: + cumulative += tool_call.weight + if r <= cumulative: + return tool_call + + return self.config.tool_calls[0] + + async def _call_tool(self, tool_call: ToolCallConfig) -> ToolCallResult: + """Execute a single tool call and measure latency. + + Args: + tool_call: Tool call configuration + + Returns: + ToolCallResult with timing and success information + """ + start_time = time.time() + timestamp = start_time + + try: + async with self._client_ctx() as ctx: + read, write = (ctx[0], ctx[1]) if isinstance(ctx, tuple) else ctx + async with ClientSession(read, write) as session: + await session.initialize() + result = await session.call_tool(tool_call.tool_name, tool_call.args) + + outputs: list[str] = [] + for content in result.content: + if isinstance(content, TextContent): + outputs.append(content.text) + else: + outputs.append(str(content)) + + response = "\n".join(outputs) + is_error = getattr(result, "isError", False) + + end_time = time.time() + latency_ms = (end_time - start_time) * 1000 + + return ToolCallResult( + tool_name=tool_call.tool_name, + success=not is_error, + latency_ms=latency_ms, + timestamp=timestamp, + response=response if not is_error else None, + error=response if is_error else None, + ) + + except Exception as e: + end_time = time.time() + latency_ms = (end_time - start_time) * 1000 + + return ToolCallResult( + tool_name=tool_call.tool_name, + success=False, + latency_ms=latency_ms, + timestamp=timestamp, + error=str(e), + ) + + async def _user_simulation(self, user_id: int, end_time: float) -> None: + """Simulate a single user making repeated tool calls. + + Args: + user_id: User identifier for logging + end_time: Timestamp when to stop making calls + """ + logger.debug("User %d starting simulation", user_id) + + while time.time() < end_time: + tool_call = self._select_tool_call() + result = await self._call_tool(tool_call) + self.results.append(result) + + await asyncio.sleep(random.uniform(0.1, 0.2)) + + logger.debug("User %d finished simulation", user_id) + + async def _monitor_memory(self, end_time: float) -> None: + """Monitor server memory usage during the test. + + Args: + end_time: Timestamp when to stop monitoring + """ + if not self.server_process: + return + + try: + process = psutil.Process(self.server_process.pid) + except (psutil.NoSuchProcess, psutil.AccessDenied): + logger.warning("Cannot monitor memory for server process") + return + + while time.time() < end_time: + try: + mem_info = process.memory_info() + mem_percent = process.memory_percent() + + self.memory_samples.append( + MemorySample( + timestamp=time.time(), + rss_mb=mem_info.rss / (1024 * 1024), + vms_mb=mem_info.vms / (1024 * 1024), + percent=mem_percent, + )) + except (psutil.NoSuchProcess, psutil.AccessDenied): + break + + await asyncio.sleep(1.0) + + async def run(self) -> dict[str, Any]: + """Run the load test. + + Returns: + Dictionary containing test results and statistics + """ + logger.info("Starting load test with config: %s", self.config) + + await self._start_server() + + try: + if self.config.warmup_seconds > 0: + logger.info("Warming up for %d seconds", self.config.warmup_seconds) + warmup_end = time.time() + self.config.warmup_seconds + warmup_tasks = [ + asyncio.create_task(self._user_simulation(i, warmup_end)) + for i in range(min(3, self.config.num_concurrent_users)) + ] + await asyncio.gather(*warmup_tasks) + self.results.clear() + + logger.info( + "Starting load test: %d concurrent users for %d seconds", + self.config.num_concurrent_users, + self.config.duration_seconds, + ) + + test_start_time = time.time() + test_end_time = test_start_time + self.config.duration_seconds + + self._memory_monitor_task = asyncio.create_task(self._monitor_memory(test_end_time)) + + tasks = [ + asyncio.create_task(self._user_simulation(i, test_end_time)) + for i in range(self.config.num_concurrent_users) + ] + + await asyncio.gather(*tasks) + + if self._memory_monitor_task: + await self._memory_monitor_task + + test_duration = time.time() - test_start_time + + logger.info("Load test completed. Total calls: %d", len(self.results)) + + summary = generate_summary_report(self.results, test_duration, self.config, self.memory_samples) + self._save_reports(summary) + + return summary + + finally: + await self._stop_server() + + def _save_reports(self, summary: dict[str, Any]) -> None: + """Save test reports to files. + + Args: + summary: Test summary statistics + """ + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + csv_file = self.output_dir / f"load_test_{timestamp}.csv" + self._save_csv(csv_file) + logger.info("Saved CSV report: %s", csv_file) + + summary_file = self.output_dir / f"load_test_{timestamp}_summary.txt" + self._save_summary_text(summary_file, summary) + logger.info("Saved summary report: %s", summary_file) + + def _save_csv(self, file_path: Path) -> None: + """Save detailed results as CSV. + + Args: + file_path: Path to save CSV file + """ + import csv + + with open(file_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow([ + "timestamp", + "tool_name", + "success", + "latency_ms", + "memory_rss_mb", + "memory_vms_mb", + "memory_percent", + "error", + ]) + + for result in self.results: + memory_rss = "" + memory_vms = "" + memory_percent = "" + + if self.memory_samples: + closest_sample = min( + self.memory_samples, + key=lambda sample: abs(sample.timestamp - result.timestamp), + ) + memory_rss = f"{closest_sample.rss_mb:.2f}" + memory_vms = f"{closest_sample.vms_mb:.2f}" + memory_percent = f"{closest_sample.percent:.2f}" + + writer.writerow([ + result.timestamp, + result.tool_name, + result.success, + result.latency_ms, + memory_rss, + memory_vms, + memory_percent, + result.error or "", + ]) + + def _save_summary_text(self, file_path: Path, summary: dict[str, Any]) -> None: + """Save summary report as text file. + + Args: + file_path: Path to save summary file + summary: Summary statistics dictionary + """ + with open(file_path, "w") as f: + f.write("=" * 70 + "\n") + f.write("MCP LOAD TEST SUMMARY\n") + f.write("=" * 70 + "\n\n") + + f.write("TEST CONFIGURATION\n") + f.write("-" * 70 + "\n") + config_data = summary.get("test_configuration", {}) + for key, value in config_data.items(): + f.write(f"{key.replace('_', ' ').title()}: {value}\n") + f.write("\n") + + f.write("SUMMARY METRICS\n") + f.write("-" * 70 + "\n") + summary_data = summary.get("summary", {}) + for key, value in summary_data.items(): + f.write(f"{key.replace('_', ' ').title()}: {value}\n") + f.write("\n") + + f.write("LATENCY STATISTICS\n") + f.write("-" * 70 + "\n") + latency_data = summary.get("latency_statistics", {}) + for key, value in latency_data.items(): + f.write(f"{key.upper()}: {value:.2f} ms\n") + f.write("\n") + + memory_data = summary.get("memory_statistics", {}) + if memory_data: + f.write("MEMORY STATISTICS\n") + f.write("-" * 70 + "\n") + for key, value in memory_data.items(): + if isinstance(value, float): + f.write(f"{key.replace('_', ' ').title()}: {value:.2f} MB\n") + else: + f.write(f"{key.replace('_', ' ').title()}: {value}\n") + f.write("\n") + + f.write("PER-TOOL STATISTICS\n") + f.write("-" * 70 + "\n") + tool_stats = summary.get("per_tool_statistics", {}) + for tool_name, stats in tool_stats.items(): + f.write(f"\nTool: {tool_name}\n") + for key, value in stats.items(): + if isinstance(value, float): + f.write(f" {key.replace('_', ' ').title()}: {value:.2f}\n") + else: + f.write(f" {key.replace('_', ' ').title()}: {value}\n") + + errors = summary.get("errors", {}) + if errors: + f.write("\nERRORS\n") + f.write("-" * 70 + "\n") + for error_msg, count in sorted(errors.items(), key=lambda x: x[1], reverse=True): + f.write(f"{error_msg}: {count}\n") + + f.write("\n" + "=" * 70 + "\n") + + +def run_load_test( + config_file: str, + tool_calls: list[dict[str, Any]] | None = None, + num_concurrent_users: int = 10, + duration_seconds: int = 60, + server_host: str = "localhost", + server_port: int = 9901, + transport: str = "streamable-http", + warmup_seconds: int = 5, + output_dir: str | None = None, +) -> dict[str, Any]: + """Run an MCP load test with the specified configuration. + + Args: + config_file: Path to NAT workflow config file + tool_calls: List of tool call configurations + num_concurrent_users: Number of concurrent users to simulate + duration_seconds: Duration of the load test + server_host: MCP server host + server_port: MCP server port + transport: Transport type + warmup_seconds: Warmup period before measurements + output_dir: Output directory for reports + + Returns: + Dictionary containing test results and statistics + """ + if tool_calls is None: + raise ValueError("tool_calls must be provided") + + tool_call_configs = [ + ToolCallConfig( + tool_name=tc["tool_name"], + args=tc.get("args", {}), + weight=tc.get("weight", 1.0), + ) for tc in tool_calls + ] + + config = LoadTestConfig( + config_file=config_file, + tool_calls=tool_call_configs, + num_concurrent_users=num_concurrent_users, + duration_seconds=duration_seconds, + server_host=server_host, + server_port=server_port, + transport=transport, + warmup_seconds=warmup_seconds, + output_dir=output_dir, + ) + + load_test = MCPLoadTest(config) + return asyncio.run(load_test.run()) + + +def run_load_test_from_yaml(yaml_config_path: str) -> dict[str, Any]: + """Run an MCP load test using a YAML configuration file. + + Args: + yaml_config_path: Path to YAML config file + + Returns: + Dictionary containing test results and statistics + """ + from nat.test.mcp.load_test_utils.config_loader import load_config_from_yaml + from nat.test.mcp.load_test_utils.config_loader import validate_config + + config = load_config_from_yaml(yaml_config_path) + validate_config(config) + + load_test = MCPLoadTest(config) + return asyncio.run(load_test.run()) diff --git a/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/report_generator.py b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/report_generator.py new file mode 100644 index 000000000..bfcb41542 --- /dev/null +++ b/packages/nvidia_nat_test/src/nat/test/mcp/load_test_utils/report_generator.py @@ -0,0 +1,185 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Report generation for MCP load tests.""" + +import statistics +from typing import TYPE_CHECKING +from typing import Any + +if TYPE_CHECKING: + from nat.test.mcp.load_test_utils.load_tester import LoadTestConfig + from nat.test.mcp.load_test_utils.load_tester import MemorySample + from nat.test.mcp.load_test_utils.load_tester import ToolCallResult + + +def generate_summary_report( + results: list['ToolCallResult'], + test_duration: float, + config: 'LoadTestConfig', + memory_samples: list['MemorySample'] | None = None, +) -> dict[str, Any]: + """Generate summary statistics from load test results. + + Args: + results: List of tool call results + test_duration: Actual duration of the test in seconds + config: Load test configuration + memory_samples: Optional list of memory usage samples + + Returns: + Dictionary containing summary statistics + """ + if not results: + return { + "total_requests": 0, + "successful_requests": 0, + "failed_requests": 0, + "success_rate": 0.0, + "test_duration_seconds": test_duration, + "requests_per_second": 0.0, + } + + total_requests = len(results) + successful = [r for r in results if r.success] + failed = [r for r in results if not r.success] + + successful_count = len(successful) + failed_count = len(failed) + success_rate = (successful_count / total_requests) * 100 if total_requests > 0 else 0.0 + + latencies = [r.latency_ms for r in successful] + + if latencies: + latency_stats = { + "min_ms": min(latencies), + "max_ms": max(latencies), + "mean_ms": statistics.mean(latencies), + "median_ms": statistics.median(latencies), + "p95_ms": _percentile(latencies, 0.95), + "p99_ms": _percentile(latencies, 0.99), + "stdev_ms": statistics.stdev(latencies) if len(latencies) > 1 else 0.0, + } + else: + latency_stats = { + "min_ms": 0.0, + "max_ms": 0.0, + "mean_ms": 0.0, + "median_ms": 0.0, + "p95_ms": 0.0, + "p99_ms": 0.0, + "stdev_ms": 0.0, + } + + tool_stats: dict[str, dict[str, Any]] = {} + tool_names = set(r.tool_name for r in results) + + for tool_name in tool_names: + tool_results = [r for r in results if r.tool_name == tool_name] + tool_successful = [r for r in tool_results if r.success] + tool_latencies = [r.latency_ms for r in tool_successful] + + tool_stats[tool_name] = { + "total_calls": len(tool_results), + "successful_calls": len(tool_successful), + "failed_calls": len(tool_results) - len(tool_successful), + "success_rate": (len(tool_successful) / len(tool_results)) * 100 if tool_results else 0.0, + "mean_latency_ms": statistics.mean(tool_latencies) if tool_latencies else 0.0, + "median_latency_ms": statistics.median(tool_latencies) if tool_latencies else 0.0, + "p95_latency_ms": _percentile(tool_latencies, 0.95) if tool_latencies else 0.0, + } + + error_counts: dict[str, int] = {} + for result in failed: + error_msg = result.error or "Unknown error" + error_key = error_msg[:100] + error_counts[error_key] = error_counts.get(error_key, 0) + 1 + + memory_stats: dict[str, Any] = {} + if memory_samples: + rss_values = [sample.rss_mb for sample in memory_samples] + vms_values = [sample.vms_mb for sample in memory_samples] + percent_values = [sample.percent for sample in memory_samples] + + if rss_values: + memory_stats = { + "samples_count": len(memory_samples), + "rss_mean_mb": statistics.mean(rss_values), + "rss_max_mb": max(rss_values), + "rss_min_mb": min(rss_values), + "vms_mean_mb": statistics.mean(vms_values), + "vms_max_mb": max(vms_values), + "percent_mean": statistics.mean(percent_values), + "percent_max": max(percent_values), + } + + report = { + "test_configuration": { + "config_file": config.config_file, + "num_concurrent_users": config.num_concurrent_users, + "duration_seconds": config.duration_seconds, + "server_url": f"http://{config.server_host}:{config.server_port}", + "transport": config.transport, + "warmup_seconds": config.warmup_seconds, + "tool_calls_configured": len(config.tool_calls), + "output_dir": config.output_dir or "load_test_results", + }, + "summary": { + "total_requests": + total_requests, + "successful_requests": + successful_count, + "failed_requests": + failed_count, + "success_rate": + round(success_rate, 2), + "test_duration_seconds": + round(test_duration, 2), + "requests_per_second": + round(total_requests / test_duration, 2) if test_duration > 0 else 0.0, + "avg_concurrent_rps": + round(total_requests / test_duration / + config.num_concurrent_users, 2) if test_duration > 0 and config.num_concurrent_users > 0 else 0.0, + }, + "latency_statistics": { + k: round(v, 2) + for k, v in latency_stats.items() + }, + "per_tool_statistics": tool_stats, + "errors": error_counts, + } + + if memory_stats: + report["memory_statistics"] = {k: round(v, 2) if isinstance(v, float) else v for k, v in memory_stats.items()} + + return report + + +def _percentile(data: list[float], percentile: float) -> float: + """Calculate percentile value. + + Args: + data: List of values + percentile: Percentile to calculate + + Returns: + Percentile value + """ + if not data: + return 0.0 + + sorted_data = sorted(data) + index = int(len(sorted_data) * percentile) + index = min(index, len(sorted_data) - 1) + return sorted_data[index]