Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 69 additions & 21 deletions src/standalone/api/kernels/backgroundExecution.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { CancellationToken } from 'vscode';
import { CancellationToken, Disposable } from 'vscode';
import { IKernel } from '../../../kernels/types';
import { JVSC_EXTENSION_ID } from '../../../platform/common/constants';
import { createKernelApiForExtension } from './kernel';
import { DisposableStore } from '../../../platform/common/utils/lifecycle';
import { raceCancellation } from '../../../platform/common/cancellation';
import { getNotebookCellOutputMetadata } from '../../../kernels/execution/helpers';
import { raceCancellation, wrapCancellationTokens } from '../../../platform/common/cancellation';
import { CellOutputMimeTypes, getNotebookCellOutputMetadata } from '../../../kernels/execution/helpers';
import { unTrackDisplayDataForExtension } from '../../../kernels/execution/extensionDisplayDataTracker';
import { logger } from '../../../platform/logging';
import { Delayer } from '../../../platform/common/utils/async';

export const executionCounters = new WeakMap<IKernel, number>();
export async function execCodeInBackgroundThread<T>(
Expand All @@ -28,14 +29,15 @@ export async function execCodeInBackgroundThread<T>(
const codeToSend = `
def __jupyter_exec_background__():
from IPython.display import display
from ipykernel import __version__ as ipykernel_version
from threading import Thread
from traceback import format_exc

# First send a dummy response to get the display id.
# Later we'll send the real response with the actual data.
# And that can happen much later even after the execution completes,
# as that response will be sent from a bg thread.
output = display({"${mime}": ""}, raw=True, display_id=True)
output = display({"${mime}": ipykernel_version}, raw=True, display_id=True)

def do_implementation():
${codeWithReturnStatement.map((l, i) => (i === 0 ? l : ` ${l}`)).join('\n')}
Expand All @@ -53,14 +55,22 @@ def __jupyter_exec_background__():
__jupyter_exec_background__()
del __jupyter_exec_background__
`.trim();
let lastStdError = '';
const disposables = new DisposableStore();
disposables.add(token.onCancellationRequested(() => disposables.dispose()));
disposables.add(
new Disposable(() => {
// We no longer need to track any more outputs from the kernel that are related to this output.
kernel.session && unTrackDisplayDataForExtension(kernel.session, displayId);
})
);
const wrappedCancellation = disposables.add(wrapCancellationTokens(token));
disposables.add(wrappedCancellation.token.onCancellationRequested(() => disposables.dispose()));
const promise = raceCancellation(
token,
wrappedCancellation.token,
new Promise<T | undefined>((resolve, reject) => {
disposables.add(
api.onDidReceiveDisplayUpdate(async (output) => {
if (token.isCancellationRequested) {
if (wrappedCancellation.token.isCancellationRequested) {
return resolve(undefined);
}
const metadata = getNotebookCellOutputMetadata(output);
Expand All @@ -80,23 +90,47 @@ del __jupyter_exec_background__
})
);
})
// We no longer need to track any more outputs from the kernel that are related to this output.
).finally(() => {
kernel.session && unTrackDisplayDataForExtension(kernel.session, displayId);
disposables.dispose();
});
).finally(() => disposables.dispose());

let ipyKernelVersion = '';
const exitIfFailuresFound = new Delayer(5_000);

for await (const output of api.executeCode(codeToSend, token)) {
for await (const output of api.executeCode(codeToSend, wrappedCancellation.token)) {
if (token.isCancellationRequested) {
return;
}
const metadata = getNotebookCellOutputMetadata(output);
if (!metadata?.transient?.display_id) {
if (
output.metadata?.outputType === 'stream' &&
output.items.length &&
output.items[0].mime === CellOutputMimeTypes.stderr
) {
lastStdError += new TextDecoder().decode(output.items[0].data);
if (lastStdError && ipyKernelVersion.startsWith('7.0.1')) {
// ipykernel 7.0.1 has a bug where background thread errors are printed to stderr
// https://github.com/ipython/ipykernel/issues/1450
wrappedCancellation.cancel();
} else {
logger.trace('Background execution stderr:', lastStdError);
}
}
continue;
}
const dummyMessage = output.items.find((item) => item.mime === mime);
if (dummyMessage) {
displayId = metadata.transient.display_id;
exitIfFailuresFound.cancel();

try {
ipyKernelVersion = new TextDecoder().decode(dummyMessage.data).trim();
// Check if ipykernel version matches the pattern d.d.d<anything>
if (!ipyKernelVersion.match(/^\d+\.\d+\.\d+/)) {
ipyKernelVersion = '';
}
} catch {
// Ignore errors in decoding
}
continue;
}

Expand All @@ -110,13 +144,27 @@ del __jupyter_exec_background__
}
}
}
if (token.isCancellationRequested) {
return;
}
if (!displayId) {
logger.warn('Failed to get display id for completions');
return;
try {
if (wrappedCancellation.token.isCancellationRequested) {
if (!token.isCancellationRequested && lastStdError && ipyKernelVersion.startsWith('7.0.1')) {
throw new Error(lastStdError);
}
return;
}
if (!displayId) {
logger.warn('Failed to get display id for completions');
return;
}
const result = await raceCancellation(wrappedCancellation.token, promise);
if (result) {
return result;
}
if (wrappedCancellation.token.isCancellationRequested && !token.isCancellationRequested && lastStdError) {
throw new Error(lastStdError);
}
} finally {
if (lastStdError) {
logger.error('Error in background execution:\n', lastStdError);
}
}

return promise;
}
Loading