Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8S tests are flaky with test_integration_run_dag_with_scheduler_failurE tests #45145

Open
potiuk opened this issue Dec 22, 2024 · 0 comments
Open
Labels
area:CI Airflow's tests and continious integration provider:cncf-kubernetes Kubernetes provider related issues

Comments

@potiuk
Copy link
Member

potiuk commented Dec 22, 2024

THE test_integration_run_dag_with_scheduler_failure is flaky and fails sometimes.

For example https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306

kubernetes_tests/test_other_executors.py .F                              [100%]
  
  =================================== FAILURES ===================================
  __ TestCeleryAndLocalExecutor.test_integration_run_dag_with_scheduler_failure __
  
  self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
  
      @pytest.mark.xfail(
          EXECUTOR == "LocalExecutor",
          reason="https://github.com/apache/airflow/issues/44481 needs to be implemented",
      )
      def test_integration_run_dag_with_scheduler_failure(self):
          dag_id = "example_xcom"
      
          dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
      
          self._delete_airflow_pod("scheduler")
      
          time.sleep(10)  # give time for pod to restart
      
          # Wait some time for the operator to complete
  >       self.monitor_task(
              host=self.host,
              dag_run_id=dag_run_id,
              dag_id=dag_id,
              task_id="push",
              expected_final_state="success",
              timeout=40,  # This should fail fast if failing
          )
  
  kubernetes_tests/test_other_executors.py:71: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
  host = 'localhost:41460'
  dag_run_id = 'manual__2024-12-22T08:49:39.[1302](https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306#step:10:1304)07+00:00', dag_id = 'example_xcom'
  task_id = 'push', expected_final_state = 'success', timeout = 40
  
      def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, timeout):
          tries = 0
          state = ""
          max_tries = max(int(timeout / 5), 1)
          # Wait some time for the operator to complete
          while tries < max_tries:
              time.sleep(5)
              # Check task state
              try:
                  get_string = (
                      f"http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
                  )
                  print(f"Calling [monitor_task]#1 {get_string}")
                  result = self.session.get(get_string)
                  if result.status_code == 404:
                      check_call(["echo", "api returned 404."])
                      tries += 1
                      continue
                  assert result.status_code == 200, "Could not get the status"
                  result_json = result.json()
                  print(f"Received [monitor_task]#2: {result_json}")
                  state = result_json["state"]
                  print(f"Attempt {tries}: Current state of operator is {state}")
      
                  if state == expected_final_state:
                      break
                  if state in {"failed", "upstream_failed", "removed"}:
                      # If the TI is in failed state (and that's not the state we want) there's no point
                      # continuing to poll, it won't change
                      break
                  self._describe_resources(namespace="airflow")
                  self._describe_resources(namespace="default")
                  tries += 1
              except requests.exceptions.ConnectionError as e:
                  check_call(["echo", f"api call failed. trying again. error {e}"])
          if state != expected_final_state:
              print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
  >       assert state == expected_final_state
  E       AssertionError: assert equals failed
  E         None       'success'
  
  kubernetes_tests/test_base.py:197: AssertionError
  ---------------------------- Captured stdout setup -----------------------------
@potiuk potiuk converted this from a draft issue Dec 22, 2024
@dosubot dosubot bot added area:CI Airflow's tests and continious integration provider:cncf-kubernetes Kubernetes provider related issues labels Dec 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:CI Airflow's tests and continious integration provider:cncf-kubernetes Kubernetes provider related issues
Projects
Status: Flaky stuff
Development

No branches or pull requests

1 participant