Skip to content
94 changes: 78 additions & 16 deletions python/grass/temporal/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
:authors: Soeren Gebbert
"""

from __future__ import annotations

import re
import sys
from multiprocessing import Process

import grass.script as gs
from grass.exceptions import CalledModuleError

from .abstract_map_dataset import AbstractMapDataset

from .core import (
SQLDatabaseInterfaceConnection,
get_current_mapset,
Expand All @@ -31,6 +35,73 @@
############################################################################


def compile_new_map_name(
sp, base: str, count: int, map_id: str, time_suffix: str | None, dbif
):
"""Compile new map name with suffix and semantic label.

:param sp: An open SpaceTimeDataSet (STDS)
:param count: Running number of the map to be used as numeric suffix (if not time suffix)
:param map_id: Map ID to compile new map name for
:param time_suffix: Type of time suffix to use (or None)
:param dbif: initialized TGIS database interface
"""
if sp.get_temporal_type() == "absolute" and time_suffix:
old_map = sp.get_new_map_instance(map_id)
old_map.select(dbif)
semantic_label = old_map.metadata.get_semantic_label()
if time_suffix == "gran":
suffix = create_suffix_from_datetime(
old_map.temporal_extent.get_start_time(), sp.get_granularity()
)
elif time_suffix == "time":
suffix = create_time_suffix(old_map)
if semantic_label:
return f"{base}_{semantic_label}_{suffix}"
return f"{base}_{suffix}"
return create_numeric_suffix(base, count, time_suffix)


def replace_stds_names(expression: str, simple_name: str, full_name: str) -> str:
"""Safely replace simple with full STDS names.

When users provide inconsistent input for STDS in the expression
(with and without mapset componenet) or if the STDS name is part
of the name of other raster maps in the expression, the final
mapcalc expression may become invalid when the STDS name later is
replaced with the name of the individual maps in the time series.
The replacement with the fully qualified STDS names avoids that
confusion.

:param expression: The mapcalc expression to replace names in
:param simple_name: STDS name *without* mapset component
:param full_name: STDS name *with* mapset component
"""
separators = r""" ,-+*/^:&|"'`()<>#^"""
name_matches = re.finditer(simple_name, expression)
new_expression = ""
old_idx = 0
for match in name_matches:
# Fill-in expression component between matches
new_expression += expression[old_idx : match.start()]
# Only replace STDS name if pre- and succeeded by a separator
# Match is either at the start or preceeeded by a separator
if match.start() == 0 or expression[match.start() - 1] in separators:
# Match is either at the end or succeeded by a separator
if (
match.end() + 1 > len(expression)
or expression[match.end()] in separators
):
new_expression += full_name
else:
new_expression += simple_name
else:
new_expression += simple_name
old_idx = match.end()
new_expression += expression[match.end() :]
return new_expression


def extract_dataset(
input,
output,
Expand Down Expand Up @@ -102,33 +173,24 @@ def extract_dataset(
proc_count = 0
proc_list = []

# Make sure STRDS is in the expression referenced with fully qualified name
expression = replace_stds_names(
expression, sp.base.get_name(), sp.base.get_map_id()
)
for row in rows:
count += 1

if count % 10 == 0:
msgr.percent(count, num_rows, 1)

if sp.get_temporal_type() == "absolute" and time_suffix == "gran":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_suffix_from_datetime(
old_map.temporal_extent.get_start_time(), sp.get_granularity()
)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
elif sp.get_temporal_type() == "absolute" and time_suffix == "time":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_time_suffix(old_map)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
else:
map_name = create_numeric_suffix(base, count, time_suffix)
map_name = compile_new_map_name(
sp, base, count, row["id"], time_suffix, dbif
)

# We need to modify the r(3).mapcalc expression
if type != "vector":
expr = expression
expr = expr.replace(sp.base.get_map_id(), row["id"])
expr = expr.replace(sp.base.get_name(), row["id"])

expr = "%s = %s" % (map_name, expr)

# We need to build the id
Expand Down
203 changes: 203 additions & 0 deletions temporal/t.rast.extract/testsuite/test_t_rast_extract_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Test t.rast.extract.

This test checks that t.rast.extract works also when
a) a fully qualified name is used in the expression,
b) it is run with a STRDS from another mapset as input and
c) the STRDS contains maps with identical temporal extent but with
different semantic labels

(C) 2025 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.

@author Stefan Blumentrath
"""

import os
import shutil
from pathlib import Path

import grass.script as gs
import pytest
from grass.tools import Tools


@pytest.fixture(scope="module")
def session(tmp_path_factory):
"""Pytest fixture to create a temporary GRASS project with a STRDS.

