Skip to content

Commit f028f73

Browse files
committed
[SEA-NodeJS] configurable sync/async execute via runAsync (default sync)
Make the SEA execution path user-configurable between sync and async, toggled by the EXISTING `runAsync` option — no new public field, exactly mirroring the Thrift backend's `runAsync` distinction. Default is SYNC (`runAsync: false`): faster, and with the kernel sync canceller fully cancellable mid-compute. SeaSessionBackend.executeStatement now branches on `options.runAsync`: - false/undefined (DEFAULT) -> Connection.executeStatementCancellable: the kernel blocks on execute() (poll-to-terminal server-side), driven lazily in the operation backend's result(). `queryTimeoutSecs` IS forwarded (the kernel execute() honours it). - true -> Connection.submitStatement (submit + poll), unchanged. `queryTimeoutSecs` stays client-side (kernel ignores it on submit). SeaOperationBackend gains a third dual-mode handle kind, `cancellableExecution`, alongside `asyncStatement` and `statement`: - waitUntilReadyCancellable drives result() to the terminal Statement (memoised as the fetch handle + close target); - the lifecycle handle is a composite: cancel() routes to the detached canceller (lock-free, interrupts a running result() mid-COMPUTE and is a no-op once terminal); close() routes to the resolved statement; - a cancel-induced result() rejection maps to OperationStateError( Canceled) so the DBSQLOperation facade mirrors its cancelled flag, matching the Thrift path. Public API, result shape, schema (TTableSchema), and error classes are identical across both modes and to Thrift — the only observable difference is lifecycle timing (when executeStatement resolves). Built against databricks-sql-kernel napi 639e19ef97decc1c5aa2365c0b3a229c1ccd5b58 (executeStatementCancellable / CancellableExecution); KERNEL_REV bumped to match. Refreshed the committed binding surface (native/sea/index.{js,d.ts}). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 6b3d496 commit f028f73

7 files changed

Lines changed: 455 additions & 36 deletions

File tree

KERNEL_REV

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
639e19ef97decc1c5aa2365c0b3a229c1ccd5b58

lib/sea/SeaNativeLoader.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import type {
3838
NamedTypedValueInput as NativeNamedTypedValueInput,
3939
AsyncStatement as NativeAsyncStatement,
4040
AsyncResultHandle as NativeAsyncResultHandle,
41+
CancellableExecution as NativeCancellableExecution,
4142
} from '../../native/sea';
4243

4344
// SEA-prefixed re-exports. The kernel-generated `.d.ts` keeps the
@@ -69,6 +70,14 @@ export type SeaNativeNamedTypedValueInput = NativeNamedTypedValueInput;
6970
export type SeaNativeAsyncStatement = NativeAsyncStatement;
7071
export type SeaNativeAsyncResultHandle = NativeAsyncResultHandle;
7172

