Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SceneQueryRunner: allow extra queries and processors to be rerun separately #748

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/scenes/src/components/SceneTimeRangeCompare.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { sceneGraph } from '../core/sceneGraph';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { SceneComponentProps, SceneObjectState, SceneObjectUrlValues } from '../core/types';
import { DataQueryExtended } from '../querying/SceneQueryRunner';
import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider } from '../querying/ExtraQueryProvider';
import { ExtraQueryDescriptor, ExtraQueryDataProcessor, ExtraQueryProvider, ExtraQueryShouldRerun } from '../querying/ExtraQueryProvider';
import { SceneObjectUrlSyncConfig } from '../services/SceneObjectUrlSyncConfig';
import { getCompareSeriesRefId } from '../utils/getCompareSeriesRefId';
import { parseUrlParam } from '../utils/parseUrlParam';
Expand Down Expand Up @@ -118,7 +118,7 @@ export class SceneTimeRangeCompare
}

// The query runner should rerun the comparison query if the compareWith value has changed.
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): boolean {
public shouldRerun(prev: SceneTimeRangeCompareState, next: SceneTimeRangeCompareState): ExtraQueryShouldRerun {
return prev.compareWith !== next.compareWith;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/scenes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange';
export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride';

export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner';
export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor } from './querying/ExtraQueryProvider';
export { type ExtraQueryDescriptor, type ExtraQueryProvider, type ExtraQueryDataProcessor, type ExtraQueryShouldRerun } from './querying/ExtraQueryProvider';
export { SceneDataLayerSet, SceneDataLayerSetBase } from './querying/SceneDataLayerSet';
export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase';
export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls';
Expand Down
16 changes: 14 additions & 2 deletions packages/scenes/src/querying/ExtraQueryProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ export interface ExtraQueryDescriptor {
processor?: ExtraQueryDataProcessor;
}

// Whether extra queries, providers, or neither should be rerun as the result
// of a state change.
//
// Returning `true` or 'queries' will cause the query runner to completely rerun all queries
// _and_ processors.
// Returning 'processors' will avoid rerunning queries, and pass the most
// recent (unprocessed) query results to the processors again for reprocessing. This allows
// the processors to process differently depending on their most recent state, without incurring
// the cost of a query.
// Returning `false` will not rerun queries or processors.
export type ExtraQueryShouldRerun = boolean | 'queries' | 'processors';

// Indicates that this type wants to add extra requests, along with
// optional processing functions, to a query runner.
export interface ExtraQueryProvider<T extends SceneObjectState> extends SceneObjectBase<T> {
Expand All @@ -38,8 +50,8 @@ export interface ExtraQueryProvider<T extends SceneObjectState> extends SceneObj
//
// When the provider's state changes this function will be passed both the previous and the
// next state. The implementation can use this to determine whether the change should trigger
// a rerun of the query or not.
shouldRerun(prev: T, next: T): boolean;
// a rerun of the queries, processors or neither.
shouldRerun(prev: T, next: T): ExtraQueryShouldRerun;
}

export function isExtraQueryProvider(obj: any): obj is ExtraQueryProvider<any> {
Expand Down
62 changes: 57 additions & 5 deletions packages/scenes/src/querying/SceneQueryRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { activateFullSceneTree } from '../utils/test/activateFullSceneTree';
import { SceneDeactivationHandler, SceneObjectState } from '../core/types';
import { LocalValueVariable } from '../variables/variants/LocalValueVariable';
import { SceneObjectBase } from '../core/SceneObjectBase';
import { ExtraQueryDescriptor, ExtraQueryProvider } from './ExtraQueryProvider';
import { ExtraQueryDescriptor, ExtraQueryProvider, ExtraQueryShouldRerun } from './ExtraQueryProvider';

const getDataSourceMock = jest.fn().mockReturnValue({
uid: 'test-uid',
Expand Down Expand Up @@ -1191,6 +1191,48 @@ describe('SceneQueryRunner', () => {

expect(runRequestMock.mock.calls.length).toEqual(2);
});

test('should run extra processors, but not queries, when providers declare it', async () => {
const timeRange = new SceneTimeRange({
from: '2023-08-24T05:00:00.000Z',
to: '2023-08-24T07:00:00.000Z',
});

const queryRunner = new SceneQueryRunner({
queries: [{ refId: 'A' }],
});
const provider = new TestExtraQueryProvider({ foo: 1 }, 'processors');
const scene = new EmbeddedScene({
$timeRange: timeRange,
$data: queryRunner,
controls: [provider],
body: new SceneCanvasText({ text: 'hello' }),
});

// activate the scene, which will also activate the provider
// and the provider will run the extra request
scene.activate();
await new Promise((r) => setTimeout(r, 1));

expect(runRequestMock.mock.calls.length).toEqual(2);
let runRequestCall = runRequestMock.mock.calls[0];
let extraRunRequestCall = runRequestMock.mock.calls[1];
expect(runRequestCall[1].targets[0].refId).toEqual('A');
expect(extraRunRequestCall[1].targets[0].refId).toEqual('Extra');
expect(extraRunRequestCall[1].targets[0].foo).toEqual(1);
expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(1);

// change the state of the provider, which will trigger the activation
// handler to run the processor again. The provider will
// return 'processors' from shouldRun, so we should not see any more queries.
provider.setState({ foo: 2 });
await new Promise((r) => setTimeout(r, 1));

expect(runRequestMock.mock.calls.length).toEqual(2);

// we _should_ see that the processor has rerun and updated the data, however.
expect(queryRunner.state.data?.series[3].fields[0].values[0]).toEqual(2);
});
});

describe('time frame comparison', () => {
Expand Down Expand Up @@ -2308,9 +2350,9 @@ interface TestExtraQueryProviderState extends SceneObjectState {
}

class TestExtraQueryProvider extends SceneObjectBase<TestExtraQueryProviderState> implements ExtraQueryProvider<{}> {
private _shouldRerun: boolean;
private _shouldRerun: ExtraQueryShouldRerun;

public constructor(state: { foo: number }, shouldRerun: boolean) {
public constructor(state: { foo: number }, shouldRerun: ExtraQueryShouldRerun) {
super(state);
this._shouldRerun = shouldRerun;
}
Expand All @@ -2324,11 +2366,21 @@ class TestExtraQueryProvider extends SceneObjectBase<TestExtraQueryProviderState
{ refId: 'Extra', foo: this.state.foo },
],
},
processor: (primary, secondary) => of({ ...primary, ...secondary }),
processor: (primary, secondary) => {
return of({
...primary,
...secondary,
series: [...primary.series, ...secondary.series, {
fields: [{ name: "foo", values: [this.state.foo], config: {}, type: FieldType.number }],
length: 1,
}],
});
},
},
];
}
public shouldRerun(prev: {}, next: {}): boolean {

public shouldRerun(prev: {}, next: {}): ExtraQueryShouldRerun {
return this._shouldRerun;
}
}
94 changes: 72 additions & 22 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { cloneDeep, isEqual } from 'lodash';
import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs';
import { forkJoin, map, mergeMap, Observable, of, ReplaySubject, Unsubscribable } from 'rxjs';

import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema';

Expand Down Expand Up @@ -36,7 +36,6 @@ import { VariableValueRecorder } from '../variables/VariableValueRecorder';
import { emptyPanelData } from '../core/SceneDataNode';
import { getClosest } from '../core/sceneGraph/utils';
import { isExtraQueryProvider, ExtraQueryDataProcessor, ExtraQueryProvider } from './ExtraQueryProvider';
import { passthroughProcessor, extraQueryProcessingOperator } from './extraQueryProcessingOperator';
import { filterAnnotations } from './layers/annotations/filterAnnotations';
import { getEnrichedDataRequest } from './getEnrichedDataRequest';
import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters';
Expand Down Expand Up @@ -98,6 +97,9 @@ interface PreparedRequests {
processors: Map<string, ExtraQueryDataProcessor>;
}

// Passthrough processor for secondary requests which don't define a processor.
const passthroughProcessor: ExtraQueryDataProcessor = (_, secondary) => of(secondary);

export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implements SceneDataProvider {
private _querySub?: Unsubscribable;
private _dataLayersSub?: Unsubscribable;
Expand All @@ -111,6 +113,14 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
private _layerAnnotations?: DataFrame[];
private _resultAnnotations?: DataFrame[];

// The results of the latest query before it was processed by any extra query providers.
private _unprocessedResults = new ReplaySubject<[PanelData, ...PanelData[]]>(1);
// The subscription to the unprocessed results.
private _unprocessedSub?: Unsubscribable;
// The processors provided by any extra query providers.
// The key is the request ID of the secondary request.
private _processors?: Map<string, ExtraQueryDataProcessor>;

private _adhocFiltersVar?: AdHocFiltersVariable;
private _groupByVar?: GroupByVariable;

Expand Down Expand Up @@ -139,8 +149,13 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
for (const provider of providers) {
this._subs.add(
provider.subscribeToState((n, p) => {
if (provider.shouldRerun(p, n)) {
const shouldRerun = provider.shouldRerun(p, n);
if (shouldRerun === true || shouldRerun === 'queries') {
// don't explicitly run processors here, that's done automatically
// as part of `this.runQueries`.
this.runQueries();
} else if (shouldRerun === 'processors') {
this.runProcessors();
}
})
);
Expand Down Expand Up @@ -321,6 +336,10 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._dataLayersSub = undefined;
}

if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}

this._timeSub?.unsubscribe();
this._timeSub = undefined;
this._timeSubRange = undefined;
Expand Down Expand Up @@ -368,13 +387,24 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._timeSubRange = timeRange;
this._timeSub = timeRange.subscribeToState(() => {
this.runWithTimeRange(timeRange);
this.runProcessors();
});
}

public runQueries() {
const timeRange = sceneGraph.getTimeRange(this);
this.subscribeToTimeRangeChanges(timeRange);
this.runWithTimeRange(timeRange);
this.runProcessors();
}

private runProcessors() {
if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}
this._unprocessedSub = this._unprocessedResults
.pipe((data) => this.processResults(data))
.subscribe((data) => this.onDataReceived(data));
}

