Skip to content

Commit

Permalink
S3 output test by re-creating JVM. Open-EO/openeo-geotrellis-extensio…
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 5, 2024
1 parent 0de7093 commit c1d62b6
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 3 deletions.
6 changes: 5 additions & 1 deletion openeogeotrellis/configparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, env=os.environ):
self.layer_catalog_metadata_files = env.get("OPENEO_CATALOG_FILES", "layercatalog.json").split(",")

# TODO #283 using this "is_kube_deploy" switch is an anti-pattern (induces hard to maintain code and make unit testing difficult)
self.is_kube_deploy = env.get("KUBE", False)
self._is_kube_deploy = env.get("KUBE", False)
self.pod_namespace = env.get("POD_NAMESPACE", "spark-jobs")
self.concurrent_pod_limit = int(env.get("CONCURRENT_POD_LIMIT", 0)) # 0 means no limit.

Expand All @@ -70,6 +70,10 @@ def _as_boolean(envar_value: Optional[str]) -> bool:
# TODO: use `openeo_driver.utils.smart_bool` instead?
return envar_value is not None and envar_value.lower() == "true"

@property
def is_kube_deploy(self):
return self._is_kube_deploy

@property
def use_object_storage(self):
"""Whether or not to get the result files / assets from object storage.
Expand Down
45 changes: 44 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,48 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
return context


# noinspection PyProtectedMember
def restart_spark_context():
from pyspark import SparkContext

with SparkContext._lock:
# Need to shut down before creating a new SparkConf (Before SparkContext is not enough)
# Like this, the new environment variables are available inside the JVM
if SparkContext._active_spark_context:
SparkContext._active_spark_context.stop()
SparkContext._gateway.shutdown()
SparkContext._gateway = None
SparkContext._jvm = None

class TerminalReporterMock:
@staticmethod
def write_line(message):
print(message)

# noinspection PyTypeChecker
_setup_local_spark(TerminalReporterMock(), 0)


@pytest.fixture
def custom_spark_context_restart_instant():
"""
Add this fixture at the end of your argument list.
The restarted JVM will pick up your environment variables
https://docs.pytest.org/en/6.2.x/fixture.html#yield-fixtures-recommended
"""
restart_spark_context()


@pytest.fixture
def custom_spark_context_restart_delayed():
"""
Add this fixture at the beginning of your argument list.
The JVM will be restarted when all mocking is cleaned up.
"""
yield "Spark context is globally accesible now"
restart_spark_context()


@pytest.fixture(params=["1.0.0"])
def api_version(request):
return request.param
Expand Down Expand Up @@ -361,8 +403,9 @@ def mock_s3_client(aws_credentials):


@pytest.fixture(scope="function")
def mock_s3_bucket(mock_s3_resource):
def mock_s3_bucket(mock_s3_resource, monkeypatch):
bucket_name = "openeo-fake-bucketname"
monkeypatch.setenv("SWIFT_BUCKET", bucket_name)

with gps_config_overrides(s3_bucket_name=bucket_name):
assert get_backend_config().s3_bucket_name == bucket_name
Expand Down
67 changes: 66 additions & 1 deletion tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
read_gdal_raster_metadata,
)
from openeogeotrellis.testing import gps_config_overrides
from openeogeotrellis.utils import get_jvm, to_s3_url
from openeogeotrellis.utils import get_jvm, to_s3_url, s3_client

EXPECTED_GRAPH = [{"expression": {"nop": {"process_id": "discard_result",
"result": True}},
Expand Down Expand Up @@ -1301,6 +1301,71 @@ def test_run_job_get_projection_extension_metadata_assets_in_s3_multiple_assets(
)


@mock.patch(
"openeogeotrellis.configparams.ConfigParams.is_kube_deploy",
new_callable=mock.PropertyMock,
)
def test_run_job_to_s3(
mock_config_is_kube_deploy,
custom_spark_context_restart_delayed,
tmp_path,
mock_s3_bucket,
moto_server,
custom_spark_context_restart_instant,
):
mock_config_is_kube_deploy.return_value = True
process_graph = {
"lc": {
"process_id": "load_collection",
"arguments": {
"id": "TestCollection-LonLat4x4",
"temporal_extent": ["2021-01-01", "2021-01-10"],
"spatial_extent": {
"east": 5.08,
"north": 51.22,
"south": 51.215,
"west": 5.07,
},
"bands": ["Longitude", "Latitude", "Day"],
},
},
"resamplespatial1": {
"process_id": "resample_spatial",
"arguments": {
"align": "upper-left",
"data": {"from_node": "lc"},
"method": "bilinear",
"projection": 4326,
"resolution": 0.000297619047619,
},
},
"save": {
"process_id": "save_result",
"arguments": {"data": {"from_node": "lc"}, "format": "GTiff"},
"result": True,
},
}

run_job(
job_specification={
"process_graph": process_graph,
},
output_file=tmp_path / "out",
metadata_file=tmp_path / "metadata.json",
api_version="2.0.0",
job_dir=tmp_path,
dependencies=[],
user_id="jenkins",
)

s3_instance = s3_client()
from openeogeotrellis.config import get_backend_config

files = {o["Key"] for o in s3_instance.list_objects(Bucket=get_backend_config().s3_bucket_name)["Contents"]}
files = {f[len(str(tmp_path)) :] for f in files}
assert files == {"collection.json", "metadata.json", "openEO_2021-01-05Z.tif", "openEO_2021-01-05Z.tif.json"}


# TODO: Update this test to include statistics or not? Would need to update the json file.
@pytest.mark.parametrize(
["json_file", "expected_metadata"],
Expand Down

0 comments on commit c1d62b6

Please sign in to comment.