diff --git a/api/app/endpoint_handlers/dataset.py b/api/app/endpoint_handlers/dataset.py index c03a54b..e239b02 100644 --- a/api/app/endpoint_handlers/dataset.py +++ b/api/app/endpoint_handlers/dataset.py @@ -29,9 +29,10 @@ MESSAGE_SEPARATOR = os.environ["MESSAGE_SEPARATOR"] def _is_etimate_enabled(dataset_id, product_id): - if dataset_id in ("sentinel-2",): - return False - return True + return False + # if dataset_id in ("sentinel-2",): + # return False + # return True @log_execution_time(log) diff --git a/drivers/intake_geokube/base.py b/drivers/intake_geokube/base.py index e070427..3190ca4 100644 --- a/drivers/intake_geokube/base.py +++ b/drivers/intake_geokube/base.py @@ -87,6 +87,16 @@ def process(self, query: GeoQuery) -> Any: data_ = self.read() return self._process_geokube_dataset(data_, query=query, compute=True) + @staticmethod + def _maybe_compute_delayed(delayed_item): + if isinstance(delayed_item, Delayed): + return delayed_item.compute() + elif isinstance(delayed_item, Dataset): + return delayed_item.apply( + lambda dc: dc.compute() if isinstance(dc, Delayed) else dc + ) + return delayed_item + def _process_geokube_dataset( self, dataset: Dataset | DataCube, @@ -98,12 +108,14 @@ def _process_geokube_dataset( ) if not query: self.log.info("query is empty!") - return dataset.compute() if compute else dataset + if compute: + return AbstractBaseDriver._maybe_compute_delayed(dataset) + return dataset if isinstance(dataset, Dataset): self.log.info("filtering with: %s", query.filters) dataset = dataset.filter(**query.filters) - if isinstance(dataset, Delayed) and compute: - dataset = dataset.compute() + if compute: + dataset = AbstractBaseDriver._maybe_compute_delayed(dataset) if query.variable: self.log.info("selecting variable: %s", query.variable) dataset = dataset[query.variable] @@ -120,13 +132,4 @@ def _process_geokube_dataset( self.log.info("subsetting by vertical: %s", query.vertical) method = None if isinstance(query.vertical, slice) else "nearest" dataset = dataset.sel(vertical=query.vertical, method=method) - if isinstance(dataset, Dataset) and compute: - self.log.info( - "computing delayed datacubes in the dataset with %d" - " records...", - len(dataset), - ) - dataset = dataset.apply( - lambda dc: dc.compute() if isinstance(dc, Delayed) else dc - ) return dataset diff --git a/drivers/intake_geokube/iot/driver.py b/drivers/intake_geokube/iot/driver.py index 93c52cd..115be70 100644 --- a/drivers/intake_geokube/iot/driver.py +++ b/drivers/intake_geokube/iot/driver.py @@ -100,7 +100,7 @@ def _get_schema(self): self.stream.start() return {"stream": str(self.stream)} - def read(self) -> streamz.dataframe.core.DataFrame: + def read(self): """Read IoT data.""" self.log.info("reading stream...") self._get_schema() @@ -113,7 +113,7 @@ def load(self) -> NoReturn: "loading entire product is not supported for IoT data" ) - def process(self, query: GeoQuery) -> streamz.dataframe.core.DataFrame: + def process(self, query: GeoQuery): """Process IoT data with the passed query. Parameters @@ -123,7 +123,7 @@ def process(self, query: GeoQuery) -> streamz.dataframe.core.DataFrame: Returns ------- - stream : streamz.dataframe.core.DataFrame + stream : streamz.dataframe.DataFrame A DataFrame object with streamed content """ df = d[0]