73+
// Cancellable sync-execute surface: `Connection.executeStatementCancellable`
74+
// returns a `CancellableExecution` that captures a detached StatementCanceller
75+
// BEFORE dispatching the blocking `execute()`, so a concurrent `cancel()`
76+
// interrupts a still-running query mid-compute. `result()` drives the blocking
77+
// execute and resolves to the same terminal `Statement` `executeStatement`
78+
// returns.
79+
export type SeaNativeCancellableExecution = NativeCancellableExecution;
80+
7281
/**
7382
* The full native binding surface, derived from the generated module
7483
* so it can never drift from the `.d.ts` contract: when the kernel

lib/sea/SeaOperationBackend.ts

Lines changed: 128 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ import ResultSlicer from '../result/ResultSlicer';
5151
import SeaResultsProvider from './SeaResultsProvider';
5252
import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc';
5353
import { decodeNapiKernelError } from './SeaErrorMapping';
54-
import { SeaStatement, SeaNativeAsyncStatement, SeaNativeAsyncResultHandle } from './SeaNativeLoader';
54+
import {
55+
SeaStatement,
56+
SeaNativeAsyncStatement,
57+
SeaNativeAsyncResultHandle,
58+
SeaNativeCancellableExecution,
59+
} from './SeaNativeLoader';
5560
import {
5661
SeaStatementHandle,
5762
SeaOperationLifecycleState,
@@ -116,6 +121,15 @@ export interface SeaOperationBackendOptions {
116121
asyncStatement?: SeaNativeAsyncStatement;
117122
/** The terminal napi `Statement` from a metadata call. */
118123
statement?: SeaOperationStatement;
124+
/**
125+
* The pending napi `CancellableExecution` from
126+
* `Connection.executeStatementCancellable(...)` — the sync (`runAsync: false`)
127+
* query path. `result()` drives the blocking `execute()` to a terminal
128+
* `Statement` (the fetch handle); `cancel()` fires a detached canceller that
129+
* interrupts a still-running `result()` mid-COMPUTE. Exactly one of
130+
* `asyncStatement`, `statement`, or `cancellableExecution` must be set.
131+
*/
132+
cancellableExecution?: SeaNativeCancellableExecution;
119133
context: IClientContext;
120134
/**
121135
* Optional override for `id`. Defaults to the napi statement-id when the
@@ -134,15 +148,23 @@ export interface SeaOperationBackendOptions {
134148
}
135149

136150
export default class SeaOperationBackend implements IOperationBackend {
137-
// Query path: pending async statement we poll to terminal. Undefined on the
138-
// metadata path.
151+
// Async query path: pending async statement we poll to terminal. Undefined on
152+
// the metadata / sync-execute paths.
139153
private readonly asyncStatement?: SeaNativeAsyncStatement;
140154

141-
// Metadata path: terminal statement. Undefined on the query path.
142-
private readonly blockingStatement?: SeaOperationStatement;
155+
// Sync query path (`runAsync: false`): pending cancellable execution whose
156+
// `result()` drives the blocking `execute()` to a terminal `Statement`.
157+
// Undefined on the async / metadata paths.
158+
private readonly cancellableExecution?: SeaNativeCancellableExecution;
159+
160+
// Metadata path: terminal statement. Also the resolved fetch handle on the
161+
// sync-execute path once `cancellableExecution.result()` settles.
162+
private blockingStatement?: SeaOperationStatement;
143163

144164
// The cancel/close surface — whichever handle backs this operation. Both
145-
// `AsyncStatement` and `Statement` expose `cancel()` / `close()`.
165+
// `AsyncStatement` and `Statement` expose `cancel()` / `close()`; the
166+
// sync-execute path uses a composite that routes `cancel()` to the
167+
// cancellable execution (mid-compute) and `close()` to the resolved statement.
146168
private readonly lifecycleHandle: SeaStatementHandle;
147169

148170
private readonly context: IClientContext;
@@ -168,15 +190,43 @@ export default class SeaOperationBackend implements IOperationBackend {
168190
// undefined when unset. Enforced in the async poll loop.
169191
private readonly queryTimeoutMs?: number;
170192

171-
constructor({ asyncStatement, statement, context, id, queryTimeoutSecs }: SeaOperationBackendOptions) {
172-
if ((asyncStatement === undefined) === (statement === undefined)) {
173-
throw new HiveDriverError('SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided');
193+
constructor({
194+
asyncStatement,
195+
statement,
196+
cancellableExecution,
197+
context,
198+
id,
199+
queryTimeoutSecs,
200+
}: SeaOperationBackendOptions) {
201+
// Exactly one of the three handle kinds must be supplied.
202+
const providedCount =
203+
(asyncStatement !== undefined ? 1 : 0) +
204+
(statement !== undefined ? 1 : 0) +
205+
(cancellableExecution !== undefined ? 1 : 0);
206+
if (providedCount !== 1) {
207+
throw new HiveDriverError(
208+
'SeaOperationBackend: exactly one of `asyncStatement`, `statement`, or `cancellableExecution` must be provided',
209+
);
174210
}
175211
this.asyncStatement = asyncStatement;
212+
this.cancellableExecution = cancellableExecution;
176213
this.blockingStatement = statement;
177-
this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle;
214+
// Lifecycle surface. The async/metadata handles expose both cancel/close.
215+
// The sync-execute path uses a composite: `cancel()` always routes to the
216+
// cancellable execution (lock-free, interrupts a running `result()`
217+
// mid-compute and is a no-op once terminal); `close()` routes to the
218+
// resolved terminal statement once `result()` has produced it (before that
219+
// there is nothing server-side to close, and the kernel's per-execute drop
220+
// guard handles an abandoned in-flight execution).
221+
this.lifecycleHandle = cancellableExecution
222+
? {
223+
cancel: () => cancellableExecution.cancel(),
224+
close: () => (this.blockingStatement ? this.blockingStatement.close() : Promise.resolve()),
225+
}
226+
: ((asyncStatement ?? statement) as SeaStatementHandle);
178227
this.context = context;
179-
this._id = id ?? asyncStatement?.statementId ?? statement?.statementId ?? uuidv4();
228+
this._id =
229+
id ?? asyncStatement?.statementId ?? statement?.statementId ?? cancellableExecution?.statementId ?? uuidv4();
180230
this.queryTimeoutMs = queryTimeoutSecs !== undefined && queryTimeoutSecs > 0 ? queryTimeoutSecs * 1000 : undefined;
181231
}
182232

@@ -312,11 +362,20 @@ export default class SeaOperationBackend implements IOperationBackend {
312362
return { state: OperationState.Closed, hasResultSet: true };
313363
}
314364
if (this.asyncStatement) {
315-
// Query path: report the real kernel state (single GetStatementStatus
316-
// RPC — no polling here; `waitUntilReady` owns the poll loop).
365+
// Async query path: report the real kernel state (single
366+
// GetStatementStatus RPC — no polling here; `waitUntilReady` owns the
367+
// poll loop).
317368
const state = statusStringToOperationState(await this.asyncStatement.status());
318369
return { state, hasResultSet: true };
319370
}
371+
if (this.cancellableExecution) {
372+
// Sync (`runAsync: false`) path: the kernel `execute()` blocks and polls
373+
// server-side; there is no per-status RPC to query while it runs. Report
374+
// Running until `result()` has materialised the terminal statement, then
375+
// Succeeded — mirroring the kernel's blocking-then-terminal lifecycle.
376+
const state = this.fetchHandlePromise ? OperationState.Succeeded : OperationState.Running;
377+
return { state, hasResultSet: true };
378+
}
320379
// Metadata path: the kernel statement is already terminal.
321380
return { state: OperationState.Succeeded, hasResultSet: true };
322381
}
@@ -325,6 +384,9 @@ export default class SeaOperationBackend implements IOperationBackend {
325384
if (this.asyncStatement) {
326385
return this.waitUntilReadyAsync(options);
327386
}
387+
if (this.cancellableExecution) {
388+
return this.waitUntilReadyCancellable(options);
389+
}
328390
// Metadata path: the kernel statement has already resolved, so there is
329391
// nothing to poll. seaFinished fires the progress callback once with a
330392
// synthesised completion tick, matching the Thrift path's final tick.
@@ -420,6 +482,36 @@ export default class SeaOperationBackend implements IOperationBackend {
420482
}
421483
}
422484

485+
/**
486+
* Sync (`runAsync: false`) execute path. Drives the blocking
487+
* `CancellableExecution.result()` to a terminal `Statement` (the kernel polls
488+
* to completion server-side, honouring `queryTimeoutSecs` on this path). The
489+
* await is interruptible: a JS-initiated `cancel()` fires the detached
490+
* canceller, the server flips the statement terminal, and the parked
491+
* `result()` rejects with `Cancelled` — which we map to the typed
492+
* `OperationStateError(Canceled)`.
493+
*
494+
* Unlike the async path there is no status poll loop (the kernel owns
495+
* polling), so the progress callback fires once on completion, matching the
496+
* metadata path's single completion tick.
497+
*/
498+
private async waitUntilReadyCancellable(options?: IOperationBackendWaitOptions): Promise<void> {
499+
// Already materialised → terminal-and-ready, nothing to wait for.
500+
if (this.fetchHandlePromise) {
501+
return;
502+
}
503+
// A JS-initiated cancel/close before we start short-circuits to the typed
504+
// state error rather than dispatching the blocking execute.
505+
failIfNotActive(this.lifecycle);
506+
// `getFetchHandle()` drives `result()` and memoises the resolved Statement
507+
// (also stored on `blockingStatement` so `close()` can reach it).
508+
await this.getFetchHandle();
509+
// Single completion tick, matching the metadata path.
510+
if (options?.callback) {
511+
await Promise.resolve(options.callback({ state: OperationState.Succeeded, hasResultSet: true }));
512+
}
513+
}
514+
423515
/**
424516
* Drive `awaitResult()` on a Failed statement to surface the kernel's typed
425517
* SQL-error envelope. Falls back to a generic error if `awaitResult()`
@@ -444,6 +536,29 @@ export default class SeaOperationBackend implements IOperationBackend {
444536
this.fetchHandlePromise = this.asyncStatement.awaitResult().catch((err) => {
445537
throw decodeNapiKernelError(err);
446538
}) as Promise<SeaNativeAsyncResultHandle>;
539+
} else if (this.cancellableExecution) {
540+
// Sync (`runAsync: false`) path: drive the blocking `result()` to the
541+
// terminal `Statement`. Store it on `blockingStatement` so `close()` can
542+
// reach it post-execute, and so a subsequent fetch uses it directly.
543+
this.fetchHandlePromise = this.cancellableExecution
544+
.result()
545+
.then((stmt) => {
546+
this.blockingStatement = stmt as unknown as SeaOperationStatement;
547+
return stmt as unknown as SeaFetchHandle;
548+
})
549+
.catch((err) => {
550+
const mapped = decodeNapiKernelError(err);
551+
// A cancel-induced rejection surfaces as the kernel's Cancelled
552+
// error; map it to the typed `OperationStateError(Canceled)` so the
553+
// `DBSQLOperation` facade mirrors its cancelled flag (it only does so
554+
// for `OperationStateError`), matching the Thrift path. If the
555+
// operation was cancelled client-side, prefer the typed code
556+
// regardless of the kernel error text.
557+
if (this.lifecycle.isCancelled) {
558+
throw new OperationStateError(OperationStateErrorCode.Canceled);
559+
}
560+
throw mapped;
561+
});
447562
} else {
448563
const stmt = this.blockingStatement!;
449564
if (!stmt.fetchNextBatch) {

lib/sea/SeaSessionBackend.ts

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,56 @@ export default class SeaSessionBackend implements ISessionBackend {
155155
);
156156
}
157157

158-
const execOptions = this.buildExecuteOptions(options);
158+
// `runAsync` selects the kernel execution path, exactly mirroring the
159+
// Thrift backend's `runAsync` distinction (the only observable difference is
160+
// WHEN `executeStatement` resolves — the public API, result shape, schema,
161+
// and error classes are identical on both paths):
162+
//
163+
// - DEFAULT (`runAsync` false/undefined) — SYNC. Route through
164+
// `executeStatementCancellable`: the kernel blocks on `execute()`
165+
// (server-side direct-results / poll-to-terminal), which is faster and,
166+
// with the napi sync canceller, fully cancellable mid-COMPUTE. The
167+
// blocking drive runs in the operation backend's `result()` (inside
168+
// `waitUntilReady`, which the facade invokes lazily at first fetch).
169+
// `queryTimeoutSecs` IS honoured on this path (forwarded to the napi
170+
// options below) since the kernel `execute()` consults it.
171+
//
172+
// - `runAsync: true` — ASYNC. Submit (`wait_timeout=0s`): the server
173+
// returns a pending `AsyncStatement` immediately while the query runs;
174+
// the backend polls `status()` to terminal in `waitUntilReady()` and
175+
// materialises results via `awaitResult()`. `queryTimeoutSecs` is
176+
// ignored by the kernel on submit, so it is enforced client-side by the
177+
// operation backend's poll-loop deadline instead.
178+
const runAsync = options.runAsync ?? false;
179+
const queryTimeoutSecs =
180+
options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined;
181+
182+
if (!runAsync) {
183+
// Sync path: forward `queryTimeoutSecs` to the napi options — the kernel
184+
// `execute()` honours it (server statement timeout).
185+
const execOptions = this.buildExecuteOptions(options, queryTimeoutSecs);
186+
let cancellableExecution;
187+
try {
188+
cancellableExecution =
189+
execOptions === undefined
190+
? await this.connection.executeStatementCancellable(statement)
191+
: await this.connection.executeStatementCancellable(statement, execOptions);
192+
} catch (err) {
193+
throw this.logAndMapError('executeStatement', err);
194+
}
195+
return new SeaOperationBackend({
196+
cancellableExecution: cancellableExecution!,
197+
context: this.context,
198+
// The kernel honours `queryTimeoutSecs` on the sync `execute` path, so
199+
// it is forwarded via the napi options (see `buildExecuteOptions`); the
200+
// backend also keeps it as a deadline guard for parity with async.
201+
queryTimeoutSecs,
202+
});
203+
}
159204

160-
// Submit asynchronously (kernel `wait_timeout=0s`): the server returns a
161-
// pending `AsyncStatement` immediately while the query runs, matching the
162-
// Thrift backend's always-async (`runAsync: true`) path. The operation
163-
// backend polls `status()` to terminal in `waitUntilReady()` and
164-
// materialises results via `awaitResult()`, so a long-running query stays
165-
// cancellable mid-flight and `status()` reports real Pending/Running states.
205+
// Async path: do NOT forward `queryTimeoutSecs` (the kernel ignores it on
206+
// submit — `wait_timeout=0s`); it is enforced client-side by the poll loop.
207+
const execOptions = this.buildExecuteOptions(options);
166208
let asyncStatement;
167209
try {
168210
asyncStatement =
@@ -181,7 +223,7 @@ export default class SeaSessionBackend implements ISessionBackend {
181223
context: this.context,
182224
// `queryTimeout` is typed `number | bigint | Int64`; `numberToInt64(...).toNumber()`
183225
// coerces all three (a bare `Number(int64)` yields NaN — node-int64 has no valueOf).
184-
queryTimeoutSecs: options.queryTimeout !== undefined ? numberToInt64(options.queryTimeout).toNumber() : undefined,
226+
queryTimeoutSecs,
185227
});
186228
}
187229

@@ -190,7 +232,10 @@ export default class SeaSessionBackend implements ISessionBackend {
190232
* `ExecuteOptions`, returning `undefined` when nothing is set so the
191233
* no-options call shape (`executeStatement(sql)`) is preserved.
192234
*/
193-
private buildExecuteOptions(options: ExecuteStatementOptions): SeaNativeExecuteOptions | undefined {
235+
private buildExecuteOptions(
236+
options: ExecuteStatementOptions,
237+
queryTimeoutSecs?: number,
238+
): SeaNativeExecuteOptions | undefined {
194239
// Positional (`?`) and named (`:name`) parameters are mutually exclusive —
195240
// the kernel binds one placeholder style per statement. Use the SAME error
196241
// type and message as the Thrift backend (`ThriftSessionBackend`) so a
@@ -208,9 +253,14 @@ export default class SeaSessionBackend implements ISessionBackend {
208253
if (namedParams !== undefined) {
209254
execOptions.namedParams = namedParams;
210255
}
211-
// `queryTimeout` is intentionally NOT forwarded here — the kernel ignores
212-
// `queryTimeoutSecs` on `submitStatement`, so it is enforced client-side by
213-
// the operation backend's poll-loop deadline instead (see executeStatement).
256+
// `queryTimeoutSecs` is forwarded only on the SYNC path (the caller passes
257+
// it in): the kernel `execute()` consults it as the server statement
258+
// timeout. On the async submit path the caller omits it (the kernel ignores
259+
// it under `wait_timeout=0s`), so it is enforced client-side by the
260+
// operation backend's poll-loop deadline instead (see executeStatement).
261+
if (queryTimeoutSecs !== undefined) {
262+
execOptions.queryTimeoutSecs = queryTimeoutSecs;
263+
}
214264
if (options.rowLimit !== undefined) {
215265
execOptions.rowLimit = Number(options.rowLimit);
216266
}

0 commit comments

Comments
 (0)