From 31fe81b62739f4515d76c13d7fdda0a8ffe4bc4c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 13 Dec 2024 10:26:56 -0500 Subject: [PATCH] Add support for CDM v8 Principally this changes the names of the various document "id" fields from "id" to "-uuid" (e.g., "run-uuid"). This code determines whether the Crucible instance is using v7 or v8, and generates the proper ID field names. There are some other minor cleanups here, in handling broken documents. --- backend/app/api/v1/endpoints/ilab/ilab.py | 10 +- backend/app/main.py | 2 - backend/app/services/crucible_svc.py | 299 +++++++++++++++------- 3 files changed, 219 insertions(+), 92 deletions(-) diff --git a/backend/app/api/v1/endpoints/ilab/ilab.py b/backend/app/api/v1/endpoints/ilab/ilab.py index b30f0e6..bd0d60f 100644 --- a/backend/app/api/v1/endpoints/ilab/ilab.py +++ b/backend/app/api/v1/endpoints/ilab/ilab.py @@ -8,9 +8,8 @@ from datetime import datetime, timedelta, timezone from typing import Annotated, Any, Optional -from fastapi import APIRouter, Depends, Query - from app.services.crucible_svc import CrucibleService, GraphList, Metric +from fastapi import APIRouter, Depends, Query router = APIRouter() @@ -30,6 +29,7 @@ async def crucible_svc(): crucible = None try: crucible = CrucibleService(CONFIGPATH) + await crucible.detect_cdm() yield crucible finally: if crucible: @@ -143,6 +143,7 @@ async def runs( Optional[str], Query(description="End time for search", examples=["2020-11-10"]), ] = None, + undated: Annotated[bool, Query(description="Don't filter on dates")] = False, filter: Annotated[ Optional[list[str]], Query( @@ -161,7 +162,10 @@ async def runs( Query(description="Page offset to start", examples=[10]), ] = 0, ): - if start_date is None and end_date is None: + if undated: + start = None + end = None + elif start_date is None and end_date is None: now = datetime.now(timezone.utc) start = now - timedelta(days=30) end = now diff --git a/backend/app/main.py b/backend/app/main.py index 109c741..eb64a49 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -49,8 +49,6 @@ def render(self, content: typing.Any) -> bytes: @app.middleware('http') async def some_middleware(request: Request, call_next): - print(f"origin: {origins}, request: {request.headers}") - print(f"{request.app.user_middleware}") if request.url.path in routes_to_reroute: request.scope['path'] = '/docs' headers = dict(request.scope['headers']) diff --git a/backend/app/services/crucible_svc.py b/backend/app/services/crucible_svc.py index aad8b29..ca5e95f 100644 --- a/backend/app/services/crucible_svc.py +++ b/backend/app/services/crucible_svc.py @@ -10,18 +10,18 @@ aggregate, or Plotly graph format for UI display. """ +import re import time from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, Iterator, Optional, Tuple, Union +from app import config from elasticsearch import AsyncElasticsearch from fastapi import HTTPException, status from pydantic import BaseModel -from app import config - class Metric(BaseModel): """Describe a single metric to be graphed or summarized @@ -286,7 +286,7 @@ class CrucibleService: "source", ) - def __init__(self, configpath: str = "crucible"): + def __init__(self, configpath: str = "crucible", version: int = 8): """Initialize a Crucible CDM (OpenSearch) connection. Generally the `configpath` should be scoped, like `ilab.crucible` so @@ -304,11 +304,50 @@ def __init__(self, configpath: str = "crucible"): self.password = self.cfg.get(configpath + ".password") self.auth = (self.user, self.password) if self.user or self.password else None self.url = self.cfg.get(configpath + ".url") + self.set_cdm_version(version) self.elastic = AsyncElasticsearch(self.url, basic_auth=self.auth) - @staticmethod - def _get_index(root: str) -> str: - return "cdmv7dev-" + root + def set_cdm_version(self, version): + """Set up for a specific version of the CDM. + + We currently support v7 and v8. To dynamically select the latest CDM + version active on an Opensearch instance, call detect_cdm() on a + CrucibleService instance. + """ + if version < 7 or version > 8: + raise HTTPException( + status_code=400, + detail=f"The Crucible service supports CDM versions 7 and 8: {version} was specified", + ) + print(f"Selecting CDM version {version}") + self.cdm_version = version + self.id_names = { + "run": "id" if version == 7 else "run-uuid", + "iteration": "id" if version == 7 else "iteration-uuid", + "period": "id" if version == 7 else "period-uuid", + "sample": "id" if version == 7 else "sample-uuid", + "metric_desc": "id" if version == 7 else "metric_desc-uuid", + } + + async def detect_cdm(self): + indices = await self.elastic.indices.get("cdmv*") + versions = set() + vpat = re.compile(r"cdmv(?P\d+)dev-") + for i in indices.keys(): + match = vpat.match(i) + if match: + try: + versions.add(int(match.group("version"))) + except Exception as e: + print(f"Skipping index {i}: {str(e)!r}") + latest = max(versions) + self.set_cdm_version(latest) + + def _get_index(self, root: str) -> str: + return f"cdmv{self.cdm_version:d}dev-{root}" + + def _get_id_field(self, root: str) -> str: + return self.id_names[root] @staticmethod def _split_list(alist: Optional[list[str]] = None) -> list[str]: @@ -463,8 +502,26 @@ def _format_data(cls, data: dict[str, Any]) -> dict[str, Any]: "value": float(data["value"]), } - @classmethod - def _format_period(cls, period: dict[str, Any]) -> dict[str, Any]: + def _format_iteration(self, iteration: dict[str, Any]) -> dict[str, Any]: + """Helper to format an "iteration" object + + Args: + iteration: an "iteration" object + + Returns: + A neatly formatted "iteration" object + """ + iidn = self._get_id_field("iteration") + return { + "id": iteration[iidn], + "num": iteration["num"], + "path": iteration["path"], + "primary_metric": iteration["primary-metric"], + "primary_period": iteration["primary-period"], + "status": iteration["status"], + } + + def _format_period(self, period: dict[str, Any]) -> dict[str, Any]: """Helper to format a "period" object Crucible stores the date values as stringified integers, so this @@ -476,13 +533,31 @@ def _format_period(cls, period: dict[str, Any]) -> dict[str, Any]: Returns: A neatly formatted "period" object """ + pidn = self._get_id_field("period") return { - "begin": cls._format_timestamp(timestamp=period["begin"]), - "end": cls._format_timestamp(period["end"]), - "id": period["id"], + "begin": self._format_timestamp(timestamp=period["begin"]), + "end": self._format_timestamp(period["end"]), + "id": period[pidn], "name": period["name"], } + def _format_sample(self, sample: dict[str, Any]) -> dict[str, Any]: + """Helper to format a "sample" object + + Args: + sample: a "sample" object + + Returns: + A neatly formatted "sample" object + """ + sidn = self._get_id_field("sample") + return { + "num": sample["num"], + "path": sample["path"], + "id": sample[sidn], + "status": sample["status"], + } + @classmethod def _build_filter_options(cls, filter: Optional[list[str]] = None) -> Tuple[ Optional[list[dict[str, Any]]], @@ -600,9 +675,8 @@ def _build_name_filters( filters.append({"term": {f"metric_desc.names.{n}": v}}) return filters - @classmethod def _build_period_filters( - cls, periodlist: Optional[list[str]] = None + self, periodlist: Optional[list[str]] = None ) -> list[dict[str, Any]]: """Build period filters @@ -621,14 +695,15 @@ def _build_period_filters( A filter term that requires a period.id match only for metric_desc documents with a period. """ - pl: list[str] = cls._split_list(periodlist) + pl: list[str] = self._split_list(periodlist) + pidn = self._get_id_field("period") if pl: return [ { "dis_max": { "queries": [ {"bool": {"must_not": {"exists": {"field": "period"}}}}, - {"terms": {"period.id": pl}}, + {"terms": {f"period.{pidn}": pl}}, ] } } @@ -636,9 +711,8 @@ def _build_period_filters( else: return [] - @classmethod def _build_metric_filters( - cls, + self, run: str, metric: str, names: Optional[list[str]] = None, @@ -659,18 +733,19 @@ def _build_metric_filters( A list of OpenSearch filter expressions """ msource, mtype = metric.split("::") + ridn = self._get_id_field("run") return ( [ - {"term": {"run.id": run}}, + {"term": {f"run.{ridn}": run}}, {"term": {"metric_desc.source": msource}}, {"term": {"metric_desc.type": mtype}}, ] - + cls._build_name_filters(names) - + cls._build_period_filters(periods) + + self._build_name_filters(names) + + self._build_period_filters(periods) ) @classmethod - def _build_sort_terms(cls, sorters: Optional[list[str]]) -> list[dict[str, str]]: + def _build_sort_terms(cls, sorters: Optional[list[str]]) -> list[dict[str, Any]]: """Build sort term list Sorters may reference any native `run` index field and must specify @@ -690,16 +765,16 @@ def _build_sort_terms(cls, sorters: Optional[list[str]]) -> list[dict[str, str]] if dir not in cls.DIRECTIONS: raise HTTPException( status.HTTP_400_BAD_REQUEST, - f"Sort direction {dir!r} must be one of {','.join(DIRECTIONS)}", + f"Sort direction {dir!r} must be one of {','.join(cls.DIRECTIONS)}", ) if key not in cls.FIELDS: raise HTTPException( status.HTTP_400_BAD_REQUEST, - f"Sort key {key!r} must be one of {','.join(FIELDS)}", + f"Sort key {key!r} must be one of {','.join(cls.FIELDS)}", ) - sort_terms.append({f"run.{key}": dir}) + sort_terms.append({f"run.{key}": {"order": dir}}) else: - sort_terms = [{"run.begin": "asc"}] + sort_terms = [{"run.begin": {"order": "asc"}}] return sort_terms async def _search( @@ -810,7 +885,8 @@ async def _get_metric_ids( if len(metrics["hits"]["hits"]) < 1: print(f"No metric descs: filters={filters}") return [] - ids = [h["metric_desc"]["id"] for h in self._hits(metrics)] + mdidn = self._get_id_field("metric_desc") + ids = [h["metric_desc"][mdidn] for h in self._hits(metrics)] if len(ids) < 2 or aggregate: return ids @@ -823,9 +899,10 @@ async def _get_metric_ids( "message": f"More than one metric ({len(ids)}) means " "you should add breakout filters or aggregate." } + pidn = self._get_id_field("period") for m in self._hits(metrics): if "period" in m: - periods.add(m["period"]["id"]) + periods.add(m["period"][pidn]) for n, v in m["metric_desc"]["names"].items(): names[n].add(v) @@ -856,8 +933,9 @@ async def _build_timestamp_range_filters( if periods: ps = self._split_list(periods) + pidn = self._get_id_field("period") matches = await self.search( - "period", filters=[{"terms": {"period.id": ps}}] + "period", filters=[{"terms": {f"period.{pidn}": ps}}] ) try: start = min([int(h) for h in self._hits(matches, ["period", "begin"])]) @@ -881,7 +959,7 @@ async def _get_run_ids( ) -> set[str]: """Return a set of run IDs matching a filter - Documents in the specified index must have "run.id" fields. Returns + Documents in the specified index must have ID fields. This returns a set of unique run IDs matched by the filter in the specified index. Args: @@ -891,11 +969,12 @@ async def _get_run_ids( Returns: a set of unique run ID values """ + ridn = self._get_id_field("run") filtered = await self.search( - index, source="run.id", filters=filters, ignore_unavailable=True + index, source=f"run.{ridn}", filters=filters, ignore_unavailable=True ) print(f"HITS: {filtered['hits']['hits']}") - return set([x for x in self._hits(filtered, ["run", "id"])]) + return set([x for x in self._hits(filtered, ["run", ridn])]) async def _make_title( self, @@ -925,6 +1004,9 @@ async def _make_title( A string title """ names = metric_item.names + ridn = self._get_id_field("run") + iidn: str = self._get_id_field("iteration") + pidn = self._get_id_field("period") metric = metric_item.metric if metric_item.periods and len(metric_item.periods) == 1: period = metric_item.periods[0] @@ -934,22 +1016,22 @@ async def _make_title( # Gather iteration parameters outside the loop for help in # generating useful labels. all_params = await self.search( - "param", filters=[{"term": {"run.id": run_id}}] + "param", filters=[{"term": {f"run.{ridn}": run_id}}] ) collector = defaultdict(defaultdict) for h in self._hits(all_params): - collector[h["iteration"]["id"]][h["param"]["arg"]] = h["param"]["val"] + collector[h["iteration"][iidn]][h["param"]["arg"]] = h["param"]["val"] params_by_run[run_id] = collector else: collector = params_by_run[run_id] if run_id not in periods_by_run: periods = await self.search( - "period", filters=[{"term": {"run.id": run_id}}] + "period", filters=[{"term": {f"run.{ridn}": run_id}}] ) iteration_periods = defaultdict(list[dict[str, Any]]) for p in self._hits(periods): - iteration_periods[p["iteration"]["id"]].append(p["period"]) + iteration_periods[p["iteration"][iidn]].append(p["period"]) periods_by_run[run_id] = iteration_periods else: iteration_periods = periods_by_run[run_id] @@ -963,11 +1045,11 @@ async def _make_title( if metric_item.periods: iteration = None for i, plist in iteration_periods.items(): - if set(metric_item.periods) <= set([p["id"] for p in plist]): + if set(metric_item.periods) <= set([p[pidn] for p in plist]): iteration = i if period: for p in plist: - if p["id"] == period: + if p[pidn] == period: name_suffix += f" {p['name']}" # If the period(s) we're graphing resolve to a single @@ -1160,6 +1242,7 @@ async def get_runs( if run_filters: filters.extend(run_filters) if start or end: + print(f"Filtering runs from {start} to {end}") s = None e = None if start: @@ -1213,28 +1296,49 @@ async def get_runs( rawiterations = await self.search("iteration", ignore_unavailable=True) rawtags = await self.search("tag", ignore_unavailable=True) rawparams = await self.search("param", ignore_unavailable=True) + rawperiods = await self.search(index="period", ignore_unavailable=True) iterations = defaultdict(list) + periods: dict[str, tuple[int, int]] = {} tags = defaultdict(defaultdict) params = defaultdict(defaultdict) run_params = defaultdict(list) + ridn = self._get_id_field("run") + iidn = self._get_id_field("iteration") for i in self._hits(rawiterations): - iterations[i["run"]["id"]].append(i["iteration"]) + iterations[i["run"][ridn]].append(i["iteration"]) # Organize tags by run ID for t in self._hits(rawtags): - tags[t["run"]["id"]][t["tag"]["name"]] = t["tag"]["val"] + tags[t["run"][ridn]][t["tag"]["name"]] = t["tag"]["val"] + + # Organize period timestamps by run ID + for p in self._hits(rawperiods): + period = p["period"] + rid = p["run"][ridn] + range = periods.get(rid) + try: + b = int(period["begin"]) + e = int(period["end"]) + except Exception as e: + print(f"bad timestamp in period {period}: {str(e)!r}") + continue + if range: + range = (min(range[0], b), max(range[1], e)) + else: + range = (b, e) + periods[rid] = range # Organize params by iteration ID for p in self._hits(rawparams): - run_params[p["run"]["id"]].append(p) - params[p["iteration"]["id"]][p["param"]["arg"]] = p["param"]["val"] + run_params[p["run"][ridn]].append(p) + params[p["iteration"][iidn]][p["param"]["arg"]] = p["param"]["val"] runs = {} for h in self._hits(hits): run = h["run"] - rid = run["id"] + rid = run[ridn] # Filter the runs by our tag and param queries if param_filters and rid not in paramids: @@ -1243,17 +1347,24 @@ async def get_runs( if tag_filters and rid not in tagids: continue - runs[rid] = run + # Convert string timestamps (milliseconds from epoch) to int. + # If the run document has no begin or end, look for the periods + b = None + e = None + if "begin" in run and "end" in run: + try: + b = int(run["begin"]) + e = int(run["end"]) + except Exception as exc: + print(f"bad timestamps in run {run}: {str(exc)!r}") + if (b is None or e is None) and rid in periods: + b, e = periods[rid] + if b is None or e is None: + print(f"can't find begin/end timestamps for run {rid}: ignoring") + continue + run["begin"] = b + run["end"] = e - # Convert string timestamps (milliseconds from epoch) to int - try: - run["begin"] = int(run["begin"]) - run["end"] = int(run["end"]) - except Exception as e: - print( - f"Unexpected error converting timestamp {run['begin']!r} " - f"or {run['end']!r} to int: {str(e)!r}" - ) run["tags"] = tags.get(rid, {}) run["iterations"] = [] run["primary_metrics"] = set() @@ -1262,7 +1373,7 @@ async def get_runs( # Collect unique iterations: the status is "fail" if any iteration # for that run ID failed. for i in iterations.get(rid, []): - iparams = params.get(i["id"], {}) + iparams = params.get(i[iidn], {}) if "status" not in run: run["status"] = i["status"] else: @@ -1289,6 +1400,8 @@ async def get_runs( run["begin_date"] = self._format_timestamp("0") run["end_date"] = self._format_timestamp("0") + runs[rid] = run + count = len(runs) total = hits["hits"]["total"]["value"] results.update( @@ -1311,9 +1424,10 @@ async def get_tags(self, run: str, **kwargs) -> dict[str, str]: Returns: JSON dict with "tag" keys showing each value """ + ridn = self._get_id_field("run") tags = await self.search( index="tag", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], **kwargs, ignore_unavailable=True, ) @@ -1342,7 +1456,11 @@ async def get_params( status.HTTP_400_BAD_REQUEST, "A params query requires either a run or iteration ID", ) - match = {"run.id" if run else "iteration.id": run if run else iteration} + ridn = self._get_id_field("run") + iidn = self._get_id_field("iteration") + match = { + f"run.{ridn}" if run else f"iteration.{iidn}": run if run else iteration + } params = await self.search( index="param", filters=[{"term": match}], @@ -1351,7 +1469,7 @@ async def get_params( ) response = defaultdict(defaultdict) for param in self._hits(params): - iter = param["iteration"]["id"] + iter = param["iteration"][iidn] arg = param["param"]["arg"] val = param["param"]["val"] if response.get(iter) and response.get(iter).get(arg): @@ -1377,9 +1495,11 @@ async def get_iterations(self, run: str, **kwargs) -> list[dict[str, Any]]: Returns: A list of iteration documents """ + ridn = self._get_id_field("run") + iidn = self._get_id_field("iteration") hits = await self.search( index="iteration", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], sort=[{"iteration.num": "asc"}], **kwargs, ignore_unavailable=True, @@ -1387,16 +1507,7 @@ async def get_iterations(self, run: str, **kwargs) -> list[dict[str, Any]]: iterations = [] for i in self._hits(hits, ["iteration"]): - iterations.append( - { - "id": i["id"], - "num": i["num"], - "path": i["path"], - "primary_metric": i["primary-metric"], - "primary_period": i["primary-period"], - "status": i["status"], - } - ) + iterations.append(self._format_iteration(i)) return iterations async def get_samples( @@ -1417,7 +1528,11 @@ async def get_samples( status.HTTP_400_BAD_REQUEST, "A sample query requires either a run or iteration ID", ) - match = {"run.id" if run else "iteration.id": run if run else iteration} + ridn = self._get_id_field("run") + iidn = self._get_id_field("iteration") + match = { + f"run.{ridn}" if run else f"iteration.{iidn}": run if run else iteration + } hits = await self.search( index="sample", filters=[{"term": match}], @@ -1426,12 +1541,10 @@ async def get_samples( ) samples = [] for s in self._hits(hits): - print(f"SAMPLE's ITERATION {s['iteration']}") - sample = s["sample"] + sample = self._format_sample(s["sample"]) sample["iteration"] = s["iteration"]["num"] sample["primary_metric"] = s["iteration"]["primary-metric"] sample["primary_period"] = s["iteration"]["primary-period"] - sample["status"] = s["iteration"]["status"] samples.append(sample) return samples @@ -1463,12 +1576,15 @@ async def get_periods( "A period query requires a run, iteration, or sample ID", ) match = None + ridn = self._get_id_field("run") + iidn = self._get_id_field("iteration") + sidn = self._get_id_field("sample") if sample: - match = {"sample.id": sample} + match = {f"sample.{sidn}": sample} elif iteration: - match = {"iteration.id": iteration} + match = {f"iteration.{iidn}": iteration} else: - match = {"run.id": run} + match = {f"run.{ridn}": run} periods = await self.search( index="period", filters=[{"term": match}], @@ -1499,31 +1615,34 @@ async def get_timeline(self, run: str, **kwargs) -> dict[str, Any]: run: run ID kwargs: additional OpenSearch parameters """ + ridn = self._get_id_field("run") itr = await self.search( index="iteration", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], **kwargs, ignore_unavailable=True, ) sam = await self.search( index="sample", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], **kwargs, ignore_unavailable=True, ) per = await self.search( index="period", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], **kwargs, ignore_unavailable=True, ) samples = defaultdict(list) periods = defaultdict(list) + iidn = self._get_id_field("iteration") + sidn = self._get_id_field("sample") for s in self._hits(sam): - samples[s["iteration"]["id"]].append(s) + samples[s["iteration"][iidn]].append(s) for p in self._hits(per): - periods[p["sample"]["id"]].append(p) + periods[p["sample"][sidn]].append(p) iterations = [] robj = {"id": run, "iterations": iterations} @@ -1535,10 +1654,10 @@ async def get_timeline(self, run: str, **kwargs) -> dict[str, Any]: iteration = i["iteration"] iterations.append(iteration) iteration["samples"] = [] - for s in samples.get(iteration["id"], []): + for s in samples.get(iteration[iidn], []): sample = s["sample"] sample["periods"] = [] - for pr in periods.get(sample["id"], []): + for pr in periods.get(sample[sidn], []): period = self._format_period(pr["period"]) sample["periods"].append(period) iteration["samples"].append(sample) @@ -1570,13 +1689,15 @@ async def get_metrics_list(self, run: str, **kwargs) -> dict[str, Any]: Returns: List of metrics available for the run """ + ridn = self._get_id_field("run") hits = await self.search( index="metric_desc", - filters=[{"term": {"run.id": run}}], + filters=[{"term": {f"run.{ridn}": run}}], ignore_unavailable=True, **kwargs, ) met = {} + pidn = self._get_id_field("period") for h in self._hits(hits): desc = h["metric_desc"] name = desc["source"] + "::" + desc["type"] @@ -1586,7 +1707,7 @@ async def get_metrics_list(self, run: str, **kwargs) -> dict[str, Any]: record = {"periods": [], "breakouts": defaultdict(set)} met[name] = record if "period" in h: - record["periods"].append(h["period"]["id"]) + record["periods"].append(h["period"][pidn]) for n, v in desc["names"].items(): record["breakouts"][n].add(v) return met @@ -1647,6 +1768,7 @@ async def get_metric_breakouts( response = {"label": metric, "class": classes} breakouts = defaultdict(set) pl = set() + pidn = self._get_id_field("period") for m in self._hits(metrics): desc = m["metric_desc"] response["type"] = desc["type"] @@ -1654,7 +1776,7 @@ async def get_metric_breakouts( if desc.get("class"): classes.add(desc["class"]) if "period" in m: - pl.add(m["period"]["id"]) + pl.add(m["period"][pidn]) for n, v in desc["names"].items(): breakouts[n].add(v) # We want to help filter a consistent summary, so only show those @@ -1720,7 +1842,8 @@ async def get_metrics_data( # If we're searching by periods, filter metric data by the period # timestamp range rather than just relying on the metric desc IDs as # we also want to filter non-periodic tool data. - filters = [{"terms": {"metric_desc.id": ids}}] + mdidn = self._get_id_field("metric_desc") + filters = [{"terms": {f"metric_desc.{mdidn}": ids}}] filters.extend(await self._build_timestamp_range_filters(periods)) response = [] @@ -1784,6 +1907,7 @@ async def get_metrics_summary( params_by_run = {} periods_by_run = {} run_id_list = [] + mdidn = self._get_id_field("metric_desc") for s in summaries: if not s.run: raise HTTPException( @@ -1800,7 +1924,7 @@ async def get_metrics_summary( periodlist=summary.periods, aggregate=summary.aggregate, ) - filters = [{"terms": {"metric_desc.id": ids}}] + filters = [{"terms": {f"metric_desc.{mdidn}": ids}}] filters.extend(await self._build_timestamp_range_filters(summary.periods)) data = await self.search( "metric_data", @@ -1956,7 +2080,8 @@ async def get_metrics_graph(self, graphdata: GraphList) -> dict[str, Any]: periodlist=g.periods, aggregate=g.aggregate, ) - filters = [{"terms": {"metric_desc.id": ids}}] + mdidn = self._get_id_field("metric_desc") + filters = [{"terms": {f"metric_desc.{mdidn}": ids}}] filters.extend(await self._build_timestamp_range_filters(g.periods)) y_max = 0.0 points: list[Point] = []