Skip to content

Commit

Permalink
format with black
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Nov 6, 2024
1 parent c1d62b6 commit 9bb8a6b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 64 deletions.
56 changes: 29 additions & 27 deletions openeogeotrellis/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ def setup_local_spark(additional_jar_dirs=[]):
# TODO: make this more reusable (e.g. also see `_setup_local_spark` in tests/conftest.py)
from pyspark import SparkContext, find_spark_home

spark_python = os.path.join(find_spark_home._find_spark_home(), 'python')
spark_python = os.path.join(find_spark_home._find_spark_home(), "python")
logging.info(f"spark_python: {spark_python}")
py4j = glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
sys.path[:0] = [spark_python, py4j]
_log.debug('sys.path: {p!r}'.format(p=sys.path))
_log.debug("sys.path: {p!r}".format(p=sys.path))
master_str = "local[2]"

OPENEO_LOCAL_DEBUGGING = smart_bool(os.environ.get("OPENEO_LOCAL_DEBUGGING", "false"))
Expand Down Expand Up @@ -82,28 +82,28 @@ def setup_local_spark(additional_jar_dirs=[]):
conf.set("spark.driver.extraJavaOptions", extra_options)
# conf.set('spark.executor.extraJavaOptions', extra_options) # Seems not needed

conf.set(key='spark.driver.memory', value='2G')
conf.set(key='spark.executor.memory', value='2G')
conf.set(key="spark.driver.memory", value="2G")
conf.set(key="spark.executor.memory", value="2G")

if 'PYSPARK_PYTHON' not in os.environ:
os.environ['PYSPARK_PYTHON'] = sys.executable
if "PYSPARK_PYTHON" not in os.environ:
os.environ["PYSPARK_PYTHON"] = sys.executable

_log.info('Creating Spark context with config:')
_log.info("Creating Spark context with config:")
for k, v in conf.getAll():
_log.info("Spark config: {k!r}: {v!r}".format(k=k, v=v))
pysc = SparkContext.getOrCreate(conf)
pysc.setLogLevel("INFO")
_log.info('Created Spark Context {s}'.format(s=pysc))
_log.info("Created Spark Context {s}".format(s=pysc))
if OPENEO_LOCAL_DEBUGGING:
_log.info("Spark web UI: http://localhost:{p}/".format(p=pysc.getConf().get("spark.ui.port") or 4040))

return pysc


def on_started() -> None:
show_log_level(logging.getLogger('gunicorn.error'))
show_log_level(logging.getLogger('flask'))
show_log_level(logging.getLogger('werkzeug'))
show_log_level(logging.getLogger("gunicorn.error"))
show_log_level(logging.getLogger("flask"))
show_log_level(logging.getLogger("werkzeug"))


def setup_environment():
Expand Down Expand Up @@ -132,18 +132,20 @@ def setup_environment():
if smart_bool(os.environ.get("OPENEO_DRIVER_SIMPLE_LOGGING")):
root_handlers = None

setup_logging(get_logging_config(
root_handlers=root_handlers,
loggers={
"openeo": {"level": "DEBUG"},
"openeo_driver": {"level": "DEBUG"},
'openeogeotrellis': {'level': 'DEBUG'},
"flask": {"level": "DEBUG"},
"werkzeug": {"level": "DEBUG"},
"gunicorn": {"level": "INFO"},
'kazoo': {'level': 'WARN'},
},
))
setup_logging(
get_logging_config(
root_handlers=root_handlers,
loggers={
"openeo": {"level": "DEBUG"},
"openeo_driver": {"level": "DEBUG"},
"openeogeotrellis": {"level": "DEBUG"},
"flask": {"level": "DEBUG"},
"werkzeug": {"level": "DEBUG"},
"gunicorn": {"level": "INFO"},
"kazoo": {"level": "WARN"},
},
)
)

setup_environment()

Expand All @@ -159,9 +161,9 @@ def setup_environment():
)
app = build_app(backend_implementation=backend_implementation)