This setup initializes the GRASS environment, sets the computational region,
creates a series of raster maps with semantic labels, and rgisters
the maps in a new SpaceTimeRasterDataSet (STRDS).

Yields:
session: active GRASS session with the prepared project and STRDS.

"""
# Create a single temporary project directory once per module
tmp_path = tmp_path_factory.mktemp("raster_time_series")

# Remove if exists (defensive cleanup)
if tmp_path.exists():
shutil.rmtree(tmp_path)

gs.create_project(tmp_path)

with gs.setup.init(
tmp_path.parent,
location=tmp_path.name,
env=os.environ.copy(),
) as session:
tools = Tools(session=session)
tools.g_mapset(mapset="perc", flags="c")
tools.g_gisenv(set="TGIS_USE_CURRENT_MAPSET=1")
tools.g_region(s=0, n=80, w=0, e=120, b=0, t=50, res=10, res3=10)
register_strings = []
for i in range(1, 4):
for label in ["A", "B"]:
mapname = f"prec_{label}_{i}"
semantic_label = f"S2{label}_{i}"
tools.r_mapcalc(expression=f"{mapname} = {i}00", overwrite=True)
tools.r_semantic_label(
map=mapname,
semantic_label=semantic_label,
overwrite=True,
)
register_strings.append(f"{mapname}|2025-08-0{i}|{semantic_label}\n")

tools.t_create(
type="strds",
temporaltype="absolute",
output="prec",
title="A test",
description="A test",
overwrite=True,
)
tmp_file = tools.g_tempfile(pid=os.getpid()).text
Path(tmp_file).write_text("".join(register_strings), encoding="UTF8")
tools.t_register(
type="raster",
input="prec",
file=tmp_file,
overwrite=True,
)
yield session


def test_selection(session):
"""Perform a simple selection by datetime."""
tools = Tools(session=session)
gisenv = gs.gisenv(env=session.env)
tools.t_rast_extract(
input="prec",
output="prec_1",
where="start_time > '2025-08-01'",
verbose=True,
overwrite=True,
)


def test_selection_and_expression(session):
"""Perform a selection and a r.mapcalc expression with full name."""
result = "prec_2"
tools = Tools(session=session)
gisenv = gs.gisenv(env=session.env)
tools.t_rast_extract(
input="prec",
output=result,
where="start_time > '2025-08-01'",
expression=" if(prec@perc>=200,prec@perc,null())",
basename="prec_2",
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = gs.parse_key_val(tools.t_info(input=result, flags="g").text)
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": "200.0",
"min_max": "300.0",
"max_min": "200.0",
"max_max": "300.0",
"aggregation_type": "None",
"number_of_semantic_labels": "4",
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": "4",
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)


def test_inconsistent_selection_and_expression(session):
"""Perform a selection and a r.mapcalc expression with simple and full name."""
result = "prec_2"
tools = Tools(session=session)
gisenv = gs.gisenv(env=session.env)
tools.t_rast_extract(
input="prec",
output=result,
where="start_time > '2025-08-01'",
expression=" if(prec>=200,prec@perc,null())",
basename="prec_2",
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = gs.parse_key_val(tools.t_info(input=result, flags="g").text)
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": "200.0",
"min_max": "300.0",
"max_min": "200.0",
"max_max": "300.0",
"aggregation_type": "None",
"number_of_semantic_labels": "4",
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": "4",
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)


def test_selection_and_expression_simple_name(session):
"""Perform a selection and a r.mapcalc expression with simple name."""
result = "prec_3"
tools = Tools(session=session)
gisenv = gs.gisenv(env=session.env)
tools.t_rast_extract(
input="prec",
output=result,
where="start_time > '2025-08-01'",
expression=" if(prec>= 200, prec, null())",
basename="prec_3",
nprocs=2,
verbose=True,
overwrite=True,
)
strds_info = gs.parse_key_val(tools.t_info(input=result, flags="g").text)
expected_info = {
"start_time": "'2025-08-02 00:00:00'",
"end_time": "'2025-08-03 00:00:00'",
"name": result,
"min_min": "200.0",
"min_max": "300.0",
"max_min": "200.0",
"max_max": "300.0",
"aggregation_type": "None",
"number_of_semantic_labels": "4",
"semantic_labels": "S2A_2,S2A_3,S2B_2,S2B_3",
"number_of_maps": "4",
}
for k, v in expected_info.items():
assert strds_info[k] == v, (
f"Expected value for key '{k}' is {v}. Got: {strds_info[k]}"
)
Loading