Skip to content

Commit

Permalink
SceneQueryRunner: decouple time range comparisons
Browse files Browse the repository at this point in the history
Prior to this commit, `SceneQueryRunner` had special handling for
`SceneTimeRangeCompare` objects, explicitly searching for them in the
scene graph, adding additional queries, and transforming resulting
queries. This made it difficult to re-use the general behaviour of
'running additional queries' in other objects.

This commit introduces a new interface, `SceneRequestAdder`, which
can be implemented to inform the query runner that it should run
additional requests (returned by `getExtraRequests`) and transform the
results in some fashion.

Instead of searching the graph for `SceneTimeRangeCompare` objects,
the query runner searches for implementors of `SceneRequestAdder`
and uses those instead. The specifics of how it searches for these
is a little bit fuzzy and should probably be improved: it walks up the
graph until it finds at least one adder at the current level or in
any children of the current level; adds any others at that level
or in the children; then returns.

`SceneTimeRangeCompare` has been refactored to make use of this new
interface. I've also got a separate object which also implements
it which is working well including when both are enabled.
  • Loading branch information
sd2k committed Feb 9, 2024
1 parent c7d1f1b commit 7f75cef
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 98 deletions.
70 changes: 63 additions & 7 deletions packages/scenes/src/components/SceneTimeRangeCompare.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { DateTime, dateTime, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data';
import { DataQueryRequest, DateTime, dateTime, FieldType, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data';
import { config } from '@grafana/runtime';
import { ButtonGroup, ButtonSelect, Checkbox, ToolbarButton, useStyles2 } from '@grafana/ui';
import React from 'react';
import { sceneGraph } from '../core/sceneGraph';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types';
import { ExtraRequest, SceneRequestAdder, TransformFunc } from '../querying/SceneRequestAdder';
import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig';
import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId';
import { parseUrlParam } from '../utils/parseUrlParam';
import { css } from '@emotion/css';

export interface TimeRangeCompareProvider {
getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined;
}

interface SceneTimeRangeCompareState extends SceneObjectState {
compareWith?: string;
compareOptions: Array<{ label: string; value: string }>;
Expand All @@ -38,8 +37,7 @@ export const DEFAULT_COMPARE_OPTIONS = [

export class SceneTimeRangeCompare
extends SceneObjectBase<SceneTimeRangeCompareState>
implements TimeRangeCompareProvider
{
implements SceneRequestAdder<SceneTimeRangeCompareState> {
static Component = SceneTimeRangeCompareRenderer;
protected _urlSync = new SceneObjectUrlSyncConfig(this, { keys: ['compareWith'] });

Expand Down Expand Up @@ -94,6 +92,27 @@ export class SceneTimeRangeCompare
this.setState({ compareWith: undefined });
};

// Get a time shifted request to compare with the primary request.
public getExtraRequests(request: DataQueryRequest): ExtraRequest[] {
const extraRequests = [];
const compareRange = this.getCompareTimeRange(request.range);
if (compareRange) {
extraRequests.push({
req: {
...request,
range: compareRange,
},
transform: timeShiftAlignmentTransform,
});
}
return extraRequests;
}

// The query runner should rerun the comparison query if the compareWith value has changed.
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean {
return prev.compareWith !== next.compareWith;
}

public getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined {
let compareFrom: DateTime;
let compareTo: DateTime;
Expand Down Expand Up @@ -149,6 +168,43 @@ export class SceneTimeRangeCompare
}
}

// Transformation function for use with time shifted comparison series.
// This aligns the secondary series with the primary and adds custom
// metadata and config to the secondary series' fields so that it is
// rendered appropriately.
const timeShiftAlignmentTransform: TransformFunc = (primary, secondary) => {
const diff = secondary.timeRange.from.diff(primary.timeRange.from);
secondary.series.forEach((series) => {
series.refId = getCompareSeriesRefId(series.refId || '');
series.meta = {
...series.meta,
// @ts-ignore Remove when https://github.com/grafana/grafana/pull/71129 is released
timeCompare: {
diffMs: diff,
isTimeShiftQuery: true,
},
};
series.fields.forEach((field) => {
// Align compare series time stamps with reference series
if (field.type === FieldType.time) {
field.values = field.values.map((v) => {
return diff < 0 ? v - diff : v + diff;
});
}

field.config = {
...field.config,
color: {
mode: 'fixed',
fixedColor: config.theme.palette.gray60,
},
};
return field;
});
});
return secondary;
}

function SceneTimeRangeCompareRenderer({ model }: SceneComponentProps<SceneTimeRangeCompare>) {
const styles = useStyles2(getStyles);
const { compareWith, compareOptions } = model.useState();
Expand Down
1 change: 1 addition & 0 deletions packages/scenes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange';
export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride';

export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner';
export { type ExtraRequest, type SceneRequestAdder, type TransformFunc } from './querying/SceneRequestAdder';
export { SceneDataLayers } from './querying/SceneDataLayers';
export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase';
export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls';
Expand Down
1 change: 1 addition & 0 deletions packages/scenes/src/querying/SceneQueryRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const runRequestMock = jest.fn().mockImplementation((ds: DataSourceApi, request:
state: LoadingState.Loading,
series: [],
annotations: [],
request,
timeRange: request.range,
};

Expand Down
85 changes: 40 additions & 45 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import { VariableDependencyConfig } from '../variables/VariableDependencyConfig'
import { writeSceneLog } from '../utils/writeSceneLog';
import { VariableValueRecorder } from '../variables/VariableValueRecorder';
import { emptyPanelData } from '../core/SceneDataNode';
import { SceneTimeRangeCompare } from '../components/SceneTimeRangeCompare';
import { getClosest } from '../core/sceneGraph/utils';
import { timeShiftQueryResponseOperator } from './timeShiftQueryResponseOperator';
import { isRequestAdder, SceneRequestAdder, TransformFunc } from './SceneRequestAdder';
import { passthroughTransform, extraRequestProcessingOperator } from './extraRequestProcessingOperator';
import { filterAnnotations } from './layers/annotations/filterAnnotations';
import { getEnrichedDataRequest } from './getEnrichedDataRequest';
import { AdHocFilterSet } from '../variables/adhoc/AdHocFiltersSet';
Expand Down Expand Up @@ -100,19 +100,18 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}

private _onActivate() {
const timeRange = sceneGraph.getTimeRange(this);
const comparer = this.getTimeCompare();

if (comparer) {
const adders = this.getClosestRequestAdders();
for (const adder of adders) {
this._subs.add(
comparer.subscribeToState((n, p) => {
if (n.compareWith !== p.compareWith) {
adder.subscribeToState((n, p) => {
if (adder.shouldRerun(p, n)) {
this.runQueries();
}
})
);
)
}

const timeRange = sceneGraph.getTimeRange(this);
this._subs.add(
timeRange.subscribeToState(() => {
this.runWithTimeRange(timeRange);
Expand Down Expand Up @@ -382,21 +381,24 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}

const runRequest = getRunRequest();
const [request, secondaryRequest] = this.prepareRequests(timeRange, ds);
const { primary, secondaries, transformations } = this.prepareRequests(timeRange, ds);

writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key);

let stream = runRequest(ds, request);
let stream = runRequest(ds, primary);

if (secondaryRequest) {
if (secondaries.length > 0) {
const [sReq, ...otherSReqs] = secondaries;
const secondaryStreams = otherSReqs.map((r) => runRequest(ds, r));
// change subscribe callback below to pipe operator
stream = forkJoin([stream, runRequest(ds, secondaryRequest)]).pipe(timeShiftQueryResponseOperator);
const op = extraRequestProcessingOperator(transformations);
stream = forkJoin([stream, runRequest(ds, sReq), ...secondaryStreams]).pipe(op);
}

stream = stream.pipe(
registerQueryWithController({
type: 'data',
request,
request: primary,
origin: this,
cancel: () => this.cancelQuery(),
})
Expand All @@ -418,12 +420,9 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
private prepareRequests = (
timeRange: SceneTimeRangeLike,
ds: DataSourceApi
): [DataQueryRequest, DataQueryRequest | undefined] => {
const comparer = this.getTimeCompare();
): { primary: DataQueryRequest, secondaries: DataQueryRequest[], transformations: Map<string, TransformFunc> } => {
const { minInterval, queries } = this.state;

let secondaryRequest: DataQueryRequest | undefined;

let request: DataQueryRequest = {
app: 'scenes',
requestId: getNextRequestId(),
Expand Down Expand Up @@ -477,23 +476,18 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
request.intervalMs = norm.intervalMs;

const primaryTimeRange = timeRange.state.value;
if (comparer) {
const secondaryTimeRange = comparer.getCompareTimeRange(primaryTimeRange);
if (secondaryTimeRange) {
secondaryRequest = {
...request,
range: secondaryTimeRange,
requestId: getNextRequestId(),
};

request = {
...request,
range: primaryTimeRange,
};

let secondaryRequests: DataQueryRequest[] = [];
let secondaryTransformations = new Map();
for (const adder of this.getClosestRequestAdders() ?? []) {
for (const { req, transform } of adder.getExtraRequests(request)) {
const requestId = getNextRequestId();
secondaryRequests.push({ ...req, requestId })
secondaryTransformations.set(requestId, transform ?? passthroughTransform);
}
}

return [request, secondaryRequest];
request.range = primaryTimeRange;
return { primary: request, secondaries: secondaryRequests, transformations: secondaryTransformations };
};

private onDataReceived = (data: PanelData) => {
Expand Down Expand Up @@ -535,26 +529,27 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}

/**
* Will walk up the scene graph and find the closest time range compare object
* It performs buttom-up search, including shallow search across object children for supporting controls/header actions
* Walk up the scene graph and find any request adders.
*
* Will stop as soon as at least one adder has been found at any level
* of the graph. This might need to change in future.
*/
private getTimeCompare() {
private getClosestRequestAdders(): Array<SceneRequestAdder<any>> {
if (!this.parent) {
return null;
return [];
}
return getClosest(this.parent, (s) => {
let found = null;
if (s instanceof SceneTimeRangeCompare) {
return s;
const found: Array<SceneRequestAdder<any>> = [];
if (isRequestAdder(s)) {
found.push(s);
}
s.forEachChild((child) => {
if (child instanceof SceneTimeRangeCompare) {
found = child;
if (isRequestAdder(child)) {
found.push(child);
}
});

return found;
});
return found.length > 0 ? found : null;
}) ?? [];
}

/**
Expand Down
31 changes: 31 additions & 0 deletions packages/scenes/src/querying/SceneRequestAdder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { DataQueryRequest, PanelData } from "@grafana/data";

import { SceneObjectBase } from "../core/SceneObjectBase";
import { SceneObjectState } from "../core/types";

// A transformation function called by the query runner with responses
// to any extra requests.
//
// See the docs for `extraRequestProcessingOperator` for more information.
export type TransformFunc = (primary: PanelData, secondary: PanelData) => PanelData;

// An extra request that should be run by a query runner, and an optional
// transform that should be called with the response data.
export interface ExtraRequest {
// The request.
req: DataQueryRequest;
// An optional transformation function.
transform?: TransformFunc;
}

// Indicates that this type wants to add extra requests to a query runner.
export interface SceneRequestAdder<T extends SceneObjectState> extends SceneObjectBase<T> {
// Get any extra requests and their required transformations.
getExtraRequests(request: DataQueryRequest): ExtraRequest[];
// Determine whether a query should be rerun.
shouldRerun(prev: T, next: T): boolean;
}

export function isRequestAdder(obj: any): obj is SceneRequestAdder<any> {
return typeof obj === 'object' && 'getExtraRequests' in obj;
}
33 changes: 33 additions & 0 deletions packages/scenes/src/querying/extraRequestProcessingOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { PanelData } from '@grafana/data';
import { map, Observable } from 'rxjs';
import { TransformFunc } from './SceneRequestAdder';

// Passthrough transformation for use with ExtraRequests.
export const passthroughTransform: TransformFunc = (_, secondary) => secondary;

// Factory function which takes a map from request ID to transform functions and
// returns an rxjs operator which operates on an array of panel data responses.
// The responses must have length at least 2; the first is treated as the 'primary'
// response and the rest as secondary responses.
//
// Each secondary response is transformed according to the transform function
// identified by it's request ID. The transform function is passed the primary
// response and the secondary response to be processed.
//
// The output is a single frame with the primary series and all transformed
// secondary series combined.
export const extraRequestProcessingOperator = (transforms: Map<string, TransformFunc>) =>
(data: Observable<[PanelData, PanelData, ...PanelData[]]>) => {
return data.pipe(
map(([primary, ...secondaries]) => {
const frames = secondaries.flatMap((s) => {
const transformed = transforms.get(s.request!.requestId)?.(primary, s) ?? s;
return transformed.series;
});
return {
...primary,
series: [...primary.series, ...frames],
};
})
);
}
46 changes: 0 additions & 46 deletions packages/scenes/src/querying/timeShiftQueryResponseOperator.ts

This file was deleted.

0 comments on commit 7f75cef

Please sign in to comment.