Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix v0.1a1 #36

Merged
merged 53 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
fc33278
Fixed Geokube Version
gtramonte Jul 15, 2024
af6884c
fix bug on dataset with no cache
gtramonte Jul 15, 2024
8e10e75
adding new driver for Active Fire Monitoring dataset
gtramonte Jul 15, 2024
1d54021
adding driver to setup.py
gtramonte Jul 15, 2024
946e0f2
changing from preprocess to post process data
gtramonte Jul 15, 2024
7145d8b
fix typo in DataCube
gtramonte Jul 15, 2024
f89ccbc
adding sort into driver
gtramonte Jul 17, 2024
eab48e8
dropping certainty variable from dataset
gtramonte Jul 17, 2024
a528242
removing postprocess call
gtramonte Jul 17, 2024
6e14a4a
doing only sort in post process
gtramonte Jul 17, 2024
15772cd
check if values explode memory
gtramonte Jul 17, 2024
5c4cea8
expanding only lat
gtramonte Jul 17, 2024
25d07d1
expanding longitude dim
gtramonte Jul 17, 2024
0402069
breaking the expand dim operation into two steps
gtramonte Jul 17, 2024
b16b97c
ignoring spotlight files
gtramonte Jul 19, 2024
d503dbb
applying chunking after expand dim
gtramonte Jul 19, 2024
40710f2
resetting indexes to remove duplicate
gtramonte Jul 19, 2024
80c42b9
adding a post process function to remove duplicate coordinate and app…
gtramonte Jul 19, 2024
5a2f15f
removing duplicated indexes by sel
gtramonte Jul 19, 2024
360a37e
moving reshape in post process
gtramonte Jul 19, 2024
7eac0de
changing chunks size
gtramonte Jul 19, 2024
210c3f9
adding post process chunk parameter
gtramonte Jul 19, 2024
56fd4bb
adding crs projection
gtramonte Jul 19, 2024
04cc64b
fix post process function was using wrong dataset dimensions
gtramonte Jul 19, 2024
3fa9f27
setting threads_per_worker param to 1
gtramonte Jul 22, 2024
2e21cf2
upgrade dask version
gtramonte Jul 22, 2024
5fa752e
upgrade dask version
gtramonte Jul 22, 2024
374ebb3
setting dask workers to 4
gtramonte Jul 22, 2024
6ae159b
setting dask workers to 1
gtramonte Jul 22, 2024
158fa47
removed dask cluster from executor
gtramonte Jul 22, 2024
e6aca53
checking with geokube version 0.2.6b2
gtramonte Jul 22, 2024
79463dd
test with new geokube image and dask cluster
gtramonte Jul 22, 2024
9405931
removed async keyword from process function
gtramonte Jul 22, 2024
00bf6a3
adding certainty back in the driver
gtramonte Jul 22, 2024
b5dea5c
adding pattern handling for filters
gtramonte Jul 25, 2024
c300925
resolving patter before using the open_datacube to get the list of fi…
gtramonte Jul 25, 2024
e4b1307
fix path should be a list of files not a Series
gtramonte Jul 25, 2024
997eb13
using geokube.Dataset apply function to apply postprocess on all Data…
gtramonte Jul 25, 2024
81b8650
removed unused functions
gtramonte Jul 25, 2024
0708a72
setting worker number to 4
gtramonte Jul 25, 2024
9f77ab2
applying resampling to datacubes
gtramonte Jul 26, 2024
9876f1d
removed chunk in postprocess
gtramonte Jul 26, 2024
7cda4fd
applying chunking again
gtramonte Jul 26, 2024
533fdff
setting number of thread per worker to 8
gtramonte Jul 26, 2024
991f9cf
adding options for Dask cluster
gtramonte Jul 29, 2024
15dbb9c
n_workers and thread per worker should be integer
gtramonte Jul 29, 2024
0e129e9
Adding healtchecks for API pods via cron
gtramonte Jul 30, 2024
626ecd5
missing new line at EOF
gtramonte Jul 30, 2024
a55c25a
launching service cron at API start
gtramonte Jul 30, 2024
a8b045e
starting service first
gtramonte Jul 30, 2024
e17dfef
modifying entrypoint
gtramonte Jul 30, 2024
38fe6df
missing curl in api image
gtramonte Jul 30, 2024
67e6c9b
adding healtchek to executors
gtramonte Jul 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ venv.bak/
_catalogs/
_old/

.DS_Store
10 changes: 10 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geolake
ARG TAG=latest
FROM $REGISTRY/geolake-datastore:$TAG

RUN apt update && apt install -y cron curl

WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY app /app
EXPOSE 80

COPY ./healtcheck.* /opt/

