diff --git a/packages/appkit-ui/src/react/charts/types.ts b/packages/appkit-ui/src/react/charts/types.ts index 65804a74..fdcc55f1 100644 --- a/packages/appkit-ui/src/react/charts/types.ts +++ b/packages/appkit-ui/src/react/charts/types.ts @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow"; // ============================================================================ /** Supported data formats for analytics queries */ -export type DataFormat = "json" | "arrow" | "auto"; +export type DataFormat = "json" | "arrow" | "arrow_stream" | "auto"; /** Chart orientation */ export type Orientation = "vertical" | "horizontal"; diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts index 3d5e96f1..32ce52cb 100644 --- a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts @@ -205,7 +205,7 @@ describe("useChartData", () => { ); }); - test("auto-selects JSON by default when no heuristics match", () => { + test("auto-selects ARROW_STREAM by default when no heuristics match", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -223,11 +223,11 @@ describe("useChartData", () => { expect(mockUseAnalyticsQuery).toHaveBeenCalledWith( "test", { limit: 100 }, - expect.objectContaining({ format: "JSON" }), + expect.objectContaining({ format: "ARROW_STREAM" }), ); }); - test("defaults to auto format (JSON) when format is not specified", () => { + test("defaults to auto format (ARROW_STREAM) when format is not specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -243,7 +243,7 @@ describe("useChartData", () => { expect(mockUseAnalyticsQuery).toHaveBeenCalledWith( "test", undefined, - expect.objectContaining({ format: "JSON" }), + expect.objectContaining({ format: "ARROW_STREAM" }), ); }); }); diff --git a/packages/appkit-ui/src/react/hooks/types.ts b/packages/appkit-ui/src/react/hooks/types.ts index 5db725fc..f25f17dd 100644 --- a/packages/appkit-ui/src/react/hooks/types.ts +++ b/packages/appkit-ui/src/react/hooks/types.ts @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow"; // ============================================================================ /** Supported response formats for analytics queries */ -export type AnalyticsFormat = "JSON" | "ARROW"; +export type AnalyticsFormat = "JSON" | "ARROW" | "ARROW_STREAM"; /** * Typed Arrow Table - preserves row type information for type inference. @@ -32,8 +32,10 @@ export interface TypedArrowTable< // ============================================================================ /** Options for configuring an analytics SSE query */ -export interface UseAnalyticsQueryOptions { - /** Response format - "JSON" returns typed arrays, "ARROW" returns TypedArrowTable */ +export interface UseAnalyticsQueryOptions< + F extends AnalyticsFormat = "ARROW_STREAM", +> { + /** Response format - "ARROW_STREAM" (default) uses inline Arrow, "JSON" returns typed arrays, "ARROW" uses external links */ format?: F; /** Maximum size of serialized parameters in bytes */ diff --git a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts index 24e03ea3..7d13648f 100644 --- a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts @@ -54,13 +54,13 @@ function getArrowStreamUrl(id: string) { export function useAnalyticsQuery< T = unknown, K extends QueryKey = QueryKey, - F extends AnalyticsFormat = "JSON", + F extends AnalyticsFormat = "ARROW_STREAM", >( queryKey: K, parameters?: InferParams | null, options: UseAnalyticsQueryOptions = {} as UseAnalyticsQueryOptions, ): UseAnalyticsQueryResult> { - const format = options?.format ?? "JSON"; + const format = options?.format ?? "ARROW_STREAM"; const maxParametersSize = options?.maxParametersSize ?? 100 * 1024; const autoStart = options?.autoStart ?? true; diff --git a/packages/appkit-ui/src/react/hooks/use-chart-data.ts b/packages/appkit-ui/src/react/hooks/use-chart-data.ts index d8d0bd38..1d1da2dd 100644 --- a/packages/appkit-ui/src/react/hooks/use-chart-data.ts +++ b/packages/appkit-ui/src/react/hooks/use-chart-data.ts @@ -50,10 +50,11 @@ export interface UseChartDataResult { function resolveFormat( format: DataFormat, parameters?: Record, -): "JSON" | "ARROW" { +): "JSON" | "ARROW" | "ARROW_STREAM" { // Explicit format selection if (format === "json") return "JSON"; if (format === "arrow") return "ARROW"; + if (format === "arrow_stream") return "ARROW_STREAM"; // Auto-selection heuristics if (format === "auto") { @@ -72,10 +73,10 @@ function resolveFormat( return "ARROW"; } - return "JSON"; + return "ARROW_STREAM"; } - return "JSON"; + return "ARROW_STREAM"; } // ============================================================================ diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 9e810b97..a782fc26 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -68,6 +68,7 @@ "@opentelemetry/sdk-trace-base": "2.6.0", "@opentelemetry/semantic-conventions": "1.38.0", "@types/semver": "7.7.1", + "apache-arrow": "21.1.0", "dotenv": "16.6.1", "express": "4.22.0", "obug": "2.1.1", diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index 4ab9344e..f844693f 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -3,6 +3,7 @@ import { type sql, type WorkspaceClient, } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; import type { TelemetryOptions } from "shared"; import { AppKitError, @@ -393,7 +394,20 @@ export class SQLWarehouseConnector { private _transformDataArray(response: sql.StatementResponse) { if (response.manifest?.format === "ARROW_STREAM") { - return this.updateWithArrowStatus(response); + const result = response.result as any; + + // Inline Arrow: some warehouses return base64 Arrow IPC in `attachment`. + if (result?.attachment) { + return this._transformArrowAttachment(response, result.attachment); + } + + // Inline data_array: fall through to the row transform below. + if (result?.data_array) { + // Fall through. + } else { + // External links: data fetched separately via statement_id. + return this.updateWithArrowStatus(response); + } } if (!response.result?.data_array || !response.manifest?.schema?.columns) { @@ -439,6 +453,28 @@ export class SQLWarehouseConnector { }; } + /** + * Decode a base64 Arrow IPC attachment into row objects. + * Some serverless warehouses return inline results as Arrow IPC in + * `result.attachment` rather than `result.data_array`. + */ + private _transformArrowAttachment( + response: sql.StatementResponse, + attachment: string, + ) { + const buf = Buffer.from(attachment, "base64"); + const table = tableFromIPC(buf); + const data = table.toArray().map((row) => row.toJSON()); + const { attachment: _att, ...restResult } = response.result as any; + return { + ...response, + result: { + ...restResult, + data, + }, + }; + } + private updateWithArrowStatus(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts new file mode 100644 index 00000000..73bc8cda --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts @@ -0,0 +1,286 @@ +import type { sql } from "@databricks/sdk-experimental"; +import { describe, expect, test, vi } from "vitest"; + +vi.mock("../../../telemetry", () => { + const mockMeter = { + createCounter: () => ({ add: vi.fn() }), + createHistogram: () => ({ record: vi.fn() }), + }; + return { + TelemetryManager: { + getProvider: () => ({ + startActiveSpan: vi.fn(), + getMeter: () => mockMeter, + }), + }, + SpanKind: { CLIENT: 1 }, + SpanStatusCode: { ERROR: 2 }, + }; +}); +vi.mock("../../../logging/logger", () => ({ + createLogger: () => ({ + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + event: () => null, + }), +})); +vi.mock("../../../stream/arrow-stream-processor", () => ({ + ArrowStreamProcessor: vi.fn(), +})); + +import { SQLWarehouseConnector } from "../client"; + +function createConnector() { + return new SQLWarehouseConnector({ timeout: 30000 }); +} + +// Real base64 Arrow IPC from a serverless warehouse returning +// `SELECT 1 AS test_col, 2 AS test_col2` with INLINE + ARROW_STREAM. +// Contains schema (two INT columns) + one record batch with values [1, 2]. +const REAL_ARROW_ATTACHMENT = + "/////7gAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAABMAAAABAAAAMz///8QAAAAGAAAAAAAAQIUAAAAvP///yAAAAAAAAABAAAAAAkAAAB0ZXN0X2NvbDIAAAAQABQAEAAOAA8ABAAAAAgAEAAAABgAAAAgAAAAAAABAhwAAAAIAAwABAALAAgAAAAgAAAAAAAAAQAAAAAIAAAAdGVzdF9jb2wAAAAA/////7gAAAAQAAAADAAaABgAFwAEAAgADAAAACAAAAAAAQAAAAAAAAAAAAAAAAADBAAKABgADAAIAAQACgAAADwAAAAQAAAAAQAAAAAAAAAAAAAAAgAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAEAAAAAAAAAQAAAAAAAAAAEAAAAAAAAAIAAAAAAAAAAAQAAAAAAAADAAAAAAAAAAAQAAAAAAAAA/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAP////8AAAAA"; + +describe("SQLWarehouseConnector._transformDataArray", () => { + describe("classic warehouse (JSON_ARRAY + INLINE)", () => { + test("transforms data_array rows into named objects", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: INLINE + JSON_ARRAY + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + }, + total_row_count: 1, + truncated: false, + }, + result: { + data_array: [["1", "2"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([{ test_col: "1", test_col2: "2" }]); + expect(result.result.data_array).toBeUndefined(); + }); + + test("parses JSON strings in STRING columns", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "metadata", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [["1", '{"key":"value"}']], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data[0].metadata).toEqual({ key: "value" }); + }); + }); + + describe("classic warehouse (EXTERNAL_LINKS + ARROW_STREAM)", () => { + test("returns statement_id for external links fetch", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: EXTERNAL_LINKS + ARROW_STREAM + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + external_links: [ + { + external_link: "https://storage.example.com/chunk0", + expiration: "2026-04-15T00:00:00Z", + }, + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.statement_id).toBe("stmt-1"); + expect(result.result.data).toBeUndefined(); + }); + }); + + describe("serverless warehouse (INLINE + ARROW_STREAM with attachment)", () => { + test("decodes base64 Arrow IPC attachment into row objects", () => { + const connector = createConnector(); + // Real response shape from serverless warehouse: INLINE + ARROW_STREAM + // Data arrives in result.attachment as base64-encoded Arrow IPC, not data_array. + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + total_chunk_count: 1, + chunks: [{ chunk_index: 0, row_offset: 0, row_count: 1 }], + total_row_count: 1, + }, + truncated: false, + }, + result: { + chunk_index: 0, + row_offset: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([{ test_col: 1, test_col2: 2 }]); + expect(result.result.attachment).toBeUndefined(); + // Preserves other result fields + expect(result.result.row_count).toBe(1); + }); + + test("preserves manifest and status alongside decoded data", () => { + const connector = createConnector(); + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + chunk_index: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Manifest and statement_id are preserved + expect(result.manifest.format).toBe("ARROW_STREAM"); + expect(result.statement_id).toBe("00000001-test-stmt"); + }); + }); + + describe("ARROW_STREAM with data_array (hypothetical inline variant)", () => { + test("transforms data_array like JSON_ARRAY path", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "value", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [ + ["1", "hello"], + ["2", "world"], + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([ + { id: "1", value: "hello" }, + { id: "2", value: "world" }, + ]); + }); + }); + + describe("edge cases", () => { + test("returns response unchanged when no data_array, attachment, or schema", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { format: "JSON_ARRAY" }, + result: {}, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result).toBe(response); + }); + + test("attachment takes priority over data_array when both present", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + attachment: REAL_ARROW_ATTACHMENT, + data_array: [["999", "999"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Should use attachment (Arrow IPC), not data_array + expect(result.result.data).toEqual([{ test_col: 1, test_col2: 2 }]); + }); + }); +}); diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index 86b60986..c6566330 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -19,6 +19,7 @@ import { queryDefaults } from "./defaults"; import manifest from "./manifest.json"; import { QueryProcessor } from "./query"; import type { + AnalyticsFormat, AnalyticsQueryResponse, IAnalyticsConfig, IAnalyticsQueryRequest, @@ -119,7 +120,8 @@ export class AnalyticsPlugin extends Plugin { res: express.Response, ): Promise { const { query_key } = req.params; - const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest; + const { parameters, format = "ARROW_STREAM" } = + req.body as IAnalyticsQueryRequest; // Request-scoped logging with WideEvent tracking logger.debug(req, "Executing query: %s (format=%s)", query_key, format); @@ -155,19 +157,6 @@ export class AnalyticsPlugin extends Plugin { const userKey = getCurrentUserId(); const executorKey = isAsUser ? userKey : "global"; - const queryParameters = - format === "ARROW" - ? { - formatParameters: { - disposition: "EXTERNAL_LINKS", - format: "ARROW_STREAM", - }, - type: "arrow", - } - : { - type: "result", - }; - const hashedQuery = this.queryProcessor.hashQuery(query); const defaultConfig: PluginExecuteConfig = { @@ -197,20 +186,115 @@ export class AnalyticsPlugin extends Plugin { parameters, ); - const result = await executor.query( + return this._executeWithFormatFallback( + executor, query, processedParams, - queryParameters.formatParameters, + format, signal, ); - - return { type: queryParameters.type, ...result }; }, streamExecutionSettings, executorKey, ); } + /** Format configurations in fallback order. */ + private static readonly FORMAT_CONFIGS = { + ARROW_STREAM: { + formatParameters: { disposition: "INLINE", format: "ARROW_STREAM" }, + type: "result" as const, + }, + JSON: { + formatParameters: { disposition: "INLINE", format: "JSON_ARRAY" }, + type: "result" as const, + }, + ARROW: { + formatParameters: { + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }, + type: "arrow" as const, + }, + }; + + /** + * Execute a query with automatic format fallback. + * + * For the default ARROW_STREAM format, tries formats in order until one + * succeeds: ARROW_STREAM → JSON → ARROW. This handles warehouses that + * only support a subset of format/disposition combinations. + * + * Explicit format requests (JSON, ARROW) are not retried. + */ + private async _executeWithFormatFallback( + executor: AnalyticsPlugin, + query: string, + processedParams: + | Record + | undefined, + requestedFormat: AnalyticsFormat, + signal?: AbortSignal, + ): Promise<{ type: string; [key: string]: any }> { + // Explicit format — no fallback. + if (requestedFormat === "JSON" || requestedFormat === "ARROW") { + const config = AnalyticsPlugin.FORMAT_CONFIGS[requestedFormat]; + const result = await executor.query( + query, + processedParams, + config.formatParameters, + signal, + ); + return { type: config.type, ...result }; + } + + // Default (ARROW_STREAM) — try each format in order. + const fallbackOrder: AnalyticsFormat[] = ["ARROW_STREAM", "JSON", "ARROW"]; + + for (let i = 0; i < fallbackOrder.length; i++) { + const fmt = fallbackOrder[i]; + const config = AnalyticsPlugin.FORMAT_CONFIGS[fmt]; + try { + const result = await executor.query( + query, + processedParams, + config.formatParameters, + signal, + ); + if (i > 0) { + logger.info( + "Query succeeded with fallback format %s (preferred %s was rejected)", + fmt, + fallbackOrder[0], + ); + } + return { type: config.type, ...result }; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + const isFormatError = + msg.includes("ARROW_STREAM") || + msg.includes("JSON_ARRAY") || + msg.includes("EXTERNAL_LINKS") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED"); + + if (!isFormatError || i === fallbackOrder.length - 1) { + throw err; + } + + logger.warn( + "Format %s rejected by warehouse, falling back to %s: %s", + fmt, + fallbackOrder[i + 1], + msg, + ); + } + } + + // Unreachable — last format in fallbackOrder throws on failure. + throw new Error("All format fallbacks exhausted"); + } + /** * Execute a SQL query using the current execution context. * diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index 71a8fd98..99d12402 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -525,6 +525,302 @@ describe("Analytics Plugin", () => { ); }); + test("/query/:query_key should pass INLINE + ARROW_STREAM format parameters when format is ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should pass EXTERNAL_LINKS + ARROW_STREAM format parameters when format is ARROW", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should use INLINE + ARROW_STREAM by default when no format specified", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should not pass format parameters when format is explicitly JSON", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + }); + + test("/query/:query_key should fall back from ARROW_STREAM to JSON when warehouse rejects ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error( + "INVALID_PARAMETER_VALUE: Inline disposition only supports JSON_ARRAY format", + ), + ) + .mockResolvedValueOnce({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // First call: ARROW_STREAM (rejected) + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // Second call: JSON (explicit JSON_ARRAY + INLINE) + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + }); + + test("/query/:query_key should fall back through all formats when each is rejected", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error("INVALID_PARAMETER_VALUE: only supports JSON_ARRAY"), + ) + .mockRejectedValueOnce( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ) + .mockResolvedValueOnce({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledTimes(3); + // Third call: ARROW (EXTERNAL_LINKS) + expect(executeMock.mock.calls[2][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key should not fall back for non-format errors", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue(new Error("PERMISSION_DENIED: no access")); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls use same format (ARROW_STREAM) — no format fallback occurred. + // (executeStream's retry interceptor may retry, but always with the same format.) + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key should not fall back when format is explicitly JSON", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls use JSON_ARRAY + INLINE — explicit JSON, no fallback. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + } + }); + test("should return 404 when query file is not found", async () => { const plugin = new AnalyticsPlugin(config); const { router, getHandler } = createMockRouter(); diff --git a/packages/appkit/src/plugins/analytics/types.ts b/packages/appkit/src/plugins/analytics/types.ts index c58b6ecf..bc7568f9 100644 --- a/packages/appkit/src/plugins/analytics/types.ts +++ b/packages/appkit/src/plugins/analytics/types.ts @@ -4,7 +4,7 @@ export interface IAnalyticsConfig extends BasePluginConfig { timeout?: number; } -export type AnalyticsFormat = "JSON" | "ARROW"; +export type AnalyticsFormat = "JSON" | "ARROW" | "ARROW_STREAM"; export interface IAnalyticsQueryRequest { parameters?: Record; format?: AnalyticsFormat; diff --git a/packages/appkit/src/type-generator/query-registry.ts b/packages/appkit/src/type-generator/query-registry.ts index 0ed785cb..7d31ca3e 100644 --- a/packages/appkit/src/type-generator/query-registry.ts +++ b/packages/appkit/src/type-generator/query-registry.ts @@ -266,10 +266,32 @@ export async function generateQueriesFromDescribe( sqlHash, cleanedSql, }: (typeof uncachedQueries)[number]): Promise => { - const result = (await client.statementExecution.executeStatement({ - statement: `DESCRIBE QUERY ${cleanedSql}`, - warehouse_id: warehouseId, - })) as DatabricksStatementExecutionResponse; + let result: DatabricksStatementExecutionResponse; + try { + // Prefer JSON_ARRAY for predictable data_array parsing. + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "JSON_ARRAY", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("ARROW_STREAM") || msg.includes("JSON_ARRAY")) { + // Warehouse doesn't support JSON_ARRAY inline — retry with no format + // to let it use its default (typically ARROW_STREAM inline). + logger.debug( + "Warehouse rejected JSON_ARRAY for %s, retrying with default format", + queryName, + ); + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + })) as DatabricksStatementExecutionResponse; + } else { + throw err; + } + } completed++; spinner.update( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 199fcfb8..c3bd3729 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -296,6 +296,9 @@ importers: '@types/semver': specifier: 7.7.1 version: 7.7.1 + apache-arrow: + specifier: 21.1.0 + version: 21.1.0 dotenv: specifier: 16.6.1 version: 16.6.1 @@ -5536,7 +5539,7 @@ packages: basic-ftp@5.0.5: resolution: {integrity: sha512-4Bcg1P8xhUuqcii/S0Z9wiHIrQVPMermM1any+MX5GeGD7faD3/msQUDGLol9wOcz4/jbg/WJnGqoJF6LiBdtg==} engines: {node: '>=10.0.0'} - deprecated: Security vulnerability fixed in 5.2.0, please upgrade + deprecated: Security vulnerability fixed in 5.2.1, please upgrade batch@0.6.1: resolution: {integrity: sha512-x+VAiMRL6UPkx+kudNvxTl6hB2XNNCG2r+7wixVfIYwu/2HKRXimwQyaumLjMveWvT2Hkd/cAJw+QBMfJ/EKVw==} @@ -6650,6 +6653,7 @@ packages: dottie@2.0.6: resolution: {integrity: sha512-iGCHkfUc5kFekGiqhe8B/mdaurD+lakO9txNnTvKtA6PISrw86LgqHvRzWYPyoE2Ph5aMIrCw9/uko6XHTKCwA==} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. drizzle-orm@0.45.1: resolution: {integrity: sha512-Te0FOdKIistGNPMq2jscdqngBRfBpC8uMFVwqjf6gtTVJHIQ/dosgV/CLBU2N4ZJBsXL5savCba9b0YJskKdcA==}