private getMaxDataPoints() {
Expand Down Expand Up @@ -435,32 +465,33 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen

const runRequest = getRunRequest();
const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds);
this._processors = processors;

writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key);

let stream = runRequest(ds, primary);

if (secondaries.length > 0) {
// Submit all secondary requests in parallel.
const secondaryStreams = secondaries.map((r) => runRequest(ds, r));
// Create the rxjs operator which will combine the primary and secondary responses
// by calling the correct processor functions provided by the
// extra request providers.
const op = extraQueryProcessingOperator(processors);
// Combine the primary and secondary streams into a single stream, and apply the operator.
stream = forkJoin([stream, ...secondaryStreams]).pipe(op);
}

stream = stream.pipe(
registerQueryWithController({
let primaryStream = runRequest(ds, primary)
.pipe(registerQueryWithController({
type: 'data',
request: primary,
origin: this,
cancel: () => this.cancelQuery(),
})
);

this._querySub = stream.subscribe(this.onDataReceived);
}));

if (secondaries.length === 0) {
this._querySub = primaryStream
.pipe(map((data) => [data] as [PanelData, ...PanelData[]]))
.subscribe((data) => this._unprocessedResults.next(data));
} else {
const secondaryStreams = secondaries.map((r) => runRequest(ds, r)
.pipe(registerQueryWithController({
type: 'data',
request: r,
origin: this,
cancel: () => this.cancelQuery(),
})));
const stream = forkJoin([primaryStream, ...secondaryStreams]);
this._querySub = stream.subscribe((data) => this._unprocessedResults.next(data));
}
} catch (err) {
console.error('PanelQueryRunner Error', err);

Expand All @@ -473,6 +504,25 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}
}

private processResults(data: Observable<[PanelData, ...PanelData[]]>): Observable<PanelData> {
return data.pipe(
mergeMap(([primary, ...secondaries]: [PanelData, ...PanelData[]]) => {
if (this._processors === undefined || secondaries.length === 0) {
return of([primary]);
}
const processedSecondaries = secondaries.flatMap((s) => {
return this._processors!.get(s.request!.requestId)?.(primary, s) ?? of(s);
});
return forkJoin([of(primary), ...processedSecondaries]);
}),
map(([primary, ...processedSecondaries]) => ({
...primary,
series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)],
annotations: [...(primary.annotations ?? []), ...processedSecondaries.flatMap((s) => s.annotations ?? [])],
}))
)
}

public clone(withState?: Partial<QueryRunnerState>) {
const clone = super.clone(withState);

Expand Down
32 changes: 0 additions & 32 deletions packages/scenes/src/querying/extraQueryProcessingOperator.ts

This file was deleted.

Loading