RUN chmod +x /opt/healtcheck.sh
RUN crontab -u root /opt/healtcheck.cron

CMD ["uvicorn", "app.main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"]

1 change: 1 addition & 0 deletions api/healtcheck.cron
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/10 * * * * bash -c '/opt/healtcheck.sh'
1 change: 1 addition & 0 deletions api/healtcheck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl https://hc-ping.com/$HEALTCHECKS
6 changes: 3 additions & 3 deletions datastore/datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self) -> None:

@log_execution_time(_LOG)
def get_cached_product_or_read(
self, dataset_id: str, product_id: str, query: GeoQuery | None = None
self, dataset_id: str, product_id: str,
) -> DataCube | Dataset:
"""Get product from the cache instead of loading files indicated in
the catalog if `metadata_caching` set to `True`.
Expand Down Expand Up @@ -81,7 +81,7 @@ def get_cached_product_or_read(
)
return self.catalog(CACHE_DIR=self.cache_dir)[dataset_id][
product_id
].get(geoquery=query, compute=False).read_chunked()
].read_chunked()
return self.cache[dataset_id][product_id]

@log_execution_time(_LOG)
Expand Down Expand Up @@ -389,7 +389,7 @@ def estimate(
# NOTE: we always use catalog directly and single product cache
self._LOG.debug("loading product...")
# NOTE: for estimation we use cached products
kube = self.get_cached_product_or_read(dataset_id, product_id, query=query)
kube = self.get_cached_product_or_read(dataset_id, product_id)
self._LOG.debug("original kube len: %s", len(kube))
return Datastore._process_query(kube, geoquery, False).nbytes

Expand Down
4 changes: 2 additions & 2 deletions drivers/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geokube
# ARG TAG=v0.2.6b1
ARG TAG=latest
#ARG TAG=v0.2.6b2
ARG TAG=2024.05.03.10.36
FROM $REGISTRY/geokube:$TAG

COPY dist/intake_geokube-0.1a0-py3-none-any.whl /
Expand Down
99 changes: 99 additions & 0 deletions drivers/intake_geokube/afm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""geokube driver for intake."""

from typing import Mapping, Optional
import geokube
import numpy as np
import xarray as xr
from .base import GeokubeSource
from geokube import open_datacube, open_dataset
from geokube.core.datacube import DataCube

_PROJECTION = {"grid_mapping_name": "latitude_longitude"}

def postprocess_afm(ds: xr.Dataset, **post_process_chunks):
if isinstance(ds, geokube.core.datacube.DataCube):
ds = ds.to_xarray()
latitude = ds['lat'].values
longitude = ds['lon'].values
# ds = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1,0))
ds = ds.drop('lat')
ds = ds.drop('lon')
ds = ds.drop('certainty')
deduplicated = ds.expand_dims(dim={"latitude": latitude, "longitude": longitude}, axis=(1, 0))
# print(deduplicated.dims)
for dim in deduplicated.dims:
indexes = {dim: ~deduplicated.get_index(dim).duplicated(keep='first')}
deduplicated = deduplicated.isel(indexes)
return DataCube.from_xarray(
deduplicated.sortby('time').sortby('latitude').sortby('longitude').chunk(post_process_chunks))

def add_projection(dset: xr.Dataset, **kwargs) -> xr.Dataset:
"""Add projection information to the dataset"""
coords = dset.coords
coords["crs"] = xr.DataArray(data=np.array(1), attrs=_PROJECTION)
for var in dset.data_vars.values():
enc = var.encoding
enc["grid_mapping"] = "crs"
return dset


class CMCCAFMSource(GeokubeSource):
name = "cmcc_afm_geokube"

def __init__(
self,
path: str,
pattern: str = None,
field_id: str = None,
delay_read_cubes: bool = False,
metadata_caching: bool = False,
metadata_cache_path: str = None,
storage_options: dict = None,
xarray_kwargs: dict = None,
metadata=None,
mapping: Optional[Mapping[str, Mapping[str, str]]] = None,
load_files_on_persistance: Optional[bool] = True,
postprocess_chunk: Optional = None
):
self._kube = None
self.path = path
self.pattern = pattern
self.field_id = field_id
self.delay_read_cubes = delay_read_cubes
self.metadata_caching = metadata_caching
self.metadata_cache_path = metadata_cache_path
self.storage_options = storage_options
self.mapping = mapping
self.xarray_kwargs = {} if xarray_kwargs is None else xarray_kwargs
self.load_files_on_persistance = load_files_on_persistance
self.postprocess_chunk = postprocess_chunk
# self.preprocess = preprocess_afm
super(CMCCAFMSource, self).__init__(metadata=metadata)

