From b22345886f9b05eeebff560d4e82f1a64a6e2544 Mon Sep 17 00:00:00 2001 From: Ori Hoch Date: Wed, 11 Jun 2025 10:36:06 +0300 Subject: [PATCH] Add unit tests for yaml loader and dags generator --- tests/test_dags_generator.py | 62 ++++++++++++++++++++++ tests/test_yaml_loader.py | 100 +++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 tests/test_dags_generator.py create mode 100644 tests/test_yaml_loader.py diff --git a/tests/test_dags_generator.py b/tests/test_dags_generator.py new file mode 100644 index 0000000..d902d5d --- /dev/null +++ b/tests/test_dags_generator.py @@ -0,0 +1,62 @@ +from pathlib import Path + +from open_bus_pipelines.dags_generator import dags_generator +from open_bus_pipelines.operators.api_bash_operator import ApiBashOperator +from open_bus_pipelines import config +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + + +def write_yaml(path: Path, text: str): + path.write_text(text) + + +def test_basic_dag_generation(monkeypatch, tmp_path): + monkeypatch.setattr(config, "OPEN_BUS_PIPELINES_ALERT_EMAILS", []) + + write_yaml(tmp_path / "airflow.yaml", "dag_files:\n - dags.yaml\n") + dags_yaml = """ +- name: my-dag + schedule_interval: '@daily' + tasks: + - id: first + config: + type: api + - id: second + config: + type: trigger_dag + trigger_dag_id: other + pass_conf_params: [foo] + depends_on: + - first +""" + write_yaml(tmp_path / "dags.yaml", dags_yaml) + + dags = list(dags_generator(str(tmp_path) + "/")) + assert len(dags) == 1 + dag_id, dag = dags[0] + assert dag_id == "my_dag" + + assert set(dag.task_dict.keys()) == {"first", "second", "monitor_task"} + assert isinstance(dag.get_task("first"), ApiBashOperator) + second = dag.get_task("second") + assert isinstance(second, TriggerDagRunOperator) + assert second.conf == {"foo": "{{ dag_run.conf.get('foo') }}"} + assert "first" in second.upstream_task_ids + + +def test_disable_alert(monkeypatch, tmp_path): + monkeypatch.setattr(config, "OPEN_BUS_PIPELINES_ALERT_EMAILS", []) + write_yaml(tmp_path / "airflow.yaml", "dag_files:\n - d.yaml\n") + dags_yaml = """ +- name: no-alert-dag + disable_alert_after_minutes: true + tasks: + - id: t + config: + type: api +""" + write_yaml(tmp_path / "d.yaml", dags_yaml) + dags = list(dags_generator(str(tmp_path) + "/")) + dag_id, dag = dags[0] + assert dag_id == "no_alert_dag" + assert set(dag.task_dict.keys()) == {"t"} diff --git a/tests/test_yaml_loader.py b/tests/test_yaml_loader.py new file mode 100644 index 0000000..9e59bf9 --- /dev/null +++ b/tests/test_yaml_loader.py @@ -0,0 +1,100 @@ +import os +import json +import datetime +import hashlib +import builtins + +import pytest + +from open_bus_pipelines import yaml_loader + + +class DummyResponse: + def __init__(self, text, status=200): + self.text = text + self.status_code = status + + def raise_for_status(self): + if self.status_code >= 400: + raise Exception("HTTP error") + + +def test_get_from_url_success(monkeypatch, tmp_path): + url = "http://example.com/data" + monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path)) + + def fake_get(requested_url, timeout=15): + assert requested_url == url + return DummyResponse("hello") + + monkeypatch.setattr(yaml_loader.requests, "get", fake_get) + + result = yaml_loader.get_from_url(url) + assert result == "hello" + cache_key = hashlib.sha256(url.encode()).hexdigest() + cache_file = tmp_path / f"{cache_key}.json" + assert cache_file.exists() + data = json.loads(cache_file.read_text()) + assert data["text"] == "hello" + + +def _write_cache(tmp_path, url, text, delta_days): + cache_key = hashlib.sha256(url.encode()).hexdigest() + cache_file = tmp_path / f"{cache_key}.json" + dt = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=delta_days) + cache_file.write_text(json.dumps({ + "url": url, + "datetime": dt.strftime('%Y-%m-%dT%H:%M:%S%z'), + "text": text + })) + return cache_file + + +def test_get_from_url_use_cache(monkeypatch, tmp_path): + url = "http://example.com/data" + monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path)) + _write_cache(tmp_path, url, "cached", 2) + + def fake_get(*args, **kwargs): + raise Exception("network error") + + monkeypatch.setattr(yaml_loader.requests, "get", fake_get) + result = yaml_loader.get_from_url(url) + assert result == "cached" + + +def test_get_from_url_cache_too_old(monkeypatch, tmp_path): + url = "http://example.com/data" + monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path)) + _write_cache(tmp_path, url, "old", 6) + + def fake_get(*args, **kwargs): + raise Exception("network error") + + monkeypatch.setattr(yaml_loader.requests, "get", fake_get) + with pytest.raises(Exception): + yaml_loader.get_from_url(url) + + +def test_get_from_url_no_cache(monkeypatch, tmp_path): + url = "http://example.com/data" + monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path)) + + def fake_get(*args, **kwargs): + raise Exception("network error") + + monkeypatch.setattr(yaml_loader.requests, "get", fake_get) + with pytest.raises(Exception): + yaml_loader.get_from_url(url) + + +def test_yaml_safe_load_local(tmp_path): + f = tmp_path / "file.yaml" + f.write_text("a: 1") + assert yaml_loader.yaml_safe_load(str(f)) == {"a": 1} + + +def test_yaml_safe_load_url(monkeypatch): + url = "http://example.com/data.yaml" + monkeypatch.setattr(yaml_loader, "get_from_url", lambda u: "b: 2") + assert yaml_loader.yaml_safe_load(url) == {"b": 2}