diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index 2fdca38..c7c53e5 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -62,6 +62,7 @@ Maps the silver-layer input tables. | `container_tags_table` | `str` | No | Full Unity Catalog path. Container EAV tags. | | `channel_tags_table` | `str` | No | Full Unity Catalog path. Channel EAV tags. | | `channel_mapping_table` | `str` | No | Full Unity Catalog path. Logical-to-physical channel alias table. Required when using `QueryBuilder.channel_with_alias()` (currently supported by `KeyValueStoreSolver`). | +| `unit_conversion_table` | `str` | No | Full Unity Catalog path. Per-unit-family conversion factors. When configured together with a `channel_mapping_table` whose rows carry `source_unit` / `target_unit` columns, aliased selectors auto-convert values from source to target unit during `solve()` (currently supported by `KeyValueStoreSolver`). | Tag tables are required for solvers that consume tag-based filters (`DeltaSolver` with tag filters, `KeyValueStoreSolver`). @@ -172,8 +173,9 @@ Per-table sections (each a `TableConfig`): | `container_metrics`| All solvers | Custom container_id column, custom timestamp columns | | `channel_tags` | DeltaSolver | Tag key/value column renames | | `channel_metrics` | All solvers | Custom channel_id column, custom value/timestamp columns | -| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column | +| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column; optional `join_keys` for non-default alias-resolution composite keys | | `channels` | All solvers | RLE column renames (`tstart`/`tend`/`value`) | +| `unit_conversion` | KeyValueStoreSolver | Unit-conversion table column renames (`unit`, `group_id`, `conversion_factor`) | Internal column names that mappings can target: @@ -187,6 +189,14 @@ Internal column names that mappings can target: | `priority` | Tie-breaker column on the `channel_mapping` table | | `project_id` | Project scoping column | | `parent_id` | Parent/scope identifier | +| `source_channel`| Source-channel identifier on the `channel_mapping` table | +| `data_key` | Data-key identifier (default present on both `channel_mapping` and `channel_metrics`) | +| `channel_alias` | Alias identifier on the `channel_mapping` table | +| `channel_name` | Channel-name identifier on the `channel_metrics` table | +| `source_unit`, `target_unit` | Source/target unit columns on the `channel_mapping` table | +| `unit` | Unit name column on the `unit_conversion` table | +| `group_id` | Unit-family identifier on the `unit_conversion` table | +| `conversion_factor` | Per-unit factor on `unit_conversion`; also the per-channel factor name carried into the solve UDF | :::note Per-solver feature support @@ -235,6 +245,92 @@ However, only the parts each solver supports are actually consumed: Sections you don't customize can be omitted; defaults are an empty mapping and no filters. +### Unit conversion (optional) + +Set `source.unit_conversion_table` and extend `channel_mapping` with `source_unit` / `target_unit` columns +to have aliased selectors auto-convert values from source to target unit during `solve()`. Direct selectors +via `query.channel(...)` always return raw values, even on a channel that an aliased sibling converts — +conversion is a property of the alias, not of the channel. See +[`unit_conversion`](../data_model/silver_layer_schema.md#unit_conversion-optional) for the table schema. + +```python +"source": { + "container_metrics_table": "my_catalog.silver.container_metrics", + "channel_metrics_table": "my_catalog.silver.channel_metrics", + "channels_uri": "my_catalog.silver.channels", + "channel_mapping_table": "my_catalog.silver.channel_mapping", + "unit_conversion_table": "my_catalog.silver.unit_conversion" +}, +"query_engine": { + "solver": "KeyValueStoreSolver", + "solver_config": { + "unit_conversion": { + "column_name_mapping": {} + } + } +} +``` + +### Alias-resolution join keys (optional) + +`KeyValueStoreSolver.filter_aliased_channel_metrics` joins `channel_mapping` +to `channel_metrics` to resolve aliased selectors. The default composite key +is `(source_channel, channel_name) + (data_key, data_key)`. Override +`channel_mapping.join_keys` to change the arity or column choice — for +example, a single-column join when `data_key` is not part of the channel +identity in your silver layout: + +```python +"solver_config": { + "channel_mapping": { + "join_keys": [ + {"mapping_col": "source_channel", "metrics_col": "channel_name"} + ] + } +} +``` + +Each `mapping_col` / `metrics_col` is an **internal** name (the name as the +solver sees the column **after** `column_name_mapping` has been applied on +the respective table). The two sides of a pair are independent, so the same +column can carry different names on the two tables. For instance, a layout +where the data-key column has different physical names on the two tables +has two equivalent paths: + +```python +# Path 1 — rename both physical columns to the same internal name; the +# default join_keys then works unchanged. +"solver_config": { + "channel_mapping": { + "column_name_mapping": {"mapping_data_key": "data_key"} + }, + "channel_metrics": { + "column_name_mapping": {"metrics_data_key": "data_key"} + } +} + +# Path 2 — leave the physical names as-is and reference them directly. +"solver_config": { + "channel_mapping": { + "join_keys": [ + {"mapping_col": "source_channel", "metrics_col": "channel_name"}, + {"mapping_col": "mapping_data_key", "metrics_col": "metrics_data_key"} + ] + } +} +``` + +`query.channel(...)` and `query.channel_with_alias(...)` kwargs are column +references against the **post-`column_name_mapping`** schema. If you +override `join_keys` (or skip renames) so that the solver sees a column +under a non-default name, the same name must be used as the kwarg. Example: +if `join_keys` references `metrics_col: "my_chan_name"` and the column is +not renamed via `column_name_mapping`, call +`query.channel(my_chan_name=...)`. The internal-name properties on +`SolverConfig` exist primarily to remove magic strings from the solver +code; the user-facing contract is "kwarg name == column name as the solver +sees it". + ### When to use what - **`solver_config..column_name_mapping`** — your silver-layer column is named differently from diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index 32d859d..47add63 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -183,6 +183,14 @@ pre-filtering before scanning the much larger `channels` table. | `pz90` | `float` | Yes | 90th percentile. | | `pz99` | `float` | Yes | 99th percentile. | +An optional `unit: string` column may also be present. When the report +config sets a `unit_conversion_table` and the solver resolves an aliased +selector, this column is treated as the authoritative source unit of the +physical channel and takes precedence over `channel_mapping.source_unit` +via `COALESCE(channel_metrics.unit, channel_mapping.source_unit)`. The +column is not part of the canonical schema — omit it for layouts that +don't need per-channel physical units. + --- ## channel_tags @@ -260,7 +268,44 @@ channel name to one or more physical channels keyed by `project_id` / | `channel_name` | `string` | No | Logical channel name to match against `channel_with_alias` selectors. | | `data_key` | `string` | No | Physical lookup key joined to `channel_metrics`. | | `priority` | `int` | Yes | Tie-breaker when multiple physical channels match a logical name. | +| `source_unit` | `string` | Yes | **Fallback** source unit for aliased reads of this mapping. The solver resolves the effective source unit as `COALESCE(channel_metrics.unit, channel_mapping.source_unit)`, so `channel_mapping.source_unit` only takes effect when `channel_metrics.unit` is null or absent. When configured together with `target_unit` and a `unit_conversion_table`, the solver converts values from source to target unit on aliased reads. | +| `target_unit` | `string` | Yes | Target unit for aliased reads of this mapping. Always taken from the mapping (there is no analogous column on `channel_metrics`). | Configured via `source.channel_mapping_table` (see [Configuration](../config/configuration.md)). Joins to `channel_metrics` on `(project_id, data_key, channel_name)`. + +**Per-channel unit conversion is single-target per query.** Storing two +distinct aliases that resolve to the same physical channel (same +`(source_channel, data_key)` → same `channel_metrics.channel_id`) with +different `target_unit` (or different `source_unit`) values is allowed at +the table level. The constraint only applies at query time: if a single +query selects **both** such aliases via `channel_with_alias()`, the solver +raises `ValueError`. The current per-channel factor model attaches one +conversion factor per physical channel and cannot apply two distinct +conversions to the same channel in the same query. Workarounds: select +the conflicting aliases in **separate queries**, or align the mapping rows +so they agree on the unit pair per physical channel. + +--- + +## unit_conversion (optional) + +Per-unit-family conversion factors. Read by `KeyValueStoreSolver` at +solve time when `source.unit_conversion_table` is configured and the +`channel_mapping` table carries `source_unit` / `target_unit` columns. + +| Column | Type | Nullable | Description | +|---------------------|----------|----------|------------------------------------------------------------------------------------------------------------| +| `group_id` | `string` | No | Unit family identifier (e.g. `speed`, `rotation`). Only units within the same family can convert into each other. | +| `unit` | `string` | No | Unit name. Matches the `source_unit` / `target_unit` values on `channel_mapping`. | +| `conversion_factor` | `double` | No | Multiplier that converts a value in this unit to the family's base unit. The base unit has factor `1.0`. **Required to be a positive non-null number** — a row with `conversion_factor` null, zero, or negative is rejected at query time with `ValueError` (validation runs once per query that uses unit conversion). | + +For each aliased channel the solver looks up `source_factor` (the row +whose `unit` matches `source_unit`) and `target_factor` (the row whose +`unit` matches `target_unit`, constrained to the same `group_id`) and +multiplies values by `source_factor / target_factor`. Missing rows or a +`group_id` mismatch yield a null factor and no conversion. + +Configured via `source.unit_conversion_table` (see +[Configuration](../config/configuration.md)). diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md index be9e711..3a7b9cd 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md @@ -150,6 +150,16 @@ columns, then applies the top-level ``project_id`` filter and any per-table ``channel_mapping.filters``, and finally joins with channel_metrics to resolve aliases. +When the database is configured with a ``unit_conversion_table`` and +the ``channel_mapping`` table carries ``source_unit`` / ``target_unit`` +columns, this method also propagates the effective unit pair on each +resolved row. The effective ``source_unit`` is computed as +``COALESCE(channel_metrics.unit, channel_mapping.source_unit)`` so +that the authoritative per-channel physical unit on +``channel_metrics`` takes precedence over the mapping-level default +when present. ``target_unit`` is always taken from the mapping — +there is no analogous column on ``channel_metrics``. + **Arguments**: - `spark` (`SparkSession`): Spark session used for query execution. @@ -160,7 +170,9 @@ channel_metrics to resolve aliases. **Returns**: `pyspark.sql.DataFrame`: DataFrame with ``(container_id, channel_id, selector_ids)`` -where ``selector_ids`` is an array column. +where ``selector_ids`` is an array column. When unit conversion +is active (see above), also carries ``source_unit`` and +``target_unit`` columns. #### resolve\_channel\_selections @@ -171,15 +183,37 @@ def resolve_channel_selections(spark, channel_metrics_df, Union direct and aliased channel metrics, combining selector_ids. +When the aliased side carries ``source_unit`` / ``target_unit`` +columns (added by :meth:`filter_aliased_channel_metrics` when a +unit conversion table is configured), those columns are preserved +through the union and aggregation. Direct selectors produce null +unit columns, which causes the downstream conversion-factor join +in :meth:`solve` to leave their values unchanged. + +Validates that each ``(container_id, channel_id)`` carries at most +one distinct ``source_unit`` and one distinct ``target_unit``. Per +physical channel the unit-conversion model can attach only one +factor; conflicting aliases would otherwise pick an arbitrary +target and silently mis-convert one of them. + **Arguments**: - `spark` (`SparkSession`): Spark session used for query execution. - `channel_metrics_df` (`pyspark.sql.DataFrame`): Direct channel metrics with ``selector_ids`` array column. - `aliased_channel_metrics_df` (`pyspark.sql.DataFrame`): Aliased channel metrics with ``selector_ids`` array column. +**Raises**: + +- `ValueError`: If two or more aliased selectors resolve to the same physical +channel with conflicting ``source_unit`` or ``target_unit`` +values. Up to three offending channels are listed in the +message. + **Returns**: -`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)``. +`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)`` +(plus ``source_unit`` / ``target_unit`` when present on the +aliased side). #### solve @@ -189,6 +223,13 @@ def solve(query, channels_df, selections, dtypes) -> DataFrame Solve the query by grouping channels and applying selections. +When a ``unit_conversion_table`` is configured on the database and +*channels_df* carries ``source_unit`` / ``target_unit`` columns +(added upstream by :meth:`filter_aliased_channel_metrics`), +per-channel conversion factors are computed and propagated into +the grouped-map UDF so that time-series values are converted from +the source to the target unit on the fly. + **Arguments**: - `query` (`QueryBuilder`): Query object containing database and filter information. diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md index 7b0a8fa..f1389a0 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md @@ -36,6 +36,47 @@ names used by the solver. An empty dict means no renaming Keys are internal column names; values are the literal values to match. +## JoinKey + +```python +class JoinKey(BaseModel) +``` + +A single column pair in the ``channel_mapping`` → ``channel_metrics`` join. + +Used by :class:`ChannelMappingConfig.join_keys` to override the default +alias-resolution composite key. + +Both fields reference column names **after** ``column_name_mapping`` has +been applied on the respective table; the two sides are independent, so +a column may appear under different names on the two tables. + +**Arguments**: + +- `mapping_col` (`str`): Column name on ``channel_mapping`` after its ``column_name_mapping`` +has been applied. +- `metrics_col` (`str`): Column name on ``channel_metrics`` after its ``column_name_mapping`` +has been applied. + +## ChannelMappingConfig + +```python +class ChannelMappingConfig(TableConfig) +``` + +``TableConfig`` plus an optional alias-resolution join-key spec. + +**Arguments**: + +- `join_keys` (`list[JoinKey] or None`): Custom composite key for the ``channel_mapping`` → ``channel_metrics`` +join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``. +When ``None`` (the default), the solver uses the backward-compatible +pair ``[(source_channel, channel_name), (data_key, data_key)]`` +sourced from :class:`SolverConfig` internal-name properties. +Provide a custom list to change the join arity or column choice +(e.g. a single-column join when ``data_key`` is not part of the +channel identity in your silver layout). + ## SolverConfig ```python @@ -58,8 +99,10 @@ so that solver code can always reference the same constants. - `container_metrics` (`TableConfig`): Column mappings and filters for the container metrics table. - `channel_tags` (`TableConfig`): Column mappings and filters for the channel tags table. - `channel_metrics` (`TableConfig`): Column mappings and filters for the channel metrics table. -- `channel_mapping` (`TableConfig`): Column mappings and filters for the channel mapping (alias) table. +- `channel_mapping` (`ChannelMappingConfig`): Column mappings, filters, and the alias-resolution ``join_keys`` +override for the channel mapping (alias) table. - `channels` (`TableConfig`): Column mappings and filters for the channel data table. +- `unit_conversion` (`TableConfig`): Column mappings and filters for the unit conversion table. #### from\_json @@ -176,6 +219,50 @@ def alias_priority_col() -> str Internal column name for the alias priority on the channel_mapping table. +#### source\_channel\_col + +```python +def source_channel_col() -> str +``` + +Internal column name for the source-channel identifier on the channel_mapping table. + + +#### data\_key\_col + +```python +def data_key_col() -> str +``` + +Internal column name for the data-key identifier. + +Default present on both ``channel_mapping`` and ``channel_metrics``; +used by the default :meth:`effective_alias_join_keys` for both sides. +Layouts where the two tables carry the data-key column under different +physical names can either rename both to ``"data_key"`` via per-table +``column_name_mapping`` or override + + +#### channel\_alias\_col + +```python +def channel_alias_col() -> str +``` + +Internal column name for the alias identifier on the channel_mapping table. + +Referenced by the dedup window in + + +#### channel\_name\_col + +```python +def channel_name_col() -> str +``` + +Internal column name for the channel-name identifier on the channel_metrics table. + + #### project\_id\_col ```python @@ -194,6 +281,87 @@ def parent_id_col() -> str Internal column name for the parent/scope identifier. +#### conversion\_factor\_col + +```python +def conversion_factor_col() -> str +``` + +Internal column name for the conversion factor on the unit_conversion table. + +Also used as the column that carries the per-channel combined factor +downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors` +into the grouped-map UDF. + + +#### source\_unit\_col + +```python +def source_unit_col() -> str +``` + +Internal column name for the source unit on the channel_mapping table. + + +#### target\_unit\_col + +```python +def target_unit_col() -> str +``` + +Internal column name for the target unit on the channel_mapping table. + + +#### unit\_col + +```python +def unit_col() -> str +``` + +Internal column name for the unit identifier. + +Used in two places that happen to share the same default name: + +- On the ``unit_conversion`` table, as the key joined against + ``channel_mapping.source_unit`` / ``target_unit`` to look up a + conversion factor. +- On the ``channel_metrics`` table (optional), as the authoritative + physical unit of a channel. When present, takes precedence over + ``channel_mapping.source_unit`` for aliased reads via the + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` + coalesce. + +Users with different internal names per table can rename physical +columns to ``unit`` on each table independently via the per-table +``column_name_mapping``. + + +#### group\_id\_col + +```python +def group_id_col() -> str +``` + +Internal column name for the unit group id on the unit_conversion table. + + +#### effective\_alias\_join\_keys + +```python +def effective_alias_join_keys() -> list[tuple[str, str]] +``` + +Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples. + +Falls back to the default composite key +``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]`` +when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise +returns the configured list. + +Both members of each tuple are column names **after** +``column_name_mapping`` has been applied on the respective table. + + #### col\_map ```python diff --git a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md index 2eccf27..c12be71 100644 --- a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md +++ b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md @@ -146,6 +146,10 @@ configured) regardless of whether ``container_tags_table`` is set. - `channels_uri` (`str`): Full Unity Catalog path to the channels data table. - `channel_mapping_table` (`str`): Full Unity Catalog path to the channel mapping table. Required when using ``channel_with_alias()`` for logical alias resolution. +- `unit_conversion_table` (`str`): Full Unity Catalog path to the unit conversion table. When set together +with a ``channel_mapping_table`` whose rows carry ``source_unit`` and +``target_unit`` columns, the query engine converts time-series values +from the source to the target unit during ``solve()``. ## UnitySink diff --git a/docs/impulse/docs/references/query_engine.md b/docs/impulse/docs/references/query_engine.md index 150e928..fa07dfb 100644 --- a/docs/impulse/docs/references/query_engine.md +++ b/docs/impulse/docs/references/query_engine.md @@ -47,6 +47,7 @@ attributes are already wide on `container_metrics` itself. | `container_tags` | required (narrow EAV) | optional (narrow EAV) | | `channel_tags` | required (narrow EAV) | not used | | `channel_mapping` | not used | optional (channel aliases) | +| `unit_conversion` | not used | optional (per-alias unit conversion) | See the [Silver Layer Schema](../data_model/silver_layer_schema.md) for the columns each table is expected to carry. diff --git a/docs/impulse/docs/references/tsal.md b/docs/impulse/docs/references/tsal.md index d94d654..9542df7 100644 --- a/docs/impulse/docs/references/tsal.md +++ b/docs/impulse/docs/references/tsal.md @@ -51,6 +51,12 @@ Each keyword argument becomes a tag filter on the `channel_mapping` table; the s to the physical channels at read time. Use this when the consuming code should not need to know which physical signal backs a given logical name. +When the `channel_mapping` table carries `source_unit` and `target_unit` columns and the report config sets +`source.unit_conversion_table`, values returned from `channel_with_alias()` are automatically converted from source +to target unit before any expression is evaluated. Constants and parameters in expressions over an aliased selector +must therefore be expressed in the target unit. Direct selectors via `channel(...)` on the same physical channel are +unaffected — conversion is a property of the alias, not of the channel. + --- ## Operators diff --git a/src/impulse_query_engine/analyze/metadata/time_series_expression.py b/src/impulse_query_engine/analyze/metadata/time_series_expression.py index fbdacd2..7df22c1 100644 --- a/src/impulse_query_engine/analyze/metadata/time_series_expression.py +++ b/src/impulse_query_engine/analyze/metadata/time_series_expression.py @@ -598,7 +598,7 @@ def build(self, cache: SeriesCache) -> SampleSeries: # TODO: select candidate mid = candidates.container_id.iloc[0] cid = candidates.channel_id.iloc[0] - return cache.load_blob(mid, cid) + return cache.load_blob(mid, cid, uses_alias=self.uses_alias) def get_required_tag_exprs(self) -> set[TagExpression]: """ diff --git a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py index 089190d..6e5bced 100644 --- a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py @@ -49,7 +49,7 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.df) return self.df[idx] - def load_blob(self, container_id, channel_id): + def load_blob(self, container_id, channel_id, uses_alias: bool = False): """ Load a time series blob from disk. @@ -59,6 +59,9 @@ def load_blob(self, container_id, channel_id): Container ID. channel_id : Any Channel ID. + uses_alias : bool, optional + Unused by this cache (no unit conversion); accepted for + interface compatibility with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py index cddf9df..30b545a 100644 --- a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py @@ -61,7 +61,7 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.mdf) return self.mdf[idx] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Load a time series blob from the DataFrame. @@ -71,6 +71,9 @@ def load_blob(self, mid, cid): Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + Unused by this cache (no unit conversion); accepted for + interface compatibility with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py index 84254bf..32ca0d1 100644 --- a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py +++ b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py @@ -25,7 +25,7 @@ def resolve(self, selection): """ return [] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Return an empty SampleSeries for any container and channel ID. @@ -35,6 +35,9 @@ def load_blob(self, mid, cid): Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + Unused by this cache; accepted for interface compatibility + with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 2a1e518..d5e5e5c 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -30,16 +30,22 @@ def __init__(self, pdf, col_map: dict[str, str]): Parameters ---------- pdf : pd.DataFrame - DataFrame containing time series data. + DataFrame containing time series data. When the column named by + ``col_map["conv"]`` is present, :meth:`load_blob` multiplies the + loaded values by that per-channel factor. All rows of a given + ``(cid, ch)`` slice are expected to share the same factor. col_map : dict[str, str] Mapping with keys ``"cid"``, ``"ch"``, ``"ts"``, ``"te"``, - ``"val"`` to the actual column names in *pdf*. + ``"val"``, ``"conv"`` to the actual column names in *pdf*. The + ``"conv"`` column is optional in *pdf*. """ self._cid_col = col_map["cid"] self._ch_col = col_map["ch"] self._ts_col = col_map["ts"] self._te_col = col_map["te"] self._val_col = col_map["val"] + self._conv_col = col_map.get("conv") + self._has_conversion = self._conv_col is not None and self._conv_col in pdf.columns meta = pdf.drop(columns=[self._ts_col, self._te_col, self._val_col]) self.mdf = meta.drop_duplicates(subset=[self._cid_col, self._ch_col]).reset_index() @@ -67,16 +73,26 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.mdf) return self.mdf[idx] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Load a time series blob from the DataFrame. + When the underlying *pdf* carries a conversion-factor column (the + column named by ``col_map["conv"]``) **and** the caller is an + aliased selector (``uses_alias=True``), the returned values are + multiplied by that factor. Direct selectors on the same physical + channel always receive raw values — unit conversion is a property + of the alias, not of the channel. + Parameters ---------- mid : Any Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + ``True`` when the calling selector resolved via channel_mapping. + Gates the per-channel conversion factor; defaults to ``False``. Returns ------- @@ -84,7 +100,12 @@ def load_blob(self, mid, cid): The loaded sample series object. """ s = self.pdf[(self.pdf[self._cid_col] == mid) & (self.pdf[self._ch_col] == cid)] - return SampleSeries(s[self._ts_col], s[self._te_col], s[self._val_col]) + values = s[self._val_col] + if self._has_conversion and len(s) > 0 and uses_alias: + factor = s[self._conv_col].iloc[0] + if pd.notna(factor): + values = values * factor + return SampleSeries(s[self._ts_col], s[self._te_col], values) class KeyValueStoreSolver(QuerySolver): @@ -344,6 +365,16 @@ def filter_aliased_channel_metrics( per-table ``channel_mapping.filters``, and finally joins with channel_metrics to resolve aliases. + When the database is configured with a ``unit_conversion_table`` and + the ``channel_mapping`` table carries ``source_unit`` / ``target_unit`` + columns, this method also propagates the effective unit pair on each + resolved row. The effective ``source_unit`` is computed as + ``COALESCE(channel_metrics.unit, channel_mapping.source_unit)`` so + that the authoritative per-channel physical unit on + ``channel_metrics`` takes precedence over the mapping-level default + when present. ``target_unit`` is always taken from the mapping — + there is no analogous column on ``channel_metrics``. + Parameters ---------- spark : SparkSession @@ -359,7 +390,9 @@ def filter_aliased_channel_metrics( ------- pyspark.sql.DataFrame DataFrame with ``(container_id, channel_id, selector_ids)`` - where ``selector_ids`` is an array column. + where ``selector_ids`` is an array column. When unit conversion + is active (see above), also carries ``source_unit`` and + ``target_unit`` columns. """ container_id_col = self.config.container_id_col channel_id_col = self.config.channel_id_col @@ -382,28 +415,71 @@ def filter_aliased_channel_metrics( resolved_mapping = channel_mapping.where(self._build_expr(selectors)) - channel_metrics = db.channel_metrics(spark).join( + channel_metrics = db.channel_metrics(spark) + channel_metrics = self._apply_column_mapping( + channel_metrics, self.config.channel_metrics.column_name_mapping + ) + channel_metrics = channel_metrics.join( F.broadcast(container_df.select(container_id_col)), on=[container_id_col], how="inner", ) alias_priority_col = self.config.alias_priority_col + channel_alias_col = self.config.channel_alias_col + join_keys = self.config.effective_alias_join_keys + + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + unit_col = self.config.unit_col + has_unit_cols = ( + db.config.unit_conversion_table is not None + and source_unit_col in resolved_mapping.columns + and target_unit_col in resolved_mapping.columns + ) + metrics_has_unit = unit_col in channel_metrics.columns + + # Mapping-side projection: one aliased copy per mapping_col plus the + # alias / priority columns (and the optional unit columns, aliased + # with the ``_map_`` prefix so we can coalesce the source unit with + # ``channel_metrics.unit`` after the join). + mapping_select_cols = [ + F.col(mapping_col).alias(f"_map_{mapping_col}") for mapping_col, _ in join_keys + ] + mapping_select_cols.extend([F.col(channel_alias_col), F.col(alias_priority_col)]) + if has_unit_cols: + mapping_select_cols.extend( + [ + F.col(source_unit_col).alias("_map_source_unit"), + F.col(target_unit_col).alias("_map_target_unit"), + ] + ) resolved = channel_metrics.join( - resolved_mapping.select( - F.col("source_channel").alias("_map_source_channel"), - F.col("data_key").alias("_map_data_key"), - F.col("channel_alias"), - F.col(alias_priority_col), - ), + resolved_mapping.select(*mapping_select_cols), on=[ - channel_metrics["channel_name"] == F.col("_map_source_channel"), - channel_metrics["data_key"] == F.col("_map_data_key"), + channel_metrics[metrics_col] == F.col(f"_map_{mapping_col}") + for mapping_col, metrics_col in join_keys ], how="inner", ) - dedup_window = Window.partitionBy(container_id_col, "channel_alias").orderBy( + # Materialize the effective source_unit / target_unit. The source unit + # comes from ``channel_metrics.unit`` when present (authoritative + # physical unit of the channel) and falls back to the mapping + # ``source_unit`` otherwise. The target unit is always taken from + # the mapping — there is no per-channel "target" on + # ``channel_metrics``; the target is a user choice on the alias. + if has_unit_cols: + if metrics_has_unit: + resolved = resolved.withColumn( + source_unit_col, + F.coalesce(channel_metrics[unit_col], F.col("_map_source_unit")), + ) + else: + resolved = resolved.withColumn(source_unit_col, F.col("_map_source_unit")) + resolved = resolved.withColumn(target_unit_col, F.col("_map_target_unit")) + + dedup_window = Window.partitionBy(container_id_col, channel_alias_col).orderBy( F.col(alias_priority_col).asc_nulls_last() ) resolved = resolved.withColumn("_rank", F.row_number().over(dedup_window)) @@ -412,7 +488,10 @@ def filter_aliased_channel_metrics( resolved = resolved.withColumn( "selector_ids", F.array(self._build_selector_id_expr(selectors)) ) - return resolved.select(container_id_col, channel_id_col, "selector_ids") + out_cols = [container_id_col, channel_id_col, "selector_ids"] + if has_unit_cols: + out_cols.extend([source_unit_col, target_unit_col]) + return resolved.select(*out_cols) def resolve_channel_selections( self, spark, channel_metrics_df, aliased_channel_metrics_df @@ -420,6 +499,19 @@ def resolve_channel_selections( """ Union direct and aliased channel metrics, combining selector_ids. + When the aliased side carries ``source_unit`` / ``target_unit`` + columns (added by :meth:`filter_aliased_channel_metrics` when a + unit conversion table is configured), those columns are preserved + through the union and aggregation. Direct selectors produce null + unit columns, which causes the downstream conversion-factor join + in :meth:`solve` to leave their values unchanged. + + Validates that each ``(container_id, channel_id)`` carries at most + one distinct ``source_unit`` and one distinct ``target_unit``. Per + physical channel the unit-conversion model can attach only one + factor; conflicting aliases would otherwise pick an arbitrary + target and silently mis-convert one of them. + Parameters ---------- spark : SparkSession @@ -432,13 +524,220 @@ def resolve_channel_selections( Returns ------- pyspark.sql.DataFrame - Merged DataFrame with ``(container_id, channel_id, selector_ids)``. + Merged DataFrame with ``(container_id, channel_id, selector_ids)`` + (plus ``source_unit`` / ``target_unit`` when present on the + aliased side). + + Raises + ------ + ValueError + If two or more aliased selectors resolve to the same physical + channel with conflicting ``source_unit`` or ``target_unit`` + values. Up to three offending channels are listed in the + message. """ - merged = channel_metrics_df.unionByName(aliased_channel_metrics_df) - return merged.groupBy( + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + has_unit_cols = ( + source_unit_col in aliased_channel_metrics_df.columns + and target_unit_col in aliased_channel_metrics_df.columns + ) + + merged = channel_metrics_df.unionByName( + aliased_channel_metrics_df, allowMissingColumns=has_unit_cols + ) + + agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")] + if has_unit_cols: + # collect_set serves a dual purpose: (a) it deduplicates so we + # can detect a conflict by size > 1, and (b) the single + # remaining element materializes the scalar unit value the + # downstream code expects. + agg_exprs.append(F.collect_set(source_unit_col).alias("_source_units")) + agg_exprs.append(F.collect_set(target_unit_col).alias("_target_units")) + + grouped = merged.groupBy( self.config.container_id_col, self.config.channel_id_col, - ).agg(F.flatten(F.collect_list("selector_ids")).alias("selector_ids")) + ).agg(*agg_exprs) + + if has_unit_cols: + # TODO(unit-conversion): lift this limitation by attaching the + # conversion factor to the selector instead of the channel row + # (see PR #30 review). + conflicts = ( + grouped.where((F.size("_source_units") > 1) | (F.size("_target_units") > 1)) + .select( + self.config.container_id_col, + self.config.channel_id_col, + "_source_units", + "_target_units", + ) + .limit(3) + .collect() + ) + if conflicts: + details = [ + f"(container_id={row[self.config.container_id_col]}, " + f"channel_id={row[self.config.channel_id_col]}): " + f"source_units={sorted(row['_source_units'])}, " + f"target_units={sorted(row['_target_units'])}" + for row in conflicts + ] + raise ValueError( + "Conflicting unit conversions on the same physical channel " + "(first 3 shown):\n" + "\n".join(details) + ) + # Empty sets (direct-only channels) yield null via + # try_element_at, matching the prior F.first(ignorenulls=True) + # behavior. Plain element_at raises on empty arrays in Spark 4. + grouped = ( + grouped.withColumn(source_unit_col, F.try_element_at("_source_units", F.lit(1))) + .withColumn(target_unit_col, F.try_element_at("_target_units", F.lit(1))) + .drop("_source_units", "_target_units") + ) + + return grouped + + # ------------------------------------------------------------------ + # Unit conversion + # ------------------------------------------------------------------ + + def _validate_unit_conversion_table(self, uc_table: DataFrame) -> None: + """Raise ``ValueError`` if the unit_conversion table contains rows + whose ``conversion_factor`` is null, zero, or negative. + + ``conversion_factor`` is conceptually a strictly-positive number. + A zero on the source side silently corrupts values to all-zero; + a zero on the target side raises a cryptic Spark + ``ArithmeticException`` deep in the conversion path under Spark 4 + ANSI mode; a negative value flips signs; a null silently skips + conversion (contract violation, not corruption). Catching all + four cases here turns each into a clear, actionable error + naming the offending row. + + Parameters + ---------- + uc_table : pyspark.sql.DataFrame + The ``unit_conversion`` table **after** + ``_apply_column_mapping`` has been applied. + + Raises + ------ + ValueError + If any row has ``conversion_factor IS NULL`` or + ``conversion_factor <= 0``. Up to three offending rows are + listed in the message. + """ + unit_col = self.config.unit_col + group_id_col = self.config.group_id_col + factor_col = self.config.conversion_factor_col + + bad_rows = ( + uc_table.where(F.col(factor_col).isNull() | (F.col(factor_col) <= 0)) + .select(group_id_col, unit_col, factor_col) + .limit(3) + .collect() + ) + if bad_rows: + details = [ + f"(group_id={row[group_id_col]}, unit={row[unit_col]}, " + f"conversion_factor={row[factor_col]})" + for row in bad_rows + ] + raise ValueError( + "Invalid conversion_factor in unit_conversion table " + "(must be a positive non-null number; first 3 shown):\n" + "\n".join(details) + ) + + def _compute_conversion_factors(self, spark, query, channels_df: DataFrame) -> DataFrame: + """ + Join *channels_df* with the unit conversion table to compute a + per-channel combined conversion factor. + + The unit conversion table associates each unit with a base-unit + scaling factor inside a unit family (``group_id``). For a row with + ``source_unit = S``, ``target_unit = T`` belonging to family ``G``: + + - ``_src_factor`` converts a value in ``S`` to the base unit of ``G``. + - ``_tgt_factor`` converts a value in ``T`` to the base unit of ``G``. + - The combined factor that converts ``S`` to ``T`` is + ``_src_factor / _tgt_factor``. + + Rows whose source or target unit is missing on the table — or whose + source/target units belong to different families — receive a null + factor. Null factors are treated as "no conversion" by the cache. + + Parameters + ---------- + spark : SparkSession + Active Spark session. + query : QueryBuilder + Query object carrying the configured ``db``. + channels_df : pyspark.sql.DataFrame + DataFrame that already carries ``source_unit`` / ``target_unit`` + columns (added by :meth:`filter_aliased_channel_metrics`). + + Returns + ------- + pyspark.sql.DataFrame + *channels_df* augmented with a ``conversion_factor`` column. + + Raises + ------ + ValueError + If the ``unit_conversion`` table contains a row with a null, + zero, or negative ``conversion_factor``. See + :meth:`_validate_unit_conversion_table` for the underlying + check. + """ + uc_table = query.db.unit_conversion(spark) + uc_table = self._apply_column_mapping( + uc_table, self.config.unit_conversion.column_name_mapping + ) + self._validate_unit_conversion_table(uc_table) + + unit_col = self.config.unit_col + group_id_col = self.config.group_id_col + factor_col = self.config.conversion_factor_col + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + + # Source-side join: fetch _src_factor and _src_group_id. + channels_df = channels_df.join( + F.broadcast( + uc_table.select( + F.col(unit_col).alias("_src_unit"), + F.col(factor_col).alias("_src_factor"), + F.col(group_id_col).alias("_src_group_id"), + ) + ), + on=[channels_df[source_unit_col] == F.col("_src_unit")], + how="left", + ).drop("_src_unit") + + # Target-side join: must belong to the same unit family. + channels_df = channels_df.join( + F.broadcast( + uc_table.select( + F.col(unit_col).alias("_tgt_unit"), + F.col(factor_col).alias("_tgt_factor"), + F.col(group_id_col).alias("_tgt_group_id"), + ) + ), + on=[ + channels_df[target_unit_col] == F.col("_tgt_unit"), + F.col("_src_group_id") == F.col("_tgt_group_id"), + ], + how="left", + ).drop("_tgt_unit", "_tgt_group_id") + + channels_df = channels_df.withColumn( + factor_col, + F.col("_src_factor") / F.col("_tgt_factor"), + ).drop("_src_factor", "_src_group_id", "_tgt_factor") + + return channels_df # ------------------------------------------------------------------ # Solve @@ -478,6 +777,13 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame: """ Solve the query by grouping channels and applying selections. + When a ``unit_conversion_table`` is configured on the database and + *channels_df* carries ``source_unit`` / ``target_unit`` columns + (added upstream by :meth:`filter_aliased_channel_metrics`), + per-channel conversion factors are computed and propagated into + the grouped-map UDF so that time-series values are converted from + the source to the target unit on the fly. + Parameters ---------- query : QueryBuilder @@ -495,6 +801,20 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame: DataFrame containing results for each container. """ col_map = self.config.col_map + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + + has_conversion_table = getattr(query.db.config, "unit_conversion_table", None) is not None + has_unit_cols = ( + source_unit_col in channels_df.columns and target_unit_col in channels_df.columns + ) + + if has_conversion_table and has_unit_cols: + channels_df = self._compute_conversion_factors(self.spark, query, channels_df) + + for col_name in (source_unit_col, target_unit_col): + if col_name in channels_df.columns: + channels_df = channels_df.drop(col_name) q = query.db.channels(self.spark) q = self._apply_column_mapping(q, self.config.channels.column_name_mapping) diff --git a/src/impulse_query_engine/analyze/query/solvers/series_cache.py b/src/impulse_query_engine/analyze/query/solvers/series_cache.py index b2f7e2f..7ad955f 100644 --- a/src/impulse_query_engine/analyze/query/solvers/series_cache.py +++ b/src/impulse_query_engine/analyze/query/solvers/series_cache.py @@ -24,7 +24,7 @@ def resolve(self, selection) -> pd.DataFrame: pass @abstractmethod - def load_blob(self, mid, cid) -> SampleSeries: + def load_blob(self, mid, cid, uses_alias: bool = False) -> SampleSeries: """ Resolve given mid and cid to a series. @@ -34,6 +34,13 @@ def load_blob(self, mid, cid) -> SampleSeries: Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + ``True`` when the calling selector resolves the channel via a + ``channel_mapping`` alias. Caches that perform unit conversion + (e.g. :class:`KVSTimeSeriesCache`) only apply the per-channel + conversion factor when this is ``True``, so a direct selector + on the same physical channel always returns raw values. + Defaults to ``False`` (direct / no-conversion semantics). Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index 5536b3f..48fc0bf 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -38,6 +38,49 @@ class TableConfig(BaseModel): filters: dict[str, str] = {} +class JoinKey(BaseModel): + """A single column pair in the ``channel_mapping`` → ``channel_metrics`` join. + + Used by :class:`ChannelMappingConfig.join_keys` to override the default + alias-resolution composite key. + + Both fields reference column names **after** ``column_name_mapping`` has + been applied on the respective table; the two sides are independent, so + a column may appear under different names on the two tables. + + Attributes + ---------- + mapping_col : str + Column name on ``channel_mapping`` after its ``column_name_mapping`` + has been applied. + metrics_col : str + Column name on ``channel_metrics`` after its ``column_name_mapping`` + has been applied. + """ + + mapping_col: str + metrics_col: str + + +class ChannelMappingConfig(TableConfig): + """``TableConfig`` plus an optional alias-resolution join-key spec. + + Attributes + ---------- + join_keys : list[JoinKey] or None + Custom composite key for the ``channel_mapping`` → ``channel_metrics`` + join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``. + When ``None`` (the default), the solver uses the backward-compatible + pair ``[(source_channel, channel_name), (data_key, data_key)]`` + sourced from :class:`SolverConfig` internal-name properties. + Provide a custom list to change the join arity or column choice + (e.g. a single-column join when ``data_key`` is not part of the + channel identity in your silver layout). + """ + + join_keys: list[JoinKey] | None = None + + class SolverConfig(BaseModel): """Per-table configuration for solver column name mappings and filters. @@ -60,10 +103,13 @@ class SolverConfig(BaseModel): Column mappings and filters for the channel tags table. channel_metrics : TableConfig Column mappings and filters for the channel metrics table. - channel_mapping : TableConfig - Column mappings and filters for the channel mapping (alias) table. + channel_mapping : ChannelMappingConfig + Column mappings, filters, and the alias-resolution ``join_keys`` + override for the channel mapping (alias) table. channels : TableConfig Column mappings and filters for the channel data table. + unit_conversion : TableConfig + Column mappings and filters for the unit conversion table. """ project_id: str | None = None @@ -72,8 +118,9 @@ class SolverConfig(BaseModel): container_metrics: TableConfig = TableConfig() channel_tags: TableConfig = TableConfig() channel_metrics: TableConfig = TableConfig() - channel_mapping: TableConfig = TableConfig() + channel_mapping: ChannelMappingConfig = ChannelMappingConfig() channels: TableConfig = TableConfig() + unit_conversion: TableConfig = TableConfig() # ------------------------------------------------------------------ # Class methods @@ -166,6 +213,44 @@ def alias_priority_col(self) -> str: """Internal column name for the alias priority on the channel_mapping table.""" return "priority" + @property + def source_channel_col(self) -> str: + """Internal column name for the source-channel identifier on the channel_mapping table.""" + return "source_channel" + + @property + def data_key_col(self) -> str: + """Internal column name for the data-key identifier. + + Default present on both ``channel_mapping`` and ``channel_metrics``; + used by the default :meth:`effective_alias_join_keys` for both sides. + Layouts where the two tables carry the data-key column under different + physical names can either rename both to ``"data_key"`` via per-table + ``column_name_mapping`` or override + :attr:`ChannelMappingConfig.join_keys` with explicit + ``mapping_col`` / ``metrics_col`` values. + """ + return "data_key" + + @property + def channel_alias_col(self) -> str: + """Internal column name for the alias identifier on the channel_mapping table. + + Referenced by the dedup window in + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` and is the + conventional kwarg name passed to + :meth:`QueryBuilder.channel_with_alias` (e.g. + ``channel_with_alias(channel_alias="vehicle_speed")``). The kwarg name + must match the column name as seen by the solver after + ``column_name_mapping`` is applied. + """ + return "channel_alias" + + @property + def channel_name_col(self) -> str: + """Internal column name for the channel-name identifier on the channel_metrics table.""" + return "channel_name" + @property def project_id_col(self) -> str: """Internal column name for the project identifier.""" @@ -176,6 +261,71 @@ def parent_id_col(self) -> str: """Internal column name for the parent/scope identifier.""" return "parent_id" + @property + def conversion_factor_col(self) -> str: + """Internal column name for the conversion factor on the unit_conversion table. + + Also used as the column that carries the per-channel combined factor + downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors` + into the grouped-map UDF. + """ + return "conversion_factor" + + @property + def source_unit_col(self) -> str: + """Internal column name for the source unit on the channel_mapping table.""" + return "source_unit" + + @property + def target_unit_col(self) -> str: + """Internal column name for the target unit on the channel_mapping table.""" + return "target_unit" + + @property + def unit_col(self) -> str: + """Internal column name for the unit identifier. + + Used in two places that happen to share the same default name: + + - On the ``unit_conversion`` table, as the key joined against + ``channel_mapping.source_unit`` / ``target_unit`` to look up a + conversion factor. + - On the ``channel_metrics`` table (optional), as the authoritative + physical unit of a channel. When present, takes precedence over + ``channel_mapping.source_unit`` for aliased reads via the + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` + coalesce. + + Users with different internal names per table can rename physical + columns to ``unit`` on each table independently via the per-table + ``column_name_mapping``. + """ + return "unit" + + @property + def group_id_col(self) -> str: + """Internal column name for the unit group id on the unit_conversion table.""" + return "group_id" + + @property + def effective_alias_join_keys(self) -> list[tuple[str, str]]: + """Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples. + + Falls back to the default composite key + ``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]`` + when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise + returns the configured list. + + Both members of each tuple are column names **after** + ``column_name_mapping`` has been applied on the respective table. + """ + if self.channel_mapping.join_keys is None: + return [ + (self.source_channel_col, self.channel_name_col), + (self.data_key_col, self.data_key_col), + ] + return [(jk.mapping_col, jk.metrics_col) for jk in self.channel_mapping.join_keys] + @property def col_map(self) -> dict[str, str]: """Short-key → internal-column-name mapping for UDFs and caches.""" @@ -185,4 +335,5 @@ def col_map(self) -> dict[str, str]: "ts": self.tstart_col, "te": self.tend_col, "val": self.value_col, + "conv": self.conversion_factor_col, } diff --git a/src/impulse_query_engine/measurement_db.py b/src/impulse_query_engine/measurement_db.py index 36a1c26..c0ba27c 100644 --- a/src/impulse_query_engine/measurement_db.py +++ b/src/impulse_query_engine/measurement_db.py @@ -15,6 +15,7 @@ def __init__( channel_metrics_table=None, channels_uri=None, channel_mapping_table=None, + unit_conversion_table=None, table_locations: str = "external_locations", ): self.container_tags_table = container_tags_table @@ -23,6 +24,7 @@ def __init__( self.channel_metrics_table = channel_metrics_table self.channels_uri = channels_uri self.channel_mapping_table = channel_mapping_table + self.unit_conversion_table = unit_conversion_table self.table_locations = table_locations self.debug_tables = None @@ -31,6 +33,7 @@ def for_unity_catalog( catalog_name: str, core_schema_name: str = "core", channel_mapping_table: str | None = None, + unit_conversion_table: str | None = None, ): return MeasurementDBConfig( container_tags_table=f"{catalog_name}.{core_schema_name}.container_tags", @@ -39,6 +42,7 @@ def for_unity_catalog( channel_metrics_table=f"{catalog_name}.{core_schema_name}.channel_metrics", channels_uri=f"{catalog_name}.{core_schema_name}.channels", channel_mapping_table=channel_mapping_table, + unit_conversion_table=unit_conversion_table, table_locations="unity_catalog", ) @@ -57,6 +61,9 @@ def for_debug(debug_tables): channel_mapping_table=( "channel_mapping" if "channel_mapping" in debug_tables else None ), + unit_conversion_table=( + "unit_conversion" if "unit_conversion" in debug_tables else None + ), table_locations="debug", ) cfg.debug_tables = debug_tables @@ -101,6 +108,11 @@ def channel_mapping(self, spark) -> DataFrame: raise ValueError("channel_mapping_table is not configured") return self._read_table(spark, self.config.channel_mapping_table) + def unit_conversion(self, spark) -> DataFrame: + if self.config.unit_conversion_table is None: + raise ValueError("unit_conversion_table is not configured") + return self._read_table(spark, self.config.unit_conversion_table) + def channel_uri(self): return self.config.channels_uri diff --git a/src/impulse_reporting/config/config_parser.py b/src/impulse_reporting/config/config_parser.py index 198b445..4953448 100644 --- a/src/impulse_reporting/config/config_parser.py +++ b/src/impulse_reporting/config/config_parser.py @@ -204,6 +204,11 @@ class Source(BaseModel): channel_mapping_table : str, optional Full Unity Catalog path to the channel mapping table. Required when using ``channel_with_alias()`` for logical alias resolution. + unit_conversion_table : str, optional + Full Unity Catalog path to the unit conversion table. When set together + with a ``channel_mapping_table`` whose rows carry ``source_unit`` and + ``target_unit`` columns, the query engine converts time-series values + from the source to the target unit during ``solve()``. Notes ----- @@ -217,6 +222,7 @@ class Source(BaseModel): channel_metrics_table: Annotated[str, AfterValidator(is_valid_table_name)] channels_uri: Annotated[str, AfterValidator(is_valid_table_name)] channel_mapping_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None + unit_conversion_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None class UnitySink(BaseModel): diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py index ab09ee4..7b73278 100644 --- a/src/impulse_reporting/core/report.py +++ b/src/impulse_reporting/core/report.py @@ -859,6 +859,13 @@ def determine_report(self, is_incremental: bool = None): # Validate that every aggregation references a registered event self._validate_aggregation_events() + # TODO: port unit-consistency sanity check from MDA Framework + # (`mda_reporting/util/unit_sanity_check.py`). When a + # `unit_conversion_table` is configured, walk all aggregation / + # event expressions and emit a UserWarning for each aliased + # selector whose source_unit differs from target_unit so the + # caller knows to express formula constants in target units. + # Clean up temp tables from previous runs self._cleanup_temp_tables() diff --git a/tests/conftest.py b/tests/conftest.py index d29fa74..431fb42 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -334,3 +334,69 @@ def key_value_store_alias_db( cfg = MeasurementDBConfig.for_debug(tables) cfg.channel_mapping_table = "channel_mapping" return MeasurementDB(cfg, ws=mock_workspace_client) + + +@pytest.fixture(scope="session") +def unit_conversion_dataframes(spark): + """Load unit-conversion test CSVs into cached in-memory DataFrames. + + Hands DataFrames directly to MeasurementDB (via ``for_debug``) instead of + persisting them through Delta — the alias-style write-then-read fixture + occasionally hit Delta ``ProtocolChangedException`` during macOS test + runs. Caching the DataFrames once per session keeps the data stable. + """ + base_path = os.path.dirname(os.path.abspath(__file__)) + base_path = base_path[: base_path.find("tests")] + + container_tags_path = f"{base_path}/tests/unit/data/key_value_store_csv/container_metrics.csv" + container_metric_path = f"{base_path}/tests/unit/data/basic_narrow_csv/container_metrics.csv" + channel_metric_path = ( + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv" + ) + channels_path = f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv" + channel_mapping_path = ( + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv" + ) + unit_conversion_path = ( + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv" + ) + + options = {"header": "True", "delimiter": ",", "inferSchema": "True"} + + def _load(path): + df = spark.read.options(**options).csv(path).cache() + df.count() + return df + + return { + "container_tags": _load(container_tags_path), + "container_metrics": _load(container_metric_path), + "channel_metrics": _load(channel_metric_path), + "channels": _load(channels_path), + "channel_mapping": _load(channel_mapping_path), + "unit_conversion": _load(unit_conversion_path), + } + + +@pytest.fixture +def key_value_store_unit_conversion_db( + unit_conversion_dataframes, mock_workspace_client +) -> MeasurementDB: + """Return a key-value-store MeasurementDB with unit conversion configured.""" + cfg = MeasurementDBConfig.for_debug(unit_conversion_dataframes) + cfg.channel_mapping_table = "channel_mapping" + cfg.unit_conversion_table = "unit_conversion" + return MeasurementDB(cfg, ws=mock_workspace_client) + + +@pytest.fixture +def key_value_store_unit_conversion_db_no_table( + unit_conversion_dataframes, mock_workspace_client +) -> MeasurementDB: + """Same data as ``key_value_store_unit_conversion_db`` but with + ``unit_conversion_table=None`` to test the opt-out path.""" + tables = {k: v for k, v in unit_conversion_dataframes.items() if k != "unit_conversion"} + cfg = MeasurementDBConfig.for_debug(tables) + cfg.channel_mapping_table = "channel_mapping" + # Explicitly leave unit_conversion_table = None + return MeasurementDB(cfg, ws=mock_workspace_client) diff --git a/tests/impulse_query_engine/integration/kvs_solver_test.py b/tests/impulse_query_engine/integration/kvs_solver_test.py index 6fca0d2..c79825e 100644 --- a/tests/impulse_query_engine/integration/kvs_solver_test.py +++ b/tests/impulse_query_engine/integration/kvs_solver_test.py @@ -19,6 +19,7 @@ KeyValueStoreSolver, ) from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, SolverConfig, TableConfig, ) @@ -29,7 +30,7 @@ def _kvs_cfg( project_id: str = "SAMPLE_PROJECT", container_tags: TableConfig | None = None, container_metrics: TableConfig | None = None, - channel_mapping: TableConfig | None = None, + channel_mapping: ChannelMappingConfig | None = None, ) -> SolverConfig: """Build a SolverConfig wired up for the KVS test data. @@ -42,7 +43,7 @@ def _kvs_cfg( container_tags=container_tags or TableConfig(column_name_mapping={"element_id": "key"}), container_metrics=container_metrics or TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=channel_mapping or TableConfig(), + channel_mapping=channel_mapping or ChannelMappingConfig(), ) @@ -222,7 +223,7 @@ def test_solve_with_aliased_channel( solver = KeyValueStoreSolver( spark, config=_kvs_cfg( - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py index 02c3798..e229b4f 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py @@ -11,6 +11,8 @@ KeyValueStoreSolver, ) from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + JoinKey, SolverConfig, TableConfig, ) @@ -31,7 +33,7 @@ def test_no_aliased_selections_returns_empty( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -52,7 +54,7 @@ def test_alias_resolves_to_correct_channels( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -80,7 +82,7 @@ def test_alias_scoped_by_project_id( config=SolverConfig( project_id="NON_EXISTENT_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -100,7 +102,9 @@ def test_alias_scoped_by_toolbox_id( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "non_existent_toolbox"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "non_existent_toolbox"} + ), ), ) query = key_value_store_alias_db.query @@ -120,7 +124,7 @@ def test_selector_id_consistent_for_same_expression( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -140,7 +144,7 @@ def test_multiple_aliases(self, spark: SparkSession, key_value_store_alias_db: M config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -165,7 +169,7 @@ def test_solve_with_alias_only( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -185,7 +189,7 @@ def test_solve_with_mixed_direct_and_alias( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -209,7 +213,7 @@ def test_solve_deduplication( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -241,7 +245,7 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -304,3 +308,130 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm( def test_channel_with_alias_without_mapping_raises(self, key_value_store_db: MeasurementDB): with pytest.raises(ValueError, match="channel_mapping_table is not configured"): key_value_store_db.query.channel_with_alias(channel_alias="engine_speed") + + +class TestConfigurableJoinKeys: + """Behavior of the configurable ``channel_mapping.join_keys`` override.""" + + def test_single_column_join_key( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Single-column join on source_channel == channel_name only; data_key + # is intentionally dropped from the join. The alias resolution still + # works and the (container_id, channel_alias) dedup keeps results + # unique. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_different_data_key_names_per_side_via_rename( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Path 1: rename both physical `data_key` columns to a common + # internal name (here we use a non-default name `dk`). Two + # JoinKey entries cover the composite key. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"data_key": "dk"}), + channel_mapping=ChannelMappingConfig( + column_name_mapping={"data_key": "dk"}, + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="dk", metrics_col="dk"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_different_data_key_names_per_side_via_join_keys( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Path 2: rename the two physical `data_key` columns to *different* + # internal names per table and reference them directly in + # join_keys. No common-name rename — the JoinKey's two sides are + # independent. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"data_key": "metrics_dk"}), + channel_mapping=ChannelMappingConfig( + column_name_mapping={"data_key": "map_dk"}, + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_tag_kwarg_must_match_post_rename_name( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # When channel_metrics.channel_name is renamed via column_name_mapping + # to a non-default internal name, the direct selector's kwarg must use + # the renamed name AND the override `join_keys` must reference it. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"channel_name": "chan"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="chan"), + JoinKey(mapping_col="data_key", metrics_col="data_key"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + # Direct selector — kwarg `chan` must match the renamed column name. + engine_rpm = query.channel(chan="Engine RPM", data_key="TM").alias("engine_rpm") + + pdf = query.select(engine_rpm).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + # Direct selector — containers with no matching channel drop out. + # Containers 1 and 2 have "Engine RPM"/data_key="TM"; container 3 + # only carries it under "EngSpd"/"ProjSpecREC_10Hz" (no match). + assert sorted(pdf["container_id"].tolist()) == [1, 2] + assert all(length > 0 for length in pdf["engine_rpm"].map(len)) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py index ad0590a..f4d98b4 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py @@ -270,6 +270,7 @@ def test_col_map_always_returns_internal_names(self, spark: SparkSession): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_config_properties_return_internal_names(self, spark: SparkSession): diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py new file mode 100644 index 0000000..0f71e8b --- /dev/null +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -0,0 +1,702 @@ +# pylint: disable=missing-function-docstring + +import os + +import numpy as np +import pandas as pd +import pytest +from pyspark.sql import SparkSession + +from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( + KeyValueStoreSolver, +) +from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + SolverConfig, + TableConfig, +) +from impulse_query_engine.measurement_db import MeasurementDB + + +def _solver(spark: SparkSession) -> KeyValueStoreSolver: + return KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), + ), + ) + + +def _expected_raw_values(channels_csv_path: str, container_id: int, channel_id: int) -> np.ndarray: + raw = pd.read_csv(channels_csv_path) + rows = raw[(raw["container_id"] == container_id) & (raw["channel_id"] == channel_id)] + return rows.sort_values("tstart")["value"].values.astype(np.float64) + + +@pytest.fixture +def channels_csv_path() -> str: + base_path = os.path.dirname(os.path.abspath(__file__)) + base_path = base_path[: base_path.find("tests")] + return f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv" + + +class TestUnitConversionSolve: + def test_solve_with_unit_conversion( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + assert pdf["container_id"].tolist() == [1, 2, 3] + + factor = 0.277778 + # Containers 1 and 2 resolve vehicle_speed -> "Vehicle Speed Sensor" (channel 7); + # channel_metrics.unit == "km/h" matches channel_mapping.source_unit, so the + # coalesce yields "km/h" and values scale by ~0.277778 to reach m/s. + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * factor + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + + # Container 3 resolves to channel 7 via Spd_Vhcl / ProjSpecREC_10Hz. Its + # channel_metrics.unit is "m/s" (overrides channel_mapping.source_unit="km/h" + # via COALESCE), and target_unit is also "m/s", so the conversion factor is + # 1.0 and values are unchanged from raw. + expected3 = _expected_raw_values(channels_csv_path, 3, 7) + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-12) + + def test_solve_no_conversion_when_same_unit( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 5) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.engine_speed.values, expected, rtol=1e-12) + + def test_solve_no_conversion_when_table_not_configured( + self, + spark: SparkSession, + key_value_store_unit_conversion_db_no_table: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db_no_table.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + # No conversion: values are returned exactly as-is from the raw channel data. + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-12) + + def test_solve_no_conversion_for_direct_selectors( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + # Direct selector — no alias, so no unit metadata, so no conversion. + vehicle_speed_direct = query.channel( + channel_name="Vehicle Speed Sensor", data_key="TM" + ).alias("vehicle_speed_direct") + + pdf = query.select(vehicle_speed_direct).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed_direct.values, expected, rtol=1e-12) + + def test_solve_same_channel_direct_stays_raw_aliased_converts( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # When a direct selector and an aliased selector resolve to the same + # (container_id, channel_id) (both land on channel 7), conversion is a + # property of the alias — the direct selector returns raw values, + # the aliased selector returns raw * factor (km/h -> m/s). + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + direct = query.channel(channel_name="Vehicle Speed Sensor", data_key="TM").alias( + "vehicle_speed_raw" + ) + aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed_converted" + ) + + pdf = query.select(direct, aliased).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + factor = 0.277778 + for cid in (1, 2): + raw = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed_raw.values, raw, rtol=1e-12) + np.testing.assert_allclose(row.vehicle_speed_converted.values, raw * factor, rtol=1e-6) + + def test_solve_mixed_direct_and_aliased_disjoint_channels( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Direct selector targets a *different* channel than the aliased one. + # Direct: Ambient Air Temperature (channel 6, no conversion). + # Aliased: vehicle_speed (channel 7, km/h -> m/s). + # + # Note: when a direct selector and an aliased selector resolve to the + # same (container_id, channel_id), the conversion factor stored on the + # channel row applies to both — the per-channel factor model in + # KVSTimeSeriesCache cannot distinguish callers. We therefore only + # cover the disjoint case here. + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + direct = query.channel(channel_name="Ambient Air Temperature", data_key="TM").alias( + "ambient_temp" + ) + aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed_converted" + ) + + pdf = query.select(direct, aliased).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + factor = 0.277778 + for cid in (1, 2): + ambient_raw = _expected_raw_values(channels_csv_path, cid, 6) + speed_raw = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.ambient_temp.values, ambient_raw, rtol=1e-12) + np.testing.assert_allclose( + row.vehicle_speed_converted.values, speed_raw * factor, rtol=1e-6 + ) + + def test_solve_cross_family_units_leave_values_unchanged( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # cross_family_alias maps Engine RPM (rotation family) -> m/s + # (speed family). The group_id mismatch makes the target-side join + # miss, leaving conversion_factor null and values unchanged. + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + cross = query.channel_with_alias(channel_alias="cross_family_alias").alias("cross") + + pdf = query.select(cross).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + # The mapping only references Engine RPM/TM, which exists for containers 1 and 2. + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 5) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.cross.values, expected, rtol=1e-12) + + +class TestSourceUnitResolution: + """Effective source_unit = COALESCE(channel_metrics.unit, channel_mapping.source_unit).""" + + def test_source_unit_from_channel_metrics_overrides_mapping( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Container 3's Spd_Vhcl row has channel_metrics.unit = "m/s" while the + # mapping's source_unit is "km/h". Coalesce yields "m/s"; mapping's + # target_unit is also "m/s"; effective factor = 1.0 (no scaling). + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + expected = _expected_raw_values(channels_csv_path, 3, 7) + np.testing.assert_allclose(row3.vehicle_speed.values, expected, rtol=1e-12) + + def test_source_unit_falls_back_to_mapping_when_metrics_unit_null( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Containers 1 and 2 have channel_metrics.unit = "km/h" (which equals + # the mapping's source_unit, so they coalesce identically). To + # exercise the null-fallback specifically, construct a custom + # channel_metrics where the unit cell is null for the row of interest + # — the coalesce must then return the mapping's source_unit. + from pyspark.sql import functions as F # noqa: PLR0402 local import + + # Replace the unit cell on (cid=1, ch=7) with null. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_null = cm.withColumn( + "unit", + F.when( + (F.col("container_id") == 1) & (F.col("channel_id") == 7), + F.lit(None).cast("string"), + ).otherwise(F.col("unit")), + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_null + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # Container 1: unit null → fall back to mapping source_unit="km/h" + # → factor 0.277778. + expected = _expected_raw_values(channels_csv_path, 1, 7) * 0.277778 + row1 = pdf.loc[pdf["container_id"] == 1].iloc[0] + np.testing.assert_allclose(row1.vehicle_speed.values, expected, rtol=1e-6) + finally: + # Restore the fixture so subsequent tests in this session see + # the original DataFrame. + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + def test_source_unit_falls_back_when_channel_metrics_lacks_unit_column( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Drop the `unit` column from channel_metrics entirely. The solver + # detects its absence (metrics_has_unit = False) and falls back to + # the mapping's source_unit directly. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_no_unit = cm.drop("unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_no_unit + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # All three containers: no unit column → mapping source_unit + # "km/h" wins → factor 0.277778. + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + def test_channel_metrics_unit_col_is_configurable( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Rename the physical `unit` column to `phys_unit` on channel_metrics, + # then point the solver at it via channel_metrics.column_name_mapping. + # The configurable unit_col property (default "unit") is what the + # solver references; rename brings the physical name to the internal + # name. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_renamed = cm.withColumnRenamed("unit", "phys_unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_renamed + + try: + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"phys_unit": "unit"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"} + ), + ), + ) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # Renamed column carries through: container 3 still resolves to + # m/s (no scaling); containers 1/2 still scale by 0.277778. + expected3 = _expected_raw_values(channels_csv_path, 3, 7) + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-12) + + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + +class TestAliasUnitConflictDetection: + """Per-channel unit conversion supports only one (source_unit, target_unit) pair. + + When two aliases on the same physical channel disagree, the solver + must raise rather than silently mis-converting one of them. + """ + + @staticmethod + def _mapping_with(spark: SparkSession, rows): + """Build a channel_mapping DataFrame from rows matching the + unit-conversion fixture schema.""" + from pyspark.sql.types import IntegerType, StringType, StructField, StructType + + schema = StructType( + [ + StructField("project_id", StringType(), nullable=False), + StructField("toolbox_id", StringType(), nullable=False), + StructField("channel_alias", StringType(), nullable=False), + StructField("source_channel", StringType(), nullable=False), + StructField("data_key", StringType(), nullable=False), + StructField("priority", IntegerType(), nullable=True), + StructField("source_unit", StringType(), nullable=True), + StructField("target_unit", StringType(), nullable=True), + ] + ) + return spark.createDataFrame(rows, schema=schema) + + def test_conflict_on_target_unit_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + # Two aliases both resolve to (container_id, channel_id) = (1, 7) and + # (2, 7) via Vehicle Speed Sensor / TM, but request different + # target_units. The solver must raise. + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + conflicting = self._mapping_with( + spark, + [ + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_mph", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "mph", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_ms", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ], + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = conflicting + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + mph = query.channel_with_alias(channel_alias="vehicle_speed_mph").alias("mph") + ms = query.channel_with_alias(channel_alias="vehicle_speed_ms").alias("ms") + + with pytest.raises(ValueError, match="Conflicting unit conversions") as excinfo: + query.select(mph, ms).toPandas(spark, solver=solver) + + msg = str(excinfo.value) + assert "channel_id=7" in msg + assert "mph" in msg + assert "m/s" in msg + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + + def test_conflict_on_source_unit_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + # Same physical channel, agreeing target_unit, but disagreeing + # source_unit. (The coalesce in filter_aliased_channel_metrics + # prefers channel_metrics.unit, but if it's null/absent the + # mapping's source_unit wins — and these two mappings disagree.) + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + conflicting = self._mapping_with( + spark, + [ + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_a", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_b", + "Vehicle Speed Sensor", + "TM", + None, + "mph", + "m/s", + ), + ], + ) + # Also drop channel_metrics.unit so neither alias has a value to + # coalesce against — both rely on mapping.source_unit. + original_cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_no_unit = original_cm.drop("unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = conflicting + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_no_unit + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + a = query.channel_with_alias(channel_alias="vehicle_speed_a").alias("a") + b = query.channel_with_alias(channel_alias="vehicle_speed_b").alias("b") + + with pytest.raises(ValueError, match="Conflicting unit conversions") as excinfo: + query.select(a, b).toPandas(spark, solver=solver) + + msg = str(excinfo.value) + assert "channel_id=7" in msg + assert "km/h" in msg + assert "mph" in msg + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = original_cm + + def test_no_conflict_when_aliases_agree( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Two aliases on the same physical channel agree on (source_unit, + # target_unit). Both selectors should resolve and produce the same + # converted values. + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + agreeing = self._mapping_with( + spark, + [ + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_a", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_b", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ], + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = agreeing + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + a = query.channel_with_alias(channel_alias="vehicle_speed_a").alias("a") + b = query.channel_with_alias(channel_alias="vehicle_speed_b").alias("b") + + pdf = query.select(a, b).toPandas(spark, solver=solver) + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.a.values, expected, rtol=1e-6) + np.testing.assert_allclose(row.b.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + + +class TestConversionFactorValidation: + """`unit_conversion.conversion_factor` must be a positive non-null number. + + Catches malformed reference rows early so the user sees a clear error + instead of silent data corruption (zero/negative) or silent contract + violation (null). + """ + + @staticmethod + def _uc_with(spark: SparkSession, rows): + """Build a unit_conversion DataFrame with an explicit schema so the + nullable factor case doesn't confuse Spark's type inference.""" + from pyspark.sql.types import ( + BooleanType, + DoubleType, + StringType, + StructField, + StructType, + ) + + schema = StructType( + [ + StructField("group_id", StringType(), nullable=False), + StructField("unit", StringType(), nullable=False), + StructField("conversion_factor", DoubleType(), nullable=True), + StructField("is_base", BooleanType(), nullable=True), + ] + ) + return spark.createDataFrame(rows, schema=schema) + + def _run_with_uc_table(self, spark, db, uc_rows): + """Replace the unit_conversion debug table, run a vehicle_speed + aliased query, restore the original. Returns nothing — used inside + a ``pytest.raises`` block. + """ + original = db.config.debug_tables["unit_conversion"] + db.config.debug_tables["unit_conversion"] = self._uc_with(spark, uc_rows) + try: + solver = _solver(spark) + query = db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + query.select(vehicle_speed).toPandas(spark, solver=solver) + finally: + db.config.debug_tables["unit_conversion"] = original + + def test_zero_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", 0.0, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=0" in msg + + def test_negative_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", -1.0, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=-1" in msg + + def test_null_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", None, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=None" in msg + + +class TestComputeConversionFactors: + def test_factor_one_for_identical_units( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "RPM", "RPM"), (2, 5, "RPM", "RPM")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + result = solver._compute_conversion_factors(spark, query, channels_df).collect() + factors = {row.container_id: row.conversion_factor for row in result} + assert pytest.approx(factors[1], rel=1e-12) == 1.0 + assert pytest.approx(factors[2], rel=1e-12) == 1.0 + + def test_factor_for_known_speed_conversion( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 7, "km/h", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor == pytest.approx(0.277778, rel=1e-6) + + def test_null_factor_for_cross_family( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "RPM", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor is None + + def test_null_factor_for_unknown_unit( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "furlongs/fortnight", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor is None diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py index 8606f7b..a08c635 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py @@ -610,6 +610,7 @@ def test_col_map_always_returns_internal_names(self, spark): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_mapping_entries_stored_correctly(self, spark): diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py index 509f06a..6bc29ba 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py @@ -15,6 +15,8 @@ import pytest from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + JoinKey, SolverConfig, TableConfig, ) @@ -35,6 +37,7 @@ "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } @@ -137,7 +140,7 @@ class TestColMap: def test_col_map_keys(self, cfg: SolverConfig): """col_map should contain exactly the expected short keys.""" - assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val"} + assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val", "conv"} def test_col_map_default_config(self): """Default SolverConfig col_map should match hardcoded defaults.""" @@ -148,6 +151,7 @@ def test_col_map_default_config(self): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_col_map_consistent_with_properties(self, cfg: SolverConfig): @@ -157,6 +161,96 @@ def test_col_map_consistent_with_properties(self, cfg: SolverConfig): assert cfg.col_map["ts"] == cfg.tstart_col assert cfg.col_map["te"] == cfg.tend_col assert cfg.col_map["val"] == cfg.value_col + assert cfg.col_map["conv"] == cfg.conversion_factor_col def test_col_map_values(self, cfg: SolverConfig): assert cfg.col_map == _EXPECTED_COL_MAP + + +# --------------------------------------------------------------------------- +# TestAliasInternalNameProperties – channel mapping / metrics internal names +# --------------------------------------------------------------------------- + + +class TestAliasInternalNameProperties: + """Internal-name properties for the alias-resolution columns.""" + + def test_source_channel_col(self): + assert SolverConfig().source_channel_col == "source_channel" + + def test_data_key_col(self): + assert SolverConfig().data_key_col == "data_key" + + def test_channel_alias_col(self): + assert SolverConfig().channel_alias_col == "channel_alias" + + def test_channel_name_col(self): + assert SolverConfig().channel_name_col == "channel_name" + + +# --------------------------------------------------------------------------- +# TestEffectiveAliasJoinKeys – default + override behavior +# --------------------------------------------------------------------------- + + +class TestEffectiveAliasJoinKeys: + def test_default_when_join_keys_none(self): + cfg = SolverConfig() + assert cfg.channel_mapping.join_keys is None + assert cfg.effective_alias_join_keys == [ + ("source_channel", "channel_name"), + ("data_key", "data_key"), + ] + + def test_single_column_override(self): + cfg = SolverConfig( + channel_mapping=ChannelMappingConfig( + join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")] + ) + ) + assert cfg.effective_alias_join_keys == [("source_channel", "channel_name")] + + def test_different_names_per_side(self): + cfg = SolverConfig( + channel_mapping=ChannelMappingConfig( + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"), + ] + ) + ) + assert cfg.effective_alias_join_keys == [ + ("source_channel", "channel_name"), + ("map_dk", "metrics_dk"), + ] + + +# --------------------------------------------------------------------------- +# TestChannelMappingConfig – type acceptance + JSON round-trip +# --------------------------------------------------------------------------- + + +class TestChannelMappingConfig: + def test_accepts_channel_mapping_config_instance(self): + cm = ChannelMappingConfig( + filters={"toolbox_id": "tb"}, + join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")], + ) + cfg = SolverConfig(channel_mapping=cm) + assert cfg.channel_mapping is cm + + def test_json_round_trip_with_join_keys(self): + raw = { + "channel_mapping": { + "column_name_mapping": {"alias": "channel_alias"}, + "filters": {"toolbox_id": "tb"}, + "join_keys": [{"mapping_col": "source_channel", "metrics_col": "channel_name"}], + } + } + cfg = SolverConfig.from_dict(raw) + assert isinstance(cfg.channel_mapping, ChannelMappingConfig) + assert cfg.channel_mapping.column_name_mapping == {"alias": "channel_alias"} + assert cfg.channel_mapping.filters == {"toolbox_id": "tb"} + assert cfg.channel_mapping.join_keys == [ + JoinKey(mapping_col="source_channel", metrics_col="channel_name") + ] diff --git a/tests/impulse_reporting/unit/meta/container_dimensions_test.py b/tests/impulse_reporting/unit/meta/container_dimensions_test.py index e40bf0b..acad8f8 100644 --- a/tests/impulse_reporting/unit/meta/container_dimensions_test.py +++ b/tests/impulse_reporting/unit/meta/container_dimensions_test.py @@ -68,7 +68,7 @@ def test_config_hashing(spark): schema = T.StructType([T.StructField(col, T.StringType(), True) for col in silver_columns]) df = spark.createDataFrame([("test_vehicle",)], schema) result = df.transform(ContainerDimension._add_config_hash(impulse_config)) - expected_result = [Row(uut_id="test_vehicle", config_hash=1983688711)] + expected_result = [Row(uut_id="test_vehicle", config_hash=1267386821)] assert expected_result == result.collect() diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv new file mode 100644 index 0000000..eab3979 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv @@ -0,0 +1,6 @@ +project_id,toolbox_id,channel_alias,source_channel,data_key,priority,source_unit,target_unit +SAMPLE_PROJECT,container_concept,engine_speed,Engine RPM,TM,,RPM,RPM +SAMPLE_PROJECT,container_concept,engine_speed,EngSpd,ProjSpecREC_10Hz,,RPM,RPM +SAMPLE_PROJECT,container_concept,vehicle_speed,Vehicle Speed Sensor,TM,,km/h,m/s +SAMPLE_PROJECT,container_concept,vehicle_speed,Spd_Vhcl,ProjSpecREC_10Hz,,km/h,m/s +SAMPLE_PROJECT,container_concept,cross_family_alias,Engine RPM,TM,,RPM,m/s diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv b/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv new file mode 100644 index 0000000..0e1d966 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv @@ -0,0 +1,13 @@ +container_id,channel_id,channel_name,data_key,group_idx,channel_idx,unit,sample_count,min,max,mean,begin_ms,end_ms,duration_ms,sample_rate,value_type +1,6,Ambient Air Temperature,TM,2,2,C,59625,13,23,15.904821802935011,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +1,9,Intake Air Temperature,TM,4,1,C,59625,-8,150,29.0571572327044,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +3,8,Intake Air Temperature,TM,4,1,C,46340,-7,146,29.205761760897712,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,7,Vehicle Speed Sensor,TM,3,1,km/h,57240,0,182,75.85866526904263,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +2,5,Engine RPM,TM,2,1,RPM,57240,0,3072,1595.6337875611462,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +3,7,Spd_Vhcl,ProjSpecREC_10Hz,3,1,m/s,46340,0,126,49.57703927492447,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +3,2,Ambient Air Temperature,TM,0,2,C,46340,16,23,19.51277514026759,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,9,Intake Air Temperature,TM,4,1,C,57240,-8,141,36.545946890286515,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +3,5,EngSpd,ProjSpecREC_10Hz,2,1,RPM,46340,0,2689,1378.3660552438498,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,6,Ambient Air Temperature,TM,2,2,C,57240,21,33,28.793081761006288,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +1,7,Vehicle Speed Sensor,TM,3,1,km/h,59625,0,217,68.22906498951782,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +1,5,Engine RPM,TM,2,1,RPM,59625,0,3658,1490.707790356394,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv new file mode 100644 index 0000000..2aa4d36 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv @@ -0,0 +1,6 @@ +group_id,unit,conversion_factor,is_base +speed,m/s,1.0,true +speed,km/h,0.277778,false +speed,mph,0.44704,false +rotation,RPM,1.0,true +rotation,rad/s,0.10472,false