show_log_level(logging.getLogger('openeo'))
show_log_level(logging.getLogger('openeo_driver'))
show_log_level(logging.getLogger('openeogeotrellis'))
show_log_level(logging.getLogger("openeo"))
show_log_level(logging.getLogger("openeo_driver"))
show_log_level(logging.getLogger("openeogeotrellis"))
show_log_level(app.logger)

host = os.environ.get("OPENEO_DEV_GUNICORN_HOST", "127.0.0.1")
Expand Down
79 changes: 42 additions & 37 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def backend_config_path() -> Path:
@pytest.hookimpl(trylast=True)
def pytest_configure(config):
"""Pytest configuration hook"""
os.environ['PYTEST_CONFIGURE'] = (os.environ.get('PYTEST_CONFIGURE', '') + ':' + __file__).lstrip(':')
os.environ["PYTEST_CONFIGURE"] = (os.environ.get("PYTEST_CONFIGURE", "") + ":" + __file__).lstrip(":")

# Load test GpsBackendConfig by default
os.environ["OPENEO_BACKEND_CONFIG"] = str(_BACKEND_CONFIG_PATH)
Expand All @@ -75,14 +75,18 @@ def _ensure_geopyspark(out: TerminalReporter):
"""Make sure GeoPySpark knows where to find Spark (SPARK_HOME) and py4j"""
try:
import geopyspark

out.write_line("[conftest.py] Succeeded to import geopyspark automatically: {p!r}".format(p=geopyspark))
except KeyError as e:
# Geopyspark failed to detect Spark home and py4j, let's fix that.
from pyspark import find_spark_home

pyspark_home = Path(find_spark_home._find_spark_home())
out.write_line("[conftest.py] Failed to import geopyspark automatically. "
"Will set up py4j path using Spark home: {h}".format(h=pyspark_home))
py4j_zip = next((pyspark_home / 'python' / 'lib').glob('py4j-*-src.zip'))
out.write_line(
"[conftest.py] Failed to import geopyspark automatically. "
"Will set up py4j path using Spark home: {h}".format(h=pyspark_home)
)
py4j_zip = next((pyspark_home / "python" / "lib").glob("py4j-*-src.zip"))
out.write_line("[conftest.py] py4j zip: {z!r}".format(z=py4j_zip))
sys.path.append(str(py4j_zip))

Expand All @@ -99,8 +103,8 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
out.write_line("[conftest.py] Setting up local Spark")
master_str = "local[2]"

if 'PYSPARK_PYTHON' not in os.environ:
os.environ['PYSPARK_PYTHON'] = sys.executable
if "PYSPARK_PYTHON" not in os.environ:
os.environ["PYSPARK_PYTHON"] = sys.executable

from geopyspark import geopyspark_conf
from pyspark import SparkContext
Expand All @@ -126,38 +130,39 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
conf.set("spark.sql.session.timeZone", "UTC")

conf.set("spark.kryoserializer.buffer.max", value="1G")
conf.set(key='spark.kryo.registrator', value='geotrellis.spark.store.kryo.KryoRegistrator')
conf.set("spark.kryo.registrator", "geotrellis.spark.store.kryo.KryoRegistrator")
conf.set(
key="spark.kryo.classesToRegister",
value="ar.com.hjg.pngj.ImageInfo,ar.com.hjg.pngj.ImageLineInt,geotrellis.raster.RasterRegion$GridBoundsRasterRegion",
)
# Only show spark progress bars for high verbosity levels
conf.set('spark.ui.showConsoleProgress', verbosity >= 3)
conf.set("spark.ui.showConsoleProgress", verbosity >= 3)

conf.set(key="spark.driver.memory", value="2G")
conf.set(key="spark.executor.memory", value="2G")
OPENEO_LOCAL_DEBUGGING = smart_bool(os.environ.get("OPENEO_LOCAL_DEBUGGING", "false"))
conf.set('spark.ui.enabled', OPENEO_LOCAL_DEBUGGING)
conf.set("spark.ui.enabled", OPENEO_LOCAL_DEBUGGING)

