Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend region timeline overview #388

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
3 changes: 2 additions & 1 deletion cubedash/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ def get_time_summary(
year: Optional[int] = None,
month: Optional[int] = None,
day: Optional[int] = None,
region: Optional[str] = None,
) -> Optional[TimePeriodOverview]:
return STORE.get(product_name, year, month, day)
return STORE.get(product_name, year, month, day, region)


@cache.memoize(timeout=60)
Expand Down
10 changes: 5 additions & 5 deletions cubedash/_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def region_page(
selected_summary,
year_selector_summary,
time_selector_summary,
) = _load_product(product_name, year, month, day)
) = _load_product(product_name, year, month, day, region_code)

region_info = _model.STORE.get_product_region_info(product_name)
if not region_info:
Expand Down Expand Up @@ -394,7 +394,7 @@ def timeline_page(product_name: str):


def _load_product(
product_name, year, month, day
product_name, year, month, day, region: str = None
) -> Tuple[
DatasetType,
ProductSummary,
Expand All @@ -410,9 +410,9 @@ def _load_product(
abort(404, f"Unknown product {product_name!r}")

product_summary = _model.get_product_summary(product_name)
time_summary = _model.get_time_summary(product_name, year, month, day)
year_selector_summary = _model.get_time_summary(product_name, None, None, None)
time_selector_summary = _model.get_time_summary(product_name, year, None, None)
time_summary = _model.get_time_summary(product_name, year, month, day, region)
year_selector_summary = _model.get_time_summary(product_name, None, None, None, region)
time_selector_summary = _model.get_time_summary(product_name, year, None, None, region)
return (
product,
product_summary,
Expand Down
1 change: 1 addition & 0 deletions cubedash/summary/_extents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import uuid
from dataclasses import dataclass

from datetime import date, datetime
from pathlib import Path
from typing import Dict, Generator, Iterable, List, Optional
Expand Down
1 change: 1 addition & 0 deletions cubedash/summary/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def add_periods(
for p in periods:
timeline_counter.update(p.timeline_dataset_counts)
period = p.timeline_period

timeline_counter, period = cls._group_counter_if_needed(
timeline_counter, period
)
Expand Down
5 changes: 4 additions & 1 deletion cubedash/summary/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@
Column("crses", postgres.ARRAY(String)),
# Size of this dataset in bytes, if the product includes it.
Column("size_bytes", BigInteger),
PrimaryKeyConstraint("product_ref", "start_day", "period_type"),
Column("regions_hash", String),
PrimaryKeyConstraint(
"product_ref", "start_day", "period_type", "regions_hash",
),
CheckConstraint(
r"array_length(timeline_dataset_start_days, 1) = "
r"array_length(timeline_dataset_counts, 1)",
Expand Down
147 changes: 136 additions & 11 deletions cubedash/summary/_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ def get(
year: Optional[int] = None,
month: Optional[int] = None,
day: Optional[int] = None,
region_code: Optional[str] = None,
) -> Optional[TimePeriodOverview]:
period, start_day = TimePeriodOverview.flat_period_representation(
year, month, day
Expand All @@ -771,15 +772,56 @@ def get(
if not product:
return None

res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
)
if region_code and year:
return self._summariser.calculate_summary(
product_name,
year_month_day=(year, month, day),
product_refresh_time=datetime.now(),
region_code=region_code,
)
).fetchone()

if region_code:
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
TIME_OVERVIEW.c.regions.contains([region_code]),
func.cardinality(TIME_OVERVIEW.c.regions) == 1,
)
)
).fetchone()
else:
if self.get_product_all_regions(product.name, period, start_day):
"""
if the product contains region
"""
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
func.cardinality(TIME_OVERVIEW.c.regions) == len(
self.get_product_all_regions(product.name, period, start_day)
),
)
)
).fetchone()
else:
"""
if the product doesnt contain region
"""
res = self._engine.execute(
select([TIME_OVERVIEW]).where(
and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
)
).order_by(TIME_OVERVIEW.c.generation_time.desc())
).fetchone()

if not res:
return None
Expand Down Expand Up @@ -834,6 +876,13 @@ def get_dataset_type(self, name) -> DatasetType:
return d
raise KeyError(f"Unknown dataset type {name!r}")

@ttl_cache(ttl=DEFAULT_TTL)
def get_dataset_type_return_none(self, name) -> DatasetType:
for d in self.all_dataset_types():
if d.name == name:
return d
return None

