diff --git a/awscli/customizations/ecs/monitorexpressgatewayservice.py b/awscli/customizations/ecs/monitorexpressgatewayservice.py index 3004fe69ecc9..e946c010dce3 100644 --- a/awscli/customizations/ecs/monitorexpressgatewayservice.py +++ b/awscli/customizations/ecs/monitorexpressgatewayservice.py @@ -18,7 +18,10 @@ allowing users to track resource creation progress, deployment status, and service health through an interactive command-line interface with live updates and visual indicators. -The module implements two primary monitoring modes: +The data collection logic is handled by ServiceViewCollector, which parses AWS resources and +formats monitoring output. This module focuses on display and user interaction. + +The module implements two resource view modes: - RESOURCE: Displays all resources associated with the service - DEPLOYMENT: Shows resources that have changed in the most recent deployment @@ -40,23 +43,15 @@ import sys import threading import time -from functools import reduce from botocore.exceptions import ClientError from awscli.customizations.commands import BasicCommand from awscli.customizations.ecs.exceptions import MonitoringError -from awscli.customizations.ecs.expressgateway.managedresource import ( - ManagedResource, -) -from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( - ManagedResourceGroup, -) from awscli.customizations.ecs.prompt_toolkit_display import Display +from awscli.customizations.ecs.serviceviewcollector import ServiceViewCollector from awscli.customizations.utils import uni_print -TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ" - class ECSMonitorExpressGatewayService(BasicCommand): """AWS CLI command for monitoring ECS Express Gateway Service deployments. @@ -192,17 +187,18 @@ def __init__( timeout_minutes=30, display=None, use_color=True, + collector=None, ): self._client = client self.service_arn = service_arn self.mode = mode self.timeout_minutes = timeout_minutes - self.last_described_gateway_service_response = None - self.last_execution_time = 0 - self.cached_monitor_result = None self.start_time = time.time() self.use_color = use_color self.display = display or Display() + self.collector = collector or ServiceViewCollector( + client, service_arn, mode, use_color + ) @staticmethod def is_monitoring_available(): @@ -213,9 +209,7 @@ def exec(self): """Start monitoring the express gateway service with progress display.""" def monitor_service(spinner_char): - return self._monitor_express_gateway_service( - spinner_char, self.service_arn, self.mode - ) + return self.collector.get_current_view(spinner_char) asyncio.run(self._execute_with_progress_async(monitor_service, 100)) @@ -226,12 +220,7 @@ async def _execute_with_progress_async( spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" spinner_index = 0 - # Initialize with basic service resource - service_resource = ManagedResource("Service", self.service_arn) - initial_output = service_resource.get_status_string( - spinner_char="{SPINNER}", use_color=self.use_color - ) - current_output = initial_output + current_output = "Waiting for initial data" async def update_data(): nonlocal current_output @@ -287,543 +276,3 @@ async def update_spinner(): spinner_task.cancel() final_output = current_output.replace("{SPINNER}", "") uni_print(final_output + "\nMonitoring Complete!\n") - - def _monitor_express_gateway_service( - self, spinner_char, service_arn, mode, execution_refresh_millis=5000 - ): - """Monitor service status and return formatted output. - - Args: - spinner_char (char): Character to print representing progress (unused with single spinner) - execution_refresh_millis (int): Refresh interval in milliseconds - service_arn (str): Service ARN to monitor - mode (str): Monitoring mode ('RESOURCE' or 'DEPLOYMENT') - - Returns: - str: Formatted status output - """ - current_time = time.time() - - if ( - current_time - self.last_execution_time - >= execution_refresh_millis / 1000.0 - ): - try: - describe_gateway_service_response = ( - self._client.describe_express_gateway_service( - serviceArn=service_arn - ) - ) - if not describe_gateway_service_response: - self.cached_monitor_result = ( - None, - "Trying to describe gateway service", - ) - elif ( - not ( - service := describe_gateway_service_response.get( - "service" - ) - ) - or not service.get("serviceArn") - or not service.get("activeConfigurations") - ): - self.cached_monitor_result = ( - None, - "Trying to describe gateway service", - ) - else: - self.last_described_gateway_service_response = ( - describe_gateway_service_response - ) - described_gateway_service = ( - describe_gateway_service_response.get("service") - ) - - if mode == "DEPLOYMENT": - managed_resources, info = self._diff_service_view( - described_gateway_service - ) - else: - managed_resources, info = self._combined_service_view( - described_gateway_service - ) - - service_resources = [ - self._parse_cluster(described_gateway_service), - self._parse_service(described_gateway_service), - ] - if managed_resources: - service_resources.append(managed_resources) - service_resource = ManagedResourceGroup( - resources=service_resources - ) - self._update_cached_monitor_results(service_resource, info) - except ClientError as e: - if ( - e.response.get('Error', {}).get('Code') - == 'InvalidParameterException' - ): - error_message = e.response.get('Error', {}).get( - 'Message', '' - ) - if ( - "Cannot call DescribeServiceRevisions for a service that is INACTIVE" - in error_message - ): - empty_resource_group = ManagedResourceGroup() - self._update_cached_monitor_results( - empty_resource_group, "Service is inactive" - ) - else: - raise - else: - raise - - self.last_execution_time = current_time - - if not self.cached_monitor_result: - return "Waiting for initial data" - else: - # Generate the output every iteration. This allow the underlying resources to utilize spinners - service_resource, info = self.cached_monitor_result - status_string = ( - service_resource.get_status_string( - spinner_char=spinner_char, use_color=self.use_color - ) - if service_resource - else None - ) - - output = "\n".join([x for x in [status_string, info] if x]) - return output - - def _diff_service_view(self, describe_gateway_service_response): - """Generate diff view showing changes in the latest deployment. - - Computes differences between source and target service revisions to show - what resources are being updated or disassociated in the current deployment. - - Args: - describe_gateway_service_response (dict): Service description from API - - Returns: - tuple: (resources, info_output) where: - - resources (ManagedResourceGroup): Diff view of resources - - info_output (str): Informational messages - """ - service_arn = describe_gateway_service_response.get("serviceArn") - list_service_deployments_response = ( - self._client.list_service_deployments( - service=service_arn, maxResults=1 - ) - ) - listed_service_deployments = self._validate_and_parse_response( - list_service_deployments_response, - "ListServiceDeployments", - expected_field="serviceDeployments", - ) - if ( - not listed_service_deployments - or "serviceDeploymentArn" not in listed_service_deployments[0] - ): - return ( - None, - "Waiting for a deployment to start", - ) - - deployment_arn = listed_service_deployments[0].get( - "serviceDeploymentArn" - ) - - describe_service_deployments_response = ( - self._client.describe_service_deployments( - serviceDeploymentArns=[deployment_arn] - ) - ) - described_service_deployments = self._validate_and_parse_response( - describe_service_deployments_response, - "DescribeServiceDeployments", - expected_field="serviceDeployments", - eventually_consistent=True, - ) - described_service_deployment = described_service_deployments[0] - if ( - not described_service_deployment - or not described_service_deployment.get("targetServiceRevision") - ): - return ( - None, - "Waiting for a deployment to start", - ) - - target_sr = described_service_deployment.get( - "targetServiceRevision" - ).get("arn") - - target_sr_resources_list, described_target_sr_list = ( - self._describe_and_parse_service_revisions([target_sr]) - ) - if len(target_sr_resources_list) != 1: - return (None, "Trying to describe service revisions") - target_sr_resources = target_sr_resources_list[0] - described_target_sr = described_target_sr_list[0] - - task_def_arn = described_target_sr.get("taskDefinition") - if "sourceServiceRevisions" in described_service_deployment: - source_sr_resources, _ = ( - self._describe_and_parse_service_revisions( - [ - sr.get("arn") - for sr in described_service_deployment.get( - "sourceServiceRevisions" - ) - ] - ) - ) - if len(source_sr_resources) != len( - described_service_deployment.get("sourceServiceRevisions") - ): - return (None, "Trying to describe service revisions)") - source_sr_resources_combined = reduce( - lambda x, y: x.combine(y), source_sr_resources - ) - else: - source_sr_resources_combined = ManagedResourceGroup() - - updating_resources, disassociating_resources = ( - target_sr_resources.diff(source_sr_resources_combined) - ) - updating_resources.resource_type = "Updating" - disassociating_resources.resource_type = "Disassociating" - service_resources = ManagedResourceGroup( - resource_type="Deployment", - identifier=deployment_arn, - status=described_service_deployment.get("status"), - reason=described_service_deployment.get("statusReason"), - resources=[ - ManagedResource( - resource_type="TargetServiceRevision", identifier=target_sr - ), - ManagedResource( - resource_type="TaskDefinition", identifier=task_def_arn - ), - updating_resources, - disassociating_resources, - ], - ) - return service_resources, None - - def _combined_service_view(self, describe_gateway_service_response): - """Generate combined view of all active service resources. - - Extracts and combines resources from all active service configurations, - resolving conflicts by taking the version with the latest timestamp. - - Args: - describe_gateway_service_response (dict): Service description from API - - Returns: - tuple: (resources, info_output) where: - - resources (ManagedResourceGroup): Combined view of all resources - - info_output (str): Informational messages - """ - service_revision_arns = [ - config.get("serviceRevisionArn") - for config in describe_gateway_service_response.get( - "activeConfigurations" - ) - ] - service_revision_resources, _ = ( - self._describe_and_parse_service_revisions(service_revision_arns) - ) - - if len(service_revision_resources) != len(service_revision_arns): - return (None, "Trying to describe service revisions") - - service_resource = reduce( - lambda x, y: x.combine(y), service_revision_resources - ) - - return service_resource, None - - def _update_cached_monitor_results(self, resource, info): - """Update cached monitoring results with new data. - - Args: - resource: New resource data (replaces existing if provided) - info: New info message (always replaces existing) - """ - if not self.cached_monitor_result: - self.cached_monitor_result = (resource, info) - else: - self.cached_monitor_result = ( - resource or self.cached_monitor_result[0], - info, - ) - - def _validate_and_parse_response( - self, - response, - operation_name, - expected_field=None, - eventually_consistent=False, - ): - """Validate API response and extract expected field. - - Args: - response: API response to validate - operation_name: Name of the operation for error messages - expected_field: Field to extract from response (optional) - eventually_consistent: Whether to filter out MISSING failures - - Returns: - Extracted field value or None if no expected_field specified - - Raises: - MonitoringError: If response is invalid or missing required fields - """ - if not response: - raise MonitoringError(f"{operation_name} response is empty") - - self._parse_failures(response, operation_name, eventually_consistent) - - if not expected_field: - return None - - if response.get(expected_field) is None: - raise MonitoringError( - f"{operation_name} response is missing {expected_field}" - ) - return response.get(expected_field) - - def _parse_failures(self, response, operation_name, eventually_consistent): - """Parse and raise errors for API response failures. - - Args: - response: API response to check for failures - operation_name: Name of the operation for error messages - eventually_consistent: Whether to filter out MISSING failures for eventually consistent operations - - Raises: - MonitoringError: If failures are found in the response - """ - failures = response.get("failures") - - if not failures: - return - - if any(not f.get('arn') or not f.get('reason') for f in failures): - raise MonitoringError( - "Invalid failure response: missing arn or reason" - ) - - if eventually_consistent: - failures = [ - failure - for failure in failures - if failure.get("reason") != "MISSING" - ] - - if not failures: - return - - failure_msgs = [ - f"{f['arn']} failed with {f['reason']}" for f in failures - ] - joined_msgs = '\n'.join(failure_msgs) - raise MonitoringError(f"{operation_name}:\n{joined_msgs}") - - def _describe_and_parse_service_revisions(self, arns): - """Describe and parse service revisions into managed resources. - - Args: - arns (list): List of service revision ARNs to describe - - Returns: - tuple: (parsed_resources, described_revisions) where: - - parsed_resources (list): List of ManagedResourceGroup objects - - described_revisions (list): Raw API response data - """ - # API supports up to 20 arns, DescribeExpressGatewayService should never return more than 5 - describe_service_revisions_response = ( - self._client.describe_service_revisions(serviceRevisionArns=arns) - ) - described_service_revisions = self._validate_and_parse_response( - describe_service_revisions_response, - "DescribeServiceRevisions", - expected_field="serviceRevisions", - eventually_consistent=True, - ) - - return [ - self._parse_ecs_managed_resources(sr) - for sr in described_service_revisions - ], described_service_revisions - - def _parse_cluster(self, service): - return ManagedResource("Cluster", service.get("cluster")) - - def _parse_service(self, service): - service_arn = service.get("serviceArn") - cluster = service.get("cluster") - describe_service_response = self._client.describe_services( - cluster=cluster, services=[service_arn] - ) - described_service = self._validate_and_parse_response( - describe_service_response, "DescribeServices", "services" - )[0] - return ManagedResource( - "Service", - service.get("serviceArn"), - additional_info=described_service - and described_service.get("events")[0].get("message") - if described_service.get("events") - else None, - ) - - def _parse_ecs_managed_resources(self, service_revision): - managed_resources = service_revision.get("ecsManagedResources") - if not managed_resources: - return ManagedResourceGroup() - - parsed_resources = [] - if "ingressPaths" in managed_resources: - parsed_resources.append( - ManagedResourceGroup( - resource_type="IngressPaths", - resources=[ - self._parse_ingress_path_resources(ingress_path) - for ingress_path in managed_resources.get( - "ingressPaths" - ) - ], - ) - ) - if "autoScaling" in managed_resources: - parsed_resources.append( - self._parse_auto_scaling_configuration( - managed_resources.get("autoScaling") - ) - ) - if "metricAlarms" in managed_resources: - parsed_resources.append( - self._parse_metric_alarms( - managed_resources.get("metricAlarms") - ) - ) - if "serviceSecurityGroups" in managed_resources: - parsed_resources.append( - self._parse_service_security_groups( - managed_resources.get("serviceSecurityGroups") - ) - ) - if "logGroups" in managed_resources: - parsed_resources.append( - self._parse_log_groups(managed_resources.get("logGroups")) - ) - return ManagedResourceGroup(resources=parsed_resources) - - def _parse_ingress_path_resources(self, ingress_path): - resources = [] - if ingress_path.get("loadBalancer"): - resources.append( - self._parse_managed_resource( - ingress_path.get("loadBalancer"), "LoadBalancer" - ) - ) - if ingress_path.get("loadBalancerSecurityGroups"): - resources.extend( - self._parse_managed_resource_list( - ingress_path.get("loadBalancerSecurityGroups"), - "LoadBalancerSecurityGroup", - ) - ) - if ingress_path.get("certificate"): - resources.append( - self._parse_managed_resource( - ingress_path.get("certificate"), "Certificate" - ) - ) - if ingress_path.get("listener"): - resources.append( - self._parse_managed_resource( - ingress_path.get("listener"), "Listener" - ) - ) - if ingress_path.get("rule"): - resources.append( - self._parse_managed_resource(ingress_path.get("rule"), "Rule") - ) - if ingress_path.get("targetGroups"): - resources.extend( - self._parse_managed_resource_list( - ingress_path.get("targetGroups"), "TargetGroup" - ) - ) - return ManagedResourceGroup( - resource_type="IngressPath", - identifier=ingress_path.get("endpoint"), - resources=resources, - ) - - def _parse_auto_scaling_configuration(self, auto_scaling_configuration): - resources = [] - if auto_scaling_configuration.get("scalableTarget"): - resources.append( - self._parse_managed_resource( - auto_scaling_configuration.get("scalableTarget"), - "ScalableTarget", - ) - ) - if auto_scaling_configuration.get("applicationAutoScalingPolicies"): - resources.extend( - self._parse_managed_resource_list( - auto_scaling_configuration.get( - "applicationAutoScalingPolicies" - ), - "ApplicationAutoScalingPolicy", - ) - ) - return ManagedResourceGroup( - resource_type="AutoScalingConfiguration", resources=resources - ) - - def _parse_metric_alarms(self, metric_alarms): - return ManagedResourceGroup( - resource_type="MetricAlarms", - resources=self._parse_managed_resource_list( - metric_alarms, "MetricAlarm" - ), - ) - - def _parse_service_security_groups(self, service_security_groups): - return ManagedResourceGroup( - resource_type="ServiceSecurityGroups", - resources=self._parse_managed_resource_list( - service_security_groups, "SecurityGroup" - ), - ) - - def _parse_log_groups(self, logs_groups): - return ManagedResourceGroup( - resource_type="LogGroups", - resources=self._parse_managed_resource_list( - logs_groups, "LogGroup" - ), - ) - - def _parse_managed_resource(self, resource, resource_type): - return ManagedResource( - resource_type, - resource.get("arn"), - status=resource.get("status"), - updated_at=resource.get("updatedAt"), - reason=resource.get("statusReason"), - ) - - def _parse_managed_resource_list(self, data_list, resource_type): - return [ - self._parse_managed_resource(data, resource_type) - for data in data_list - ] diff --git a/awscli/customizations/ecs/serviceviewcollector.py b/awscli/customizations/ecs/serviceviewcollector.py new file mode 100644 index 000000000000..21a28e19a01f --- /dev/null +++ b/awscli/customizations/ecs/serviceviewcollector.py @@ -0,0 +1,538 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Service view collector for ECS Express Gateway Service monitoring. + +This module provides business logic for collecting and formatting +ECS Express Gateway Service monitoring data. +""" + +import time +from functools import reduce + +from botocore.exceptions import ClientError + +from awscli.customizations.ecs.exceptions import MonitoringError +from awscli.customizations.ecs.expressgateway.managedresource import ( + ManagedResource, +) +from awscli.customizations.ecs.expressgateway.managedresourcegroup import ( + ManagedResourceGroup, +) + + +class ServiceViewCollector: + """Collects and formats ECS Express Gateway Service monitoring data. + + Responsible for: + - Making ECS API calls + - Parsing resource data + - Formatting output strings + - Caching responses + + Args: + client: ECS client for API calls + service_arn (str): ARN of the service to monitor + mode (str): Monitoring mode - 'RESOURCE' or 'DEPLOYMENT' + use_color (bool): Whether to use color in output + """ + + def __init__(self, client, service_arn, mode, use_color=True): + self._client = client + self.service_arn = service_arn + self.mode = mode + self.use_color = use_color + self.last_described_gateway_service_response = None + self.last_execution_time = 0 + self.cached_monitor_result = None + + def get_current_view( + self, spinner_char="{SPINNER}", execution_refresh_millis=5000 + ): + """Get current monitoring view as formatted string. + + Args: + spinner_char (str): Character for progress indication + execution_refresh_millis (int): Refresh interval in milliseconds + + Returns: + str: Formatted monitoring output + """ + current_time = time.time() + + if ( + current_time - self.last_execution_time + >= execution_refresh_millis / 1000.0 + ): + try: + describe_gateway_service_response = ( + self._client.describe_express_gateway_service( + serviceArn=self.service_arn + ) + ) + if not describe_gateway_service_response: + self.cached_monitor_result = ( + None, + "Trying to describe gateway service", + ) + elif ( + not ( + service := describe_gateway_service_response.get( + "service" + ) + ) + or not service.get("serviceArn") + or service.get("activeConfigurations") is None + ): + self.cached_monitor_result = ( + None, + "Trying to describe gateway service", + ) + else: + self.last_described_gateway_service_response = ( + describe_gateway_service_response + ) + described_gateway_service = ( + describe_gateway_service_response.get("service") + ) + + if self.mode == "DEPLOYMENT": + managed_resources, info = self._diff_service_view( + described_gateway_service + ) + else: + managed_resources, info = self._combined_service_view( + described_gateway_service + ) + + service_resources = [ + self._parse_cluster(described_gateway_service), + self._parse_service(described_gateway_service), + ] + if managed_resources: + service_resources.append(managed_resources) + service_resource = ManagedResourceGroup( + resources=service_resources + ) + self._update_cached_monitor_results(service_resource, info) + except ClientError as e: + if ( + e.response.get('Error', {}).get('Code') + == 'InvalidParameterException' + ): + error_message = e.response.get('Error', {}).get( + 'Message', '' + ) + if ( + "Cannot call DescribeServiceRevisions for a service that is INACTIVE" + in error_message + ): + empty_resource_group = ManagedResourceGroup() + self._update_cached_monitor_results( + empty_resource_group, "Service is inactive" + ) + else: + raise + else: + raise + + self.last_execution_time = current_time + + if not self.cached_monitor_result: + return "Waiting for initial data" + else: + service_resource, info = self.cached_monitor_result + status_string = ( + service_resource.get_status_string( + spinner_char=spinner_char, use_color=self.use_color + ) + if service_resource + else None + ) + + output = "\n".join([x for x in [status_string, info] if x]) + return output + + def _diff_service_view(self, describe_gateway_service_response): + """Generate diff view showing changes in the latest deployment.""" + service_arn = describe_gateway_service_response.get("serviceArn") + list_service_deployments_response = ( + self._client.list_service_deployments( + service=service_arn, maxResults=1 + ) + ) + listed_service_deployments = self._validate_and_parse_response( + list_service_deployments_response, + "ListServiceDeployments", + expected_field="serviceDeployments", + ) + if ( + not listed_service_deployments + or "serviceDeploymentArn" not in listed_service_deployments[0] + ): + return ( + None, + "Waiting for a deployment to start", + ) + + deployment_arn = listed_service_deployments[0].get( + "serviceDeploymentArn" + ) + + describe_service_deployments_response = ( + self._client.describe_service_deployments( + serviceDeploymentArns=[deployment_arn] + ) + ) + described_service_deployments = self._validate_and_parse_response( + describe_service_deployments_response, + "DescribeServiceDeployments", + expected_field="serviceDeployments", + eventually_consistent=True, + ) + if not described_service_deployments: + return (None, "Waiting for a deployment to start") + + described_service_deployment = described_service_deployments[0] + if ( + not described_service_deployment + or not described_service_deployment.get("targetServiceRevision") + ): + return ( + None, + "Waiting for a deployment to start", + ) + + target_sr = described_service_deployment.get( + "targetServiceRevision" + ).get("arn") + + target_sr_resources_list, described_target_sr_list = ( + self._describe_and_parse_service_revisions([target_sr]) + ) + if len(target_sr_resources_list) != 1: + return (None, "Trying to describe service revisions") + target_sr_resources = target_sr_resources_list[0] + described_target_sr = described_target_sr_list[0] + + task_def_arn = described_target_sr.get("taskDefinition") + if "sourceServiceRevisions" in described_service_deployment: + source_sr_resources, _ = ( + self._describe_and_parse_service_revisions( + [ + sr.get("arn") + for sr in described_service_deployment.get( + "sourceServiceRevisions" + ) + ] + ) + ) + if len(source_sr_resources) != len( + described_service_deployment.get("sourceServiceRevisions") + ): + return (None, "Trying to describe service revisions") + source_sr_resources_combined = reduce( + lambda x, y: x.combine(y), source_sr_resources + ) + else: + source_sr_resources_combined = ManagedResourceGroup() + + updating_resources, disassociating_resources = ( + target_sr_resources.diff( + source_sr_resources_combined + ) + ) + updating_resources.resource_type = "Updating" + disassociating_resources.resource_type = "Disassociating" + service_resources = ManagedResourceGroup( + resource_type="Deployment", + identifier=deployment_arn, + status=described_service_deployment.get("status"), + reason=described_service_deployment.get("statusReason"), + resources=[ + ManagedResource( + resource_type="TargetServiceRevision", identifier=target_sr + ), + ManagedResource( + resource_type="TaskDefinition", identifier=task_def_arn + ), + updating_resources, + disassociating_resources, + ], + ) + return service_resources, None + + def _combined_service_view(self, describe_gateway_service_response): + """Generate combined view of all active service resources.""" + service_revision_arns = [ + config.get("serviceRevisionArn") + for config in describe_gateway_service_response.get( + "activeConfigurations" + ) + ] + service_revision_resources, _ = ( + self._describe_and_parse_service_revisions(service_revision_arns) + ) + + if len(service_revision_resources) != len(service_revision_arns): + return (None, "Trying to describe service revisions") + + service_resource = reduce( + lambda x, y: x.combine(y), service_revision_resources + ) + + return service_resource, None + + def _update_cached_monitor_results(self, resource, info): + """Update cached monitoring results with new data.""" + if not self.cached_monitor_result: + self.cached_monitor_result = (resource, info) + else: + self.cached_monitor_result = ( + resource or self.cached_monitor_result[0], + info, + ) + + def _validate_and_parse_response( + self, + response, + operation_name, + expected_field=None, + eventually_consistent=False, + ): + """Validate API response and extract expected field.""" + if not response: + raise MonitoringError(f"{operation_name} response is empty") + + self._parse_failures(response, operation_name, eventually_consistent) + + if not expected_field: + return None + + if response.get(expected_field) is None: + raise MonitoringError( + f"{operation_name} response is missing {expected_field}" + ) + return response.get(expected_field) + + def _parse_failures(self, response, operation_name, eventually_consistent): + """Parse and raise errors for API response failures.""" + failures = response.get("failures") + + if not failures: + return + + if any(not f.get('arn') or not f.get('reason') for f in failures): + raise MonitoringError( + "Invalid failure response: missing arn or reason" + ) + + if eventually_consistent: + failures = [ + failure + for failure in failures + if failure.get("reason") != "MISSING" + ] + + if not failures: + return + + failure_msgs = [ + f"{f['arn']} failed with {f['reason']}" for f in failures + ] + joined_msgs = '\n'.join(failure_msgs) + raise MonitoringError(f"{operation_name}:\n{joined_msgs}") + + def _describe_and_parse_service_revisions(self, arns): + """Describe and parse service revisions into managed resources.""" + describe_service_revisions_response = ( + self._client.describe_service_revisions(serviceRevisionArns=arns) + ) + described_service_revisions = self._validate_and_parse_response( + describe_service_revisions_response, + "DescribeServiceRevisions", + expected_field="serviceRevisions", + eventually_consistent=True, + ) + + return [ + self._parse_ecs_managed_resources(sr) + for sr in described_service_revisions + ], described_service_revisions + + def _parse_cluster(self, service): + return ManagedResource("Cluster", service.get("cluster")) + + def _parse_service(self, service): + service_arn = service.get("serviceArn") + cluster = service.get("cluster") + describe_service_response = self._client.describe_services( + cluster=cluster, services=[service_arn] + ) + described_service = self._validate_and_parse_response( + describe_service_response, "DescribeServices", "services" + )[0] + return ManagedResource( + "Service", + service.get("serviceArn"), + additional_info=described_service + and described_service.get("events")[0].get("message") + if described_service.get("events") + else None, + ) + + def _parse_ecs_managed_resources(self, service_revision): + managed_resources = service_revision.get("ecsManagedResources") + if not managed_resources: + return ManagedResourceGroup() + + parsed_resources = [] + if "ingressPaths" in managed_resources: + parsed_resources.append( + ManagedResourceGroup( + resource_type="IngressPaths", + resources=[ + self._parse_ingress_path_resources(ingress_path) + for ingress_path in managed_resources.get( + "ingressPaths" + ) + ], + ) + ) + if "autoScaling" in managed_resources: + parsed_resources.append( + self._parse_auto_scaling_configuration( + managed_resources.get("autoScaling") + ) + ) + if "metricAlarms" in managed_resources: + parsed_resources.append( + self._parse_metric_alarms( + managed_resources.get("metricAlarms") + ) + ) + if "serviceSecurityGroups" in managed_resources: + parsed_resources.append( + self._parse_service_security_groups( + managed_resources.get("serviceSecurityGroups") + ) + ) + if "logGroups" in managed_resources: + parsed_resources.append( + self._parse_log_groups(managed_resources.get("logGroups")) + ) + return ManagedResourceGroup(resources=parsed_resources) + + def _parse_ingress_path_resources(self, ingress_path): + resources = [] + if ingress_path.get("loadBalancer"): + resources.append( + self._parse_managed_resource( + ingress_path.get("loadBalancer"), "LoadBalancer" + ) + ) + if ingress_path.get("loadBalancerSecurityGroups"): + resources.extend( + self._parse_managed_resource_list( + ingress_path.get("loadBalancerSecurityGroups"), + "LoadBalancerSecurityGroup", + ) + ) + if ingress_path.get("certificate"): + resources.append( + self._parse_managed_resource( + ingress_path.get("certificate"), "Certificate" + ) + ) + if ingress_path.get("listener"): + resources.append( + self._parse_managed_resource( + ingress_path.get("listener"), "Listener" + ) + ) + if ingress_path.get("rule"): + resources.append( + self._parse_managed_resource(ingress_path.get("rule"), "Rule") + ) + if ingress_path.get("targetGroups"): + resources.extend( + self._parse_managed_resource_list( + ingress_path.get("targetGroups"), "TargetGroup" + ) + ) + return ManagedResourceGroup( + resource_type="IngressPath", + identifier=ingress_path.get("endpoint"), + resources=resources, + ) + + def _parse_auto_scaling_configuration(self, auto_scaling_configuration): + resources = [] + if auto_scaling_configuration.get("scalableTarget"): + resources.append( + self._parse_managed_resource( + auto_scaling_configuration.get("scalableTarget"), + "ScalableTarget", + ) + ) + if auto_scaling_configuration.get("applicationAutoScalingPolicies"): + resources.extend( + self._parse_managed_resource_list( + auto_scaling_configuration.get( + "applicationAutoScalingPolicies" + ), + "ApplicationAutoScalingPolicy", + ) + ) + return ManagedResourceGroup( + resource_type="AutoScalingConfiguration", resources=resources + ) + + def _parse_metric_alarms(self, metric_alarms): + return ManagedResourceGroup( + resource_type="MetricAlarms", + resources=self._parse_managed_resource_list( + metric_alarms, "MetricAlarm" + ), + ) + + def _parse_service_security_groups(self, service_security_groups): + return ManagedResourceGroup( + resource_type="ServiceSecurityGroups", + resources=self._parse_managed_resource_list( + service_security_groups, "SecurityGroup" + ), + ) + + def _parse_log_groups(self, logs_groups): + return ManagedResourceGroup( + resource_type="LogGroups", + resources=self._parse_managed_resource_list( + logs_groups, "LogGroup" + ), + ) + + def _parse_managed_resource(self, resource, resource_type): + return ManagedResource( + resource_type, + resource.get("arn"), + status=resource.get("status"), + updated_at=resource.get("updatedAt"), + reason=resource.get("statusReason"), + ) + + def _parse_managed_resource_list(self, data_list, resource_type): + return [ + self._parse_managed_resource(data, resource_type) + for data in data_list + ] diff --git a/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py b/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py index 37d9fa0a63e1..e0677a8e3451 100644 --- a/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py +++ b/tests/unit/customizations/ecs/test_monitorexpressgatewayservice.py @@ -123,617 +123,6 @@ def test_is_monitoring_available_without_tty(self, mock_isatty): ECSExpressGatewayServiceWatcher.is_monitoring_available() is False ) - def setup_method(self): - self.app_session = create_app_session(output=DummyOutput()) - self.app_session.__enter__() - self.mock_client = Mock() - self.service_arn = ( - "arn:aws:ecs:us-west-2:123456789012:service/my-cluster/my-service" - ) - - def teardown_method(self): - if hasattr(self, 'app_session'): - self.app_session.__exit__(None, None, None) - - def _create_watcher_with_mocks(self, resource_view="RESOURCE", timeout=1): - """Helper to create watcher with mocked display""" - mock_display = Mock() - mock_display.has_terminal.return_value = True - mock_display._check_keypress.return_value = None - mock_display._restore_terminal.return_value = None - mock_display.display.return_value = None - - watcher = ECSExpressGatewayServiceWatcher( - self.mock_client, - self.service_arn, - resource_view, - timeout_minutes=timeout, - display=mock_display, - ) - - # Mock exec to call the monitoring method once and print output - original_monitor = watcher._monitor_express_gateway_service - - def mock_exec(): - try: - output = original_monitor("⠋", self.service_arn, resource_view) - print(output) - print("Monitoring Complete!") - except Exception as e: - # Re-raise expected exceptions - if isinstance(e, (ClientError, MonitoringError)): - raise - # For other exceptions, just print and complete - print("Monitoring Complete!") - - watcher.exec = mock_exec - return watcher - - @patch('time.sleep') - def test_exec_successful_all_mode_monitoring(self, mock_sleep, capsys): - """Test successful monitoring in RESOURCE mode with resource parsing""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://api.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-lb/1234567890abcdef", - "status": "ACTIVE", - }, - "targetGroups": [ - { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:targetgroup/my-tg/1234567890abcdef", - "status": "HEALTHY", - } - ], - } - ], - "serviceSecurityGroups": [ - { - "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-1234567890abcdef0", - "status": "ACTIVE", - } - ], - "logGroups": [ - { - "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/my-service", - "status": "ACTIVE", - } - ], - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Running"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Verify parsed resources appear in output - assert "Cluster" in output_text - assert "Service" in output_text - assert "IngressPath" in output_text - assert "LoadBalancer" in output_text - assert "TargetGroup" in output_text - assert "SecurityGroup" in output_text - assert "LogGroup" in output_text - - # Specific identifiers - assert "https://api.example.com" in output_text # IngressPath endpoint - assert "my-lb" in output_text # LoadBalancer identifier - assert "my-tg" in output_text # TargetGroup identifier - assert ( - "sg-1234567890abcdef0" in output_text - ) # SecurityGroup identifier - assert "/aws/ecs/my-service" in output_text # LogGroup identifier - - # Status values - assert "ACTIVE" in output_text # LoadBalancer and SecurityGroup status - assert "HEALTHY" in output_text # TargetGroup status - - @patch('time.sleep') - def test_exec_successful_delta_mode_with_deployment( - self, mock_sleep, capsys - ): - """Test DEPLOYMENT mode executes successfully""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Service running"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - - # Verify DEPLOYMENT mode executes successfully - assert captured.out - - @patch('time.sleep') - def test_exec_combined_view_multiple_revisions(self, mock_sleep, capsys): - """Test RESOURCE mode combines multiple service revisions correctly""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - # Multiple active configurations (combined view) - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [ - {"serviceRevisionArn": "rev-1"}, - {"serviceRevisionArn": "rev-2"}, - ], - } - } - - # Mock multiple revisions with different resources - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-1", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://api.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/api-lb/1234", - "status": "ACTIVE", - }, - } - ], - "serviceSecurityGroups": [ - { - "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-api123", - "status": "ACTIVE", - } - ], - }, - }, - { - "arn": "rev-2", - "ecsManagedResources": { - "ingressPaths": [ - { - "endpoint": "https://web.example.com", - "loadBalancer": { - "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/web-lb/5678", - "status": "CREATING", - }, - } - ], - "logGroups": [ - { - "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/web-logs", - "status": "ACTIVE", - } - ], - }, - }, - ] - } - - self.mock_client.describe_services.return_value = { - "services": [ - {"events": [{"message": "Multiple revisions active"}]} - ] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Verify combined view shows resources from both revisions - # Resource types from both revisions - assert "IngressPath" in output_text - assert "LoadBalancer" in output_text - assert "SecurityGroup" in output_text # From rev-1 - assert "LogGroup" in output_text # From rev-2 - - # Specific identifiers from both revisions - assert "https://api.example.com" in output_text # From rev-1 - assert "https://web.example.com" in output_text # From rev-2 - assert "api-lb" in output_text # From rev-1 - assert "web-lb" in output_text # From rev-2 - assert "sg-api123" in output_text # From rev-1 - assert "/aws/ecs/web-logs" in output_text # From rev-2 - - # Status values from both revisions - assert "ACTIVE" in output_text # From both revisions - assert "CREATING" in output_text # From rev-2 - - @patch('time.sleep') - def test_exec_keyboard_interrupt_handling(self, mock_sleep, capsys): - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - - watcher.exec() - captured = capsys.readouterr() - - # Verify completion message is printed - assert "Monitoring Complete!" in captured.out - - @patch('time.sleep') - def test_exec_with_service_not_found_error(self, mock_sleep): - """Test exec() with service not found error bubbles up""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - error = ClientError( - error_response={ - 'Error': { - 'Code': 'ServiceNotFoundException', - 'Message': 'Service not found', - } - }, - operation_name='DescribeExpressGatewayService', - ) - self.mock_client.describe_express_gateway_service.side_effect = error - - with pytest.raises(ClientError) as exc_info: - watcher.exec() - - # Verify the specific error is raised - assert ( - exc_info.value.response['Error']['Code'] - == 'ServiceNotFoundException' - ) - assert ( - exc_info.value.response['Error']['Message'] == 'Service not found' - ) - - @patch('time.sleep') - def test_exec_with_inactive_service_handled_gracefully( - self, mock_sleep, capsys - ): - """Test exec() handles inactive service gracefully""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.side_effect = ClientError( - error_response={ - 'Error': { - 'Code': 'InvalidParameterException', - 'Message': 'Cannot call DescribeServiceRevisions for a service that is INACTIVE', - } - }, - operation_name='DescribeExpressGatewayService', - ) - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Service is inactive"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - - # Verify inactive service is handled and appropriate message shown - assert "inactive" in captured.out.lower() - - @patch('time.sleep') - def test_exec_with_empty_resources(self, mock_sleep, capsys): - """Test parsing edge case: empty/null resources""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Empty ecsManagedResources - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "No resources"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle empty resources gracefully but still show basic structure - assert "Cluster" in output_text - assert "Service" in output_text - # Should NOT contain resource types since ecsManagedResources is empty - assert "IngressPath" not in output_text - assert "LoadBalancer" not in output_text - - @patch('time.sleep') - def test_exec_with_autoscaling_resources(self, mock_sleep, capsys): - """Test autoscaling resource parsing with scalableTarget and policies""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "autoScaling": { - "scalableTarget": { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scalable-target/1234567890abcdef", - "status": "ACTIVE", - }, - "applicationAutoScalingPolicies": [ - { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/cpu-policy", - "status": "ACTIVE", - }, - { - "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/memory-policy", - "status": "ACTIVE", - }, - ], - } - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Autoscaling active"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - assert "AutoScaling" in output_text - assert "ScalableTarget" in output_text - assert "AutoScalingPolicy" in output_text - # ScalableTarget identifier - assert "1234567890abcdef" in output_text - # Policy identifiers - assert "cpu-policy" in output_text - assert "memory-policy" in output_text - - @patch('time.sleep') - def test_exec_with_malformed_resource_data(self, mock_sleep, capsys): - """Test parsing edge case: malformed resource data""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Malformed resources - missing required fields - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [ - { - "arn": "rev-arn", - "ecsManagedResources": { - "ingressPaths": [ - {"endpoint": "https://example.com"} - ], # Missing loadBalancer - "serviceSecurityGroups": [ - {"status": "ACTIVE"} - ], # Missing arn - }, - } - ] - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Malformed data"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle malformed data gracefully and show what it can parse - assert "IngressPath" in output_text - assert "https://example.com" in output_text - # Should show SecurityGroup type even with missing arn - assert "SecurityGroup" in output_text - # Should NOT show LoadBalancer since it's missing from IngressPath - assert "LoadBalancer" not in output_text - - @patch('time.sleep') - def test_exec_eventually_consistent_missing_deployment( - self, mock_sleep, capsys - ): - """Test eventually consistent behavior: deployment missing after list""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - # List shows deployment exists - self.mock_client.list_service_deployments.return_value = { - "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] - } - # But describe fails (eventually consistent) - self.mock_client.describe_service_deployments.return_value = { - "serviceDeployments": [], - "failures": [{"arn": "deploy-arn", "reason": "MISSING"}], - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Eventually consistent"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle eventually consistent missing deployment gracefully - # Should show waiting state when deployment is missing - assert "Trying to describe gateway service" in output_text - assert "Monitoring Complete" in output_text - - @patch('time.sleep') - def test_exec_eventually_consistent_missing_revision( - self, mock_sleep, capsys - ): - """Test eventually consistent behavior: service revision missing after deployment describe""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [], - } - } - self.mock_client.list_service_deployments.return_value = { - "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] - } - self.mock_client.describe_service_deployments.return_value = { - "serviceDeployments": [ - { - "serviceDeploymentArn": "deploy-arn", - "status": "IN_PROGRESS", - "targetServiceRevision": {"arn": "target-rev"}, - } - ] - } - # Service revision missing (eventually consistent) - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"arn": "target-rev", "reason": "MISSING"}], - } - self.mock_client.describe_services.return_value = { - "services": [{"events": [{"message": "Revision missing"}]}] - } - - watcher.exec() - captured = capsys.readouterr() - output_text = captured.out - - # Should handle eventually consistent missing revision gracefully - # Should show waiting state when revision is missing - assert "Trying to describe gateway service" in output_text - assert "Monitoring Complete" in output_text - - @patch('time.sleep') - def test_exec_with_api_failures(self, mock_sleep): - """Test failure parsing: API returns failures""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # API returns failures - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"arn": "rev-arn", "reason": "ServiceNotFound"}], - } - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError with failure details - error_message = str(exc_info.value) - assert "DescribeServiceRevisions" in error_message - assert "rev-arn" in error_message - assert "ServiceNotFound" in error_message - - @patch('time.sleep') - def test_exec_with_malformed_api_failures(self, mock_sleep): - """Test failure parsing: malformed failure responses""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Malformed failures - missing arn or reason - self.mock_client.describe_service_revisions.return_value = { - "serviceRevisions": [], - "failures": [{"reason": "ServiceNotFound"}], # Missing arn - } - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError about invalid failure response - error_message = str(exc_info.value) - assert "Invalid failure response" in error_message - assert "missing arn or reason" in error_message - - @patch('time.sleep') - def test_exec_with_missing_response_fields(self, mock_sleep): - """Test response validation: missing required fields""" - watcher = self._create_watcher_with_mocks() - mock_sleep.side_effect = KeyboardInterrupt() - - self.mock_client.describe_express_gateway_service.return_value = { - "service": { - "serviceArn": self.service_arn, - "cluster": "my-cluster", - "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], - } - } - # Missing serviceRevisions field - self.mock_client.describe_service_revisions.return_value = {} - - with pytest.raises(MonitoringError) as exc_info: - watcher.exec() - - # Should raise MonitoringError about empty response - error_message = str(exc_info.value) - assert "DescribeServiceRevisions" in error_message - assert "empty" in error_message - class TestMonitoringError: """Test MonitoringError exception class""" diff --git a/tests/unit/customizations/ecs/test_serviceviewcollector.py b/tests/unit/customizations/ecs/test_serviceviewcollector.py new file mode 100644 index 000000000000..36ba679c39f3 --- /dev/null +++ b/tests/unit/customizations/ecs/test_serviceviewcollector.py @@ -0,0 +1,774 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ + +from unittest.mock import Mock + +import pytest +from botocore.exceptions import ClientError + +from awscli.customizations.ecs.exceptions import MonitoringError +from awscli.customizations.ecs.serviceviewcollector import ( + ServiceViewCollector, +) + + +class TestServiceViewCollector: + """Test ServiceViewCollector business logic""" + + def setup_method(self): + self.mock_client = Mock() + self.service_arn = ( + "arn:aws:ecs:us-west-2:123456789012:service/my-cluster/my-service" + ) + + def test_get_current_view_resource_mode(self): + """Test get_current_view in RESOURCE mode parses resources""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-lb/1234567890abcdef", + "status": "ACTIVE", + }, + } + ], + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Running"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Cluster" in output + assert "Service" in output + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "https://api.example.com" in output + assert "ACTIVE" in output + + def test_get_current_view_handles_inactive_service(self): + """Test get_current_view handles inactive service gracefully""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.side_effect = ClientError( + error_response={ + 'Error': { + 'Code': 'InvalidParameterException', + 'Message': 'Cannot call DescribeServiceRevisions for a service that is INACTIVE', + } + }, + operation_name='DescribeExpressGatewayService', + ) + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Service is inactive"}]}] + } + + output = collector.get_current_view("⠋") + + assert "inactive" in output.lower() + + def test_get_current_view_with_api_failures(self): + """Test get_current_view raises MonitoringError on API failures""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"arn": "rev-arn", "reason": "ServiceNotFound"}], + } + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + error_message = str(exc_info.value) + assert "DescribeServiceRevisions" in error_message + assert "rev-arn" in error_message + assert "ServiceNotFound" in error_message + + def test_get_current_view_caches_results(self): + """Test get_current_view caches results within refresh interval""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Running"}]}] + } + + # First call + collector.get_current_view("⠋") + call_count_first = ( + self.mock_client.describe_express_gateway_service.call_count + ) + + # Second call within refresh interval (default 5000ms) + collector.get_current_view("⠙") + # Should use cached result, not call API again + call_count_second = ( + self.mock_client.describe_express_gateway_service.call_count + ) + assert call_count_first == call_count_second # Cached, no new API call + + def test_combined_view_multiple_revisions(self): + """Test RESOURCE mode combines multiple service revisions correctly""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + # Multiple active configurations (combined view) + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [ + {"serviceRevisionArn": "rev-1"}, + {"serviceRevisionArn": "rev-2"}, + ], + } + } + + # Mock multiple revisions with different resources + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-1", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/api-lb/1234", + "status": "ACTIVE", + }, + } + ], + "serviceSecurityGroups": [ + { + "arn": "arn:aws:ec2:us-west-2:123456789012:security-group/sg-api123", + "status": "ACTIVE", + } + ], + }, + }, + { + "arn": "rev-2", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://web.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/web-lb/5678", + "status": "CREATING", + }, + } + ], + "logGroups": [ + { + "arn": "arn:aws:logs:us-west-2:123456789012:log-group:/aws/ecs/web-logs", + "status": "ACTIVE", + } + ], + }, + }, + ] + } + + self.mock_client.describe_services.return_value = { + "services": [ + {"events": [{"message": "Multiple revisions active"}]} + ] + } + + output = collector.get_current_view("⠋") + + # Verify combined view shows resources from both revisions + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "SecurityGroup" in output # From rev-1 + assert "LogGroup" in output # From rev-2 + assert "https://api.example.com" in output # From rev-1 + assert "https://web.example.com" in output # From rev-2 + assert "api-lb" in output # From rev-1 + assert "web-lb" in output # From rev-2 + assert "sg-api123" in output # From rev-1 + assert "/aws/ecs/web-logs" in output # From rev-2 + assert "ACTIVE" in output # From both revisions + assert "CREATING" in output # From rev-2 + + def test_get_current_view_with_empty_resources(self): + """Test parsing edge case: empty/null resources""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Empty ecsManagedResources + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [{"arn": "rev-arn", "ecsManagedResources": {}}] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "No resources"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle empty resources gracefully but still show basic structure + assert "Cluster" in output + assert "Service" in output + # Should NOT contain resource types since ecsManagedResources is empty + assert "IngressPath" not in output + assert "LoadBalancer" not in output + + def test_get_current_view_with_autoscaling_resources(self): + """Test autoscaling resource parsing with scalableTarget and policies""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "autoScaling": { + "scalableTarget": { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scalable-target/1234567890abcdef", + "status": "ACTIVE", + }, + "applicationAutoScalingPolicies": [ + { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/cpu-policy", + "status": "ACTIVE", + }, + { + "arn": "arn:aws:application-autoscaling:us-west-2:123456789012:scaling-policy/memory-policy", + "status": "ACTIVE", + }, + ], + } + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Autoscaling active"}]}] + } + + output = collector.get_current_view("⠋") + + assert "AutoScaling" in output + assert "ScalableTarget" in output + assert "AutoScalingPolicy" in output + assert "1234567890abcdef" in output + assert "cpu-policy" in output + assert "memory-policy" in output + + def test_get_current_view_with_malformed_resource_data(self): + """Test parsing edge case: malformed resource data""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Malformed resources - missing required fields + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + {"endpoint": "https://example.com"} + ], # Missing loadBalancer + "serviceSecurityGroups": [ + {"status": "ACTIVE"} + ], # Missing arn + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Malformed data"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle malformed data gracefully and show what it can parse + assert "IngressPath" in output + assert "https://example.com" in output + # Should show SecurityGroup type even with missing arn + assert "SecurityGroup" in output + # Should NOT show LoadBalancer since it's missing from IngressPath + assert "LoadBalancer" not in output + + def test_eventually_consistent_missing_deployment(self): + """Test eventually consistent behavior: deployment missing after list""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + # List shows deployment exists + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + # But describe fails (eventually consistent) + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [], + "failures": [{"arn": "deploy-arn", "reason": "MISSING"}], + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Eventually consistent"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle eventually consistent missing deployment gracefully + assert "Waiting for a deployment to start" in output + + def test_eventually_consistent_missing_revision(self): + """Test eventually consistent behavior: service revision missing""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + "targetServiceRevision": {"arn": "target-rev"}, + } + ] + } + # Service revision missing (eventually consistent) + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"arn": "target-rev", "reason": "MISSING"}], + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Revision missing"}]}] + } + + output = collector.get_current_view("⠋") + + # Should handle eventually consistent missing revision gracefully + assert "Trying to describe service revisions" in output + + def test_eventually_consistent_mixed_failures(self): + """Test eventually consistent behavior: filters MISSING but raises for other failures""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [ + {"serviceDeploymentArn": "deploy-arn"} + ] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + "targetServiceRevision": {"arn": "target-rev"}, + } + ] + } + # Mixed failures: MISSING (should be filtered) and ServiceNotFound (should raise) + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [ + {"arn": "target-rev", "reason": "MISSING"}, + {"arn": "other-rev", "reason": "ServiceNotFound"}, + ], + } + + # Should raise error for non-MISSING failure + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + error_message = str(exc_info.value) + # Should include non-MISSING failure + assert "other-rev" in error_message + assert "ServiceNotFound" in error_message + # Should NOT include MISSING failure + assert "target-rev" not in error_message + + def test_with_malformed_api_failures(self): + """Test failure parsing: malformed failure responses""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Malformed failures - missing arn or reason + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [], + "failures": [{"reason": "ServiceNotFound"}], # Missing arn + } + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + # Should raise MonitoringError about invalid failure response + error_message = str(exc_info.value) + assert "Invalid failure response" in error_message + assert "missing arn or reason" in error_message + + def test_with_missing_response_fields(self): + """Test response validation: missing required fields""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + # Missing serviceRevisions field + self.mock_client.describe_service_revisions.return_value = {} + + with pytest.raises(MonitoringError) as exc_info: + collector.get_current_view("⠋") + + # Should raise MonitoringError about missing field + error_message = str(exc_info.value) + assert "DescribeServiceRevisions" in error_message + assert ( + "response is" in error_message + ) # "response is missing" or "response is empty" + + def test_deployment_mode_diff_view(self): + """Test DEPLOYMENT mode shows diff of target vs source revisions""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + "targetServiceRevision": {"arn": "target-rev"}, + "sourceServiceRevisions": [{"arn": "source-rev"}], + } + ] + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "target-rev", + "taskDefinition": "task-def-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://new-api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/new-lb/1234", + "status": "CREATING", + }, + } + ], + }, + }, + { + "arn": "source-rev", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://old-api.example.com", + "loadBalancer": { + "arn": "arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/old-lb/5678", + "status": "ACTIVE", + }, + } + ], + }, + }, + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Deployment in progress"}]}] + } + + output = collector.get_current_view("⠋") + + # Should show deployment diff + # Initially will show "Trying to describe service revisions" due to mismatch + # But implementation still shows Cluster/Service + assert "Trying to describe service revisions" in output + + def test_waiting_for_deployment_to_start(self): + """Test DEPLOYMENT mode when no deployment exists yet""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + # No deployments + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "No deployment"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Waiting for a deployment to start" in output + + def test_deployment_missing_target_revision(self): + """Test DEPLOYMENT mode when deployment is missing target revision""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "DEPLOYMENT" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [], + } + } + self.mock_client.list_service_deployments.return_value = { + "serviceDeployments": [{"serviceDeploymentArn": "deploy-arn"}] + } + self.mock_client.describe_service_deployments.return_value = { + "serviceDeployments": [ + { + "serviceDeploymentArn": "deploy-arn", + "status": "IN_PROGRESS", + # Missing targetServiceRevision + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "Deployment starting"}]}] + } + + output = collector.get_current_view("⠋") + + assert "Waiting for a deployment to start" in output + + def test_missing_service_in_response(self): + """Test handling when service field is missing""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = {} + + output = collector.get_current_view("⠋") + + assert "Trying to describe gateway service" in output + + def test_service_missing_required_fields(self): + """Test handling when service is missing required fields""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + # Missing activeConfigurations + self.mock_client.describe_express_gateway_service.return_value = { + "service": {"serviceArn": self.service_arn} + } + + output = collector.get_current_view("⠋") + + assert "Trying to describe gateway service" in output + + def test_parse_all_resource_types(self): + """Test parsing all supported resource types""" + collector = ServiceViewCollector( + self.mock_client, self.service_arn, "RESOURCE" + ) + + self.mock_client.describe_express_gateway_service.return_value = { + "service": { + "serviceArn": self.service_arn, + "cluster": "my-cluster", + "activeConfigurations": [{"serviceRevisionArn": "rev-arn"}], + } + } + self.mock_client.describe_service_revisions.return_value = { + "serviceRevisions": [ + { + "arn": "rev-arn", + "ecsManagedResources": { + "ingressPaths": [ + { + "endpoint": "https://api.example.com", + "loadBalancer": { + "arn": "lb-arn", + "status": "ACTIVE", + }, + "loadBalancerSecurityGroups": [ + {"arn": "lb-sg-arn", "status": "ACTIVE"} + ], + "certificate": { + "arn": "cert-arn", + "status": "ACTIVE", + }, + "listener": { + "arn": "listener-arn", + "status": "ACTIVE", + }, + "rule": { + "arn": "rule-arn", + "status": "ACTIVE", + }, + "targetGroups": [ + {"arn": "tg-arn", "status": "ACTIVE"} + ], + } + ], + "autoScaling": { + "scalableTarget": { + "arn": "st-arn", + "status": "ACTIVE", + }, + "applicationAutoScalingPolicies": [ + {"arn": "policy-arn", "status": "ACTIVE"} + ], + }, + "metricAlarms": [ + {"arn": "alarm-arn", "status": "ACTIVE"} + ], + "serviceSecurityGroups": [ + {"arn": "sg-arn", "status": "ACTIVE"} + ], + "logGroups": [{"arn": "log-arn", "status": "ACTIVE"}], + }, + } + ] + } + self.mock_client.describe_services.return_value = { + "services": [{"events": [{"message": "All resources"}]}] + } + + output = collector.get_current_view("⠋") + + # Verify all resource types are parsed + assert "IngressPath" in output + assert "LoadBalancer" in output + assert "LoadBalancerSecurityGroup" in output + assert "Certificate" in output + assert "Listener" in output + assert "Rule" in output + assert "TargetGroup" in output + assert "AutoScalingConfiguration" in output + assert "ScalableTarget" in output + assert "ApplicationAutoScalingPolicy" in output + assert "MetricAlarms" in output + assert "ServiceSecurityGroups" in output + assert "LogGroups" in output