From 7d56818c7ff7a9e92ec965397839f3a23b917086 Mon Sep 17 00:00:00 2001 From: Hui Song Date: Thu, 22 May 2025 16:41:27 -0400 Subject: [PATCH] Fix timeout when disabling an activation in ocp env --- .../services/activation/engine/kubernetes.py | 116 ++++--- .../activation/engine/test_kubernetes.py | 290 +++--------------- 2 files changed, 104 insertions(+), 302 deletions(-) diff --git a/src/aap_eda/services/activation/engine/kubernetes.py b/src/aap_eda/services/activation/engine/kubernetes.py index ed3a9ff8e..9f3a35800 100644 --- a/src/aap_eda/services/activation/engine/kubernetes.py +++ b/src/aap_eda/services/activation/engine/kubernetes.py @@ -199,9 +199,6 @@ def cleanup(self, container_id: str, log_handler: LogHandler) -> None: self._delete_services(log_handler) self._delete_job(log_handler) - self._wait_for_pod_to_be_deleted(container_id, log_handler) - log_handler.write(f"Job {container_id} is cleaned up.", flush=True) - def update_logs(self, container_id: str, log_handler: LogHandler) -> None: try: pod = self._get_job_pod(container_id) @@ -460,7 +457,7 @@ def _create_job( job_result = self.client.batch_api.create_namespaced_job( namespace=self.namespace, body=job_spec ) - LOGGER.info(f"Submitted Job template: {self.job_name},") + LOGGER.info(f"Submitted Job template: {self.job_name}") except ApiException as e: LOGGER.error(f"API Exception {e}") raise ContainerStartError(str(e)) from e @@ -475,27 +472,66 @@ def _delete_job(self, log_handler: LogHandler) -> None: timeout_seconds=0, ) - if activation_job.items and activation_job.items[0].metadata: - activation_job_name = activation_job.items[0].metadata.name - result = self.client.batch_api.delete_namespaced_job( - name=activation_job_name, - namespace=self.namespace, - propagation_policy="Background", - ) - - if result.status == "Failure": - raise ContainerCleanupError(f"{result}") - else: + if not (activation_job.items and activation_job.items[0].metadata): LOGGER.info(f"Job for {self.job_name} has been removed") - log_handler.write( - f"Job for {self.job_name} has been removed.", True - ) + return + + activation_job_name = activation_job.items[0].metadata.name + self._delete_job_resource(activation_job_name, log_handler) except ApiException as e: raise ContainerCleanupError( f"Stop of {self.job_name} Failed: \n {e}" ) from e + def _delete_job_resource( + self, job_name: str, log_handler: LogHandler + ) -> None: + result = self.client.batch_api.delete_namespaced_job( + name=job_name, + namespace=self.namespace, + propagation_policy="Background", + ) + + if result.status == "Failure": + raise ContainerCleanupError(f"{result}") + + watcher = watch.Watch() + try: + for event in watcher.stream( + self.client.core_api.list_namespaced_pod, + namespace=self.namespace, + label_selector=f"job-name={self.job_name}", + timeout_seconds=POD_DELETE_TIMEOUT, + ): + if event["type"] == "DELETED": + log_handler.write( + f"Pod '{self.job_name}' is deleted.", + flush=True, + ) + break + + log_handler.write( + f"Job {self.job_name} is cleaned up.", + flush=True, + ) + except ApiException as e: + if e.status == status.HTTP_404_NOT_FOUND: + message = ( + f"Pod '{self.job_name}' not found (404), " + "assuming it's already deleted." + ) + log_handler.write(message, flush=True) + return + log_handler.write( + f"Error while waiting for deletion: {e}", flush=True + ) + raise ContainerCleanupError( + f"Error during cleanup: {str(e)}" + ) from e + finally: + watcher.stop() + def _wait_for_pod_to_start(self, log_handler: LogHandler) -> None: watcher = watch.Watch() LOGGER.info("Waiting for pod to start") @@ -532,50 +568,6 @@ def _wait_for_pod_to_start(self, log_handler: LogHandler) -> None: finally: watcher.stop() - def _wait_for_pod_to_be_deleted( - self, pod_name: str, log_handler: LogHandler - ) -> None: - log_handler.write( - f"Waiting for pod '{pod_name}' to be deleted...", - flush=True, - ) - watcher = watch.Watch() - try: - for event in watcher.stream( - self.client.core_api.list_namespaced_pod, - namespace=self.namespace, - label_selector=f"job-name={self.job_name}", - timeout_seconds=POD_DELETE_TIMEOUT, - ): - LOGGER.debug(f"Received event: {event}") - pod_name = event["object"].metadata.name - pod_phase = event["object"].status.phase - LOGGER.debug(f"Pod {pod_name} - {pod_phase}") - - if event["type"] == "DELETED": - # Pod successfully deleted - log_handler.write( - f"Pod '{pod_name}' has been deleted.", flush=True - ) - break - except ApiException as e: - if e.status == status.HTTP_404_NOT_FOUND: - message = ( - f"Pod '{pod_name}' not found (404), " - "assuming it's already deleted." - ) - log_handler.write(message, flush=True) - return - log_handler.write( - f"Error while waiting for deletion: {e}", flush=True - ) - raise ContainerCleanupError( - f"Error during cleanup: {str(e)}" - ) from e - - finally: - watcher.stop() - def _set_namespace(self) -> None: namespace_file = ( "/var/run/secrets/kubernetes.io/serviceaccount/namespace" diff --git a/tests/integration/services/activation/engine/test_kubernetes.py b/tests/integration/services/activation/engine/test_kubernetes.py index f784fbb10..c8616d234 100644 --- a/tests/integration/services/activation/engine/test_kubernetes.py +++ b/tests/integration/services/activation/engine/test_kubernetes.py @@ -18,7 +18,6 @@ from kubernetes import client as k8sclient from kubernetes.client.rest import ApiException from kubernetes.config.config_exception import ConfigException -from rest_framework import status from aap_eda.core import models from aap_eda.core.enums import ActivationStatus, ProcessParentType @@ -29,9 +28,7 @@ ContainerEngineError, ContainerEngineInitError, ContainerImagePullError, - ContainerNotFoundError, ContainerStartError, - ContainerUpdateLogsError, ) from aap_eda.services.activation.engine.kubernetes import ( IMAGE_PULL_BACK_OFF, @@ -535,8 +532,9 @@ def raise_api_error(*args, **kwargs): engine._delete_services(log_handler) +@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") @pytest.mark.django_db -def test_delete_job(init_kubernetes_data, kubernetes_engine): +def test_delete_job(mock_watch, init_kubernetes_data, kubernetes_engine): engine = kubernetes_engine job_name = "eda-job" engine.job_name = job_name @@ -556,9 +554,20 @@ def test_delete_job(init_kubernetes_data, kubernetes_engine): engine._delete_job(log_handler) assert models.RulebookProcessLog.objects.last().log.endswith( - f"Job for {job_name} has been removed." + f"Job {job_name} is cleaned up." ) + event = {"type": "DELETED"} + with mock.patch.object(engine.client, "batch_api") as batch_api_mock: + batch_api_mock.list_namespaced_job.return_value.items = [job_mock] + mock_watch.return_value.stream.return_value = [event] + engine._delete_job(log_handler) + + log_messages = [ + record.log for record in models.RulebookProcessLog.objects.all() + ] + assert f"Pod '{job_name}' is deleted." in log_messages + @pytest.mark.django_db def test_delete_job_with_exception(init_kubernetes_data, kubernetes_engine): @@ -587,95 +596,6 @@ def raise_api_error(*args, **kwargs): engine._delete_job(log_handler) -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_success( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock watch stream - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - - # Simulate a DELETE event - mock_metadata = mock.MagicMock() - mock_metadata.name = "test-pod" - - mock_watch_instance.stream.return_value = [ - { - "type": "DELETED", - "object": mock.MagicMock( - metadata=mock_metadata, - status=mock.MagicMock(phase="Terminating"), - ), - } - ] - - kubernetes_engine._wait_for_pod_to_be_deleted("test-pod", mock_log_handler) - - # Verify log handler was called correctly - mock_log_handler.write.assert_any_call( - "Waiting for pod 'test-pod' to be deleted...", flush=True - ) - mock_log_handler.write.assert_any_call( - "Pod 'test-pod' has been deleted.", flush=True - ) - mock_watch_instance.stop.assert_called_once() - - -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_not_found( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock to raise 404 error - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - mock_watch_instance.stream.side_effect = ApiException( - status=status.HTTP_404_NOT_FOUND - ) - - kubernetes_engine._wait_for_pod_to_be_deleted("test-pod", mock_log_handler) - - mock_log_handler.write.assert_any_call( - "Pod 'test-pod' not found (404), assuming it's already deleted.", - flush=True, - ) - mock_watch_instance.stop.assert_called_once() - - -@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") -@mock.patch( - "aap_eda.services.activation.engine.kubernetes.k8sclient.CoreV1Api" -) -@pytest.mark.django_db -def test_wait_for_pod_to_be_deleted_error( - mock_core_api, mock_watch, kubernetes_engine, mock_log_handler -): - # Setup mock to raise other API error - mock_watch_instance = mock.MagicMock() - mock_watch.return_value = mock_watch_instance - mock_watch_instance.stream.side_effect = ApiException( - status=status.HTTP_500_INTERNAL_SERVER_ERROR, reason="test-failure" - ) - - with pytest.raises(ContainerCleanupError): - kubernetes_engine._wait_for_pod_to_be_deleted( - "test-pod", mock_log_handler - ) - - mock_log_handler.write.assert_any_call( - "Error while waiting for deletion: (500)\nReason: test-failure\n", - flush=True, - ) - mock_watch_instance.stop.assert_called_once() - - @pytest.mark.django_db def test_cleanup_orig(init_kubernetes_data, kubernetes_engine): engine = kubernetes_engine @@ -685,162 +605,52 @@ def test_cleanup_orig(init_kubernetes_data, kubernetes_engine): with mock.patch.object(engine, "_delete_secret") as secret_mock: with mock.patch.object(engine, "_delete_services") as services_mock: with mock.patch.object(engine, "_delete_job") as job_mock: - with mock.patch.object(engine, "_wait_for_pod_to_be_deleted"): - engine.cleanup(job_name, log_handler) - secret_mock.assert_called_once() - services_mock.assert_called_once() - job_mock.assert_called_once() - - assert models.RulebookProcessLog.objects.last().log.endswith( - f"Job {job_name} is cleaned up." - ) - - -@pytest.mark.django_db -def test_update_logs(init_kubernetes_data, kubernetes_engine): - engine = kubernetes_engine - log_handler = DBLogger(init_kubernetes_data.activation_instance.id) - init_log_read_at = init_kubernetes_data.activation_instance.log_read_at - job_name = "eda-job" - pod_mock = mock.Mock() - - with mock.patch.object( - engine, "_get_job_pod", mock.Mock(return_value=pod_mock) - ): - pod_mock.status.container_statuses = get_pod_statuses("running") - log_mock = mock.Mock() - message = "INFO Result is kept for 500 seconds" - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.read_namespaced_pod_log.return_value = log_mock - log_mock.splitlines.return_value = [ - ( - "2023-10-30T19:18:48.362883381Z 2023-10-30 19:18:48,362" - " INFO Task started: Monitor project tasks" - ), - ( - "2023-10-30T19:18:48.375144193Z 2023-10-30 19:18:48,374" - " INFO Task complete: Monitor project tasks" - ), - ( - "2023-10-30T19:18:48.376026733Z 2023-10-30 19:18:48,375" - " INFO default: Job OK (monitor_project_tasks)" - ), - f"2023-10-30T19:28:48.376034150Z {message}", - ] - engine.update_logs(job_name, log_handler) - - assert models.RulebookProcessLog.objects.last().log == f"{message}" - init_kubernetes_data.activation_instance.refresh_from_db() - assert ( - init_kubernetes_data.activation_instance.log_read_at - > init_log_read_at - ) - - def raise_api_error(*args, **kwargs): - raise ApiException("Container not found") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.read_namespaced_pod_log.side_effect = raise_api_error - - with pytest.raises(ContainerUpdateLogsError): - engine.update_logs(job_name, log_handler) - - with mock.patch.object( - engine, "_get_job_pod", mock.Mock(return_value=pod_mock) - ): - pod_mock.status.container_statuses = get_pod_statuses("unknown") - log_mock = mock.Mock() - with mock.patch.object(engine.client, "core_api") as core_api_mock: - engine.update_logs(job_name, log_handler) - msg = f"Pod with label {job_name} has unhandled state:" - assert msg in models.RulebookProcessLog.objects.last().log - - -@pytest.mark.django_db -def test_get_job_pod(init_kubernetes_data, kubernetes_engine): - engine = kubernetes_engine - pods_mock = mock.Mock() - pod = get_pod("running") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.return_value = pods_mock - pods_mock.items = [pod] - - job_pod = engine._get_job_pod("eda-pod") - - assert job_pod is not None - assert job_pod == pod - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.return_value = pods_mock - pods_mock.items = None - - with pytest.raises(ContainerNotFoundError): - engine._get_job_pod("eda-pod") - - def raise_api_error(*args, **kwargs): - raise ApiException("Container not found") - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_pod.side_effect = raise_api_error - - with pytest.raises(ContainerNotFoundError): - engine._get_job_pod("eda-pod") + engine.cleanup(job_name, log_handler) + secret_mock.assert_called_once() + services_mock.assert_called_once() + job_mock.assert_called_once() +@mock.patch("aap_eda.services.activation.engine.kubernetes.watch.Watch") @pytest.mark.django_db -def test_create_service( - init_kubernetes_data, - kubernetes_engine, - default_organization: models.Organization, +def test_pod_cleanup_exception_handling( + mock_watch, init_kubernetes_data, kubernetes_engine ): engine = kubernetes_engine - engine.job_name = "eda-job" - request = get_request( - init_kubernetes_data, - "admin", - default_organization, - k8s_service_name=init_kubernetes_data.activation.k8s_service_name, - ) - - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_service.return_value.items = None - engine._create_service(request) - - core_api_mock.create_namespaced_service.assert_called_once() + log_handler = DBLogger(init_kubernetes_data.activation_instance.id) + job_name = "test-job" + engine.job_name = job_name - def raise_api_error(*args, **kwargs): - raise ApiException("Service not found") + job_mock = mock.Mock() + job_mock.metadata.name = job_name - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_service.side_effect = raise_api_error + with mock.patch.object(engine.client, "batch_api") as batch_api_mock: + batch_api_mock.list_namespaced_job.return_value.items = [job_mock] - with pytest.raises(ContainerStartError): - engine._create_service(request) + # Test 404 handling + mock_watch.return_value.stream.side_effect = ApiException(status=404) + engine._delete_job(log_handler) + log_msg = ( + "Pod 'test-job' not found (404), assuming it's already deleted." + ) + assert models.RulebookProcessLog.objects.last().log.endswith(log_msg) -@pytest.mark.django_db -def test_create_secret( - init_kubernetes_data, - kubernetes_engine, - default_organization: models.Organization, -): - engine = kubernetes_engine - engine.job_name = "eda-job" - request = get_request( - init_kubernetes_data, - "admin", - default_organization, - k8s_service_name=init_kubernetes_data.activation.k8s_service_name, - ) - log_handler = DBLogger(init_kubernetes_data.activation_instance.id) + # Test generic API error + mock_watch.return_value.stream.side_effect = ApiException(status=500) + with pytest.raises(ContainerCleanupError): + engine._delete_job(log_handler) - def raise_api_error(*args, **kwargs): - raise ApiException("Secret create error") + assert models.RulebookProcessLog.objects.last().log.startswith( + "Error while waiting for deletion:" + ) - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.create_namespaced_secret.side_effect = raise_api_error + # Verify watcher.stop() always called + mock_watch.reset_mock() + mock_watch.return_value.stream.side_effect = ConfigException( + "Config error" + ) + with pytest.raises(ConfigException): + engine._delete_job(log_handler) - with pytest.raises(ContainerStartError): - engine._create_secret(request, log_handler) - core_api_mock.delete_namespaced_secret.assert_called_once() + mock_watch.return_value.stop.assert_called_once()