Skip to content

Commit

Permalink
various fixes throught search/download, including of pipelines/luigi
Browse files Browse the repository at this point in the history
  • Loading branch information
chbrandt committed Aug 4, 2021
1 parent 1f952cd commit 22085ce
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 17 deletions.
9 changes: 6 additions & 3 deletions gpt/helpers/bbox.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging
from collections import OrderedDict

WORLD = {
'minlat': -90,
'maxlat': 90,
Expand Down Expand Up @@ -44,8 +47,8 @@ def __init__(self, bbox):
See https://wiki.openstreetmap.org/wiki/Bounding_Box
"""
if isinstance(bbox, dict):
lons = parse_dict(bbox)
if isinstance(bbox, (dict, OrderedDict)):
vals = parse_dict(bbox)
elif isinstance(bbox, (list,tuple)):
vals = parse_array(bbox)
elif isinstance(bbox, str):
Expand Down Expand Up @@ -83,7 +86,7 @@ def parse_array(arr):


def parse_dict(dct):
clr_keys = ('westlon','minlat','eastlong','maxlat')
clr_keys = ('westlon','minlat','eastlon','maxlat')
amb_keys = ('xmin','ymin','xmax','ymax')
if all([k in dct for k in clr_keys]):
return parse_array([ dct[k] for k in clr_keys ])
Expand Down
Empty file added gpt/pipelines/__init__.py
Empty file.
223 changes: 223 additions & 0 deletions gpt/pipelines/luigi_tasks_ode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
"""
Luigi runs a serial pipeline defined by _tasks_ based on files as I/O and as triggers.
"""
import luigi
import json
import logging

logger = logging.getLogger('luigi-interface')


class SearchProductTask(luigi.Task):
"""
Parameters:
- productid: str
- dataset: str
Example: 'mars/mro/ctx/edr' or 'mars/mex/hrsc/refdr3'
- output_geojson: str
Filename for search results, each data product as a feature
"""
productid = luigi.Parameter()
dataset = luigi.Parameter()
output_geojson = luigi.Parameter()

def output(self):
return luigi.LocalTarget(self.output_geojson)

def run(self):
from gpt import search

search.product('ode',
productid=self.productid,
dataset=self.dataset,
output=self.output_geojson,
intersect=self.intersect
)

class SearchBboxTask(luigi.Task):
"""
Parameters:
- bounding_box: str
Format: '{westlon},{minlat},{eastlon},{maxlat}'
- dataset: str
Example: 'mars/mro/ctx/edr' or 'mars/mex/hrsc/refdr3'
- output_geojson: str
Filename for search results, each data product as a feature
- intersect: bool
If True, any footprint intersecting the bounding-box is considered,
otherwise (False), consider only data product fully inside bounding-box.
"""
bounding_box = luigi.DictParameter()
dataset = luigi.Parameter()
output_geojson = luigi.Parameter()
intersect = luigi.BoolParameter(default=True)

def output(self):
return luigi.LocalTarget(self.output_geojson)

def run(self):
from gpt import search
bounding_box = dict(self.bounding_box)
logger.debug(f"{type(bounding_box)} : {self.bounding_box}")
res = search.bbox(api='ode',
bbox=bounding_box,
dataset=self.dataset,
intersect=self.intersect
)

with self.output().open('w') as fp:
json.dump(res, fp)



class DownloadProductTask(luigi.Task):
"""Download data products and resp. metadata
Input:
- productid: str
- dataset: str
- base_dir: str
"""
productid = luigi.Parameter()
dataset = luigi.Parameter()
base_dir = luigi.Parameter()
url_property = luigi.Parameter(default='image_url')
filename_property = luigi.Parameter(default='image_path')

def requires(self):
"""Require a search through an API/DB. Curently, USGS/ODE"""
return SearchProductTask(productid=self.productid, dataset=self.dataset)

def output(self):
return luigi.LocalTarget()

def run(self):
"""
Download the product (image) from searching ODE'
"""
from gpt import download
geojson = json.loads(self.input().open('r'))
feature = json.loads(self.geofeature)
url = feature['properties'][self.url_property]
filename = download.url(url, self.base_dir)
new_feature = feature['properties'].copy()
new_feature.update({self.filepath_property:filename})

from gpt import download
new_feature = download.feature(geofeature, self.base_dir, url_key=self.url_property, output_key=self.filename_property)

with self.output().open('w') as ofile:
json.dump(new_feature, ofile)


class DownloadFeatureTask(luigi.Task):
"""
Input:
- geofeature (from geojson): string
- base_dir (base dir): string
- url_property: str
Product's URL, where to download from
- filepath_property: str
Product's path keyword to be added to the new feature
Output:
- (LocalTarget): name of updated 'feature.download.json' filename (at download_dir)
"""
geofeature = luigi.Parameter()
base_dir = luigi.Parameter()
url_property = luigi.Parameter(default='image_url')
filepath_property = luigi.Parameter(default='image_path')

def output(self):
# This code about 'prodid' should be better, remove at some point
prodid = json.loads(self.geofeature)['properties']['id']
print("# --- DOWNLOAD output")
print("# - Task family:",self.get_task_family())
print("# - Task namespace:",self.get_task_namespace())
print("# - Task ID:",self.task_id)
print("# Feature: ",self.geofeature)
print("# - ProdID:",prodid)
output_filename = self.base_dir +'/'+ prodid + '.download.json'
return luigi.LocalTarget(output_filename)

def run(self):
"""
Download the product (image) from search's GeoJSON 'feature:properties:image_url'
"""
from gpt import download
feature = json.loads(self.geofeature)
url = feature['properties'][self.url_property]
filename = download.url(url, self.base_dir)
new_feature = feature['properties'].copy()
new_feature.update({self.filepath_property:filename})

with self.output().open('w') as ofile:
json.dump(new_feature, ofile)

class DownloadGeojsonTask(luigi.Task):
"""
Download files in GeoJSON features (from ODE, for instance)
Parameters:
- input_geojson: str
- output_geojson: str
"""
input_geojson = luigi.Parameter()
output_geojson = luigi.Parameter()
base_dir = luigi.Parameter()
url_property = luigi.Parameter(default='image_url')

def requires(self):
for feature in json.load(open(self.input_geojson,'r'))['features']:
yield DownloadFeatureTask(json.dumps(feature))

def output(self):
return luigi.LocalTarget(self.output_geojson)

def run(self):
"""Update input geojson features with 'image_path'"""
geojson = [json.load(open(filename,'r').read()) for filename in self.input()]

class ReducedTask(luigi.Task):
"""
Input:
- feature (from geojson): string
- reduced_dir (base dir): string
Output:
- (LocalTarget): name of updated 'feature.json' filename (at download_dir)
"""
feature = luigi.Parameter()
base_dir = luigi.Parameter()
temp_dir = luigi.Parameter()
datasetId = luigi.Parameter()

def requires(self):
print("# --- REDUCED requires")
print("Task family:",self.get_task_family())
print("Task namespace:",self.get_task_namespace())
print("Task ID:",self.task_id)
print(self.feature)
return DownloadFeatureTask(self.feature, base_dir=self.base_dir)

def output(self):
print("# --- REDUCED output")
print("# - Task family:",self.get_task_family())
print("# - Task namespace:",self.get_task_namespace())
print("# - Task ID:",self.task_id)
print("#",self.feature)
# prodid = '.'.join(json.load(self.input().open('r'))['properties']['image_url'].split('/')[-1].split('.')[:-1])
prodid = json.loads(self.feature)['properties']['id']
print("# - ProdID:",prodid)
return luigi.LocalTarget(self.base_dir +'/'+ prodid + '.reduced.json')

def run(self):
#from npt import isis
#isis.sh.set_docker('isis3', {'reduced_data': [base_data_path.as_posix(), '/data']})

from npt.pipelines import processing
new_feature = processing.run(json.load(self.input().open('r')), output_path=self.base_dir, tmpdir=self.temp_dir, datasetId=self.datasetId)

with self.output().open('w') as ofile:
json.dump(new_feature, ofile)

18 changes: 5 additions & 13 deletions gpt/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Functions and Classes to handle geo-spatial data search on DBs or REST/APIs
"""
import logging

from gpt.helpers import Bbox

Expand Down Expand Up @@ -51,8 +52,7 @@ def get_api(name):
return api


def footprints(bbox, api='ode', db=None, match="intersect",
target_body=None, mission=None, instrument=None, product_type=None):
def bbox(bbox, dataset, intersect=True, api='ode'):
"""
Search for product/geometries in 'api' or 'db' matching 'bbox'
Expand All @@ -70,16 +70,8 @@ def footprints(bbox, api='ode', db=None, match="intersect",
Output:
Return ~gpt.helpers.Collection with the results
"""
assert api or db, "Either 'api' or 'db' should be given. Check ~available_apis()"
from . import ode
assert api in available_apis(), "Expected a value from ~available_apis()"

from gpt.helpers import Bbox
_bb = bbox if isinstance(bbox, Bbox) else Bbox(bbox)

if api == 'ode':
from . import ode
resdf = ode.search(bbox=_bb, match=match,
target=target_body, ihid=mission,
iid=instrument, pt=product_type)

return None
match = 'intersect' if intersect else 'contain'
return ode.search(bbox=bbox, match=match, dataset=dataset)
3 changes: 2 additions & 1 deletion gpt/search/ode.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ def available_datasets(target='Mars', ihid=None, iid=None, minimal=False):
return df


def search(bbox, target, ihid, iid, pt, match='contain'):
def search(bbox, dataset, match='contain'):
target, ihid, iid, pt = dataset.split('/')
resjs = _search(bbox, target, ihid, iid, pt, match)
try:
out = _get_ps(resjs)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ install_requires =
pyproj>=3.1
pytest
rasterio>=1.2
requests
sh
tqdm
zip_safe = False

0 comments on commit 22085ce

Please sign in to comment.