jars = []
for jar_dir in additional_jar_dirs:
for jar_path in Path(jar_dir).iterdir():
if jar_path.match("openeo-logging-*.jar"):
jars.append(str(jar_path))
extraClassPath = ":".join(jars)
conf.set('spark.driver.extraClassPath', extraClassPath)
conf.set('spark.executor.extraClassPath', extraClassPath)
conf.set("spark.driver.extraClassPath", extraClassPath)
conf.set("spark.executor.extraClassPath", extraClassPath)

sparkSubmitLog4jConfigurationFile = Path(__file__).parent.parent / "scripts/batch_job_log4j2.xml"
with open(sparkSubmitLog4jConfigurationFile, 'r') as read_file:
with open(sparkSubmitLog4jConfigurationFile, "r") as read_file:
content = read_file.read()
sparkSubmitLog4jConfigurationFile = "/tmp/sparkSubmitLog4jConfigurationFile.xml"
with open(sparkSubmitLog4jConfigurationFile, 'w') as write_file:
with open(sparkSubmitLog4jConfigurationFile, "w") as write_file:
# There could be a more elegant way to fill in this variable during testing:
write_file.write(content
.replace("${sys:spark.yarn.app.container.log.dir}/", "")
.replace("${sys:openeo.logging.threshold}", "DEBUG")
)
write_file.write(
content.replace("${sys:spark.yarn.app.container.log.dir}/", "").replace(
"${sys:openeo.logging.threshold}", "DEBUG"
)
)
# got some options from 'sparkDriverJavaOptions'
sparkDriverJavaOptions = f"-Dlog4j2.configurationFile=file:{sparkSubmitLog4jConfigurationFile}\
-Dscala.concurrent.context.numThreads=6 \
Expand All @@ -180,14 +185,20 @@ def _setup_local_spark(out: TerminalReporter, verbosity=0):
out.write_line("[conftest.py] SparkContext.getOrCreate with {c!r}".format(c=conf.getAll()))
context = SparkContext.getOrCreate(conf)
context.setLogLevel("DEBUG")
out.write_line("[conftest.py] JVM info: {d!r}".format(d={
f: context._jvm.System.getProperty(f)
for f in [
"java.version", "java.vendor", "java.home",
"java.class.version",
# "java.class.path",
]
}))
out.write_line(
"[conftest.py] JVM info: {d!r}".format(
d={
f: context._jvm.System.getProperty(f)
for f in [
"java.version",
"java.vendor",
"java.home",
"java.class.version",
# "java.class.path",
]
}
)
)

if OPENEO_LOCAL_DEBUGGING:
# TODO: Activate default logging for this message
Expand Down Expand Up @@ -247,6 +258,7 @@ def custom_spark_context_restart_delayed():
def api_version(request):
return request.param


# TODO: Deduplicate code with openeo-python-client
class _Sleeper:
def __init__(self):
Expand Down Expand Up @@ -286,14 +298,9 @@ def udf_noop():

noop_udf_callback = {
"udf_process": {
"arguments": {
"data": {
"from_parameter": "data"
},
"udf": udf_code
},
"arguments": {"data": {"from_parameter": "data"}, "udf": udf_code},
"process_id": "run_udf",
"result": True
"result": True,
},
}
return noop_udf_callback
Expand All @@ -319,9 +326,7 @@ def job_registry() -> InMemoryJobRegistry:


@pytest.fixture
def backend_implementation(
request, batch_job_output_root, job_registry
) -> "GeoPySparkBackendImplementation":
def backend_implementation(request, batch_job_output_root, job_registry) -> "GeoPySparkBackendImplementation":
from openeogeotrellis.backend import GeoPySparkBackendImplementation

backend = GeoPySparkBackendImplementation(
Expand All @@ -341,8 +346,8 @@ def flask_app(backend_implementation) -> flask.Flask:
backend_implementation=backend_implementation,
# error_handling=False,
)
app.config['TESTING'] = True
app.config['SERVER_NAME'] = 'oeo.net'
app.config["TESTING"] = True
app.config["SERVER_NAME"] = "oeo.net"
return app


Expand Down

0 comments on commit 9bb8a6b

Please sign in to comment.