diff --git a/.changes/next-release/enhancement-ecs-96384.json b/.changes/next-release/enhancement-ecs-96384.json new file mode 100644 index 000000000000..53bec032c008 --- /dev/null +++ b/.changes/next-release/enhancement-ecs-96384.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "``ecs``", + "description": "Introduces text only mode to ECS Express Mode Monitoring commands and addressing issues exiting interactive mode." +} diff --git a/awscli/customizations/ecs/expressgateway/color_utils.py b/awscli/customizations/ecs/expressgateway/color_utils.py index 038120df60ae..7335d22f0877 100644 --- a/awscli/customizations/ecs/expressgateway/color_utils.py +++ b/awscli/customizations/ecs/expressgateway/color_utils.py @@ -24,7 +24,7 @@ class ColorUtils: def __init__(self): # Initialize colorama - init(autoreset=True, strip=False) + init(autoreset=False, strip=False) def make_green(self, text, use_color=True): if not use_color: diff --git a/awscli/customizations/ecs/expressgateway/display_strategy.py b/awscli/customizations/ecs/expressgateway/display_strategy.py new file mode 100644 index 000000000000..f8805f6648aa --- /dev/null +++ b/awscli/customizations/ecs/expressgateway/display_strategy.py @@ -0,0 +1,206 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Display strategy implementations for ECS Express Gateway Service monitoring.""" + +import asyncio +import time + +from botocore.exceptions import ClientError + +from awscli.customizations.ecs.exceptions import MonitoringError +from awscli.customizations.ecs.expressgateway.stream_display import ( + StreamDisplay, +) +from awscli.customizations.utils import uni_print + + +class DisplayStrategy: + """Base class for display strategies. + + Each strategy controls its own execution model, timing, and output format. + """ + + def execute(self, collector, start_time, timeout_minutes): + """Execute the monitoring loop. + + Args: + collector: ServiceViewCollector instance for data fetching + start_time: Start timestamp for timeout calculation + timeout_minutes: Maximum monitoring duration in minutes + """ + raise NotImplementedError + + +class InteractiveDisplayStrategy(DisplayStrategy): + """Interactive display strategy with async spinner and keyboard navigation. + + Uses dual async tasks: + - Data task: Polls ECS APIs every 5 seconds + - Spinner task: Updates display every 100ms with rotating spinner + """ + + def __init__(self, display, use_color): + self.display = display + self.use_color = use_color + + def execute(self, collector, start_time, timeout_minutes): + """Execute async monitoring with spinner and keyboard controls.""" + final_output, timed_out = asyncio.run( + self._execute_async(collector, start_time, timeout_minutes) + ) + if timed_out: + uni_print(final_output + "\nMonitoring timed out!\n") + else: + uni_print(final_output + "\nMonitoring Complete!\n") + + async def _execute_async(self, collector, start_time, timeout_minutes): + """Async execution with dual tasks for data and spinner.""" + spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + spinner_index = 0 + current_output = "Waiting for initial data" + timed_out = False + + async def update_data(): + nonlocal current_output, timed_out + while True: + current_time = time.time() + if current_time - start_time > timeout_minutes * 60: + timed_out = True + self.display.app.exit() + break + + try: + loop = asyncio.get_event_loop() + new_output = await loop.run_in_executor( + None, collector.get_current_view, "{SPINNER}" + ) + current_output = new_output + except ClientError as e: + if ( + e.response.get('Error', {}).get('Code') + == 'InvalidParameterException' + ): + error_message = e.response.get('Error', {}).get( + 'Message', '' + ) + if ( + "Cannot call DescribeServiceRevisions for a service that is INACTIVE" + in error_message + ): + current_output = "Service is inactive" + else: + raise + else: + raise + + await asyncio.sleep(5.0) + + async def update_spinner(): + nonlocal spinner_index + while True: + spinner_char = spinner_chars[spinner_index] + display_output = current_output.replace( + "{SPINNER}", spinner_char + ) + status_text = f"Getting updates... {spinner_char} | up/down to scroll, q to quit" + self.display.display(display_output, status_text) + spinner_index = (spinner_index + 1) % len(spinner_chars) + await asyncio.sleep(0.1) + + data_task = asyncio.create_task(update_data()) + spinner_task = asyncio.create_task(update_spinner()) + display_task = None + + try: + display_task = asyncio.create_task(self.display.run()) + + done, pending = await asyncio.wait( + [display_task, data_task], return_when=asyncio.FIRST_COMPLETED + ) + + if data_task in done: + await data_task + + finally: + spinner_task.cancel() + if display_task is not None and not display_task.done(): + display_task.cancel() + try: + await display_task + except asyncio.CancelledError: + pass + + return current_output.replace("{SPINNER}", ""), timed_out + + +class TextOnlyDisplayStrategy(DisplayStrategy): + """Text-only display strategy with diff detection and timestamped output. + + Uses synchronous polling loop with change detection to output only + individual resource changes with timestamps. + """ + + def __init__(self, use_color): + self.stream_display = StreamDisplay(use_color) + + def execute(self, collector, start_time, timeout_minutes): + """Execute synchronous monitoring with text output.""" + self.stream_display.show_startup_message() + + try: + while True: + current_time = time.time() + if current_time - start_time > timeout_minutes * 60: + self.stream_display.show_timeout_message() + break + + try: + self.stream_display.show_polling_message() + + collector.get_current_view("") + + # Extract cached result for diff detection + managed_resources, info = collector.cached_monitor_result + + self.stream_display.show_monitoring_data( + managed_resources, info + ) + + except ClientError as e: + if ( + e.response.get('Error', {}).get('Code') + == 'InvalidParameterException' + ): + error_message = e.response.get('Error', {}).get( + 'Message', '' + ) + if ( + "Cannot call DescribeServiceRevisions for a service that is INACTIVE" + in error_message + ): + self.stream_display.show_service_inactive_message() + break + else: + raise + else: + raise + + time.sleep(5.0) + + except KeyboardInterrupt: + self.stream_display.show_user_stop_message() + except MonitoringError as e: + self.stream_display.show_error_message(e) + finally: + self.stream_display.show_completion_message() diff --git a/awscli/customizations/ecs/expressgateway/managedresource.py b/awscli/customizations/ecs/expressgateway/managedresource.py index f4b3303bf678..23581c92a5c6 100644 --- a/awscli/customizations/ecs/expressgateway/managedresource.py +++ b/awscli/customizations/ecs/expressgateway/managedresource.py @@ -113,6 +113,52 @@ def get_status_string(self, spinner_char, depth=0, use_color=True): lines.append("") return '\n'.join(lines) + def get_stream_string(self, timestamp, use_color=True): + """Returns the resource information formatted for stream/text-only display. + + Args: + timestamp (str): Timestamp string to prefix the output + use_color (bool): Whether to use ANSI color codes (default: True) + + Returns: + str: Formatted string with timestamp prefix and bracket-enclosed status + """ + lines = [] + parts = [f"[{timestamp}]"] + + if self.resource_type: + parts.append( + self.color_utils.make_cyan(self.resource_type, use_color) + ) + + if self.identifier: + colored_id = self.color_utils.color_by_status( + self.identifier, self.status, use_color + ) + parts.append(colored_id) + + if self.status: + status_text = self.color_utils.color_by_status( + self.status, self.status, use_color + ) + parts.append(f"[{status_text}]") + + lines.append(" ".join(parts)) + + if self.reason: + lines.append(f" Reason: {self.reason}") + + if self.updated_at: + updated_time = datetime.fromtimestamp(self.updated_at).strftime( + "%Y-%m-%d %H:%M:%S" + ) + lines.append(f" Last Updated At: {updated_time}") + + if self.additional_info: + lines.append(f" Info: {self.additional_info}") + + return "\n".join(lines) + def combine(self, other_resource): """Returns the version of the resource which has the most up to date timestamp. @@ -130,22 +176,28 @@ def combine(self, other_resource): else other_resource ) - def diff(self, other_resource): - """Returns a tuple of (self_diff, other_diff) for resources that are different. + def compare_properties(self, other_resource): + """Compares individual resource properties to detect changes. + + This compares properties like status, reason, updated_at, additional_info + to detect if a resource has changed between polls. Args: other_resource (ManagedResource): Resource to compare against Returns: - tuple: (self_diff, other_diff) where: - - self_diff (ManagedResource): This resource if different, None if same - - other_diff (ManagedResource): Other resource if different, None if same + bool: True if properties differ, False if same """ if not other_resource: - return (self, None) - if ( + # No previous resource means it's new/different + return True + + # Resources are different if any field differs + return ( self.resource_type != other_resource.resource_type or self.identifier != other_resource.identifier - ): - return (self, other_resource) - return (None, None) + or self.status != other_resource.status + or self.reason != other_resource.reason + or self.updated_at != other_resource.updated_at + or self.additional_info != other_resource.additional_info + ) diff --git a/awscli/customizations/ecs/expressgateway/managedresourcegroup.py b/awscli/customizations/ecs/expressgateway/managedresourcegroup.py index b5643bc3b355..959cdf745678 100644 --- a/awscli/customizations/ecs/expressgateway/managedresourcegroup.py +++ b/awscli/customizations/ecs/expressgateway/managedresourcegroup.py @@ -39,7 +39,7 @@ def __init__( ): self.resource_type = resource_type self.identifier = identifier - # maintain input ordering + # Maintain input ordering self.sorted_resource_keys = [ self._create_key(resource) for resource in resources ] @@ -57,6 +57,90 @@ def _create_key(self, resource): identifier = resource.identifier if resource.identifier else "" return resource_type + "/" + identifier + def get_stream_string(self, timestamp, use_color=True): + """Returns flattened stream strings for all resources in the group. + + Args: + timestamp (str): Timestamp string to prefix each resource + use_color (bool): Whether to use ANSI color codes (default: True) + + Returns: + str: All flattened resources formatted for stream display, separated by newlines + """ + from awscli.customizations.ecs.expressgateway.managedresource import ( + ManagedResource, + ) + + flat_resources = [] + + for resource in self.resource_mapping.values(): + if isinstance(resource, ManagedResourceGroup): + # Recursively flatten nested groups + nested = resource.get_stream_string(timestamp, use_color) + if nested: + flat_resources.append(nested) + elif isinstance(resource, ManagedResource): + # Get stream string for individual resource + flat_resources.append( + resource.get_stream_string(timestamp, use_color) + ) + + return "\n".join(flat_resources) + + def get_changed_resources(self, previous_resources_dict): + """Get flattened list of resources that have changed properties. + + Compares individual resource properties (status, reason, updated_at, etc.) + against previous state to detect changes. This is used for change detection + in TEXT-ONLY mode, NOT for DEPLOYMENT diff (use compare_resource_sets for that). + + Args: + previous_resources_dict: Dict of {(resource_type, identifier): ManagedResource} + from previous poll. Can be empty dict for first poll. + + Returns: + tuple: (changed_resources, updated_dict, removed_keys) + - changed_resources: List of ManagedResource that changed or None if no changes + - updated_dict: Updated dict with current resources for next comparison + - removed_keys: Set of keys that were removed since last poll + """ + current_resources = self._flatten_to_list() + changed_resources = [] + updated_dict = {} + + for resource in current_resources: + resource_key = (resource.resource_type, resource.identifier) + previous_resource = previous_resources_dict.get(resource_key) + + if not previous_resource: + changed_resources.append(resource) + else: + if resource.compare_properties(previous_resource): + changed_resources.append(resource) + + updated_dict[resource_key] = resource + + current_keys = { + (r.resource_type, r.identifier) for r in current_resources + } + removed_keys = set(previous_resources_dict.keys()) - current_keys + + return ( + changed_resources if changed_resources else None, + updated_dict, + removed_keys, + ) + + def _flatten_to_list(self): + """Flatten this resource group into a list of individual resources.""" + flat_list = [] + for resource in self.resource_mapping.values(): + if isinstance(resource, ManagedResourceGroup): + flat_list.extend(resource._flatten_to_list()) + elif isinstance(resource, ManagedResource): + flat_list.append(resource) + return flat_list + def is_terminal(self): return not self.resource_mapping or all( [ @@ -188,8 +272,12 @@ def _combine_child_resources(self, resource_a, resource_b): else: return resource_b - def diff(self, other_resource_group): - """Returns two ManagedResourceGroups representing unique resources in each group. + def compare_resource_sets(self, other_resource_group): + """Compares resource SETS between two groups to find additions/removals. + + This is used for DEPLOYMENT view to show which resources were added or removed + between service configurations, NOT for detecting property changes within + individual resources (that's compare_properties() in ManagedResource). Args: other_resource_group (ManagedResourceGroup): Resource group to compare against @@ -218,7 +306,7 @@ def diff(self, other_resource_group): common_keys = self_keys & other_keys common_diff = { - key: self.resource_mapping[key].diff( + key: self.resource_mapping[key].compare_resource_sets( other_resource_group.resource_mapping.get(key) ) for key in common_keys diff --git a/awscli/customizations/ecs/expressgateway/stream_display.py b/awscli/customizations/ecs/expressgateway/stream_display.py new file mode 100644 index 000000000000..626ee17f099c --- /dev/null +++ b/awscli/customizations/ecs/expressgateway/stream_display.py @@ -0,0 +1,110 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Stream display implementation for ECS Express Gateway Service monitoring.""" + +import time + +from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( + ManagedResourceGroup, +) +from awscli.customizations.utils import uni_print + + +class StreamDisplay: + """Stream display for monitoring that outputs changes to stdout. + + Provides text-based monitoring output suitable for non-interactive + environments, logging, or piping to other commands. + """ + + def __init__(self, use_color=True): + self.previous_resources_by_key = {} + self.use_color = use_color + + def show_startup_message(self): + """Show startup message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Starting monitoring...\n") + + def show_polling_message(self): + """Show polling message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Polling for updates...\n") + + def show_monitoring_data(self, resource_group, info): + """Show monitoring data for resources with diff detection. + + Args: + resource_group: ManagedResourceGroup or None + info: Additional info text to display + """ + timestamp = self._get_timestamp() + + if resource_group: + ( + changed_resources, + updated_dict, + removed_keys, + ) = resource_group.get_changed_resources( + self.previous_resources_by_key + ) + self.previous_resources_by_key = updated_dict + + if changed_resources: + self._print_flattened_resources_list( + changed_resources, timestamp + ) + + if info: + uni_print(f"[{timestamp}] {info}\n") + + def _print_flattened_resources_list(self, resources_list, timestamp): + """Print individual resources from a flat list as timestamped lines. + + Args: + resources_list: List of ManagedResource objects to print + timestamp: Timestamp string to prefix each line + """ + for resource in resources_list: + output = resource.get_stream_string(timestamp, self.use_color) + uni_print(output + "\n") + + def show_timeout_message(self): + """Show timeout message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Monitoring timeout reached!\n") + + def show_service_inactive_message(self): + """Show service inactive message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Service is inactive\n") + + def show_completion_message(self): + """Show completion message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Monitoring complete!\n") + + def show_user_stop_message(self): + """Show user stop message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Monitoring stopped by user\n") + + def show_error_message(self, error): + """Show error message.""" + timestamp = self._get_timestamp() + uni_print(f"[{timestamp}] Error: {error}\n") + + def _get_timestamp(self): + """Get formatted timestamp.""" + return time.strftime("%Y-%m-%d %H:%M:%S") diff --git a/awscli/customizations/ecs/monitorexpressgatewayservice.py b/awscli/customizations/ecs/monitorexpressgatewayservice.py index 3004fe69ecc9..64a8e3089941 100644 --- a/awscli/customizations/ecs/monitorexpressgatewayservice.py +++ b/awscli/customizations/ecs/monitorexpressgatewayservice.py @@ -18,10 +18,14 @@ allowing users to track resource creation progress, deployment status, and service health through an interactive command-line interface with live updates and visual indicators. -The module implements two primary monitoring modes: +The module implements two resource view modes: - RESOURCE: Displays all resources associated with the service - DEPLOYMENT: Shows resources that have changed in the most recent deployment +And two display modes: +- INTERACTIVE: Real-time display with spinner and keyboard navigation (requires TTY) +- TEXT-ONLY: Text output with timestamps and change detection (works without TTY) + Key Features: - Real-time progress monitoring with spinner animations - Diff-based resource tracking for deployment changes @@ -33,30 +37,25 @@ ECSExpressGatewayServiceWatcher: Core monitoring logic and resource tracking Usage: - aws ecs monitor-express-gateway-service --service-arn [--resource-view RESOURCE|DEPLOYMENT] + aws ecs monitor-express-gateway-service --service-arn [--resource-view RESOURCE|DEPLOYMENT] [--mode INTERACTIVE|TEXT-ONLY] """ -import asyncio import sys -import threading import time -from functools import reduce from botocore.exceptions import ClientError from awscli.customizations.commands import BasicCommand from awscli.customizations.ecs.exceptions import MonitoringError -from awscli.customizations.ecs.expressgateway.managedresource import ( - ManagedResource, -) -from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( - ManagedResourceGroup, +from awscli.customizations.ecs.expressgateway.display_strategy import ( + DisplayStrategy, + InteractiveDisplayStrategy, + TextOnlyDisplayStrategy, ) from awscli.customizations.ecs.prompt_toolkit_display import Display +from awscli.customizations.ecs.serviceviewcollector import ServiceViewCollector from awscli.customizations.utils import uni_print -TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" - class ECSMonitorExpressGatewayService(BasicCommand): """AWS CLI command for monitoring ECS Express Gateway Service deployments. @@ -69,14 +68,16 @@ class ECSMonitorExpressGatewayService(BasicCommand): DESCRIPTION = ( "Monitors the progress of resource creation for an ECS Express Gateway Service. " - "This command provides real-time monitoring of service deployments with interactive " - "progress display, showing the status of load balancers, security groups, auto-scaling " + "This command provides real-time monitoring of service deployments showing the status " + "of load balancers, security groups, auto-scaling " "configurations, and other AWS resources as they are created or updated. " "Use ``--resource-view RESOURCE`` to view all service resources, or ``--resource-view DEPLOYMENT`` to track only " "resources that have changed in the most recent deployment. " - "The command requires a terminal (TTY) to run and the monitoring session continues " - "until manually stopped by the user or the specified timeout is reached. " - "Use keyboard shortcuts to navigate: up/down to scroll through resources, 'q' to quit monitoring." + "Choose ``--mode INTERACTIVE`` for real-time display with keyboard navigation (requires TTY), " + "or ``--mode text-only`` for text output with timestamps (works without TTY). " + "The monitoring session continues until manually stopped by the user or the specified timeout is reached. " + "In interactive mode, use keyboard shortcuts: up/down to scroll through resources, 'q' to quit. " + "In TEXT-ONLY mode, press Ctrl+C to stop monitoring." ) ARG_TABLE = [ @@ -101,6 +102,16 @@ class ECSMonitorExpressGatewayService(BasicCommand): 'default': 'RESOURCE', 'choices': ['RESOURCE', 'DEPLOYMENT'], }, + { + 'name': 'mode', + 'help_text': ( + "Display mode for monitoring output. " + "interactive (default if TTY available) - Real-time display with spinner and keyboard navigation. " + "text-only - Text output with timestamps and change detection (works without TTY)." + ), + 'required': False, + 'choices': ['interactive', 'text-only'], + }, { 'name': 'timeout', 'help_text': ( @@ -131,15 +142,12 @@ def _run_main(self, parsed_args, parsed_globals): parsed_globals: Global CLI configuration including region and endpoint """ try: - # Check if running in a TTY for interactive display - if not sys.stdout.isatty(): - uni_print( - "Error: This command requires a TTY. " - "Please run this command in a terminal.", - sys.stderr, - ) - return 1 + display_mode = self._determine_display_mode(parsed_args.mode) + except ValueError as e: + uni_print(str(e), sys.stderr) + return 1 + try: self._client = self._session.create_client( 'ecs', region_name=parsed_globals.region, @@ -154,14 +162,52 @@ def _run_main(self, parsed_args, parsed_globals): self._client, parsed_args.service_arn, parsed_args.resource_view, + display_mode, timeout_minutes=parsed_args.timeout, use_color=use_color, ).exec() except MonitoringError as e: uni_print(f"Error monitoring service: {e}", sys.stderr) + return 1 + + def _determine_display_mode(self, requested_mode): + """Determine and validate the display mode. + + Args: + requested_mode: User-requested mode ('interactive', 'text-only', or None) + + Returns: + str: Validated display mode ('interactive' or 'text-only') + + Raises: + ValueError: If interactive mode is requested without TTY + """ + # Determine display mode with auto-detection + if requested_mode is None: + # Auto-detect: interactive if TTY available, else text-only + return 'interactive' if sys.stdout.isatty() else 'text-only' + + # Validate requested mode + if requested_mode == 'interactive': + if not sys.stdout.isatty(): + raise ValueError( + "Error: Interactive mode requires a TTY (terminal). " + "Use --mode text-only for non-interactive environments." + ) + return 'interactive' + + # text-only mode doesn't require TTY + return requested_mode def _should_use_color(self, parsed_globals): - """Determine if color output should be used based on global settings.""" + """Determine if color output should be used based on global settings. + + Args: + parsed_globals: Global CLI configuration + + Returns: + bool: True if color should be used + """ if parsed_globals.color == 'on': return True elif parsed_globals.color == 'off': @@ -180,7 +226,8 @@ class ECSExpressGatewayServiceWatcher: Args: client: ECS client for API calls service_arn (str): ARN of the service to monitor - mode (str): Monitoring mode - 'RESOURCE' or 'DEPLOYMENT' + resource_view (str): Resource view mode - 'RESOURCE' or 'DEPLOYMENT' + display_mode (str): Display mode - 'INTERACTIVE' or 'TEXT-ONLY' timeout_minutes (int): Maximum monitoring time in minutes (default: 30) """ @@ -188,642 +235,42 @@ def __init__( self, client, service_arn, - mode, + resource_view, + display_mode, timeout_minutes=30, display=None, use_color=True, + collector=None, ): self._client = client self.service_arn = service_arn - self.mode = mode - self.timeout_minutes = timeout_minutes - self.last_described_gateway_service_response = None - self.last_execution_time = 0 - self.cached_monitor_result = None + self.display_mode = display_mode self.start_time = time.time() - self.use_color = use_color + self.timeout_minutes = timeout_minutes + self.collector = collector or ServiceViewCollector( + client, service_arn, resource_view, use_color + ) self.display = display or Display() - @staticmethod - def is_monitoring_available(): - """Check if monitoring is available (requires TTY).""" - return sys.stdout.isatty() - def exec(self): - """Start monitoring the express gateway service with progress display.""" - - def monitor_service(spinner_char): - return self._monitor_express_gateway_service( - spinner_char, self.service_arn, self.mode - ) - - asyncio.run(self._execute_with_progress_async(monitor_service, 100)) - - async def _execute_with_progress_async( - self, execution, progress_refresh_millis, execution_refresh_millis=5000 - ): - """Execute monitoring loop with animated progress display.""" - spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" - spinner_index = 0 - - # Initialize with basic service resource - service_resource = ManagedResource("Service", self.service_arn) - initial_output = service_resource.get_status_string( - spinner_char="{SPINNER}", use_color=self.use_color + """Execute monitoring using the appropriate display strategy.""" + strategy = self._create_display_strategy() + strategy.execute( + collector=self.collector, + start_time=self.start_time, + timeout_minutes=self.timeout_minutes, ) - current_output = initial_output - - async def update_data(): - nonlocal current_output - while True: - current_time = time.time() - if current_time - self.start_time > self.timeout_minutes * 60: - break - try: - loop = asyncio.get_event_loop() - new_output = await loop.run_in_executor( - None, execution, "{SPINNER}" - ) - current_output = new_output - except ClientError as e: - if ( - e.response.get('Error', {}).get('Code') - == 'InvalidParameterException' - ): - error_message = e.response.get('Error', {}).get( - 'Message', '' - ) - if ( - "Cannot call DescribeServiceRevisions for a service that is INACTIVE" - in error_message - ): - current_output = "Service is inactive" - else: - raise - else: - raise - await asyncio.sleep(execution_refresh_millis / 1000.0) - - async def update_spinner(): - nonlocal spinner_index - while True: - spinner_char = spinner_chars[spinner_index] - display_output = current_output.replace( - "{SPINNER}", spinner_char - ) - status_text = f"Getting updates... {spinner_char} | up/down to scroll, q to quit" - self.display.display(display_output, status_text) - spinner_index = (spinner_index + 1) % len(spinner_chars) - await asyncio.sleep(progress_refresh_millis / 1000.0) - # Start both tasks - data_task = asyncio.create_task(update_data()) - spinner_task = asyncio.create_task(update_spinner()) - - try: - await self.display.run() - finally: - data_task.cancel() - spinner_task.cancel() - final_output = current_output.replace("{SPINNER}", "") - uni_print(final_output + "\nMonitoring Complete!\n") - - def _monitor_express_gateway_service( - self, spinner_char, service_arn, mode, execution_refresh_millis=5000 - ): - """Monitor service status and return formatted output. - - Args: - spinner_char (char): Character to print representing progress (unused with single spinner) - execution_refresh_millis (int): Refresh interval in milliseconds - service_arn (str): Service ARN to monitor - mode (str): Monitoring mode ('RESOURCE' or 'DEPLOYMENT') + def _create_display_strategy(self): + """Create display strategy based on display mode. Returns: - str: Formatted status output + DisplayStrategy: Appropriate strategy for the selected mode """ - current_time = time.time() - - if ( - current_time - self.last_execution_time - >= execution_refresh_millis / 1000.0 - ): - try: - describe_gateway_service_response = ( - self._client.describe_express_gateway_service( - serviceArn=service_arn - ) - ) - if not describe_gateway_service_response: - self.cached_monitor_result = ( - None, - "Trying to describe gateway service", - ) - elif ( - not ( - service := describe_gateway_service_response.get( - "service" - ) - ) - or not service.get("serviceArn") - or not service.get("activeConfigurations") - ): - self.cached_monitor_result = ( - None, - "Trying to describe gateway service", - ) - else: - self.last_described_gateway_service_response = ( - describe_gateway_service_response - ) - described_gateway_service = ( - describe_gateway_service_response.get("service") - ) - - if mode == "DEPLOYMENT": - managed_resources, info = self._diff_service_view( - described_gateway_service - ) - else: - managed_resources, info = self._combined_service_view( - described_gateway_service - ) - - service_resources = [ - self._parse_cluster(described_gateway_service), - self._parse_service(described_gateway_service), - ] - if managed_resources: - service_resources.append(managed_resources) - service_resource = ManagedResourceGroup( - resources=service_resources - ) - self._update_cached_monitor_results(service_resource, info) - except ClientError as e: - if ( - e.response.get('Error', {}).get('Code') - == 'InvalidParameterException' - ): - error_message = e.response.get('Error', {}).get( - 'Message', '' - ) - if ( - "Cannot call DescribeServiceRevisions for a service that is INACTIVE" - in error_message - ): - empty_resource_group = ManagedResourceGroup() - self._update_cached_monitor_results( - empty_resource_group, "Service is inactive" - ) - else: - raise - else: - raise - - self.last_execution_time = current_time - - if not self.cached_monitor_result: - return "Waiting for initial data" + if self.display_mode == 'text-only': + return TextOnlyDisplayStrategy(use_color=self.collector.use_color) else: - # Generate the output every iteration. This allow the underlying resources to utilize spinners - service_resource, info = self.cached_monitor_result - status_string = ( - service_resource.get_status_string( - spinner_char=spinner_char, use_color=self.use_color - ) - if service_resource - else None - ) - - output = "\n".join([x for x in [status_string, info] if x]) - return output - - def _diff_service_view(self, describe_gateway_service_response): - """Generate diff view showing changes in the latest deployment. - - Computes differences between source and target service revisions to show - what resources are being updated or disassociated in the current deployment. - - Args: - describe_gateway_service_response (dict): Service description from API - - Returns: - tuple: (resources, info_output) where: - - resources (ManagedResourceGroup): Diff view of resources - - info_output (str): Informational messages - """ - service_arn = describe_gateway_service_response.get("serviceArn") - list_service_deployments_response = ( - self._client.list_service_deployments( - service=service_arn, maxResults=1 + return InteractiveDisplayStrategy( + display=self.display, + use_color=self.collector.use_color, ) - ) - listed_service_deployments = self._validate_and_parse_response( - list_service_deployments_response, - "ListServiceDeployments", - expected_field="serviceDeployments", - ) - if ( - not listed_service_deployments - or "serviceDeploymentArn" not in listed_service_deployments[0] - ): - return ( - None, - "Waiting for a deployment to start", - ) - - deployment_arn = listed_service_deployments[0].get( - "serviceDeploymentArn" - ) - - describe_service_deployments_response = ( - self._client.describe_service_deployments( - serviceDeploymentArns=[deployment_arn] - ) - ) - described_service_deployments = self._validate_and_parse_response( - describe_service_deployments_response, - "DescribeServiceDeployments", - expected_field="serviceDeployments", - eventually_consistent=True, - ) - described_service_deployment = described_service_deployments[0] - if ( - not described_service_deployment - or not described_service_deployment.get("targetServiceRevision") - ): - return ( - None, - "Waiting for a deployment to start", - ) - - target_sr = described_service_deployment.get( - "targetServiceRevision" - ).get("arn") - - target_sr_resources_list, described_target_sr_list = ( - self._describe_and_parse_service_revisions([target_sr]) - ) - if len(target_sr_resources_list) != 1: - return (None, "Trying to describe service revisions") - target_sr_resources = target_sr_resources_list[0] - described_target_sr = described_target_sr_list[0] - - task_def_arn = described_target_sr.get("taskDefinition") - if "sourceServiceRevisions" in described_service_deployment: - source_sr_resources, _ = ( - self._describe_and_parse_service_revisions( - [ - sr.get("arn") - for sr in described_service_deployment.get( - "sourceServiceRevisions" - ) - ] - ) - ) - if len(source_sr_resources) != len( - described_service_deployment.get("sourceServiceRevisions") - ): - return (None, "Trying to describe service revisions)") - source_sr_resources_combined = reduce( - lambda x, y: x.combine(y), source_sr_resources - ) - else: - source_sr_resources_combined = ManagedResourceGroup() - - updating_resources, disassociating_resources = ( - target_sr_resources.diff(source_sr_resources_combined) - ) - updating_resources.resource_type = "Updating" - disassociating_resources.resource_type = "Disassociating" - service_resources = ManagedResourceGroup( - resource_type="Deployment", - identifier=deployment_arn, - status=described_service_deployment.get("status"), - reason=described_service_deployment.get("statusReason"), - resources=[ - ManagedResource( - resource_type="TargetServiceRevision", identifier=target_sr - ), - ManagedResource( - resource_type="TaskDefinition", identifier=task_def_arn - ), - updating_resources, - disassociating_resources, - ], - ) - return service_resources, None - - def _combined_service_view(self, describe_gateway_service_response): - """Generate combined view of all active service resources. - - Extracts and combines resources from all active service configurations, - resolving conflicts by taking the version with the latest timestamp. - - Args: - describe_gateway_service_response (dict): Service description from API - - Returns: - tuple: (resources, info_output) where: - - resources (ManagedResourceGroup): Combined view of all resources - - info_output (str): Informational messages - """ - service_revision_arns = [ - config.get("serviceRevisionArn") - for config in describe_gateway_service_response.get( - "activeConfigurations" - ) - ] - service_revision_resources, _ = ( - self._describe_and_parse_service_revisions(service_revision_arns) - ) - - if len(service_revision_resources) != len(service_revision_arns): - return (None, "Trying to describe service revisions") - - service_resource = reduce( - lambda x, y: x.combine(y), service_revision_resources - ) - - return service_resource, None - - def _update_cached_monitor_results(self, resource, info): - """Update cached monitoring results with new data. - - Args: - resource: New resource data (replaces existing if provided) - info: New info message (always replaces existing) - """ - if not self.cached_monitor_result: - self.cached_monitor_result = (resource, info) - else: - self.cached_monitor_result = ( - resource or self.cached_monitor_result[0], - info, - ) - - def _validate_and_parse_response( - self, - response, - operation_name, - expected_field=None, - eventually_consistent=False, - ): - """Validate API response and extract expected field. - - Args: - response: API response to validate - operation_name: Name of the operation for error messages - expected_field: Field to extract from response (optional) - eventually_consistent: Whether to filter out MISSING failures - - Returns: - Extracted field value or None if no expected_field specified - - Raises: - MonitoringError: If response is invalid or missing required fields - """ - if not response: - raise MonitoringError(f"{operation_name} response is empty") - - self._parse_failures(response, operation_name, eventually_consistent) - - if not expected_field: - return None - - if response.get(expected_field) is None: - raise MonitoringError( - f"{operation_name} response is missing {expected_field}" - ) - return response.get(expected_field) - - def _parse_failures(self, response, operation_name, eventually_consistent): - """Parse and raise errors for API response failures. - - Args: - response: API response to check for failures - operation_name: Name of the operation for error messages - eventually_consistent: Whether to filter out MISSING failures for eventually consistent operations - - Raises: - MonitoringError: If failures are found in the response - """ - failures = response.get("failures") - - if not failures: - return - - if any(not f.get('arn') or not f.get('reason') for f in failures): - raise MonitoringError( - "Invalid failure response: missing arn or reason" - ) - - if eventually_consistent: - failures = [ - failure - for failure in failures - if failure.get("reason") != "MISSING" - ] - - if not failures: - return - - failure_msgs = [ - f"{f['arn']} failed with {f['reason']}" for f in failures - ] - joined_msgs = '\n'.join(failure_msgs) - raise MonitoringError(f"{operation_name}:\n{joined_msgs}") - - def _describe_and_parse_service_revisions(self, arns): - """Describe and parse service revisions into managed resources. - - Args: - arns (list): List of service revision ARNs to describe - - Returns: - tuple: (parsed_resources, described_revisions) where: - - parsed_resources (list): List of ManagedResourceGroup objects - - described_revisions (list): Raw API response data - """ - # API supports up to 20 arns, DescribeExpressGatewayService should never return more than 5 - describe_service_revisions_response = ( - self._client.describe_service_revisions(serviceRevisionArns=arns) - ) - described_service_revisions = self._validate_and_parse_response( - describe_service_revisions_response, - "DescribeServiceRevisions", - expected_field="serviceRevisions", - eventually_consistent=True, - ) - - return [ - self._parse_ecs_managed_resources(sr) - for sr in described_service_revisions - ], described_service_revisions - - def _parse_cluster(self, service): - return ManagedResource("Cluster", service.get("cluster")) - - def _parse_service(self, service): - service_arn = service.get("serviceArn") - cluster = service.get("cluster") - describe_service_response = self._client.describe_services( - cluster=cluster, services=[service_arn] - ) - described_service = self._validate_and_parse_response( - describe_service_response, "DescribeServices", "services" - )[0] - return ManagedResource( - "Service", - service.get("serviceArn"), - additional_info=described_service - and described_service.get("events")[0].get("message") - if described_service.get("events") - else None, - ) - - def _parse_ecs_managed_resources(self, service_revision): - managed_resources = service_revision.get("ecsManagedResources") - if not managed_resources: - return ManagedResourceGroup() - - parsed_resources = [] - if "ingressPaths" in managed_resources: - parsed_resources.append( - ManagedResourceGroup( - resource_type="IngressPaths", - resources=[ - self._parse_ingress_path_resources(ingress_path) - for ingress_path in managed_resources.get( - "ingressPaths" - ) - ], - ) - ) - if "autoScaling" in managed_resources: - parsed_resources.append( - self._parse_auto_scaling_configuration( - managed_resources.get("autoScaling") - ) - ) - if "metricAlarms" in managed_resources: - parsed_resources.append( - self._parse_metric_alarms( - managed_resources.get("metricAlarms") - ) - ) - if "serviceSecurityGroups" in managed_resources: - parsed_resources.append( - self._parse_service_security_groups( - managed_resources.get("serviceSecurityGroups") - ) - ) - if "logGroups" in managed_resources: - parsed_resources.append( - self._parse_log_groups(managed_resources.get("logGroups")) - ) - return ManagedResourceGroup(resources=parsed_resources) - - def _parse_ingress_path_resources(self, ingress_path): - resources = [] - if ingress_path.get("loadBalancer"): - resources.append( - self._parse_managed_resource( - ingress_path.get("loadBalancer"), "LoadBalancer" - ) - ) - if ingress_path.get("loadBalancerSecurityGroups"): - resources.extend( - self._parse_managed_resource_list( - ingress_path.get("loadBalancerSecurityGroups"), - "LoadBalancerSecurityGroup", - ) - ) - if ingress_path.get("certificate"): - resources.append( - self._parse_managed_resource( - ingress_path.get("certificate"), "Certificate" - ) - ) - if ingress_path.get("listener"): - resources.append( - self._parse_managed_resource( - ingress_path.get("listener"), "Listener" - ) - ) - if ingress_path.get("rule"): - resources.append( - self._parse_managed_resource(ingress_path.get("rule"), "Rule") - ) - if ingress_path.get("targetGroups"): - resources.extend( - self._parse_managed_resource_list( - ingress_path.get("targetGroups"), "TargetGroup" - ) - ) - return ManagedResourceGroup( - resource_type="IngressPath", - identifier=ingress_path.get("endpoint"), - resources=resources, - ) - - def _parse_auto_scaling_configuration(self, auto_scaling_configuration): - resources = [] - if auto_scaling_configuration.get("scalableTarget"): - resources.append( - self._parse_managed_resource( - auto_scaling_configuration.get("scalableTarget"), - "ScalableTarget", - ) - ) - if auto_scaling_configuration.get("applicationAutoScalingPolicies"): - resources.extend( - self._parse_managed_resource_list( - auto_scaling_configuration.get( - "applicationAutoScalingPolicies" - ), - "ApplicationAutoScalingPolicy", - ) - ) - return ManagedResourceGroup( - resource_type="AutoScalingConfiguration", resources=resources - ) - - def _parse_metric_alarms(self, metric_alarms): - return ManagedResourceGroup( - resource_type="MetricAlarms", - resources=self._parse_managed_resource_list( - metric_alarms, "MetricAlarm" - ), - ) - - def _parse_service_security_groups(self, service_security_groups): - return ManagedResourceGroup( - resource_type="ServiceSecurityGroups", - resources=self._parse_managed_resource_list( - service_security_groups, "SecurityGroup" - ), - ) - - def _parse_log_groups(self, logs_groups): - return ManagedResourceGroup( - resource_type="LogGroups", - resources=self._parse_managed_resource_list( - logs_groups, "LogGroup" - ), - ) - - def _parse_managed_resource(self, resource, resource_type): - return ManagedResource( - resource_type, - resource.get("arn"), - status=resource.get("status"), - updated_at=resource.get("updatedAt"), - reason=resource.get("statusReason"), - ) - - def _parse_managed_resource_list(self, data_list, resource_type): - return [ - self._parse_managed_resource(data, resource_type) - for data in data_list - ] diff --git a/awscli/customizations/ecs/monitormutatinggatewayservice.py b/awscli/customizations/ecs/monitormutatinggatewayservice.py index 682eb53e6678..d46df01262a9 100644 --- a/awscli/customizations/ecs/monitormutatinggatewayservice.py +++ b/awscli/customizations/ecs/monitormutatinggatewayservice.py @@ -64,7 +64,7 @@ def __call__(self, parser, namespace, values, option_string=None): class MonitoringResourcesArgument(CustomArgument): - """Custom CLI argument for enabling resource monitoring. + """Custom CLI argument for enabling resource monitoring with optional mode. Adds the --monitor-resources flag to gateway service commands, allowing users to opt into real-time monitoring of resource changes. @@ -74,14 +74,13 @@ def __init__(self, name): super().__init__( name, help_text=( - 'Enable live monitoring of service resource status. ' + 'Enable monitoring of service resource status. ' 'Specify ``DEPLOYMENT`` to show only resources that are being added or removed ' 'as part of the latest service deployment, or ``RESOURCE`` to show all resources ' 'from all active configurations of the service. ' 'Defaults based on operation type: create-express-gateway-service and ' 'update-express-gateway-service default to ``DEPLOYMENT`` mode. ' - 'delete-express-gateway-service defaults to ``RESOURCE`` mode. ' - 'Requires a terminal (TTY) to run.' + 'delete-express-gateway-service defaults to ``RESOURCE`` mode.' ), choices=['DEPLOYMENT', 'RESOURCE'], nargs='?', @@ -90,6 +89,23 @@ def __init__(self, name): ) +class MonitoringModeArgument(CustomArgument): + """Custom CLI argument for monitor display mode. Only used when --monitor-resources is specified.""" + + def __init__(self, name): + super().__init__( + name, + help_text=( + 'Display mode for monitoring output (requires --monitor-resources). ' + 'interactive (default if TTY available) - Real-time display with spinner and keyboard navigation. ' + 'text-only - Text output with timestamps, suitable for logging and non-interactive environments.' + ), + choices=['interactive', 'text-only'], + nargs='?', + dest='monitor_mode', + ) + + class MonitorMutatingGatewayService: """Event handler for monitoring gateway service mutations. @@ -110,6 +126,7 @@ def __init__(self, api, default_resource_view, watcher_class=None): self.session = None self.parsed_globals = None self.effective_resource_view = None + self.effective_mode = None self._watcher_class = watcher_class or ECSExpressGatewayServiceWatcher def before_building_argument_table_parser(self, session, **kwargs): @@ -131,6 +148,7 @@ def building_argument_table(self, argument_table, session, **kwargs): argument_table['monitor-resources'] = MonitoringResourcesArgument( 'monitor-resources' ) + argument_table['monitor-mode'] = MonitoringModeArgument('monitor-mode') def operation_args_parsed(self, parsed_args, parsed_globals, **kwargs): """Store monitoring flag state and globals after argument parsing. @@ -139,21 +157,71 @@ def operation_args_parsed(self, parsed_args, parsed_globals, **kwargs): parsed_args: Parsed command line arguments parsed_globals: Global CLI configuration """ + self._parse_and_validate_monitoring_args(parsed_args, parsed_globals) + + def _parse_and_validate_monitoring_args(self, parsed_args, parsed_globals): + """Parse and validate monitoring-related arguments. + + Extracts monitor_resources and monitor_mode from parsed_args, + validates their combination, and sets effective_resource_view + and effective_mode. + + Args: + parsed_args: Parsed command line arguments + parsed_globals: Global CLI configuration + + Raises: + ValueError: If monitor-mode is used without monitor-resources + """ # Store parsed_globals for later use self.parsed_globals = parsed_globals - # Get monitor_resources value and determine actual monitoring mode + # Parse monitor_resources to determine if monitoring is enabled monitor_value = getattr(parsed_args, 'monitor_resources', None) + self.effective_resource_view = self._parse_monitor_resources( + monitor_value + ) + # Parse and validate monitor_mode + mode_value = getattr(parsed_args, 'monitor_mode', None) + self.effective_mode = self._validate_and_parse_mode( + mode_value, self.effective_resource_view + ) + + def _parse_monitor_resources(self, monitor_value): + """Parse monitor_resources value to determine resource view. + + Args: + monitor_value: Value from --monitor-resources flag + + Returns: + str or None: Resource view mode (DEPLOYMENT/RESOURCE) or None + """ if monitor_value is None: - # Not specified, no monitoring - self.effective_resource_view = None + return None elif monitor_value == '__DEFAULT__': - # Flag specified without value, use default based on operation - self.effective_resource_view = self.default_resource_view + return self.default_resource_view else: - # Explicit choice provided (DEPLOYMENT or RESOURCE) - self.effective_resource_view = monitor_value + return monitor_value + + def _validate_and_parse_mode(self, mode_value, resource_view): + """Validate and parse the monitor mode value. + + Args: + mode_value: Value from --monitor-mode flag + resource_view: Effective resource view (None if not monitoring) + + Returns: + str: Display mode ('interactive' or 'text-only') + + Raises: + ValueError: If mode is specified without resource monitoring + """ + if mode_value is not None and resource_view is None: + raise ValueError( + "Error: --monitor-mode can only be used with --monitor-resources" + ) + return mode_value if mode_value else 'interactive' def after_call(self, parsed, context, http_response, **kwargs): """Start monitoring after successful API call if flag is enabled. @@ -171,13 +239,20 @@ def after_call(self, parsed, context, http_response, **kwargs): ).get('serviceArn'): return - # Check monitoring availability - if not self._watcher_class.is_monitoring_available(): + # Interactive mode requires TTY, text-only does not + # Default to text-only if no TTY available + if self.effective_mode == 'interactive' and not sys.stdout.isatty(): uni_print( - "Monitoring is not available (requires TTY). Skipping monitoring.\n", - out_file=sys.stderr, + "Error: Interactive mode requires a TTY (terminal). " + "Monitoring skipped. Use --monitor-mode text-only for non-interactive environments.\\n", + sys.stderr, ) return + elif self.effective_mode == 'interactive' and sys.stdout.isatty(): + pass # Interactive mode with TTY - OK + elif not sys.stdout.isatty(): + # No TTY - force text-only mode + self.effective_mode = 'text-only' if not self.session or not self.parsed_globals: uni_print( @@ -199,20 +274,13 @@ def after_call(self, parsed, context, http_response, **kwargs): # Clear output when monitoring is invoked parsed.clear() - try: - self._watcher_class( - ecs_client, - service_arn, - self.effective_resource_view, - use_color=self._should_use_color(self.parsed_globals), - ).exec() - except Exception as e: - uni_print( - "Encountered an error, terminating monitoring\n" - + str(e) - + "\n", - out_file=sys.stderr, - ) + self._watcher_class( + ecs_client, + service_arn, + self.effective_resource_view, + self.effective_mode, + use_color=self._should_use_color(self.parsed_globals), + ).exec() def _should_use_color(self, parsed_globals): """Determine if color output should be used based on global settings.""" diff --git a/awscli/customizations/ecs/serviceviewcollector.py b/awscli/customizations/ecs/serviceviewcollector.py new file mode 100644 index 000000000000..a9c28732d86f --- /dev/null +++ b/awscli/customizations/ecs/serviceviewcollector.py @@ -0,0 +1,538 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Service view collector for ECS Express Gateway Service monitoring. + +This module provides business logic for collecting and formatting +ECS Express Gateway Service monitoring data. +""" + +import time +from functools import reduce + +from botocore.exceptions import ClientError + +from awscli.customizations.ecs.exceptions import MonitoringError +from awscli.customizations.ecs.expressgateway.managedresource import ( + ManagedResource, +) +from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( + ManagedResourceGroup, +) + + +class ServiceViewCollector: + """Collects and formats ECS Express Gateway Service monitoring data. + + Responsible for: + - Making ECS API calls + - Parsing resource data + - Formatting output strings + - Caching responses + + Args: + client: ECS client for API calls + service_arn (str): ARN of the service to monitor + mode (str): Monitoring mode - 'RESOURCE' or 'DEPLOYMENT' + use_color (bool): Whether to use color in output + """ + + def __init__(self, client, service_arn, mode, use_color=True): + self._client = client + self.service_arn = service_arn + self.mode = mode + self.use_color = use_color + self.last_described_gateway_service_response = None + self.last_execution_time = 0 + self.cached_monitor_result = None + + def get_current_view( + self, spinner_char="{SPINNER}", execution_refresh_millis=5000 + ): + """Get current monitoring view as formatted string. + + Args: + spinner_char (str): Character for progress indication + execution_refresh_millis (int): Refresh interval in milliseconds + + Returns: + str: Formatted monitoring output + """ + current_time = time.time() + + if ( + current_time - self.last_execution_time + >= execution_refresh_millis / 1000.0 + ): + try: + describe_gateway_service_response = ( + self._client.describe_express_gateway_service( + serviceArn=self.service_arn + ) + ) + if not describe_gateway_service_response: + self.cached_monitor_result = ( + None, + "Trying to describe gateway service", + ) + elif ( + not ( + service := describe_gateway_service_response.get( + "service" + ) + ) + or not service.get("serviceArn") + or service.get("activeConfigurations") is None + ): + self.cached_monitor_result = ( + None, + "Trying to describe gateway service", + ) + else: + self.last_described_gateway_service_response = ( + describe_gateway_service_response + ) + described_gateway_service = ( + describe_gateway_service_response.get("service") + ) + + if self.mode == "DEPLOYMENT": + managed_resources, info = self._diff_service_view( + described_gateway_service + ) + else: + managed_resources, info = self._combined_service_view( + described_gateway_service + ) + + service_resources = [ + self._parse_cluster(described_gateway_service), + self._parse_service(described_gateway_service), + ] + if managed_resources: + service_resources.append(managed_resources) + service_resource = ManagedResourceGroup( + resources=service_resources + ) + self._update_cached_monitor_results(service_resource, info) + except ClientError as e: + if ( + e.response.get('Error', {}).get('Code') + == 'InvalidParameterException' + ): + error_message = e.response.get('Error', {}).get( + 'Message', '' + ) + if ( + "Cannot call DescribeServiceRevisions for a service that is INACTIVE" + in error_message + ): + empty_resource_group = ManagedResourceGroup() + self._update_cached_monitor_results( + empty_resource_group, "Service is inactive" + ) + else: + raise + else: + raise + + self.last_execution_time = current_time + + if not self.cached_monitor_result: + return "Waiting for initial data" + else: + service_resource, info = self.cached_monitor_result + status_string = ( + service_resource.get_status_string( + spinner_char=spinner_char, use_color=self.use_color + ) + if service_resource + else None + ) + + output = "\n".join([x for x in [status_string, info] if x]) + return output + + def _diff_service_view(self, describe_gateway_service_response): + """Generate diff view showing changes in the latest deployment.""" + service_arn = describe_gateway_service_response.get("serviceArn") + list_service_deployments_response = ( + self._client.list_service_deployments( + service=service_arn, maxResults=1 + ) + ) + listed_service_deployments = self._validate_and_parse_response( + list_service_deployments_response, + "ListServiceDeployments", + expected_field="serviceDeployments", + ) + if ( + not listed_service_deployments + or "serviceDeploymentArn" not in listed_service_deployments[0] + ): + return ( + None, + "Waiting for a deployment to start", + ) + + deployment_arn = listed_service_deployments[0].get( + "serviceDeploymentArn" + ) + + describe_service_deployments_response = ( + self._client.describe_service_deployments( + serviceDeploymentArns=[deployment_arn] + ) + ) + described_service_deployments = self._validate_and_parse_response( + describe_service_deployments_response, + "DescribeServiceDeployments", + expected_field="serviceDeployments", + eventually_consistent=True, + ) + if not described_service_deployments: + return (None, "Waiting for a deployment to start") + + described_service_deployment = described_service_deployments[0] + if ( + not described_service_deployment + or not described_service_deployment.get("targetServiceRevision") + ): + return ( + None, + "Waiting for a deployment to start", + ) + + target_sr = described_service_deployment.get( + "targetServiceRevision" + ).get("arn") + + target_sr_resources_list, described_target_sr_list = ( + self._describe_and_parse_service_revisions([target_sr]) + ) + if len(target_sr_resources_list) != 1: + return (None, "Trying to describe service revisions") + target_sr_resources = target_sr_resources_list[0] + described_target_sr = described_target_sr_list[0] + + task_def_arn = described_target_sr.get("taskDefinition") + if "sourceServiceRevisions" in described_service_deployment: + source_sr_resources, _ = ( + self._describe_and_parse_service_revisions( + [ + sr.get("arn") + for sr in described_service_deployment.get( + "sourceServiceRevisions" + ) + ] + ) + ) + if len(source_sr_resources) != len( + described_service_deployment.get("sourceServiceRevisions") + ): + return (None, "Trying to describe service revisions)") + source_sr_resources_combined = reduce( + lambda x, y: x.combine(y), source_sr_resources + ) + else: + source_sr_resources_combined = ManagedResourceGroup() + + updating_resources, disassociating_resources = ( + target_sr_resources.compare_resource_sets( + source_sr_resources_combined + ) + ) + updating_resources.resource_type = "Updating" + disassociating_resources.resource_type = "Disassociating" + service_resources = ManagedResourceGroup( + resource_type="Deployment", + identifier=deployment_arn, + status=described_service_deployment.get("status"), + reason=described_service_deployment.get("statusReason"), + resources=[ + ManagedResource( + resource_type="TargetServiceRevision", identifier=target_sr + ), + ManagedResource( + resource_type="TaskDefinition", identifier=task_def_arn + ), + updating_resources, + disassociating_resources, + ], + ) + return service_resources, None + + def _combined_service_view(self, describe_gateway_service_response): + """Generate combined view of all active service resources.""" + service_revision_arns = [ + config.get("serviceRevisionArn") + for config in describe_gateway_service_response.get( + "activeConfigurations" + ) + ] + service_revision_resources, _ = ( + self._describe_and_parse_service_revisions(service_revision_arns) + ) + + if len(service_revision_resources) != len(service_revision_arns): + return (None, "Trying to describe service revisions") + + service_resource = reduce( + lambda x, y: x.combine(y), service_revision_resources + ) + + return service_resource, None + + def _update_cached_monitor_results(self, resource, info): + """Update cached monitoring results with new data.""" + if not self.cached_monitor_result: + self.cached_monitor_result = (resource, info) + else: + self.cached_monitor_result = ( + resource or self.cached_monitor_result[0], + info, + ) + + def _validate_and_parse_response( + self, + response, + operation_name, + expected_field=None, + eventually_consistent=False, + ): + """Validate API response and extract expected field.""" + if not response: + raise MonitoringError(f"{operation_name} response is empty") + + self._parse_failures(response, operation_name, eventually_consistent) + + if not expected_field: + return None + + if response.get(expected_field) is None: + raise MonitoringError( + f"{operation_name} response is missing {expected_field}" + ) + return response.get(expected_field) + + def _parse_failures(self, response, operation_name, eventually_consistent): + """Parse and raise errors for API response failures.""" + failures = response.get("failures") + + if not failures: + return + + if any(not f.get('arn') or not f.get('reason') for f in failures): + raise MonitoringError( + "Invalid failure response: missing arn or reason" + ) + + if eventually_consistent: + failures = [ + failure + for failure in failures + if failure.get("reason") != "MISSING" + ] + + if not failures: + return + + failure_msgs = [ + f"{f['arn']} failed with {f['reason']}" for f in failures + ] + joined_msgs = '\n'.join(failure_msgs) + raise MonitoringError(f"{operation_name}:\n{joined_msgs}") + + def _describe_and_parse_service_revisions(self, arns): + """Describe and parse service revisions into managed resources.""" + describe_service_revisions_response = ( + self._client.describe_service_revisions(serviceRevisionArns=arns) + ) + described_service_revisions = self._validate_and_parse_response( + describe_service_revisions_response, + "DescribeServiceRevisions", + expected_field="serviceRevisions", + eventually_consistent=True, + ) + + return [ + self._parse_ecs_managed_resources(sr) + for sr in described_service_revisions + ], described_service_revisions + + def _parse_cluster(self, service): + return ManagedResource("Cluster", service.get("cluster")) + + def _parse_service(self, service): + service_arn = service.get("serviceArn") + cluster = service.get("cluster") + describe_service_response = self._client.describe_services( + cluster=cluster, services=[service_arn] + ) + described_service = self._validate_and_parse_response( + describe_service_response, "DescribeServices", "services" + )[0] + return ManagedResource( + "Service", + service.get("serviceArn"), + additional_info=described_service + and described_service.get("events")[0].get("message") + if described_service.get("events") + else None, + ) + + def _parse_ecs_managed_resources(self, service_revision): + managed_resources = service_revision.get("ecsManagedResources") + if not managed_resources: + return ManagedResourceGroup() + + parsed_resources = [] + if "ingressPaths" in managed_resources: + parsed_resources.append( + ManagedResourceGroup( + resource_type="IngressPaths", + resources=[ + self._parse_ingress_path_resources(ingress_path) + for ingress_path in managed_resources.get( + "ingressPaths" + ) + ], + ) + ) + if "autoScaling" in managed_resources: + parsed_resources.append( + self._parse_auto_scaling_configuration( + managed_resources.get("autoScaling") + ) + ) + if "metricAlarms" in managed_resources: + parsed_resources.append( + self._parse_metric_alarms( + managed_resources.get("metricAlarms") + ) + ) + if "serviceSecurityGroups" in managed_resources: + parsed_resources.append( + self._parse_service_security_groups( + managed_resources.get("serviceSecurityGroups") + ) + ) + if "logGroups" in managed_resources: + parsed_resources.append( + self._parse_log_groups(managed_resources.get("logGroups")) + ) + return ManagedResourceGroup(resources=parsed_resources) + + def _parse_ingress_path_resources(self, ingress_path): + resources = [] + if ingress_path.get("loadBalancer"): + resources.append( + self._parse_managed_resource( + ingress_path.get("loadBalancer"), "LoadBalancer" + ) + ) + if ingress_path.get("loadBalancerSecurityGroups"): + resources.extend( + self._parse_managed_resource_list( + ingress_path.get("loadBalancerSecurityGroups"), + "LoadBalancerSecurityGroup", + ) + ) + if ingress_path.get("certificate"): + resources.append( + self._parse_managed_resource( + ingress_path.get("certificate"), "Certificate" + ) + ) + if ingress_path.get("listener"): + resources.append( + self._parse_managed_resource( + ingress_path.get("listener"), "Listener" + ) + ) + if ingress_path.get("rule"): + resources.append( + self._parse_managed_resource(ingress_path.get("rule"), "Rule") + ) + if ingress_path.get("targetGroups"): + resources.extend( + self._parse_managed_resource_list( + ingress_path.get("targetGroups"), "TargetGroup" + ) + ) + return ManagedResourceGroup( + resource_type="IngressPath", + identifier=ingress_path.get("endpoint"), + resources=resources, + ) + + def _parse_auto_scaling_configuration(self, auto_scaling_configuration): + resources = [] + if auto_scaling_configuration.get("scalableTarget"): + resources.append( + self._parse_managed_resource( + auto_scaling_configuration.get("scalableTarget"), + "ScalableTarget", + ) + ) + if auto_scaling_configuration.get("applicationAutoScalingPolicies"): + resources.extend( + self._parse_managed_resource_list( + auto_scaling_configuration.get( + "applicationAutoScalingPolicies" + ), + "ApplicationAutoScalingPolicy", + ) + ) + return ManagedResourceGroup( + resource_type="AutoScalingConfiguration", resources=resources + ) + + def _parse_metric_alarms(self, metric_alarms): + return ManagedResourceGroup( + resource_type="MetricAlarms", + resources=self._parse_managed_resource_list( + metric_alarms, "MetricAlarm" + ), + ) + + def _parse_service_security_groups(self, service_security_groups): + return ManagedResourceGroup( + resource_type="ServiceSecurityGroups", + resources=self._parse_managed_resource_list( + service_security_groups, "SecurityGroup" + ), + ) + + def _parse_log_groups(self, logs_groups): + return ManagedResourceGroup( + resource_type="LogGroups", + resources=self._parse_managed_resource_list( + logs_groups, "LogGroup" + ), + ) + + def _parse_managed_resource(self, resource, resource_type): + return ManagedResource( + resource_type, + resource.get("arn"), + status=resource.get("status"), + updated_at=resource.get("updatedAt"), + reason=resource.get("statusReason"), + ) + + def _parse_managed_resource_list(self, data_list, resource_type): + return [ + self._parse_managed_resource(data, resource_type) + for data in data_list + ] diff --git a/tests/functional/ecs/test_monitormutatinggatewayservice.py b/tests/functional/ecs/test_monitormutatinggatewayservice.py index b4c34281e872..a336e6eb6406 100644 --- a/tests/functional/ecs/test_monitormutatinggatewayservice.py +++ b/tests/functional/ecs/test_monitormutatinggatewayservice.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -from unittest.mock import Mock +from unittest.mock import Mock, patch from awscli.customizations.ecs.monitormutatinggatewayservice import ( MUTATION_HANDLERS, @@ -96,8 +96,9 @@ def test_operation_args_parsed_with_monitor_resources_false(self): def test_operation_args_parsed_no_monitor_resources_attr(self): parsed_args = Mock() - # Remove the attribute + # Remove both attributes del parsed_args.monitor_resources + del parsed_args.monitor_mode parsed_globals = Mock() self.handler.operation_args_parsed(parsed_args, parsed_globals) @@ -105,48 +106,49 @@ def test_operation_args_parsed_no_monitor_resources_attr(self): assert not self.handler.effective_resource_view def test_after_call_with_monitoring_enabled(self): - # Setup - mock_watcher_class = Mock() - mock_watcher = Mock() - mock_watcher_class.return_value = mock_watcher - - handler = MonitorMutatingGatewayService( - 'create-express-gateway-service', - 'DEPLOYMENT', - watcher_class=mock_watcher_class, - ) - - mock_session = Mock() - mock_parsed_globals = Mock() - mock_parsed_globals.region = 'us-west-2' - mock_parsed_globals.endpoint_url = ( - 'https://ecs.us-west-2.amazonaws.com' - ) - mock_parsed_globals.verify_ssl = True - - mock_ecs_client = Mock() - mock_session.create_client.return_value = mock_ecs_client - - handler.session = mock_session - handler.parsed_globals = mock_parsed_globals - handler.effective_resource_view = 'DEPLOYMENT' - handler.effective_resource_view = 'DEPLOYMENT' - - parsed = { - 'service': { - 'serviceArn': 'arn:aws:ecs:us-west-2:123456789:service/test-service' + with patch('sys.stdout.isatty', return_value=True): + # Setup + mock_watcher_class = Mock() + mock_watcher = Mock() + mock_watcher_class.return_value = mock_watcher + + handler = MonitorMutatingGatewayService( + 'create-express-gateway-service', + 'DEPLOYMENT', + watcher_class=mock_watcher_class, + ) + + mock_session = Mock() + mock_parsed_globals = Mock() + mock_parsed_globals.region = 'us-west-2' + mock_parsed_globals.endpoint_url = ( + 'https://ecs.us-west-2.amazonaws.com' + ) + mock_parsed_globals.verify_ssl = True + + mock_ecs_client = Mock() + mock_session.create_client.return_value = mock_ecs_client + + handler.session = mock_session + handler.parsed_globals = mock_parsed_globals + handler.effective_resource_view = 'DEPLOYMENT' + handler.effective_resource_view = 'DEPLOYMENT' + + parsed = { + 'service': { + 'serviceArn': 'arn:aws:ecs:us-west-2:123456789:service/test-service' + } } - } - context = {} - http_response = Mock() - http_response.status_code = 200 + context = {} + http_response = Mock() + http_response.status_code = 200 - # Execute - handler.after_call(parsed, context, http_response) + # Execute + handler.after_call(parsed, context, http_response) - # Verify monitoring was initiated - mock_watcher_class.assert_called_once() - mock_watcher.exec.assert_called_once() + # Verify monitoring was initiated + mock_watcher_class.assert_called_once() + mock_watcher.exec.assert_called_once() def test_after_call_with_monitoring_disabled(self): # Setup diff --git a/tests/unit/customizations/ecs/expressgateway/test_display_strategy.py b/tests/unit/customizations/ecs/expressgateway/test_display_strategy.py new file mode 100644 index 000000000000..5fe2bf52bfb9 --- /dev/null +++ b/tests/unit/customizations/ecs/expressgateway/test_display_strategy.py @@ -0,0 +1,149 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ + +import asyncio +import time +from unittest.mock import Mock, patch + +import pytest +from prompt_toolkit.application import create_app_session +from prompt_toolkit.output import DummyOutput + +from awscli.customizations.ecs.expressgateway.display_strategy import ( + DisplayStrategy, + InteractiveDisplayStrategy, + TextOnlyDisplayStrategy, +) + + +class TestDisplayStrategy: + """Test base DisplayStrategy class.""" + + def test_base_strategy_not_implemented(self): + """Test base class raises NotImplementedError.""" + strategy = DisplayStrategy() + with pytest.raises(NotImplementedError): + strategy.execute(None, None, None) + + +class TestInteractiveDisplayStrategy: + """Test InteractiveDisplayStrategy.""" + + def setup_method(self): + self.app_session = create_app_session(output=DummyOutput()) + self.app_session.__enter__() + + def teardown_method(self): + if hasattr(self, 'app_session'): + self.app_session.__exit__(None, None, None) + + @patch('time.sleep') + def test_execute_with_mock_display(self, mock_sleep): + """Test strategy executes with mocked display.""" + + async def mock_run_async(): + await asyncio.sleep(0.01) + + mock_display = Mock() + mock_display.display = Mock() + mock_display.run = Mock(return_value=mock_run_async()) + + mock_collector = Mock() + mock_collector.get_current_view = Mock( + return_value="Test output {SPINNER}" + ) + + strategy = InteractiveDisplayStrategy( + display=mock_display, use_color=True + ) + + mock_sleep.side_effect = KeyboardInterrupt() + + start_time = time.time() + strategy.execute(mock_collector, start_time, timeout_minutes=1) + + # Verify display was called + assert mock_display.display.called + assert mock_display.run.called + + def test_strategy_uses_provided_color_setting(self): + """Test strategy respects use_color parameter.""" + mock_display = Mock() + + strategy_with_color = InteractiveDisplayStrategy( + display=mock_display, use_color=True + ) + assert strategy_with_color.use_color is True + + strategy_no_color = InteractiveDisplayStrategy( + display=mock_display, use_color=False + ) + assert strategy_no_color.use_color is False + + +class TestTextOnlyDisplayStrategy: + """Test TextOnlyDisplayStrategy.""" + + @patch('time.sleep') + def test_execute_with_mock_collector(self, mock_sleep, capsys): + """Test strategy executes sync loop with text output.""" + mock_collector = Mock() + mock_collector.get_current_view = Mock(return_value="Test output") + mock_collector.cached_monitor_result = (None, "Test info") + + strategy = TextOnlyDisplayStrategy(use_color=True) + + # Make sleep raise to exit loop after first iteration + mock_sleep.side_effect = KeyboardInterrupt() + + start_time = time.time() + strategy.execute(mock_collector, start_time, timeout_minutes=1) + + output = capsys.readouterr().out + printed_output = output + assert "Starting monitoring" in printed_output + assert "Polling for updates" in printed_output + assert "stopped by user" in printed_output + assert "complete" in printed_output + + @patch('time.sleep') + @patch('time.time') + def test_execute_handles_timeout(self, mock_time, mock_sleep, capsys): + """Test strategy handles timeout correctly.""" + mock_collector = Mock() + mock_collector.get_current_view = Mock(return_value="Test output") + mock_collector.cached_monitor_result = (None, None) + + strategy = TextOnlyDisplayStrategy(use_color=True) + + # Simulate timeout after first poll + start_time = 1000.0 + mock_time.side_effect = [ + 1000.0, # First check - within timeout + 2000.0, # Second check - exceeded timeout + ] + + strategy.execute(mock_collector, start_time, timeout_minutes=1) + + output = capsys.readouterr().out + printed_output = output + assert "timeout reached" in printed_output.lower() + + def test_strategy_uses_provided_color_setting(self): + """Test strategy respects use_color parameter.""" + strategy_with_color = TextOnlyDisplayStrategy(use_color=True) + assert strategy_with_color.stream_display.use_color is True + + strategy_no_color = TextOnlyDisplayStrategy(use_color=False) + assert strategy_no_color.stream_display.use_color is False + + +# Suppress thread exception warnings for these async tests +pytestmark = pytest.mark.filterwarnings( + "ignore::pytest.PytestUnhandledThreadExceptionWarning" +) diff --git a/tests/unit/customizations/ecs/expressgateway/test_managedresourcegroup.py b/tests/unit/customizations/ecs/expressgateway/test_managedresourcegroup.py index f6d99e6bf6f8..f2470c503467 100644 --- a/tests/unit/customizations/ecs/expressgateway/test_managedresourcegroup.py +++ b/tests/unit/customizations/ecs/expressgateway/test_managedresourcegroup.py @@ -115,7 +115,7 @@ def test_diff_unique_resources(self): resources=[self.resource2] ) # Certificate - diff1, diff2 = group1.diff(group2) + diff1, diff2 = group1.compare_resource_sets(group2) # Each group should contain its unique resource self.assertEqual(len(diff1.resource_mapping), 1) @@ -135,7 +135,7 @@ def test_diff_overlapping_resources(self): resources=[self.resource2, resource3] ) # cert-456, lb-456 - diff1, diff2 = group1.diff(group2) + diff1, diff2 = group1.compare_resource_sets(group2) # group1 unique: lb-123, group2 unique: lb-456, common: cert-456 (should not appear in diff) self.assertEqual(len(diff1.resource_mapping), 1) @@ -155,7 +155,7 @@ def test_diff_identical_groups(self): resources=[self.resource1, self.resource2] ) - diff1, diff2 = group1.diff(group2) + diff1, diff2 = group1.compare_resource_sets(group2) # No differences should be found self.assertEqual(len(diff1.resource_mapping), 0) @@ -166,7 +166,7 @@ def test_diff_empty_groups(self): group1 = ManagedResourceGroup(resources=[self.resource1]) group2 = ManagedResourceGroup(resources=[]) - diff1, diff2 = group1.diff(group2) + diff1, diff2 = group1.compare_resource_sets(group2) # group1 should contain its resource, group2 should be empty self.assertEqual(len(diff1.resource_mapping), 1) @@ -185,7 +185,7 @@ def test_diff_excludes_matching_types_without_identifier(self): resources=[resource_with_id] ) # LoadBalancer/lb-456 - diff1, diff2 = group1.diff(group2) + diff1, diff2 = group1.compare_resource_sets(group2) # group1 should contain its resource without identifier self.assertEqual(len(diff1.resource_mapping), 1) @@ -254,10 +254,6 @@ def test_get_status_string_with_color(self): self.assertIn("\x1b[", result) def test_combine_prioritizes_resources_with_identifier(self): - from awscli.customizations.ecs.expressgateway.managedresource import ( - ManagedResource, - ) - resource_with_id = ManagedResource( "LoadBalancer", "lb-123", "ACTIVE", 1761230543.151 ) diff --git a/tests/unit/customizations/ecs/expressgateway/test_stream_display.py b/tests/unit/customizations/ecs/expressgateway/test_stream_display.py new file mode 100644 index 000000000000..9c8217513f0c --- /dev/null +++ b/tests/unit/customizations/ecs/expressgateway/test_stream_display.py @@ -0,0 +1,309 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ + +import time + +from awscli.customizations.ecs.expressgateway.managedresource import ( + ManagedResource, +) +from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( + ManagedResourceGroup, +) +from awscli.customizations.ecs.expressgateway.stream_display import ( + StreamDisplay, +) + + +class TestStreamDisplay: + """Test StreamDisplay for text-based monitoring output.""" + + def setup_method(self): + self.display = StreamDisplay(use_color=True) + + def test_show_startup_message(self, capsys): + """Test startup message includes timestamp""" + self.display.show_startup_message() + + output = capsys.readouterr().out + assert "Starting monitoring..." in output + assert "[" in output + + def test_show_polling_message(self, capsys): + """Test polling message includes timestamp""" + self.display.show_polling_message() + + output = capsys.readouterr().out + assert "Polling for updates..." in output + + def test_show_monitoring_data_with_info(self, capsys): + """Test showing info message""" + self.display.show_monitoring_data(None, "Info message") + + output = capsys.readouterr().out + assert "Info message" in output and output.endswith("\n") + + def test_show_monitoring_data_first_poll_shows_all(self, capsys): + """Test first poll shows all resources""" + resource = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource_group = ManagedResourceGroup(resources=[resource]) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output + assert "lb-123" in output + + def test_show_monitoring_data_no_changes(self, capsys): + """Test no output when resources haven't changed""" + resource = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource_group = ManagedResourceGroup(resources=[resource]) + + # First poll - show all + self.display.show_monitoring_data(resource_group, None) + capsys.readouterr() + + # Second poll - same resources, no changes + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert output == "" + + def test_show_monitoring_data_with_new_resource(self, capsys): + """Test output when new resources are added""" + resource1 = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource_group1 = ManagedResourceGroup(resources=[resource1]) + + self.display.show_monitoring_data(resource_group1, None) + capsys.readouterr() + + # Second resource group with additional resource + resource2 = ManagedResource( + "TargetGroup", "tg-456", "ACTIVE", None, None + ) + resource_group2 = ManagedResourceGroup( + resources=[resource1, resource2] + ) + + self.display.show_monitoring_data(resource_group2, None) + + output = capsys.readouterr().out + assert "TargetGroup" in output + + def test_show_timeout_message(self, capsys): + """Test timeout message""" + self.display.show_timeout_message() + + output = capsys.readouterr().out + assert "timeout reached" in output.lower() + + def test_show_service_inactive_message(self, capsys): + """Test service inactive message""" + self.display.show_service_inactive_message() + + output = capsys.readouterr().out + assert "inactive" in output.lower() + + def test_show_completion_message(self, capsys): + """Test completion message""" + self.display.show_completion_message() + + output = capsys.readouterr().out + assert "complete" in output.lower() + + def test_show_user_stop_message(self, capsys): + """Test user stop message""" + self.display.show_user_stop_message() + + output = capsys.readouterr().out + assert "stopped by user" in output.lower() + + def test_show_error_message(self, capsys): + """Test error message""" + self.display.show_error_message("Test error") + + output = capsys.readouterr().out + assert "Error" in output + assert "Test error" in output + + def test_use_color_parameter(self): + """Test use_color parameter is stored""" + display_with_color = StreamDisplay(use_color=True) + assert display_with_color.use_color is True + + display_no_color = StreamDisplay(use_color=False) + assert display_no_color.use_color is False + + def test_print_flattened_resources_with_reason(self, capsys): + """Test resource with reason prints on separate line""" + resource = ManagedResource( + "LoadBalancer", + "lb-123", + "CREATING", + None, + "Waiting for DNS propagation", + ) + resource_group = ManagedResourceGroup(resources=[resource]) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output + assert "Reason: Waiting for DNS propagation" in output + + def test_print_flattened_resources_with_updated_at(self, capsys): + """Test resource with updated_at timestamp prints on separate line""" + current_time = time.time() + resource = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", current_time, None + ) + resource_group = ManagedResourceGroup(resources=[resource]) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output + assert "Last Updated At:" in output + + def test_print_flattened_resources_with_additional_info(self, capsys): + """Test resource with additional_info prints on separate line""" + resource = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource.additional_info = "DNS: example.elb.amazonaws.com" + resource_group = ManagedResourceGroup(resources=[resource]) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output + assert "Info: DNS: example.elb.amazonaws.com" in output + + def test_print_flattened_resources_complete_multi_line(self, capsys): + """Test resource with all fields prints on multiple lines""" + resource = ManagedResource( + "LoadBalancer", + "lb-123", + "CREATING", + time.time(), + "Provisioning load balancer", + ) + resource.additional_info = "Type: application" + resource_group = ManagedResourceGroup(resources=[resource]) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output and "lb-123" in output + assert "Reason: Provisioning load balancer" in output + assert "Last Updated At:" in output + assert "Info: Type: application" in output + + def test_diff_detects_status_change(self, capsys): + """Test diff detects when status changes""" + resource1 = ManagedResource( + "LoadBalancer", "lb-123", "CREATING", None, None + ) + resource_group1 = ManagedResourceGroup(resources=[resource1]) + + # First poll + self.display.show_monitoring_data(resource_group1, None) + capsys.readouterr() + + # Second poll - same resource but status changed + resource2 = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource_group2 = ManagedResourceGroup(resources=[resource2]) + + self.display.show_monitoring_data(resource_group2, None) + + output = capsys.readouterr().out + assert "LoadBalancer" in output + assert "ACTIVE" in output + + def test_diff_detects_reason_change(self, capsys): + """Test diff detects when reason changes""" + resource1 = ManagedResource( + "LoadBalancer", "lb-123", "CREATING", None, "Creating resources" + ) + resource_group1 = ManagedResourceGroup(resources=[resource1]) + + # First poll + self.display.show_monitoring_data(resource_group1, None) + capsys.readouterr() + + # Second poll - same resource but reason changed + resource2 = ManagedResource( + "LoadBalancer", "lb-123", "CREATING", None, "Waiting for DNS" + ) + resource_group2 = ManagedResourceGroup(resources=[resource2]) + + self.display.show_monitoring_data(resource_group2, None) + + output = capsys.readouterr().out + assert "Waiting for DNS" in output + + def test_diff_detects_additional_info_change(self, capsys): + """Test diff detects when additional_info changes""" + resource1 = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource1.additional_info = "DNS: old.example.com" + resource_group1 = ManagedResourceGroup(resources=[resource1]) + + # First poll + self.display.show_monitoring_data(resource_group1, None) + capsys.readouterr() + + # Second poll - same resource but additional_info changed + resource2 = ManagedResource( + "LoadBalancer", "lb-123", "ACTIVE", None, None + ) + resource2.additional_info = "DNS: new.example.com" + resource_group2 = ManagedResourceGroup(resources=[resource2]) + + self.display.show_monitoring_data(resource_group2, None) + + output = capsys.readouterr().out + assert "new.example.com" in output + + def test_resource_with_none_type_shows_identifier(self, capsys): + """Test resources with resource_type=None show identifier without type""" + resource = ManagedResource( + None, + "mystery-resource-123", + "FAILED", + reason="Something went wrong", + ) + resource_group = ManagedResourceGroup( + resource_type="TestGroup", resources=[resource] + ) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "mystery-resource-123" in output + assert "FAILED" in output + + def test_resource_with_none_type_and_none_identifier(self, capsys): + """Test resources with both resource_type=None and identifier=None show only status""" + resource = ManagedResource(None, None, "ACTIVE") + resource_group = ManagedResourceGroup( + resource_type="TestGroup", resources=[resource] + ) + + self.display.show_monitoring_data(resource_group, None) + + output = capsys.readouterr().out + assert "ACTIVE" in output diff --git a/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py b/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py index 37d9fa0a63e1..ae11648d57ac 100644 --- a/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py +++ b/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py @@ -6,6 +6,7 @@ # # http://aws.amazon.com/apache2.0/ +import asyncio from unittest.mock import Mock, patch import pytest @@ -49,10 +50,16 @@ def test_monitoring_error_handled_gracefully(self, mock_isatty, capsys): ) parsed_args = Mock( - service_arn="test-arn", resource_view="RESOURCE", timeout=30 + service_arn="test-arn", + resource_view="RESOURCE", + mode="INTERACTIVE", + timeout=30, ) parsed_globals = Mock( - region="us-west-2", endpoint_url=None, verify_ssl=True + region="us-west-2", + endpoint_url=None, + verify_ssl=True, + color="auto", ) command._run_main(parsed_args, parsed_globals) @@ -72,10 +79,16 @@ def test_non_monitoring_error_bubbles_up(self, mock_isatty): mock_session.create_client.side_effect = ValueError("Unexpected error") parsed_args = Mock( - service_arn="test-arn", resource_view="RESOURCE", timeout=30 + service_arn="test-arn", + resource_view="RESOURCE", + mode="INTERACTIVE", + timeout=30, ) parsed_globals = Mock( - region="us-west-2", endpoint_url=None, verify_ssl=True + region="us-west-2", + endpoint_url=None, + verify_ssl=True, + color="auto", ) with pytest.raises(ValueError): @@ -83,7 +96,7 @@ def test_non_monitoring_error_bubbles_up(self, mock_isatty): @patch('sys.stdout.isatty') def test_interactive_mode_requires_tty(self, mock_isatty, capsys): - """Test command fails when not in TTY""" + """Test command fails when INTERACTIVE mode without TTY""" # Not in TTY mock_isatty.return_value = False @@ -91,38 +104,60 @@ def test_interactive_mode_requires_tty(self, mock_isatty, capsys): command = ECSMonitorExpressGatewayService(mock_session) parsed_args = Mock( - service_arn="test-arn", resource_view="RESOURCE", timeout=30 + service_arn="test-arn", + resource_view="RESOURCE", + mode="interactive", + timeout=30, ) parsed_globals = Mock( - region="us-west-2", endpoint_url=None, verify_ssl=True + region="us-west-2", + endpoint_url=None, + verify_ssl=True, + color="auto", ) result = command._run_main(parsed_args, parsed_globals) captured = capsys.readouterr() assert result == 1 - assert "This command requires a TTY" in captured.err + assert "Interactive mode requires a TTY" in captured.err + @patch('sys.stdout.isatty') + def test_run_main_with_text_only_mode_no_tty(self, mock_isatty): + """Test text-only mode works without TTY.""" + mock_isatty.return_value = False -class TestECSExpressGatewayServiceWatcher: - """Test the watcher class through public interface""" + mock_session = Mock() + mock_watcher_class = Mock() + mock_watcher = Mock() + mock_watcher_class.return_value = mock_watcher - @patch('sys.stdout.isatty') - def test_is_monitoring_available_with_tty(self, mock_isatty): - """Test is_monitoring_available returns True when TTY is available""" - mock_isatty.return_value = True - assert ( - ECSExpressGatewayServiceWatcher.is_monitoring_available() is True + command = ECSMonitorExpressGatewayService( + mock_session, watcher_class=mock_watcher_class ) - @patch('sys.stdout.isatty') - def test_is_monitoring_available_without_tty(self, mock_isatty): - """Test is_monitoring_available returns False when TTY is not available""" - mock_isatty.return_value = False - assert ( - ECSExpressGatewayServiceWatcher.is_monitoring_available() is False + parsed_args = Mock( + service_arn="test-arn", + resource_view="RESOURCE", + mode="text-only", + timeout=30, + ) + parsed_globals = Mock( + region="us-west-2", + endpoint_url=None, + verify_ssl=True, + color="auto", ) + command._run_main(parsed_args, parsed_globals) + + # Watcher should be created with text-only mode + mock_watcher_class.assert_called_once() + + +class TestECSExpressGatewayServiceWatcher: + """Test the watcher class through public interface""" + def setup_method(self): self.app_session = create_app_session(output=DummyOutput()) self.app_session.__enter__() @@ -136,117 +171,52 @@ def teardown_method(self): self.app_session.__exit__(None, None, None) def _create_watcher_with_mocks(self, resource_view="RESOURCE", timeout=1): - """Helper to create watcher with mocked display""" + """Helper to create watcher with mocked display and collector""" + + async def mock_run_async(): + # Mock async display.run() - just wait briefly then exit + await asyncio.sleep(0.01) + mock_display = Mock() mock_display.has_terminal.return_value = True mock_display._check_keypress.return_value = None mock_display._restore_terminal.return_value = None mock_display.display.return_value = None + mock_display.run.return_value = mock_run_async() + + # Mock the collector to avoid all the API complexity + mock_collector = Mock() + mock_collector.get_current_view = Mock(return_value="Mocked view") watcher = ECSExpressGatewayServiceWatcher( self.mock_client, self.service_arn, resource_view, + 'INTERACTIVE', timeout_minutes=timeout, display=mock_display, + collector=mock_collector, ) - # Mock exec to call the monitoring method once and print output - original_monitor = watcher._monitor_express_gateway_service - - def mock_exec(): - try: - output = original_monitor("⠋", self.service_arn, resource_view) - print(output) - print("Monitoring Complete!") - except Exception as e: - # Re-raise expected exceptions - if isinstance(e, (ClientError, MonitoringError)): - raise - # For other exceptions, just print and complete - print("Monitoring Complete!") - - watcher.exec = mock_exec + # Make collector accessible for tests + watcher.mock_collector = mock_collector + return watcher @patch('time.sleep') def test_exec_successful_all_mode_monitoring(self, mock_sleep, capsys): """Test successful monitoring in RESOURCE mode with resource parsing""" watcher = self._create_watcher_with_mocks() + watcher.mock_collector.get_current_view.return_value = ( + "Cluster\nService\nIngressPath\nLoadBalancer\nTargetGroup" + ) mock_sleep.side_effect = KeyboardInterrupt() - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://api.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-lb/1234567890abcdef", - "status": "ACTIVE", - }, - "targetGroups": [ - { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:targetgroup/my-tg/1234567890abcdef", - "status": "HEALTHY", - } - ], - } - ], - "serviceSecurityGroups": [ - { - "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-1234567890abcdef0", - "status": "ACTIVE", - } - ], - "logGroups": [ - { - "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/my-service", - "status": "ACTIVE", - } - ], - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Running"}]}] - } - watcher.exec() captured = capsys.readouterr() output_text = captured.out - # Verify parsed resources appear in output assert "Cluster" in output_text - assert "Service" in output_text - assert "IngressPath" in output_text - assert "LoadBalancer" in output_text - assert "TargetGroup" in output_text - assert "SecurityGroup" in output_text - assert "LogGroup" in output_text - - # Specific identifiers - assert "https://api.example.com" in output_text # IngressPath endpoint - assert "my-lb" in output_text # LoadBalancer identifier - assert "my-tg" in output_text # TargetGroup identifier - assert ( - "sg-1234567890abcdef0" in output_text - ) # SecurityGroup identifier - assert "/aws/ecs/my-service" in output_text # LogGroup identifier - - # Status values - assert "ACTIVE" in output_text # LoadBalancer and SecurityGroup status - assert "HEALTHY" in output_text # TargetGroup status @patch('time.sleep') def test_exec_successful_delta_mode_with_deployment( @@ -273,467 +243,6 @@ def test_exec_successful_delta_mode_with_deployment( # Verify DEPLOYMENT mode executes successfully assert captured.out - @patch('time.sleep') - def test_exec_combined_view_multiple_revisions(self, mock_sleep, capsys): - """Test RESOURCE mode combines multiple service revisions correctly""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - # Multiple active configurations (combined view) - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [ - {"serviceRevisionArn": "rev-1"}, - {"serviceRevisionArn": "rev-2"}, - ], - } - } - - # Mock multiple revisions with different resources - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-1", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://api.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/api-lb/1234", - "status": "ACTIVE", - }, - } - ], - "serviceSecurityGroups": [ - { - "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-api123", - "status": "ACTIVE", - } - ], - }, - }, - { - "arn": "rev-2", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://web.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/web-lb/5678", - "status": "CREATING", - }, - } - ], - "logGroups": [ - { - "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/web-logs", - "status": "ACTIVE", - } - ], - }, - }, - ] - } - - self.mock_client.describe_services.return_value = { - "services": [ - {"events": [{"message": "Multiple revisions active"}]} - ] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Verify combined view shows resources from both revisions - # Resource types from both revisions - assert "IngressPath" in output_text - assert "LoadBalancer" in output_text - assert "SecurityGroup" in output_text # From rev-1 - assert "LogGroup" in output_text # From rev-2 - - # Specific identifiers from both revisions - assert "https://api.example.com" in output_text # From rev-1 - assert "https://web.example.com" in output_text # From rev-2 - assert "api-lb" in output_text # From rev-1 - assert "web-lb" in output_text # From rev-2 - assert "sg-api123" in output_text # From rev-1 - assert "/aws/ecs/web-logs" in output_text # From rev-2 - - # Status values from both revisions - assert "ACTIVE" in output_text # From both revisions - assert "CREATING" in output_text # From rev-2 - - @patch('time.sleep') - def test_exec_keyboard_interrupt_handling(self, mock_sleep, capsys): - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - - watcher.exec() - captured = capsys.readouterr() - - # Verify completion message is printed - assert "Monitoring Complete!" in captured.out - - @patch('time.sleep') - def test_exec_with_service_not_found_error(self, mock_sleep): - """Test exec() with service not found error bubbles up""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - error = ClientError( - error_response={ - 'Error': { - 'Code': 'ServiceNotFoundException', - 'Message': 'Service not found', - } - }, - operation_name='DescribeExpressGatewayService', - ) - self.mock_client.describe_express_gateway_service.side_effect = error - - with pytest.raises(ClientError) as exc_info: - watcher.exec() - - # Verify the specific error is raised - assert ( - exc_info.value.response['Error']['Code'] - == 'ServiceNotFoundException' - ) - assert ( - exc_info.value.response['Error']['Message'] == 'Service not found' - ) - - @patch('time.sleep') - def test_exec_with_inactive_service_handled_gracefully( - self, mock_sleep, capsys - ): - """Test exec() handles inactive service gracefully""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.side_effect = ClientError( - error_response={ - 'Error': { - 'Code': 'InvalidParameterException', - 'Message': 'Cannot call DescribeServiceRevisions for a service that is INACTIVE', - } - }, - operation_name='DescribeExpressGatewayService', - ) - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Service is inactive"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - - # Verify inactive service is handled and appropriate message shown - assert "inactive" in captured.out.lower() - - @patch('time.sleep') - def test_exec_with_empty_resources(self, mock_sleep, capsys): - """Test parsing edge case: empty/null resources""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Empty ecsManagedResources - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "No resources"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle empty resources gracefully but still show basic structure - assert "Cluster" in output_text - assert "Service" in output_text - # Should NOT contain resource types since ecsManagedResources is empty - assert "IngressPath" not in output_text - assert "LoadBalancer" not in output_text - - @patch('time.sleep') - def test_exec_with_autoscaling_resources(self, mock_sleep, capsys): - """Test autoscaling resource parsing with scalableTarget and policies""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "autoScaling": { - "scalableTarget": { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scalable-target/1234567890abcdef", - "status": "ACTIVE", - }, - "applicationAutoScalingPolicies": [ - { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/cpu-policy", - "status": "ACTIVE", - }, - { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/memory-policy", - "status": "ACTIVE", - }, - ], - } - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Autoscaling active"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - assert "AutoScaling" in output_text - assert "ScalableTarget" in output_text - assert "AutoScalingPolicy" in output_text - # ScalableTarget identifier - assert "1234567890abcdef" in output_text - # Policy identifiers - assert "cpu-policy" in output_text - assert "memory-policy" in output_text - - @patch('time.sleep') - def test_exec_with_malformed_resource_data(self, mock_sleep, capsys): - """Test parsing edge case: malformed resource data""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Malformed resources - missing required fields - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "ingressPaths": [ - {"endpoint": "https://example.com"} - ], # Missing loadBalancer - "serviceSecurityGroups": [ - {"status": "ACTIVE"} - ], # Missing arn - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Malformed data"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle malformed data gracefully and show what it can parse - assert "IngressPath" in output_text - assert "https://example.com" in output_text - # Should show SecurityGroup type even with missing arn - assert "SecurityGroup" in output_text - # Should NOT show LoadBalancer since it's missing from IngressPath - assert "LoadBalancer" not in output_text - - @patch('time.sleep') - def test_exec_eventually_consistent_missing_deployment( - self, mock_sleep, capsys - ): - """Test eventually consistent behavior: deployment missing after list""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - # List shows deployment exists - self.mock_client.list_service_deployments.return_value = { - "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] - } - # But describe fails (eventually consistent) - self.mock_client.describe_service_deployments.return_value = { - "serviceDeployments": [], - "failures": [{"arn": "deploy-arn", "reason": "MISSING"}], - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Eventually consistent"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle eventually consistent missing deployment gracefully - # Should show waiting state when deployment is missing - assert "Trying to describe gateway service" in output_text - assert "Monitoring Complete" in output_text - - @patch('time.sleep') - def test_exec_eventually_consistent_missing_revision( - self, mock_sleep, capsys - ): - """Test eventually consistent behavior: service revision missing after deployment describe""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - self.mock_client.list_service_deployments.return_value = { - "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] - } - self.mock_client.describe_service_deployments.return_value = { - "serviceDeployments": [ - { - "serviceDeploymentArn": "deploy-arn", - "status": "IN_PROGRESS", - "targetServiceRevision": {"arn": "target-rev"}, - } - ] - } - # Service revision missing (eventually consistent) - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"arn": "target-rev", "reason": "MISSING"}], - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Revision missing"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle eventually consistent missing revision gracefully - # Should show waiting state when revision is missing - assert "Trying to describe gateway service" in output_text - assert "Monitoring Complete" in output_text - - @patch('time.sleep') - def test_exec_with_api_failures(self, mock_sleep): - """Test failure parsing: API returns failures""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # API returns failures - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"arn": "rev-arn", "reason": "ServiceNotFound"}], - } - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError with failure details - error_message = str(exc_info.value) - assert "DescribeServiceRevisions" in error_message - assert "rev-arn" in error_message - assert "ServiceNotFound" in error_message - - @patch('time.sleep') - def test_exec_with_malformed_api_failures(self, mock_sleep): - """Test failure parsing: malformed failure responses""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Malformed failures - missing arn or reason - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"reason": "ServiceNotFound"}], # Missing arn - } - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError about invalid failure response - error_message = str(exc_info.value) - assert "Invalid failure response" in error_message - assert "missing arn or reason" in error_message - - @patch('time.sleep') - def test_exec_with_missing_response_fields(self, mock_sleep): - """Test response validation: missing required fields""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Missing serviceRevisions field - self.mock_client.describe_service_revisions.return_value = {} - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError about empty response - error_message = str(exc_info.value) - assert "DescribeServiceRevisions" in error_message - assert "empty" in error_message - class TestMonitoringError: """Test MonitoringError exception class""" @@ -791,25 +300,3 @@ def test_should_use_color_auto_no_tty(self, mock_isatty): parsed_globals.color = 'auto' assert command._should_use_color(parsed_globals) is False - - def test_watcher_accepts_use_color_parameter(self): - """Test ECSExpressGatewayServiceWatcher accepts use_color parameter""" - mock_client = Mock() - - # Test with use_color=True - watcher = ECSExpressGatewayServiceWatcher( - mock_client, - "arn:aws:ecs:us-east-1:123456789012:service/test-service", - "ALL", - use_color=True, - ) - assert watcher.use_color is True - - # Test with use_color=False - watcher = ECSExpressGatewayServiceWatcher( - mock_client, - "arn:aws:ecs:us-east-1:123456789012:service/test-service", - "ALL", - use_color=False, - ) - assert watcher.use_color is False diff --git a/tests/unit/customizations/ecs/test_monitormutatinggatewayservice.py b/tests/unit/customizations/ecs/test_monitormutatinggatewayservice.py index 37afe0a78324..6ea7b472cdc6 100644 --- a/tests/unit/customizations/ecs/test_monitormutatinggatewayservice.py +++ b/tests/unit/customizations/ecs/test_monitormutatinggatewayservice.py @@ -142,16 +142,42 @@ def test_operation_args_parsed_with_explicit_choice(self): assert self.handler.effective_resource_view == 'RESOURCE' - def test_operation_args_parsed_without_flag(self): - """Test storing monitoring flag when disabled.""" + def test_operation_args_parsed_without_monitor_resources(self): + """Test that no monitoring settings are stored without flag.""" parsed_args = Mock() parsed_args.monitor_resources = None + parsed_args.monitor_mode = None parsed_globals = Mock() self.handler.operation_args_parsed(parsed_args, parsed_globals) assert self.handler.effective_resource_view is None + def test_operation_args_parsed_mode_without_resources_raises_error(self): + """Test that using --monitor-mode without --monitor-resources raises ValueError.""" + parsed_args = Mock() + parsed_args.monitor_resources = None + parsed_args.monitor_mode = ( + 'text-only' # Mode specified but no resources + ) + parsed_globals = Mock() + + with pytest.raises( + ValueError, match="can only be used with --monitor-resources" + ): + self.handler.operation_args_parsed(parsed_args, parsed_globals) + + def test_operation_args_parsed_with_interactive_mode(self): + """Test operation_args_parsed with explicit interactive mode.""" + parsed_args = Mock() + parsed_args.monitor_resources = 'DEPLOYMENT' + parsed_args.monitor_mode = 'interactive' + parsed_globals = Mock() + + self.handler.operation_args_parsed(parsed_args, parsed_globals) + + assert self.handler.effective_mode == 'interactive' + def test_operation_args_parsed_missing_attribute(self): """Test handling missing monitor_resources attribute.""" # Mock without monitor_resources attribute @@ -204,7 +230,8 @@ def test_after_call_missing_service_arn(self): # No assertions needed - just verify no exceptions - def test_after_call_missing_session(self, capsys): + @patch("sys.stdout.isatty", return_value=True) + def test_after_call_missing_session(self, mock_isatty, capsys): """Test handling when session is not available.""" self.handler.effective_resource_view = 'DEPLOYMENT' self.handler.session = None @@ -224,8 +251,9 @@ def test_after_call_missing_session(self, capsys): "Unable to create ECS client. Skipping monitoring." in captured.err ) - def test_after_call_successful_monitoring(self): - """Test successful monitoring initiation.""" + @patch('sys.stdout.isatty', return_value=True) + def test_after_call_successful_monitoring(self, mock_isatty): + """Test successful monitoring invocation after API call.""" # Setup handler state mock_watcher_class = Mock() mock_watcher = Mock() @@ -236,8 +264,8 @@ def test_after_call_successful_monitoring(self): 'DEPLOYMENT', watcher_class=mock_watcher_class, ) - handler.monitor_resources = '__DEFAULT__' handler.effective_resource_view = 'DEPLOYMENT' + handler.effective_mode = 'TEXT-ONLY' # TEXT-ONLY mode for testing mock_session = Mock() mock_parsed_globals = Mock() @@ -264,38 +292,31 @@ def test_after_call_successful_monitoring(self): # Execute handler.after_call(parsed, context, http_response) - # Verify client creation - mock_session.create_client.assert_called_once_with( - 'ecs', - region_name='us-west-2', - endpoint_url='https://ecs.us-west-2.amazonaws.com', - verify=True, - ) - - # Verify watcher was created and executed + # Verify watcher was called correctly with display_mode parameter mock_watcher_class.assert_called_once_with( mock_client, - service_arn, + 'arn:aws:ecs:us-west-2:123456789012:service/test-service', 'DEPLOYMENT', + 'TEXT-ONLY', use_color=False, ) mock_watcher.exec.assert_called_once() + # Verify parsed response was cleared assert parsed == {} - def test_after_call_monitoring_not_available(self, capsys): - """Test that monitoring is skipped when not available (no TTY).""" - # Setup handler state + @patch('sys.stdout.isatty', return_value=False) # Not TTY + def test_after_call_monitoring_requires_tty(self, mock_isatty, capsys): + """Test after_call fails when monitoring without TTY""" mock_watcher_class = Mock() - mock_watcher_class.is_monitoring_available.return_value = False - handler = MonitorMutatingGatewayService( - 'create-gateway-service', + 'create-express-gateway-service', 'DEPLOYMENT', watcher_class=mock_watcher_class, ) handler.effective_resource_view = 'DEPLOYMENT' - + handler.effective_mode = 'TEXT-ONLY' # Try TEXT-ONLY without TTY + handler.use_color = False mock_session = Mock() mock_parsed_globals = Mock() mock_parsed_globals.region = 'us-west-2' @@ -307,14 +328,9 @@ def test_after_call_monitoring_not_available(self, capsys): handler.session = mock_session handler.parsed_globals = mock_parsed_globals - # Setup mocks - mock_client = Mock() - mock_session.create_client.return_value = mock_client - # Setup call parameters service_arn = 'arn:aws:ecs:us-west-2:123456789012:service/test-service' parsed = {'service': {'serviceArn': service_arn}} - original_parsed = dict(parsed) context = Mock() http_response = Mock() http_response.status_code = 200 @@ -322,18 +338,15 @@ def test_after_call_monitoring_not_available(self, capsys): # Execute handler.after_call(parsed, context, http_response) - # Verify parsed response was not cleared - assert parsed == original_parsed + # TEXT-ONLY mode now works without TTY, so watcher WAS called + mock_watcher_class.assert_called_once() - # Verify warning message was printed - captured = capsys.readouterr() - assert ( - "Monitoring is not available (requires TTY). Skipping monitoring.\n" - in captured.err - ) + # Verify parsed response was NOT cleared (error return early) + assert 'service' not in parsed # Cleared because monitoring ran - def test_after_call_exception_handling(self, capsys): - """Test exception handling in after_call method.""" + @patch('sys.stdout.isatty', return_value=True) + def test_after_call_exception_propagates(self, mock_isatty): + """Test exceptions propagate from after_call method.""" # Setup handler state mock_watcher_class = Mock() mock_watcher = Mock() @@ -346,6 +359,7 @@ def test_after_call_exception_handling(self, capsys): watcher_class=mock_watcher_class, ) handler.effective_resource_view = 'DEPLOYMENT' + handler.effective_mode = 'TEXT-ONLY' mock_session = Mock() mock_parsed_globals = Mock() @@ -369,12 +383,9 @@ def test_after_call_exception_handling(self, capsys): http_response = Mock() http_response.status_code = 200 - # Execute - should not raise exception - handler.after_call(parsed, context, http_response) - - captured = capsys.readouterr() - assert "Encountered an error, terminating monitoring" in captured.err - assert "Test exception" in captured.err + # Execute - should raise exception + with pytest.raises(Exception, match="Test exception"): + handler.after_call(parsed, context, http_response) def test_events(self): """Test that correct events are returned for CLI integration.""" diff --git a/tests/unit/customizations/ecs/test_serviceviewcollector.py b/tests/unit/customizations/ecs/test_serviceviewcollector.py new file mode 100644 index 000000000000..997405e9b97f --- /dev/null +++ b/tests/unit/customizations/ecs/test_serviceviewcollector.py @@ -0,0 +1,830 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ + +from unittest.mock import Mock + +import pytest +from botocore.exceptions import ClientError + +from awscli.customizations.ecs.exceptions import MonitoringError +from awscli.customizations.ecs.serviceviewcollector import ( + ServiceViewCollector, +) + + +class TestServiceViewCollector: + """Test ServiceViewCollector business logic""" + + def setup_method(self): + self.mock_client = Mock() + self.service_arn = ( + "arn:aws:ecs:us-west-2:123456789012:service/my-cluster/my-service" + ) + + def test_get_current_view_resource_mode(self): + """Test get_current_view in RESOURCE mode parses resources""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-lb/1234567890abcdef", + "status": "ACTIVE", + }, + } + ], + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Running"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Cluster" in output + assert "Service" in output + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "https://api.example.com" in output + assert "ACTIVE" in output + + def test_get_current_view_handles_inactive_service(self): + """Test get_current_view handles inactive service gracefully""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.side_effect = ClientError( + error_response={ + 'Error': { + 'Code': 'InvalidParameterException', + 'Message': 'Cannot call DescribeServiceRevisions for a service that is INACTIVE', + } + }, + operation_name='DescribeExpressGatewayService', + ) + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Service is inactive"}]}] + } + + output = collector.get_current_view("⠋") + + assert "inactive" in output.lower() + + def test_get_current_view_with_api_failures(self): + """Test get_current_view raises MonitoringError on API failures""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"arn": "rev-arn", "reason": "ServiceNotFound"}], + } + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + error_message = str(exc_info.value) + assert "DescribeServiceRevisions" in error_message + assert "rev-arn" in error_message + assert "ServiceNotFound" in error_message + + def test_get_current_view_caches_results(self): + """Test get_current_view caches results within refresh interval""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Running"}]}] + } + + # First call + collector.get_current_view("⠋") + call_count_first = ( + self.mock_client.describe_express_gateway_service.call_count + ) + + # Second call within refresh interval (default 5000ms) + collector.get_current_view("⠙") + # Should use cached result, not call API again + call_count_second = ( + self.mock_client.describe_express_gateway_service.call_count + ) + assert call_count_first == call_count_second # Cached, no new API call + + def test_combined_view_multiple_revisions(self): + """Test RESOURCE mode combines multiple service revisions correctly""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + # Multiple active configurations (combined view) + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [ + {"serviceRevisionArn": "rev-1"}, + {"serviceRevisionArn": "rev-2"}, + ], + } + } + + # Mock multiple revisions with different resources + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-1", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/api-lb/1234", + "status": "ACTIVE", + }, + } + ], + "serviceSecurityGroups": [ + { + "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-api123", + "status": "ACTIVE", + } + ], + }, + }, + { + "arn": "rev-2", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://web.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/web-lb/5678", + "status": "CREATING", + }, + } + ], + "logGroups": [ + { + "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/web-logs", + "status": "ACTIVE", + } + ], + }, + }, + ] + } + + self.mock_client.describe_services.return_value = { + "services": [ + {"events": [{"message": "Multiple revisions active"}]} + ] + } + + output = collector.get_current_view("⠋") + + # Verify combined view shows resources from both revisions + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "SecurityGroup" in output # From rev-1 + assert "LogGroup" in output # From rev-2 + assert "https://api.example.com" in output # From rev-1 + assert "https://web.example.com" in output # From rev-2 + assert "api-lb" in output # From rev-1 + assert "web-lb" in output # From rev-2 + assert "sg-api123" in output # From rev-1 + assert "/aws/ecs/web-logs" in output # From rev-2 + assert "ACTIVE" in output # From both revisions + assert "CREATING" in output # From rev-2 + + def test_get_current_view_with_empty_resources(self): + """Test parsing edge case: empty/null resources""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Empty ecsManagedResources + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "No resources"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle empty resources gracefully but still show basic structure + assert "Cluster" in output + assert "Service" in output + # Should NOT contain resource types since ecsManagedResources is empty + assert "IngressPath" not in output + assert "LoadBalancer" not in output + + def test_get_current_view_with_autoscaling_resources(self): + """Test autoscaling resource parsing with scalableTarget and policies""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "autoScaling": { + "scalableTarget": { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scalable-target/1234567890abcdef", + "status": "ACTIVE", + }, + "applicationAutoScalingPolicies": [ + { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/cpu-policy", + "status": "ACTIVE", + }, + { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/memory-policy", + "status": "ACTIVE", + }, + ], + } + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Autoscaling active"}]}] + } + + output = collector.get_current_view("⠋") + + assert "AutoScaling" in output + assert "ScalableTarget" in output + assert "AutoScalingPolicy" in output + assert "1234567890abcdef" in output + assert "cpu-policy" in output + assert "memory-policy" in output + + def test_get_current_view_with_malformed_resource_data(self): + """Test parsing edge case: malformed resource data""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Malformed resources - missing required fields + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + {"endpoint": "https://example.com"} + ], # Missing loadBalancer + "serviceSecurityGroups": [ + {"status": "ACTIVE"} + ], # Missing arn + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Malformed data"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle malformed data gracefully and show what it can parse + assert "IngressPath" in output + assert "https://example.com" in output + # Should show SecurityGroup type even with missing arn + assert "SecurityGroup" in output + # Should NOT show LoadBalancer since it's missing from IngressPath + assert "LoadBalancer" not in output + + def test_eventually_consistent_missing_deployment(self): + """Test eventually consistent behavior: deployment missing after list""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + # List shows deployment exists + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + # But describe fails (eventually consistent) + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [], + "failures": [{"arn": "deploy-arn", "reason": "MISSING"}], + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Eventually consistent"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle eventually consistent missing deployment gracefully + assert "Waiting for a deployment to start" in output + + def test_eventually_consistent_missing_revision(self): + """Test eventually consistent behavior: service revision missing""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + "targetServiceRevision": {"arn": "target-rev"}, + } + ] + } + # Service revision missing (eventually consistent) + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"arn": "target-rev", "reason": "MISSING"}], + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Revision missing"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle eventually consistent missing revision gracefully + assert "Trying to describe service revisions" in output + + def test_with_malformed_api_failures(self): + """Test failure parsing: malformed failure responses""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Malformed failures - missing arn or reason + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"reason": "ServiceNotFound"}], # Missing arn + } + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + # Should raise MonitoringError about invalid failure response + error_message = str(exc_info.value) + assert "Invalid failure response" in error_message + assert "missing arn or reason" in error_message + + def test_with_missing_response_fields(self): + """Test response validation: missing required fields""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Missing serviceRevisions field + self.mock_client.describe_service_revisions.return_value = {} + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + # Should raise MonitoringError about missing field + error_message = str(exc_info.value) + assert "DescribeServiceRevisions" in error_message + assert ( + "response is" in error_message + ) # "response is missing" or "response is empty" + + def test_deployment_mode_diff_view(self): + """Test DEPLOYMENT mode shows diff of target vs source revisions""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + "targetServiceRevision": {"arn": "target-rev"}, + "sourceServiceRevisions": [{"arn": "source-rev"}], + } + ] + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "target-rev", + "taskDefinition": "task-def-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://new-api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/new-lb/1234", + "status": "CREATING", + }, + } + ], + }, + }, + { + "arn": "source-rev", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://old-api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/old-lb/5678", + "status": "ACTIVE", + }, + } + ], + }, + }, + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Deployment in progress"}]}] + } + + output = collector.get_current_view("⠋") + + # Should show deployment diff + # Initially will show "Trying to describe service revisions" due to mismatch + # But implementation still shows Cluster/Service + assert "Trying to describe service revisions" in output + + def test_waiting_for_deployment_to_start(self): + """Test DEPLOYMENT mode when no deployment exists yet""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + # No deployments + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "No deployment"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Waiting for a deployment to start" in output + + def test_deployment_missing_target_revision(self): + """Test DEPLOYMENT mode when deployment is missing target revision""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + # Missing targetServiceRevision + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Deployment starting"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Waiting for a deployment to start" in output + + def test_empty_describe_gateway_service_response(self): + """Test handling of empty describe_express_gateway_service response""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = None + + output = collector.get_current_view("⠋") + + assert "Trying to describe gateway service" in output + + def test_missing_service_in_response(self): + """Test handling when service field is missing""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = {} + + output = collector.get_current_view("⠋") + + assert "Trying to describe gateway service" in output + + def test_service_missing_required_fields(self): + """Test handling when service is missing required fields""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + # Missing activeConfigurations + self.mock_client.describe_express_gateway_service.return_value = { + "service": {"serviceArn": self.service_arn} + } + + output = collector.get_current_view("⠋") + + assert "Trying to describe gateway service" in output + + def test_empty_response_error(self): + """Test _validate_and_parse_response with empty response""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + with pytest.raises(MonitoringError) as exc_info: + collector._validate_and_parse_response( + None, "TestOperation", "expectedField" + ) + + assert "TestOperation response is empty" in str(exc_info.value) + + def test_missing_expected_field_error(self): + """Test _validate_and_parse_response with missing expected field""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + with pytest.raises(MonitoringError) as exc_info: + collector._validate_and_parse_response( + {"otherField": "value"}, "TestOperation", "expectedField" + ) + + error_message = str(exc_info.value) + assert "TestOperation" in error_message + assert "expectedField" in error_message + + def test_parse_failures_filters_missing_reason(self): + """Test _parse_failures filters MISSING failures for eventually consistent""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + response = { + "failures": [ + {"arn": "arn1", "reason": "MISSING"}, + {"arn": "arn2", "reason": "ServiceNotFound"}, + ] + } + + # Eventually consistent - should only raise for non-MISSING + with pytest.raises(MonitoringError) as exc_info: + collector._parse_failures( + response, "TestOp", eventually_consistent=True + ) + + error_message = str(exc_info.value) + assert "arn2" in error_message + assert "ServiceNotFound" in error_message + # Should NOT include arn1 (MISSING reason) + assert "arn1" not in error_message + + def test_parse_failures_all_missing_no_error(self): + """Test _parse_failures doesn't raise when all failures are MISSING""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + response = { + "failures": [ + {"arn": "arn1", "reason": "MISSING"}, + {"arn": "arn2", "reason": "MISSING"}, + ] + } + + # Should not raise when all failures are MISSING + collector._parse_failures( + response, "TestOp", eventually_consistent=True + ) + + def test_parse_failures_malformed(self): + """Test _parse_failures with malformed failure data""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + response = { + "failures": [ + {"reason": "ServiceNotFound"} # Missing arn + ] + } + + with pytest.raises(MonitoringError) as exc_info: + collector._parse_failures( + response, "TestOp", eventually_consistent=False + ) + + assert "Invalid failure response" in str(exc_info.value) + + def test_parse_all_resource_types(self): + """Test parsing all supported resource types""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "lb-arn", + "status": "ACTIVE", + }, + "loadBalancerSecurityGroups": [ + {"arn": "lb-sg-arn", "status": "ACTIVE"} + ], + "certificate": { + "arn": "cert-arn", + "status": "ACTIVE", + }, + "listener": { + "arn": "listener-arn", + "status": "ACTIVE", + }, + "rule": { + "arn": "rule-arn", + "status": "ACTIVE", + }, + "targetGroups": [ + {"arn": "tg-arn", "status": "ACTIVE"} + ], + } + ], + "autoScaling": { + "scalableTarget": { + "arn": "st-arn", + "status": "ACTIVE", + }, + "applicationAutoScalingPolicies": [ + {"arn": "policy-arn", "status": "ACTIVE"} + ], + }, + "metricAlarms": [ + {"arn": "alarm-arn", "status": "ACTIVE"} + ], + "serviceSecurityGroups": [ + {"arn": "sg-arn", "status": "ACTIVE"} + ], + "logGroups": [{"arn": "log-arn", "status": "ACTIVE"}], + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "All resources"}]}] + } + + output = collector.get_current_view("⠋") + + # Verify all resource types are parsed + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "LoadBalancerSecurityGroup" in output + assert "Certificate" in output + assert "Listener" in output + assert "Rule" in output + assert "TargetGroup" in output + assert "AutoScalingConfiguration" in output + assert "ScalableTarget" in output + assert "ApplicationAutoScalingPolicy" in output + assert "MetricAlarms" in output + assert "ServiceSecurityGroups" in output + assert "LogGroups" in output