Skip to content

Commit

Permalink
SceneQueryRunner: allow supplemental queries and processors to be rer…
Browse files Browse the repository at this point in the history
…un separately

Followup from [this comment](#587 (comment))
- this adds the option for SupplementalRequestProviders to state that they
only wish to have their processor rerun (probably with different state),
rather than the QueryRunner rerunning both the query _and_ the processor.
  • Loading branch information
sd2k committed May 24, 2024
1 parent 374bda5 commit 49363a6
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 56 deletions.
2 changes: 1 addition & 1 deletion packages/scenes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export { SceneTimeRange } from './core/SceneTimeRange';
export { SceneTimeZoneOverride } from './core/SceneTimeZoneOverride';

export { SceneQueryRunner, type QueryRunnerState } from './querying/SceneQueryRunner';
export { type SupplementaryRequest, type SupplementaryRequestProvider, type ProcessorFunc } from './querying/SupplementaryRequestProvider';
export { type SupplementaryRequest, type SupplementaryRequestProvider, type ProcessorFunc, type ShouldRerun } from './querying/SupplementaryRequestProvider';
export { SceneDataLayerSet, SceneDataLayerSetBase } from './querying/SceneDataLayerSet';
export { SceneDataLayerBase } from './querying/layers/SceneDataLayerBase';
export { SceneDataLayerControls } from './querying/layers/SceneDataLayerControls';
Expand Down
94 changes: 72 additions & 22 deletions packages/scenes/src/querying/SceneQueryRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { cloneDeep, isEqual } from 'lodash';
import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs';
import { forkJoin, map, ReplaySubject, Unsubscribable } from 'rxjs';

import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema';

Expand Down Expand Up @@ -35,7 +35,6 @@ import { VariableValueRecorder } from '../variables/VariableValueRecorder';
import { emptyPanelData } from '../core/SceneDataNode';
import { getClosest } from '../core/sceneGraph/utils';
import { isSupplementaryRequestProvider, ProcessorFunc, SupplementaryRequestProvider } from './SupplementaryRequestProvider';
import { passthroughProcessor, extraRequestProcessingOperator } from './extraRequestProcessingOperator';
import { filterAnnotations } from './layers/annotations/filterAnnotations';
import { getEnrichedDataRequest } from './getEnrichedDataRequest';
import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters';
Expand Down Expand Up @@ -75,6 +74,9 @@ export interface DataQueryExtended extends DataQuery {
timeRangeCompare?: boolean;
}

// Passthrough processor for secondary requests which don't define a processor.
const passthroughProcessor: ProcessorFunc = (_, secondary) => secondary;

export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implements SceneDataProvider {
private _querySub?: Unsubscribable;
private _dataLayersSub?: Unsubscribable;
Expand All @@ -88,6 +90,13 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
private _layerAnnotations?: DataFrame[];
private _resultAnnotations?: DataFrame[];

// The results of the latest query before it was processed by the supplementary request providers.
private _unprocessedResults = new ReplaySubject<[PanelData, ...PanelData[]]>(1);
// The subscription to the unprocessed results.
private _unprocessedSub?: Unsubscribable;
// The processors provided by the supplementary request providers.
private _processors?: Map<string, ProcessorFunc>;

private _adhocFiltersVar?: AdHocFiltersVariable;
private _groupByVar?: GroupByVariable;

Expand Down Expand Up @@ -116,8 +125,13 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
for (const provider of providers) {
this._subs.add(
provider.subscribeToState((n, p) => {
if (provider.shouldRerun(p, n)) {
const shouldRerun = provider.shouldRerun(p, n);
if (shouldRerun === true || shouldRerun === 'queries') {
// don't explicitly run processors here, that's done automatically
// as part of `this.runQueries`.
this.runQueries();
} else if (shouldRerun === 'processors') {
this.runProcessors();
}
})
)
Expand Down Expand Up @@ -298,6 +312,10 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._dataLayersSub = undefined;
}

if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}

this._timeSub?.unsubscribe();
this._timeSub = undefined;
this._timeSubRange = undefined;
Expand Down Expand Up @@ -345,13 +363,22 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
this._timeSubRange = timeRange;
this._timeSub = timeRange.subscribeToState(() => {
this.runWithTimeRange(timeRange);
this.runProcessors();
});
}

public runQueries() {
const timeRange = sceneGraph.getTimeRange(this);
this.subscribeToTimeRangeChanges(timeRange);
this.runWithTimeRange(timeRange);
this.runProcessors();
}

private runProcessors() {
if (this._unprocessedSub) {
this._unprocessedSub.unsubscribe();
}
this._unprocessedSub = this._unprocessedResults.subscribe((x) => this.processResults(x));
}

private getMaxDataPoints() {
Expand Down Expand Up @@ -411,32 +438,38 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen

const runRequest = getRunRequest();
const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds);
this._processors = processors;

writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key);

