Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SceneQueryRunner: decouple time range comparisons #587

Merged
merged 15 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions packages/scenes/src/components/SceneTimeRangeCompare.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { DateTime, dateTime, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data';
import { DataQueryRequest, DateTime, dateTime, FieldType, GrafanaTheme2, rangeUtil, TimeRange } from '@grafana/data';
import { config } from '@grafana/runtime';
import { ButtonGroup, ButtonSelect, Checkbox, ToolbarButton, useStyles2 } from '@grafana/ui';
import React from 'react';
import { sceneGraph } from '../core/sceneGraph';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types';
import { DataQueryExtended } from '../querying/SceneQueryRunner';
import { ExtraRequest, ProcessorFunc, SceneRequestSupplementer } from '../querying/SceneRequestAdder';
import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig';
import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId';
import { parseUrlParam } from '../utils/parseUrlParam';
import { css } from '@emotion/css';

export interface TimeRangeCompareProvider {
getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined;
}

interface SceneTimeRangeCompareState extends SceneObjectState {
compareWith?: string;
compareOptions: Array<{ label: string; value: string }>;
Expand All @@ -38,8 +38,8 @@ export const DEFAULT_COMPARE_OPTIONS = [

export class SceneTimeRangeCompare
extends SceneObjectBase<SceneTimeRangeCompareState>
implements TimeRangeCompareProvider
{
implements SceneRequestSupplementer<SceneTimeRangeCompareState> {

static Component = SceneTimeRangeCompareRenderer;
protected _urlSync = new SceneObjectUrlSyncConfig(this, { keys: ['compareWith'] });

Expand Down Expand Up @@ -94,6 +94,33 @@ export class SceneTimeRangeCompare
this.setState({ compareWith: undefined });
};

// Get a time shifted request to compare with the primary request.
public getSupplementalRequests(request: DataQueryRequest): ExtraRequest[] {
const extraRequests: ExtraRequest[] = [];
const compareRange = this.getCompareTimeRange(request.range);
if (!compareRange) {
return extraRequests;
}

const targets = request.targets.filter((query: DataQueryExtended) => query.timeRangeCompare !== false);
if (targets.length) {
extraRequests.push({
req: {
...request,
targets,
range: compareRange,
},
processor: timeShiftAlignmentProcessor,
});
}
return extraRequests;
}

// The query runner should rerun the comparison query if the compareWith value has changed.
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean {
return prev.compareWith !== next.compareWith;
}

public getCompareTimeRange(timeRange: TimeRange): TimeRange | undefined {
let compareFrom: DateTime;
let compareTo: DateTime;
Expand Down Expand Up @@ -149,6 +176,43 @@ export class SceneTimeRangeCompare
}
}

// Processor function for use with time shifted comparison series.
// This aligns the secondary series with the primary and adds custom
// metadata and config to the secondary series' fields so that it is
// rendered appropriately.
const timeShiftAlignmentProcessor: ProcessorFunc = (primary, secondary) => {
const diff = secondary.timeRange.from.diff(primary.timeRange.from);
secondary.series.forEach((series) => {
series.refId = getCompareSeriesRefId(series.refId || '');
series.meta = {
...series.meta,
// @ts-ignore Remove when https://github.com/grafana/grafana/pull/71129 is released
timeCompare: {
diffMs: diff,
isTimeShiftQuery: true,
},
};
series.fields.forEach((field) => {
// Align compare series time stamps with reference series
if (field.type === FieldType.time) {
field.values = field.values.map((v) => {
return diff < 0 ? v - diff : v + diff;
});
}

field.config = {
...field.config,
color: {
mode: 'fixed',
fixedColor: config.theme.palette.gray60,
},
};
return field;
});
});
return secondary;
}

function SceneTimeRangeCompareRenderer({ model }: SceneComponentProps<SceneTimeRangeCompare>) {
const styles = useStyles2(getStyles);
const { compareWith, compareOptions } = model.useState();
Expand Down
1 change: 1 addition & 0 deletions packages/scenes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange';
export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride';

export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner';
export { type ExtraRequest, type SceneRequestSupplementer, type ProcessorFunc } from './querying/SceneRequestAdder';
export { SceneDataLayerSet, SceneDataLayerSetBase } from './querying/SceneDataLayerSet';
export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase';
export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls';
Expand Down
1 change: 1 addition & 0 deletions packages/scenes/src/querying/SceneQueryRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const runRequestMock = jest.fn().mockImplementation((ds: DataSourceApi, request:
state: LoadingState.Loading,
series: [],
annotations: [],
request,
timeRange: request.range,
};

Expand Down
89 changes: 40 additions & 49 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import { VariableDependencyConfig } from '../variables/VariableDependencyConfig'
import { writeSceneLog } from '../utils/writeSceneLog';
import { VariableValueRecorder } from '../variables/VariableValueRecorder';
import { emptyPanelData } from '../core/SceneDataNode';
import { SceneTimeRangeCompare } from '../components/SceneTimeRangeCompare';
import { getClosest } from '../core/sceneGraph/utils';
import { timeShiftQueryResponseOperator } from './timeShiftQueryResponseOperator';
import { isRequestAdder, ProcessorFunc, SceneRequestSupplementer } from './SceneRequestAdder';
import { passthroughProcessor, extraRequestProcessingOperator } from './extraRequestProcessingOperator';
import { filterAnnotations } from './layers/annotations/filterAnnotations';
import { getEnrichedDataRequest } from './getEnrichedDataRequest';
import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters';
Expand Down Expand Up @@ -109,16 +109,15 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen

private _onActivate() {
const timeRange = sceneGraph.getTimeRange(this);
const comparer = this.getTimeCompare();

if (comparer) {
const adders = this.getClosestRequestAdders();
for (const adder of adders.values()) {
this._subs.add(
comparer.subscribeToState((n, p) => {
if (n.compareWith !== p.compareWith) {
adder.subscribeToState((n, p) => {
if (adder.shouldRerun(p, n)) {
this.runQueries();
}
})
);
)
}

this.subscribeToTimeRangeChanges(timeRange);
Expand Down Expand Up @@ -408,21 +407,24 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this.findAndSubscribeToAdHocFilters(datasource?.uid);

const runRequest = getRunRequest();
const [request, secondaryRequest] = this.prepareRequests(timeRange, ds);
const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds);

writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key);

let stream = runRequest(ds, request);
let stream = runRequest(ds, primary);

if (secondaryRequest) {
if (secondaries.length > 0) {
const [sReq, ...otherSReqs] = secondaries;
const secondaryStreams = otherSReqs.map((r) => runRequest(ds, r));
// change subscribe callback below to pipe operator
stream = forkJoin([stream, runRequest(ds, secondaryRequest)]).pipe(timeShiftQueryResponseOperator);
const op = extraRequestProcessingOperator(processors);
stream = forkJoin([stream, runRequest(ds, sReq), ...secondaryStreams]).pipe(op);
}

stream = stream.pipe(
registerQueryWithController({
type: 'data',
request,
request: primary,
origin: this,
cancel: () => this.cancelQuery(),
})
Expand Down Expand Up @@ -462,12 +464,9 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
private prepareRequests = (
timeRange: SceneTimeRangeLike,
ds: DataSourceApi
): [DataQueryRequest, DataQueryRequest | undefined] => {
const comparer = this.getTimeCompare();
): { primary: DataQueryRequest, secondaries: DataQueryRequest[], processors: Map<string, ProcessorFunc> } => {
sd2k marked this conversation as resolved.
Show resolved Hide resolved
const { minInterval, queries } = this.state;

let secondaryRequest: DataQueryRequest | undefined;

let request: DataQueryRequest<DataQueryExtended> = {
app: 'scenes',
requestId: getNextRequestId(),
Expand Down Expand Up @@ -530,28 +529,17 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
request.intervalMs = norm.intervalMs;

const primaryTimeRange = timeRange.state.value;
if (comparer) {
const secondaryTimeRange = comparer.getCompareTimeRange(primaryTimeRange);
if (secondaryTimeRange) {
const secondaryTargets = request.targets.filter((query: DataQueryExtended) => query.timeRangeCompare !== false);

if (secondaryTargets.length) {
secondaryRequest = {
...request,
targets: secondaryTargets,
range: secondaryTimeRange,
requestId: getNextRequestId(),
};
}

request = {
...request,
range: primaryTimeRange,
};
let secondaryRequests: DataQueryRequest[] = [];
let secondaryProcessors = new Map();
for (const adder of this.getClosestRequestAdders().values() ?? []) {
for (const { req, processor } of adder.getSupplementalRequests(request)) {
const requestId = getNextRequestId();
secondaryRequests.push({ ...req, requestId })
secondaryProcessors.set(requestId, processor ?? passthroughProcessor);
}
}

return [request, secondaryRequest];
request.range = primaryTimeRange;
return { primary: request, secondaries: secondaryRequests, processors: secondaryProcessors };
};

private onDataReceived = (data: PanelData) => {
Expand Down Expand Up @@ -593,26 +581,29 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}

/**
* Will walk up the scene graph and find the closest time range compare object
* It performs buttom-up search, including shallow search across object children for supporting controls/header actions
* Walk up the scene graph and find any request adders.
*
* This will return a map from id to the closest adder for each id.
*/
private getTimeCompare() {
private getClosestRequestAdders(): Map<string, SceneRequestSupplementer<any>> {
sd2k marked this conversation as resolved.
Show resolved Hide resolved
const found = new Map();
if (!this.parent) {
return null;
return new Map();
}
return getClosest(this.parent, (s) => {
let found = null;
if (s instanceof SceneTimeRangeCompare) {
return s;
getClosest(this.parent, (s) => {
if (isRequestAdder(s) && !found.has(s.constructor.name)) {
found.set(s.constructor.name, s);
}
s.forEachChild((child) => {
if (child instanceof SceneTimeRangeCompare) {
found = child;
if (isRequestAdder(child) && !found.has(child.constructor.name)) {
found.set(child.constructor.name, child);
}
sd2k marked this conversation as resolved.
Show resolved Hide resolved
});

return found;
// Always return null so that the search continues to the top of
// the scene graph.
return null;
});
return found;
}

/**
Expand Down
32 changes: 32 additions & 0 deletions packages/scenes/src/querying/SceneRequestAdder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { DataQueryRequest, PanelData } from "@grafana/data";

import { SceneObjectBase } from "../core/SceneObjectBase";
import { SceneObjectState } from "../core/types";

// A processor function called by the query runner with responses
// to any extra requests.
//
// See the docs for `extraRequestProcessingOperator` for more information.
export type ProcessorFunc = (primary: PanelData, secondary: PanelData) => PanelData;

// An extra request that should be run by a query runner, and an optional
// processor that should be called with the response data.
export interface ExtraRequest {
// The request.
req: DataQueryRequest;
// An optional function used to process the data before passing it
// to any transformations or visualizations.
processor?: ProcessorFunc;
}

// Indicates that this type wants to add extra requests to a query runner.
export interface SceneRequestSupplementer<T extends SceneObjectState> extends SceneObjectBase<T> {
sd2k marked this conversation as resolved.
Show resolved Hide resolved
// Get any supplemental requests and their required processors.
getSupplementalRequests(request: DataQueryRequest): ExtraRequest[];
// Determine whether a query should be rerun.
shouldRerun(prev: T, next: T): boolean;
}

export function isRequestAdder(obj: any): obj is SceneRequestSupplementer<any> {
return typeof obj === 'object' && 'getSupplementalRequests' in obj;
}
33 changes: 33 additions & 0 deletions packages/scenes/src/querying/extraRequestProcessingOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { PanelData } from '@grafana/data';
import { map, Observable } from 'rxjs';
import { ProcessorFunc } from './SceneRequestAdder';

// Passthrough processor for use with ExtraRequests.
export const passthroughProcessor: ProcessorFunc = (_, secondary) => secondary;

// Factory function which takes a map from request ID to processor functions and
// returns an rxjs operator which operates on an array of panel data responses.
// The responses must have length at least 2; the first is treated as the 'primary'
// response and the rest as secondary responses.
//
// Each secondary response is transformed according to the processor function
// identified by it's request ID. The processor function is passed the primary
// response and the secondary response to be processed.
//
// The output is a single frame with the primary series and all processed
// secondary series combined.
export const extraRequestProcessingOperator = (processors: Map<string, ProcessorFunc>) =>
(data: Observable<[PanelData, PanelData, ...PanelData[]]>) => {
return data.pipe(
map(([primary, ...secondaries]) => {
const frames = secondaries.flatMap((s) => {
const processed = processors.get(s.request!.requestId)?.(primary, s) ?? s;
return processed.series;
});
return {
...primary,
series: [...primary.series, ...frames],
};
})
);
}
46 changes: 0 additions & 46 deletions packages/scenes/src/querying/timeShiftQueryResponseOperator.ts

This file was deleted.

Loading