Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tests/test_dags_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from pathlib import Path

from open_bus_pipelines.dags_generator import dags_generator
from open_bus_pipelines.operators.api_bash_operator import ApiBashOperator
from open_bus_pipelines import config
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


def write_yaml(path: Path, text: str):
path.write_text(text)


def test_basic_dag_generation(monkeypatch, tmp_path):
monkeypatch.setattr(config, "OPEN_BUS_PIPELINES_ALERT_EMAILS", [])

write_yaml(tmp_path / "airflow.yaml", "dag_files:\n - dags.yaml\n")
dags_yaml = """
- name: my-dag
schedule_interval: '@daily'
tasks:
- id: first
config:
type: api
- id: second
config:
type: trigger_dag
trigger_dag_id: other
pass_conf_params: [foo]
depends_on:
- first
"""
write_yaml(tmp_path / "dags.yaml", dags_yaml)

dags = list(dags_generator(str(tmp_path) + "/"))
assert len(dags) == 1
dag_id, dag = dags[0]
assert dag_id == "my_dag"

assert set(dag.task_dict.keys()) == {"first", "second", "monitor_task"}
assert isinstance(dag.get_task("first"), ApiBashOperator)
second = dag.get_task("second")
assert isinstance(second, TriggerDagRunOperator)
assert second.conf == {"foo": "{{ dag_run.conf.get('foo') }}"}
assert "first" in second.upstream_task_ids


def test_disable_alert(monkeypatch, tmp_path):
monkeypatch.setattr(config, "OPEN_BUS_PIPELINES_ALERT_EMAILS", [])
write_yaml(tmp_path / "airflow.yaml", "dag_files:\n - d.yaml\n")
dags_yaml = """
- name: no-alert-dag
disable_alert_after_minutes: true
tasks:
- id: t
config:
type: api
"""
write_yaml(tmp_path / "d.yaml", dags_yaml)
dags = list(dags_generator(str(tmp_path) + "/"))
dag_id, dag = dags[0]
assert dag_id == "no_alert_dag"
assert set(dag.task_dict.keys()) == {"t"}
100 changes: 100 additions & 0 deletions tests/test_yaml_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
import json
import datetime
import hashlib
import builtins

import pytest

from open_bus_pipelines import yaml_loader


class DummyResponse:
def __init__(self, text, status=200):
self.text = text
self.status_code = status

def raise_for_status(self):
if self.status_code >= 400:
raise Exception("HTTP error")


def test_get_from_url_success(monkeypatch, tmp_path):
url = "http://example.com/data"
monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path))

def fake_get(requested_url, timeout=15):
assert requested_url == url
return DummyResponse("hello")

monkeypatch.setattr(yaml_loader.requests, "get", fake_get)

result = yaml_loader.get_from_url(url)
assert result == "hello"
cache_key = hashlib.sha256(url.encode()).hexdigest()
cache_file = tmp_path / f"{cache_key}.json"
assert cache_file.exists()
data = json.loads(cache_file.read_text())
assert data["text"] == "hello"


def _write_cache(tmp_path, url, text, delta_days):
cache_key = hashlib.sha256(url.encode()).hexdigest()
cache_file = tmp_path / f"{cache_key}.json"
dt = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=delta_days)
cache_file.write_text(json.dumps({
"url": url,
"datetime": dt.strftime('%Y-%m-%dT%H:%M:%S%z'),
"text": text
}))
return cache_file


def test_get_from_url_use_cache(monkeypatch, tmp_path):
url = "http://example.com/data"
monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path))
_write_cache(tmp_path, url, "cached", 2)

def fake_get(*args, **kwargs):
raise Exception("network error")

monkeypatch.setattr(yaml_loader.requests, "get", fake_get)
result = yaml_loader.get_from_url(url)
assert result == "cached"


def test_get_from_url_cache_too_old(monkeypatch, tmp_path):
url = "http://example.com/data"
monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path))
_write_cache(tmp_path, url, "old", 6)

def fake_get(*args, **kwargs):
raise Exception("network error")

monkeypatch.setattr(yaml_loader.requests, "get", fake_get)
with pytest.raises(Exception):
yaml_loader.get_from_url(url)


def test_get_from_url_no_cache(monkeypatch, tmp_path):
url = "http://example.com/data"
monkeypatch.setattr(yaml_loader, "CACHE_PATH", str(tmp_path))

def fake_get(*args, **kwargs):
raise Exception("network error")

monkeypatch.setattr(yaml_loader.requests, "get", fake_get)
with pytest.raises(Exception):
yaml_loader.get_from_url(url)


def test_yaml_safe_load_local(tmp_path):
f = tmp_path / "file.yaml"
f.write_text("a: 1")
assert yaml_loader.yaml_safe_load(str(f)) == {"a": 1}


def test_yaml_safe_load_url(monkeypatch):
url = "http://example.com/data.yaml"
monkeypatch.setattr(yaml_loader, "get_from_url", lambda u: "b: 2")
assert yaml_loader.yaml_safe_load(url) == {"b": 2}