From e12a606922a93c1d31414f7851c88863a9b5219a Mon Sep 17 00:00:00 2001 From: Ben Sully Date: Fri, 9 Feb 2024 15:55:04 +0000 Subject: [PATCH] SceneQueryRunner: decouple time range comparisons Prior to this commit, `SceneQueryRunner` had special handling for `SceneTimeRangeCompare` objects, explicitly searching for them in the scene graph, adding additional queries, and transforming resulting queries. This made it difficult to re-use the general behaviour of 'running additional queries' in other objects. This commit introduces a new interface, `SceneRequestAdder`, which can be implemented to inform the query runner that it should run additional requests (returned by `getExtraRequests`) and transform the results in some fashion. Instead of searching the graph for `SceneTimeRangeCompare` objects, the query runner searches for implementors of `SceneRequestAdder` and uses those instead. The specifics of how it searches for these is a little bit fuzzy and should probably be improved: it walks up the graph until it finds at least one adder at the current level or in any children of the current level; adds any others at that level or in the children; then returns. `SceneTimeRangeCompare` has been refactored to make use of this new interface. I've also got a separate object which also implements it which is working well including when both are enabled. --- .../src/components/SceneTimeRangeCompare.tsx | 70 +++++++++++++-- packages/scenes/src/index.ts | 1 + .../src/querying/SceneQueryRunner.test.ts | 1 + .../scenes/src/querying/SceneQueryRunner.ts | 85 +++++++++---------- .../scenes/src/querying/SceneRequestAdder.ts | 31 +++++++ .../extraRequestProcessingOperator.ts | 33 +++++++ .../timeShiftQueryResponseOperator.ts | 46 ---------- 7 files changed, 169 insertions(+), 98 deletions(-) create mode 100644 packages/scenes/src/querying/SceneRequestAdder.ts create mode 100644 packages/scenes/src/querying/extraRequestProcessingOperator.ts delete mode 100644 packages/scenes/src/querying/timeShiftQueryResponseOperator.ts diff --git a/packages/scenes/src/components/SceneTimeRangeCompare.tsx b/packages/scenes/src/components/SceneTimeRangeCompare.tsx index 1c80f71cd..2dc5231e3 100644 --- a/packages/scenes/src/components/SceneTimeRangeCompare.tsx +++ b/packages/scenes/src/components/SceneTimeRangeCompare.tsx @@ -1,17 +1,16 @@ -import { DateTime, dateTime, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data'; +import { DataQueryRequest, DateTime, dateTime, FieldType, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data'; +import { config } from '@grafana/runtime'; import { ButtonGroup, ButtonSelect, Checkbox, ToolbarButton, useStyles2 } from '@grafana/ui'; import React from 'react'; import { sceneGraph } from '../core/sceneGraph'; import { SceneObjectBase } from '../core/SceneObjectBase'; import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types'; +import { ExtraRequest, SceneRequestAdder, TransformFunc } from '../querying/SceneRequestAdder'; import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig'; +import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId'; import { parseUrlParam } from '../utils/parseUrlParam'; import { css } from '@emotion/css'; -export interface TimeRangeCompareProvider { - getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined; -} - interface SceneTimeRangeCompareState extends SceneObjectState { compareWith?: string; compareOptions: Array<{ label: string; value: string }>; @@ -38,8 +37,7 @@ export const DEFAULT_COMPARE_OPTIONS = [ export class SceneTimeRangeCompare extends SceneObjectBase - implements TimeRangeCompareProvider -{ + implements SceneRequestAdder { static Component = SceneTimeRangeCompareRenderer; protected _urlSync = new SceneObjectUrlSyncConfig(this, { keys: ['compareWith'] }); @@ -94,6 +92,27 @@ export class SceneTimeRangeCompare this.setState({ compareWith: undefined }); }; + // Get a time shifted request to compare with the primary request. + public getExtraRequests(request: DataQueryRequest): ExtraRequest[] { + const extraRequests = []; + const compareRange = this.getCompareTimeRange(request.range); + if (compareRange) { + extraRequests.push({ + req: { + ...request, + range: compareRange, + }, + transform: timeShiftAlignmentTransform, + }); + } + return extraRequests; + } + + // The query runner should rerun the comparison query if the compareWith value has changed. + public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean { + return prev.compareWith !== next.compareWith; + } + public getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined { let compareFrom: DateTime; let compareTo: DateTime; @@ -149,6 +168,43 @@ export class SceneTimeRangeCompare } } +// Transformation function for use with time shifted comparison series. +// This aligns the secondary series with the primary and adds custom +// metadata and config to the secondary series' fields so that it is +// rendered appropriately. +const timeShiftAlignmentTransform: TransformFunc = (primary, secondary) => { + const diff = secondary.timeRange.from.diff(primary.timeRange.from); + secondary.series.forEach((series) => { + series.refId = getCompareSeriesRefId(series.refId || ''); + series.meta = { + ...series.meta, + // @ts-ignore Remove when https://github.com/grafana/grafana/pull/71129 is released + timeCompare: { + diffMs: diff, + isTimeShiftQuery: true, + }, + }; + series.fields.forEach((field) => { + // Align compare series time stamps with reference series + if (field.type === FieldType.time) { + field.values = field.values.map((v) => { + return diff < 0 ? v - diff : v + diff; + }); + } + + field.config = { + ...field.config, + color: { + mode: 'fixed', + fixedColor: config.theme.palette.gray60, + }, + }; + return field; + }); + }); + return secondary; +} + function SceneTimeRangeCompareRenderer({ model }: SceneComponentProps) { const styles = useStyles2(getStyles); const { compareWith, compareOptions } = model.useState(); diff --git a/packages/scenes/src/index.ts b/packages/scenes/src/index.ts index aeb7da6e0..07ae986f5 100644 --- a/packages/scenes/src/index.ts +++ b/packages/scenes/src/index.ts @@ -28,6 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange'; export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride'; export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner'; +export { type ExtraRequest, type SceneRequestAdder, type TransformFunc } from './querying/SceneRequestAdder'; export { SceneDataLayers } from './querying/SceneDataLayers'; export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase'; export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls'; diff --git a/packages/scenes/src/querying/SceneQueryRunner.test.ts b/packages/scenes/src/querying/SceneQueryRunner.test.ts index 07511207d..01783eeec 100644 --- a/packages/scenes/src/querying/SceneQueryRunner.test.ts +++ b/packages/scenes/src/querying/SceneQueryRunner.test.ts @@ -95,6 +95,7 @@ const runRequestMock = jest.fn().mockImplementation((ds: DataSourceApi, request: state: LoadingState.Loading, series: [], annotations: [], + request, timeRange: request.range, }; diff --git a/packages/scenes/src/querying/SceneQueryRunner.ts b/packages/scenes/src/querying/SceneQueryRunner.ts index 5f5780afe..2e0ece41f 100644 --- a/packages/scenes/src/querying/SceneQueryRunner.ts +++ b/packages/scenes/src/querying/SceneQueryRunner.ts @@ -31,9 +31,9 @@ import { VariableDependencyConfig } from '../variables/VariableDependencyConfig' import { writeSceneLog } from '../utils/writeSceneLog'; import { VariableValueRecorder } from '../variables/VariableValueRecorder'; import { emptyPanelData } from '../core/SceneDataNode'; -import { SceneTimeRangeCompare } from '../components/SceneTimeRangeCompare'; import { getClosest } from '../core/sceneGraph/utils'; -import { timeShiftQueryResponseOperator } from './timeShiftQueryResponseOperator'; +import { isRequestAdder, SceneRequestAdder, TransformFunc } from './SceneRequestAdder'; +import { passthroughTransform, extraRequestProcessingOperator } from './extraRequestProcessingOperator'; import { filterAnnotations } from './layers/annotations/filterAnnotations'; import { getEnrichedDataRequest } from './getEnrichedDataRequest'; import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters'; @@ -99,19 +99,18 @@ export class SceneQueryRunner extends SceneObjectBase implemen } private _onActivate() { - const timeRange = sceneGraph.getTimeRange(this); - const comparer = this.getTimeCompare(); - - if (comparer) { + const adders = this.getClosestRequestAdders(); + for (const adder of adders) { this._subs.add( - comparer.subscribeToState((n, p) => { - if (n.compareWith !== p.compareWith) { + adder.subscribeToState((n, p) => { + if (adder.shouldRerun(p, n)) { this.runQueries(); } }) - ); + ) } + const timeRange = sceneGraph.getTimeRange(this); this._subs.add( timeRange.subscribeToState(() => { this.runWithTimeRange(timeRange); @@ -396,21 +395,24 @@ export class SceneQueryRunner extends SceneObjectBase implemen this.findAndSubscribeToAdHocFilters(datasource?.uid); const runRequest = getRunRequest(); - const [request, secondaryRequest] = this.prepareRequests(timeRange, ds); + const { primary, secondaries, transformations } = this.prepareRequests(timeRange, ds); writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key); - let stream = runRequest(ds, request); + let stream = runRequest(ds, primary); - if (secondaryRequest) { + if (secondaries.length > 0) { + const [sReq, ...otherSReqs] = secondaries; + const secondaryStreams = otherSReqs.map((r) => runRequest(ds, r)); // change subscribe callback below to pipe operator - stream = forkJoin([stream, runRequest(ds, secondaryRequest)]).pipe(timeShiftQueryResponseOperator); + const op = extraRequestProcessingOperator(transformations); + stream = forkJoin([stream, runRequest(ds, sReq), ...secondaryStreams]).pipe(op); } stream = stream.pipe( registerQueryWithController({ type: 'data', - request, + request: primary, origin: this, cancel: () => this.cancelQuery(), }) @@ -432,12 +434,9 @@ export class SceneQueryRunner extends SceneObjectBase implemen private prepareRequests = ( timeRange: SceneTimeRangeLike, ds: DataSourceApi - ): [DataQueryRequest, DataQueryRequest | undefined] => { - const comparer = this.getTimeCompare(); + ): { primary: DataQueryRequest, secondaries: DataQueryRequest[], transformations: Map } => { const { minInterval, queries } = this.state; - let secondaryRequest: DataQueryRequest | undefined; - let request: DataQueryRequest = { app: 'scenes', requestId: getNextRequestId(), @@ -493,23 +492,18 @@ export class SceneQueryRunner extends SceneObjectBase implemen request.intervalMs = norm.intervalMs; const primaryTimeRange = timeRange.state.value; - if (comparer) { - const secondaryTimeRange = comparer.getCompareTimeRange(primaryTimeRange); - if (secondaryTimeRange) { - secondaryRequest = { - ...request, - range: secondaryTimeRange, - requestId: getNextRequestId(), - }; - - request = { - ...request, - range: primaryTimeRange, - }; + + let secondaryRequests: DataQueryRequest[] = []; + let secondaryTransformations = new Map(); + for (const adder of this.getClosestRequestAdders() ?? []) { + for (const { req, transform } of adder.getExtraRequests(request)) { + const requestId = getNextRequestId(); + secondaryRequests.push({ ...req, requestId }) + secondaryTransformations.set(requestId, transform ?? passthroughTransform); } } - - return [request, secondaryRequest]; + request.range = primaryTimeRange; + return { primary: request, secondaries: secondaryRequests, transformations: secondaryTransformations }; }; private onDataReceived = (data: PanelData) => { @@ -551,26 +545,27 @@ export class SceneQueryRunner extends SceneObjectBase implemen } /** - * Will walk up the scene graph and find the closest time range compare object - * It performs buttom-up search, including shallow search across object children for supporting controls/header actions + * Walk up the scene graph and find any request adders. + * + * Will stop as soon as at least one adder has been found at any level + * of the graph. This might need to change in future. */ - private getTimeCompare() { + private getClosestRequestAdders(): Array> { if (!this.parent) { - return null; + return []; } return getClosest(this.parent, (s) => { - let found = null; - if (s instanceof SceneTimeRangeCompare) { - return s; + const found: Array> = []; + if (isRequestAdder(s)) { + found.push(s); } s.forEachChild((child) => { - if (child instanceof SceneTimeRangeCompare) { - found = child; + if (isRequestAdder(child)) { + found.push(child); } }); - - return found; - }); + return found.length > 0 ? found : null; + }) ?? []; } /** diff --git a/packages/scenes/src/querying/SceneRequestAdder.ts b/packages/scenes/src/querying/SceneRequestAdder.ts new file mode 100644 index 000000000..80fd8c1e5 --- /dev/null +++ b/packages/scenes/src/querying/SceneRequestAdder.ts @@ -0,0 +1,31 @@ +import { DataQueryRequest, PanelData } from "@grafana/data"; + +import { SceneObjectBase } from "../core/SceneObjectBase"; +import { SceneObjectState } from "../core/types"; + +// A transformation function called by the query runner with responses +// to any extra requests. +// +// See the docs for `extraRequestProcessingOperator` for more information. +export type TransformFunc = (primary: PanelData, secondary: PanelData) => PanelData; + +// An extra request that should be run by a query runner, and an optional +// transform that should be called with the response data. +export interface ExtraRequest { + // The request. + req: DataQueryRequest; + // An optional transformation function. + transform?: TransformFunc; +} + +// Indicates that this type wants to add extra requests to a query runner. +export interface SceneRequestAdder extends SceneObjectBase { + // Get any extra requests and their required transformations. + getExtraRequests(request: DataQueryRequest): ExtraRequest[]; + // Determine whether a query should be rerun. + shouldRerun(prev: T, next: T): boolean; +} + +export function isRequestAdder(obj: any): obj is SceneRequestAdder { + return typeof obj === 'object' && 'getExtraRequests' in obj; +} diff --git a/packages/scenes/src/querying/extraRequestProcessingOperator.ts b/packages/scenes/src/querying/extraRequestProcessingOperator.ts new file mode 100644 index 000000000..1c56042aa --- /dev/null +++ b/packages/scenes/src/querying/extraRequestProcessingOperator.ts @@ -0,0 +1,33 @@ +import { PanelData } from '@grafana/data'; +import { map, Observable } from 'rxjs'; +import { TransformFunc } from './SceneRequestAdder'; + +// Passthrough transformation for use with ExtraRequests. +export const passthroughTransform: TransformFunc = (_, secondary) => secondary; + +// Factory function which takes a map from request ID to transform functions and +// returns an rxjs operator which operates on an array of panel data responses. +// The responses must have length at least 2; the first is treated as the 'primary' +// response and the rest as secondary responses. +// +// Each secondary response is transformed according to the transform function +// identified by it's request ID. The transform function is passed the primary +// response and the secondary response to be processed. +// +// The output is a single frame with the primary series and all transformed +// secondary series combined. +export const extraRequestProcessingOperator = (transforms: Map) => + (data: Observable<[PanelData, PanelData, ...PanelData[]]>) => { + return data.pipe( + map(([primary, ...secondaries]) => { + const frames = secondaries.flatMap((s) => { + const transformed = transforms.get(s.request!.requestId)?.(primary, s) ?? s; + return transformed.series; + }); + return { + ...primary, + series: [...primary.series, ...frames], + }; + }) + ); + } diff --git a/packages/scenes/src/querying/timeShiftQueryResponseOperator.ts b/packages/scenes/src/querying/timeShiftQueryResponseOperator.ts deleted file mode 100644 index fcc17d1f6..000000000 --- a/packages/scenes/src/querying/timeShiftQueryResponseOperator.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { FieldType, PanelData } from '@grafana/data'; -import { config } from '@grafana/runtime'; -import { map, Observable } from 'rxjs'; -import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId'; - -export function timeShiftQueryResponseOperator(data: Observable<[PanelData, PanelData]>) { - return data.pipe( - map(([p, s]) => { - const diff = s.timeRange.from.diff(p.timeRange.from); - s.series.forEach((series) => { - series.refId = getCompareSeriesRefId(series.refId || ''); - series.meta = { - ...series.meta, - // @ts-ignore Remove when https://github.com/grafana/grafana/pull/71129 is released - timeCompare: { - diffMs: diff, - isTimeShiftQuery: true, - }, - }; - series.fields.forEach((field) => { - // Align compare series time stamps with reference series - if (field.type === FieldType.time) { - field.values = field.values.map((v) => { - return diff < 0 ? v - diff : v + diff; - }); - } - - field.config = { - ...field.config, - color: { - mode: 'fixed', - fixedColor: config.theme.palette.gray60, - }, - }; - - return field; - }); - }); - - return { - ...p, - series: [...p.series, ...s.series], - }; - }) - ); -}