diff --git a/packages/scenes/src/querying/SceneQueryRunner.ts b/packages/scenes/src/querying/SceneQueryRunner.ts index 81e62b3fc..0c23784f2 100644 --- a/packages/scenes/src/querying/SceneQueryRunner.ts +++ b/packages/scenes/src/querying/SceneQueryRunner.ts @@ -1,5 +1,5 @@ import { cloneDeep, isEqual } from 'lodash'; -import { forkJoin, ReplaySubject, Unsubscribable } from 'rxjs'; +import { forkJoin, map, ReplaySubject, Unsubscribable } from 'rxjs'; import { DataQuery, DataSourceRef, LoadingState } from '@grafana/schema'; @@ -35,7 +35,6 @@ import { VariableValueRecorder } from '../variables/VariableValueRecorder'; import { emptyPanelData } from '../core/SceneDataNode'; import { getClosest } from '../core/sceneGraph/utils'; import { isSupplementaryRequestProvider, ProcessorFunc, SupplementaryRequestProvider } from './SupplementaryRequestProvider'; -import { passthroughProcessor, extraRequestProcessingOperator } from './extraRequestProcessingOperator'; import { filterAnnotations } from './layers/annotations/filterAnnotations'; import { getEnrichedDataRequest } from './getEnrichedDataRequest'; import { findActiveAdHocFilterVariableByUid } from '../variables/adhoc/patchGetAdhocFilters'; @@ -75,6 +74,9 @@ export interface DataQueryExtended extends DataQuery { timeRangeCompare?: boolean; } +// Passthrough processor for secondary requests which don't define a processor. +const passthroughProcessor: ProcessorFunc = (_, secondary) => secondary; + export class SceneQueryRunner extends SceneObjectBase implements SceneDataProvider { private _querySub?: Unsubscribable; private _dataLayersSub?: Unsubscribable; @@ -88,6 +90,13 @@ export class SceneQueryRunner extends SceneObjectBase implemen private _layerAnnotations?: DataFrame[]; private _resultAnnotations?: DataFrame[]; + // The results of the latest query before it was processed by the supplementary request providers. + private _unprocessedResults = new ReplaySubject<[PanelData, ...PanelData[]]>(1); + // The subscription to the unprocessed results. + private _unprocessedSub?: Unsubscribable; + // The processors provided by the supplementary request providers. + private _processors?: Map; + private _adhocFiltersVar?: AdHocFiltersVariable; private _groupByVar?: GroupByVariable; @@ -116,8 +125,15 @@ export class SceneQueryRunner extends SceneObjectBase implemen for (const provider of providers) { this._subs.add( provider.subscribeToState((n, p) => { - if (provider.shouldRerun(p, n)) { + const shouldRerun = provider.shouldRerun(p, n); + if (typeof shouldRerun === 'boolean') { + if (shouldRerun) { + this.runQueries(); + } + } else if (shouldRerun.query) { this.runQueries(); + } else if (shouldRerun.processor) { + this.runProcessors(); } }) ) @@ -298,6 +314,10 @@ export class SceneQueryRunner extends SceneObjectBase implemen this._dataLayersSub = undefined; } + if (this._unprocessedSub) { + this._unprocessedSub.unsubscribe(); + } + this._timeSub?.unsubscribe(); this._timeSub = undefined; this._timeSubRange = undefined; @@ -345,6 +365,7 @@ export class SceneQueryRunner extends SceneObjectBase implemen this._timeSubRange = timeRange; this._timeSub = timeRange.subscribeToState(() => { this.runWithTimeRange(timeRange); + this.runProcessors(); }); } @@ -352,6 +373,14 @@ export class SceneQueryRunner extends SceneObjectBase implemen const timeRange = sceneGraph.getTimeRange(this); this.subscribeToTimeRangeChanges(timeRange); this.runWithTimeRange(timeRange); + this.runProcessors(); + } + + private runProcessors() { + if (this._unprocessedSub) { + this._unprocessedSub.unsubscribe(); + } + this._unprocessedSub = this._unprocessedResults.subscribe((x) => this.processResults(x)); } private getMaxDataPoints() { @@ -411,32 +440,38 @@ export class SceneQueryRunner extends SceneObjectBase implemen const runRequest = getRunRequest(); const { primary, secondaries, processors } = this.prepareRequests(timeRange, ds); + this._processors = processors; writeSceneLog('SceneQueryRunner', 'Starting runRequest', this.state.key); - let stream = runRequest(ds, primary); - - if (secondaries.length > 0) { - // Submit all secondary requests in parallel. - const secondaryStreams = secondaries.map((r) => runRequest(ds, r)); - // Create the rxjs operator which will combine the primary and secondary responses - // by calling the correct processor functions provided by the - // supplementary request providers. - const op = extraRequestProcessingOperator(processors); - // Combine the primary and secondary streams into a single stream, and apply the operator. - stream = forkJoin([stream, ...secondaryStreams]).pipe(op); - } - - stream = stream.pipe( - registerQueryWithController({ + let primaryStream = runRequest(ds, primary) + .pipe(registerQueryWithController({ type: 'data', request: primary, origin: this, cancel: () => this.cancelQuery(), - }) - ); - - this._querySub = stream.subscribe(this.onDataReceived); + })); + + if (secondaries.length === 0) { + this._querySub = primaryStream + .pipe(map((data) => [data] as [PanelData, ...PanelData[]])) + .subscribe((data) => { + this._unprocessedResults.next(data); + }); + } else { + const secondaryStreams = secondaries.map((r) => runRequest(ds, r) + .pipe(registerQueryWithController({ + type: 'data', + request: r, + origin: this, + cancel: () => this.cancelQuery(), + }))) + ; + const stream = forkJoin([primaryStream, ...secondaryStreams]); + this._querySub = stream.subscribe((data) => { + this._unprocessedResults.next(data); + }); + } } catch (err) { console.error('PanelQueryRunner Error', err); @@ -449,6 +484,23 @@ export class SceneQueryRunner extends SceneObjectBase implemen } } + private processResults(data: [PanelData, ...PanelData[]]) { + const [primary, ...secondaries] = data; + if (this._processors === undefined || secondaries.length === 0) { + return this.onDataReceived(primary); + } + const processedSecondaries = secondaries.map((s) => this._processors!.get(s.request!.requestId)?.(primary, s) ?? s); + const processed = { + ...primary, + series: [...primary.series, ...processedSecondaries.flatMap((s) => s.series)], + annotations: [ + ...(primary.annotations ?? []), + ...processedSecondaries.flatMap((s) => s.annotations ?? []), + ], + }; + this.onDataReceived(processed); + } + public clone(withState?: Partial) { const clone = super.clone(withState); diff --git a/packages/scenes/src/querying/SupplementaryRequestProvider.ts b/packages/scenes/src/querying/SupplementaryRequestProvider.ts index 9e6213180..11ea59b0d 100644 --- a/packages/scenes/src/querying/SupplementaryRequestProvider.ts +++ b/packages/scenes/src/querying/SupplementaryRequestProvider.ts @@ -28,6 +28,8 @@ export interface SupplementaryRequest { processor?: ProcessorFunc; } +export type ShouldRerun = boolean | { query: boolean; processor: boolean; } + // Indicates that this type wants to add supplementary requests, along with // optional processing functions, to a query runner. export interface SupplementaryRequestProvider extends SceneObjectBase { @@ -38,7 +40,7 @@ export interface SupplementaryRequestProvider extend // When the provider's state changes this function will be passed both the previous and the // next state. The implementation can use this to determine whether the change should trigger // a rerun of the query or not. - shouldRerun(prev: T, next: T): boolean; + shouldRerun(prev: T, next: T): ShouldRerun; } export function isSupplementaryRequestProvider(obj: any): obj is SupplementaryRequestProvider { diff --git a/packages/scenes/src/querying/extraRequestProcessingOperator.ts b/packages/scenes/src/querying/extraRequestProcessingOperator.ts deleted file mode 100644 index b015f10b0..000000000 --- a/packages/scenes/src/querying/extraRequestProcessingOperator.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { PanelData } from '@grafana/data'; -import { map, Observable } from 'rxjs'; -import { ProcessorFunc } from './SupplementaryRequestProvider'; - -// Passthrough processor for use with ExtraRequests. -export const passthroughProcessor: ProcessorFunc = (_, secondary) => secondary; - -// Factory function which takes a map from request ID to processor functions and -// returns an rxjs operator which operates on an array of panel data responses. -// -// Each secondary response is transformed according to the processor function -// identified by it's request ID. The processor function is passed the primary -// response and the secondary response to be processed. -// -// The output is a single frame with the primary series and all processed -// secondary series combined. -export const extraRequestProcessingOperator = (processors: Map) => - (data: Observable<[PanelData, ...PanelData[]]>) => { - return data.pipe( - map(([primary, ...secondaries]) => { - const frames = secondaries.flatMap((s) => { - const processed = processors.get(s.request!.requestId)?.(primary, s) ?? s; - return processed.series; - }); - return { - ...primary, - series: [...primary.series, ...frames], - }; - }) - ); - }