let stream = runRequest(ds, primary);

if (secondaries.length > 0) {
// Submit all secondary requests in parallel.
const secondaryStreams = secondaries.map((r) => runRequest(ds, r));
// Create the rxjs operator which will combine the primary and secondary responses
// by calling the correct processor functions provided by the
// supplementary request providers.
const op = extraRequestProcessingOperator(processors);
// Combine the primary and secondary streams into a single stream, and apply the operator.
stream = forkJoin([stream, ...secondaryStreams]).pipe(op);
}

stream = stream.pipe(
registerQueryWithController({
let primaryStream = runRequest(ds, primary)
.pipe(registerQueryWithController({
type: 'data',
request: primary,
origin: this,
cancel: () => this.cancelQuery(),
})
);

this._querySub = stream.subscribe(this.onDataReceived);
}));

if (secondaries.length === 0) {
this._querySub = primaryStream
.pipe(map((data) => [data] as [PanelData, ...PanelData[]]))
.subscribe((data) => {
this._unprocessedResults.next(data);
});
} else {
const secondaryStreams = secondaries.map((r) => runRequest(ds, r)
.pipe(registerQueryWithController({
type: 'data',
request: r,
origin: this,
cancel: () => this.cancelQuery(),
})))
;
const stream = forkJoin([primaryStream, ...secondaryStreams]);
this._querySub = stream.subscribe((data) => {
this._unprocessedResults.next(data);
});
}
} catch (err) {
console.error('PanelQueryRunner Error', err);

Expand All @@ -449,6 +482,23 @@ export class SceneQueryRunner extends SceneObjectBase<QueryRunnerState> implemen
}
}

private processResults(data: [PanelData, ...PanelData[]]) {
const [primary, ...secondaries] = data;
if (this._processors === undefined || secondaries.length === 0) {
return this.onDataReceived(primary);
}
const processedSecondaries = secondaries.map((s) => this._processors!.get(s.request!.requestId)?.(primary, s) ?? s);
const processed = {
...primary,
series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)],
annotations: [
...(primary.annotations ?? []),
...processedSecondaries.flatMap((s) => s.annotations ?? []),
],
};
this.onDataReceived(processed);
}

public clone(withState?: Partial<QueryRunnerState>) {
const clone = super.clone(withState);

Expand Down
16 changes: 14 additions & 2 deletions packages/scenes/src/querying/SupplementaryRequestProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ export interface SupplementaryRequest {
processor?: ProcessorFunc;
}

// Whether supplemental queries, providers, or neither should be rerun as the result
// of a state change.
//
// Returning `true` or 'queries' will cause the query runner to completely rerun all queries
// _and_ processors.
// Returning 'processors' will avoid rerunning queries, and pass the most
// recent (unprocessed) query results to the processors again for reprocessing. This allows
// the processors to process differently depending on their most recent state, without incurring
// the cost of a query.
// Returning `false` will not rerun queries or processors.
export type ShouldRerun = boolean | 'queries' | 'processors';

// Indicates that this type wants to add supplementary requests, along with
// optional processing functions, to a query runner.
export interface SupplementaryRequestProvider<T extends SceneObjectState> extends SceneObjectBase<T> {
Expand All @@ -37,8 +49,8 @@ export interface SupplementaryRequestProvider<T extends SceneObjectState> extend
//
// When the provider's state changes this function will be passed both the previous and the
// next state. The implementation can use this to determine whether the change should trigger
// a rerun of the query or not.
shouldRerun(prev: T, next: T): boolean;
// a rerun of the queries, processors or neither.
shouldRerun(prev: T, next: T): ShouldRerun;
}

export function isSupplementaryRequestProvider(obj: any): obj is SupplementaryRequestProvider<any> {
Expand Down
31 changes: 0 additions & 31 deletions packages/scenes/src/querying/extraRequestProcessingOperator.ts

This file was deleted.

0 comments on commit 49363a6

Please sign in to comment.