From c1d62b601b99e532c2096f5194fc05b07719d025 Mon Sep 17 00:00:00 2001 From: Emile Sonneveld Date: Tue, 5 Nov 2024 12:05:25 +0100 Subject: [PATCH] S3 output test by re-creating JVM. https://github.com/Open-EO/openeo-geotrellis-extensions/issues/329 --- openeogeotrellis/configparams.py | 6 ++- tests/conftest.py | 45 ++++++++++++++++++++- tests/deploy/test_batch_job.py | 67 +++++++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/openeogeotrellis/configparams.py b/openeogeotrellis/configparams.py index bbcb529e..b4f43171 100644 --- a/openeogeotrellis/configparams.py +++ b/openeogeotrellis/configparams.py @@ -45,7 +45,7 @@ def __init__(self, env=os.environ): self.layer_catalog_metadata_files = env.get("OPENEO_CATALOG_FILES", "layercatalog.json").split(",") # TODO #283 using this "is_kube_deploy" switch is an anti-pattern (induces hard to maintain code and make unit testing difficult) - self.is_kube_deploy = env.get("KUBE", False) + self._is_kube_deploy = env.get("KUBE", False) self.pod_namespace = env.get("POD_NAMESPACE", "spark-jobs") self.concurrent_pod_limit = int(env.get("CONCURRENT_POD_LIMIT", 0)) # 0 means no limit. @@ -70,6 +70,10 @@ def _as_boolean(envar_value: Optional[str]) -> bool: # TODO: use `openeo_driver.utils.smart_bool` instead? return envar_value is not None and envar_value.lower() == "true" + @property + def is_kube_deploy(self): + return self._is_kube_deploy + @property def use_object_storage(self): """Whether or not to get the result files / assets from object storage. diff --git a/tests/conftest.py b/tests/conftest.py index 02f323b0..e5a34d70 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -201,6 +201,48 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0): return context +# noinspection PyProtectedMember +def restart_spark_context(): + from pyspark import SparkContext + + with SparkContext._lock: + # Need to shut down before creating a new SparkConf (Before SparkContext is not enough) + # Like this, the new environment variables are available inside the JVM + if SparkContext._active_spark_context: + SparkContext._active_spark_context.stop() + SparkContext._gateway.shutdown() + SparkContext._gateway = None + SparkContext._jvm = None + + class TerminalReporterMock: + @staticmethod + def write_line(message): + print(message) + + # noinspection PyTypeChecker + _setup_local_spark(TerminalReporterMock(), 0) + + +@pytest.fixture +def custom_spark_context_restart_instant(): + """ + Add this fixture at the end of your argument list. + The restarted JVM will pick up your environment variables + https://docs.pytest.org/en/6.2.x/fixture.html#yield-fixtures-recommended + """ + restart_spark_context() + + +@pytest.fixture +def custom_spark_context_restart_delayed(): + """ + Add this fixture at the beginning of your argument list. + The JVM will be restarted when all mocking is cleaned up. + """ + yield "Spark context is globally accesible now" + restart_spark_context() + + @pytest.fixture(params=["1.0.0"]) def api_version(request): return request.param @@ -361,8 +403,9 @@ def mock_s3_client(aws_credentials): @pytest.fixture(scope="function") -def mock_s3_bucket(mock_s3_resource): +def mock_s3_bucket(mock_s3_resource, monkeypatch): bucket_name = "openeo-fake-bucketname" + monkeypatch.setenv("SWIFT_BUCKET", bucket_name) with gps_config_overrides(s3_bucket_name=bucket_name): assert get_backend_config().s3_bucket_name == bucket_name diff --git a/tests/deploy/test_batch_job.py b/tests/deploy/test_batch_job.py index 7fbc584b..4529b89d 100644 --- a/tests/deploy/test_batch_job.py +++ b/tests/deploy/test_batch_job.py @@ -39,7 +39,7 @@ read_gdal_raster_metadata, ) from openeogeotrellis.testing import gps_config_overrides -from openeogeotrellis.utils import get_jvm, to_s3_url +from openeogeotrellis.utils import get_jvm, to_s3_url, s3_client EXPECTED_GRAPH = [{"expression": {"nop": {"process_id": "discard_result", "result": True}}, @@ -1301,6 +1301,71 @@ def test_run_job_get_projection_extension_metadata_assets_in_s3_multiple_assets( ) +@mock.patch( + "openeogeotrellis.configparams.ConfigParams.is_kube_deploy", + new_callable=mock.PropertyMock, +) +def test_run_job_to_s3( + mock_config_is_kube_deploy, + custom_spark_context_restart_delayed, + tmp_path, + mock_s3_bucket, + moto_server, + custom_spark_context_restart_instant, +): + mock_config_is_kube_deploy.return_value = True + process_graph = { + "lc": { + "process_id": "load_collection", + "arguments": { + "id": "TestCollection-LonLat4x4", + "temporal_extent": ["2021-01-01", "2021-01-10"], + "spatial_extent": { + "east": 5.08, + "north": 51.22, + "south": 51.215, + "west": 5.07, + }, + "bands": ["Longitude", "Latitude", "Day"], + }, + }, + "resamplespatial1": { + "process_id": "resample_spatial", + "arguments": { + "align": "upper-left", + "data": {"from_node": "lc"}, + "method": "bilinear", + "projection": 4326, + "resolution": 0.000297619047619, + }, + }, + "save": { + "process_id": "save_result", + "arguments": {"data": {"from_node": "lc"}, "format": "GTiff"}, + "result": True, + }, + } + + run_job( + job_specification={ + "process_graph": process_graph, + }, + output_file=tmp_path / "out", + metadata_file=tmp_path / "metadata.json", + api_version="2.0.0", + job_dir=tmp_path, + dependencies=[], + user_id="jenkins", + ) + + s3_instance = s3_client() + from openeogeotrellis.config import get_backend_config + + files = {o["Key"] for o in s3_instance.list_objects(Bucket=get_backend_config().s3_bucket_name)["Contents"]} + files = {f[len(str(tmp_path)) :] for f in files} + assert files == {"collection.json", "metadata.json", "openEO_2021-01-05Z.tif", "openEO_2021-01-05Z.tif.json"} + + # TODO: Update this test to include statistics or not? Would need to update the json file. @pytest.mark.parametrize( ["json_file", "expected_metadata"],