def _open_dataset(self):
if self.pattern is None:
self._kube =\
postprocess_afm(
open_datacube(
path=self.path,
id_pattern=self.field_id,
metadata_caching=self.metadata_caching,
metadata_cache_path=self.metadata_cache_path,
mapping=self.mapping,
**self.xarray_kwargs,
# preprocess=self.preprocess
),
**self.postprocess_chunk
).resample('maximum', frequency='1H')
else:
self._kube = open_dataset(
path=self.path,
pattern=self.pattern,
id_pattern=self.field_id,
metadata_caching=self.metadata_caching,
metadata_cache_path=self.metadata_cache_path,
mapping=self.mapping,
**self.xarray_kwargs,
# preprocess=self.preprocess
).apply(postprocess_afm,**self.postprocess_chunk).resample('maximum', frequency='1H')
return self._kube
1 change: 1 addition & 0 deletions drivers/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"intake.drivers": [
"geokube_netcdf = intake_geokube.netcdf:NetCDFSource",
"cmcc_wrf_geokube = intake_geokube.wrf:CMCCWRFSource",
"cmcc_afm_geokube = intake_geokube.afm:CMCCAFMSource",
]
},
classifiers=[
Expand Down
10 changes: 10 additions & 0 deletions executor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
ARG REGISTRY=rg.fr-par.scw.cloud/geolake
ARG TAG=latest
FROM $REGISTRY/geolake-datastore:$TAG

RUN apt update && apt install -y cron curl


WORKDIR /app
COPY requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir -r /code/requirements.txt
COPY app /app

COPY ./healtcheck.* /opt/

RUN chmod +x /opt/healtcheck.sh
RUN crontab -u root /opt/healtcheck.cron

CMD [ "python", "main.py" ]
36 changes: 23 additions & 13 deletions executor/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,20 @@ def process(message: Message, compute: bool):
class Executor(metaclass=LoggableMeta):
_LOG = logging.getLogger("geokube.Executor")

def __init__(self, broker, store_path):
def __init__(self, broker, store_path, dask_cluster_opts):
self._store = store_path
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(host=broker, heartbeat=10),
)
self._conn = broker_conn
self._channel = broker_conn.channel()
self._db = DBManager()
self.dask_cluster_opts = dask_cluster_opts

def create_dask_cluster(self, dask_cluster_opts: dict = None):
if dask_cluster_opts is None:
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = None
dask_cluster_opts["memory_limit"] = "auto"
dask_cluster_opts = self.dask_cluster_opts

self._worker_id = self._db.create_worker(
status="enabled",
dask_scheduler_port=dask_cluster_opts["scheduler_port"],
Expand All @@ -262,10 +256,11 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None):
extra={"track_id": self._worker_id},
)
dask_cluster = LocalCluster(
n_workers=dask_cluster_opts["n_workers"],
n_workers=dask_cluster_opts['n_workers'],
scheduler_port=dask_cluster_opts["scheduler_port"],
dashboard_address=dask_cluster_opts["dashboard_address"],
memory_limit=dask_cluster_opts["memory_limit"],
threads_per_worker=dask_cluster_opts['thread_per_worker'],
)
self._LOG.info(
"creating Dask Client...", extra={"track_id": self._worker_id}
Expand Down Expand Up @@ -356,7 +351,7 @@ def retry_until_timeout(
)
status = RequestStatus.FAILED
fail_reason = f"{type(e).__name__}: {str(e)}"
return (location_path, status, fail_reason)
return location_path, status, fail_reason

def handle_message(self, connection, channel, delivery_tag, body):
message: Message = Message(body)
Expand All @@ -382,6 +377,9 @@ def handle_message(self, connection, channel, delivery_tag, body):
message=message,
compute=False,
)

#future = asyncio.run(process(message,compute=False))

location_path, status, fail_reason = self.retry_until_timeout(
future,
message=message,
Expand Down Expand Up @@ -447,7 +445,19 @@ def get_size(self, location_path):
executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",")
store_path = os.getenv("STORE_PATH", ".")

executor = Executor(broker=broker, store_path=store_path)
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = int(os.getenv("DASK_N_WORKERS", 1))
dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto")
dask_cluster_opts['thread_per_worker'] = int(os.getenv("DASK_THREADS_PER_WORKER", 8))


executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts)
print("channel subscribe")
for etype in executor_types:
if etype == "query":
Expand Down
1 change: 1 addition & 0 deletions executor/healtcheck.cron
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/10 * * * * bash -c '/opt/healtcheck.sh'
1 change: 1 addition & 0 deletions executor/healtcheck.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl https://hc-ping.com/$HEALTCHECKS
Loading