@ttl_cache(ttl=DEFAULT_TTL)
def _dataset_type_by_id(self, id_) -> DatasetType:
for d in self.all_dataset_types():
Expand Down Expand Up @@ -1034,24 +1083,34 @@ def _put(
log.info("product.put")
product = self._product(summary.product_name)
period, start_day = summary.as_flat_period()
region_values, _ = _counter_key_vals(summary.region_dataset_counts)

row = _summary_to_row(summary)

import hashlib
import json
ret = self._engine.execute(
postgres.insert(TIME_OVERVIEW)
.returning(TIME_OVERVIEW.c.generation_time)
.on_conflict_do_update(
index_elements=["product_ref", "start_day", "period_type"],
index_elements=[
"product_ref", "start_day", "period_type",
"regions_hash"
],
set_=row,
where=and_(
TIME_OVERVIEW.c.product_ref == product.id_,
TIME_OVERVIEW.c.start_day == start_day,
TIME_OVERVIEW.c.period_type == period,
TIME_OVERVIEW.c.regions == region_values,
),
)
.values(
product_ref=product.id_, start_day=start_day, period_type=period, **row
product_ref=product.id_, start_day=start_day, period_type=period,
regions_hash=hashlib.sha224(json.dumps(region_values).encode("utf-8")).hexdigest(), **row
)
)

[gen_time] = ret.fetchone()
summary.summary_gen_time = gen_time

Expand Down Expand Up @@ -1313,13 +1372,15 @@ def _recalculate_period(
year: Optional[int] = None,
month: Optional[int] = None,
product_refresh_time: datetime = None,
region_code: str = None,
) -> TimePeriodOverview:
"""Recalculate the given period and store it in the DB"""
if year and month:
summary = self._summariser.calculate_summary(
product.name,
year_month_day=(year, month, None),
product_refresh_time=product_refresh_time,
region_code=region_code,
)
elif year:
summary = TimePeriodOverview.add_periods(
Expand All @@ -1329,19 +1390,34 @@ def _recalculate_period(
# Product. Does it have data?
elif product.dataset_count > 0:
summary = TimePeriodOverview.add_periods(
self.get(product.name, year_, None, None)
self.get(product.name, year_, None, None, region_code=None)
for year_ in range(
product.time_earliest.astimezone(timezone).year,
product.time_latest.astimezone(timezone).year + 1
)
)

if self.get_product_all_regions(product_name=product.name):
for region in self.get_product_all_regions(product_name=product.name):
region_summary = TimePeriodOverview.add_periods(
self.get(product.name, year_, None, None, region_code=region)
for year_ in range(
product.time_earliest.astimezone(timezone).year,
product.time_latest.astimezone(timezone).year + 1
)
)
region_summary.product_refresh_time = product_refresh_time
region_summary.period_tuple = (product.name, year, month, None)
self._put(region_summary)

else:
summary = TimePeriodOverview.empty(product.name)

summary.product_refresh_time = product_refresh_time
summary.period_tuple = (product.name, year, month, None)

self._put(summary)

for listener in self._update_listeners:
listener(
product_name=product.name,
Expand Down Expand Up @@ -1642,6 +1718,55 @@ def _region_summaries(self, product_name: str) -> Dict[str, RegionSummary]:
if geom is not None
}

def get_product_all_regions(self, product_name: str, period_type: str = None, start_day=None) -> List:
"""
return list of regions per date range
"""
dt = self.get_dataset_type_return_none(product_name)
if not dt:
return None
rows = self._engine.execute(
select(
[
REGION.c.region_code,
]
)
.where(REGION.c.dataset_type_ref == dt.id)
.order_by(REGION.c.region_code)
)

if period_type != 'all' and start_day:
year, month, day = TimePeriodOverview.from_flat_period_representation(
period_type, start_day
)
time = _utils.as_time_range(year, month, day)

begin_time = time.begin.replace(tzinfo=tz.gettz("Australia/Darwin"))
end_time = time.end.replace(tzinfo=tz.gettz("Australia/Darwin"))
rows = self._engine.execute(
select(
[
DATASET_SPATIAL.c.region_code
]
)
.where(
and_(
func.tstzrange(
begin_time, end_time, "[]", type_=TSTZRANGE
).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref == dt.id

)
)
.distinct()
)
if not rows:
return None

return [region["region_code"] for region in rows]

def get_product_region_info(self, product_name: str) -> RegionInfo:
return RegionInfo.for_product(
dataset_type=self.get_dataset_type(product_name),
Expand Down
53 changes: 36 additions & 17 deletions cubedash/summary/_summarise.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def calculate_summary(
product_name: str,
year_month_day: Tuple[Optional[int], Optional[int], Optional[int]],
product_refresh_time: datetime,
region_code: str = None,
) -> TimePeriodOverview:
"""
Create a summary of the given product/time range.
Expand All @@ -64,7 +65,7 @@ def calculate_summary(
log = self.log.bind(product_name=product_name, time=time)
log.debug("summary.query")

begin_time, end_time, where_clause = self._where(product_name, time)
begin_time, end_time, where_clause = self._where(product_name, time, region_code)
select_by_srid = (
select(
(
Expand Down Expand Up @@ -208,25 +209,43 @@ def _with_default_tz(self, d: datetime) -> datetime:
return d

def _where(
self, product_name: str, time: Range
self, product_name: str, time: Range, region: str = None,
) -> Tuple[datetime, datetime, ColumnElement]:
begin_time = self._with_default_tz(time.begin)
end_time = self._with_default_tz(time.end)
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
if region:
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
DATASET_SPATIAL.c.region_code == region,
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
else:
where_clause = and_(
func.tstzrange(begin_time, end_time, "[]", type_=TSTZRANGE).contains(
DATASET_SPATIAL.c.center_time
),
DATASET_SPATIAL.c.dataset_type_ref
== _scalar_subquery(
select([ODC_DATASET_TYPE.c.id]).where(
ODC_DATASET_TYPE.c.name == product_name
)
),
or_(
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(True),
func.st_isvalid(DATASET_SPATIAL.c.footprint).is_(None),
),
)
return begin_time, end_time, where_clause

@lru_cache() # noqa: B019
Expand Down
Loading