Skip to content

Commit 3511846

Browse files
authored
Remove rxjs (#14617)
* Remove rxjs * Add event emitter to jupyterServer.node.ts * Remove rxjs * Dispose everything * Fix tests
1 parent a727344 commit 3511846

18 files changed

+292
-240
lines changed

.eslintrc.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ module.exports = {
127127
'@typescript-eslint/no-restricted-imports': [
128128
'error',
129129
{
130-
paths: ['lodash', 'rxjs', 'lodash/noop', 'rxjs/util/noop'],
130+
paths: ['lodash', 'lodash/noop'],
131131
patterns: [
132132
{
133133
group: ['@jupyterlab/*'],

package-lock.json

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2168,8 +2168,6 @@
21682168
"redux": "^4.0.4",
21692169
"redux-logger": "^3.0.6",
21702170
"reflect-metadata": "^0.1.12",
2171-
"rxjs": "^6.5.4",
2172-
"rxjs-compat": "^6.5.4",
21732171
"safe-buffer": "^5.2.1",
21742172
"sanitize-filename": "^1.6.3",
21752173
"semver": "^5.7.2",

src/kernels/jupyter/interpreter/jupyterInterpreterSubCommandExecutionService.unit.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { assert, expect, use } from 'chai';
55
import chaiPromise from 'chai-as-promised';
66
import * as path from '../../../platform/vscode-path/path';
77
import * as sinon from 'sinon';
8-
import { Subject } from 'rxjs/Subject';
98
import { anything, capture, deepEqual, instance, mock, when } from 'ts-mockito';
109
import { Uri } from 'vscode';
1110
import { ObservableExecutionResult, Output } from '../../../platform/common/process/types.node';
@@ -26,6 +25,9 @@ import { JupyterInterpreterDependencyService } from './jupyterInterpreterDepende
2625
import { JupyterInterpreterService } from './jupyterInterpreterService.node';
2726
import { JupyterInterpreterSubCommandExecutionService } from './jupyterInterpreterSubCommandExecutionService.node';
2827
import { noop } from '../../../test/core';
28+
import { createObservable } from '../../../platform/common/process/proc.node';
29+
import { IDisposable } from '../../../platform/common/types';
30+
import { dispose } from '../../../platform/common/utils/lifecycle';
2931
use(chaiPromise);
3032

3133
/* eslint-disable */
@@ -39,6 +41,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
3941
const selectedJupyterInterpreter = createPythonInterpreter({ displayName: 'JupyterInterpreter' });
4042
const activePythonInterpreter = createPythonInterpreter({ displayName: 'activePythonInterpreter' });
4143
let notebookStartResult: ObservableExecutionResult<string>;
44+
const disposables: IDisposable[] = [];
4245
setup(() => {
4346
interpreterService = mock<IInterpreterService>();
4447
jupyterInterpreter = mock(JupyterInterpreterService);
@@ -49,10 +52,12 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
4952
// eslint-disable-next-line @typescript-eslint/no-explicit-any
5053
(instance(execService) as any).then = undefined;
5154
const output = new MockOutputChannel('');
55+
const out = createObservable<Output<string>>();
56+
disposables.push(out);
5257
notebookStartResult = {
5358
dispose: noop,
5459
proc: undefined,
55-
out: new Subject<Output<string>>().asObservable()
60+
out
5661
};
5762
const jupyterPaths = mock<JupyterPaths>();
5863
when(jupyterPaths.getKernelSpecTempRegistrationFolder()).thenResolve(
@@ -78,6 +83,7 @@ suite('Jupyter InterpreterSubCommandExecutionService', () => {
7883
when(interpreterService.getActiveInterpreter(undefined)).thenResolve(activePythonInterpreter);
7984
});
8085
teardown(() => {
86+
dispose(disposables);
8187
sinon.restore();
8288
});
8389
// eslint-disable-next-line

src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.ts

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
import { Subscription } from 'rxjs/Subscription';
54
import { Disposable, Uri } from 'vscode';
65
import { IConfigurationService, IDisposable } from '../../../platform/common/types';
76
import { traceError, traceWarning, traceVerbose } from '../../../platform/logging';
@@ -20,6 +19,7 @@ import { JupyterNotebookNotInstalled } from '../../../platform/errors/jupyterNot
2019
import { PythonEnvironment } from '../../../platform/pythonEnvironments/info';
2120
import { JupyterCannotBeLaunchedWithRootError } from '../../../platform/errors/jupyterCannotBeLaunchedWithRootError';
2221
import { createJupyterConnectionInfo } from '../jupyterUtils';
22+
import { dispose } from '../../../platform/common/utils/lifecycle';
2323

2424
const urlMatcher = new RegExp(RegExpValues.UrlPatternRegEx);
2525

@@ -30,7 +30,7 @@ export class JupyterConnectionWaiter implements IDisposable {
3030
private startPromise = createDeferred<IJupyterConnection>();
3131
private launchTimeout: NodeJS.Timer | number;
3232
private output = '';
33-
private subscriptions: Subscription[] = [];
33+
private subscriptions: IDisposable[] = [];
3434
public readonly ready = this.startPromise.promise;
3535

3636
constructor(
@@ -58,30 +58,29 @@ export class JupyterConnectionWaiter implements IDisposable {
5858
}
5959
// Listen on stderr for its connection information
6060
this.subscriptions.push(
61-
launchResult.out.subscribe(
62-
(output: Output<string>) => {
63-
traceVerbose(output.out);
64-
this.output += output.out;
65-
if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) {
66-
// .then so that we can keep from pushing aync up to the subscribed observable function
67-
this.getServerInfo()
68-
.then((serverInfos) => this.getJupyterURL(serverInfos, this.output))
69-
.catch((ex) => traceWarning('Failed to get server info', ex));
70-
}
61+
launchResult.out.onDidChange((output: Output<string>) => {
62+
traceVerbose(output.out);
63+
this.output += output.out;
64+
if (RegExpValues.HttpPattern.exec(this.output) && !this.startPromise.completed) {
65+
// .then so that we can keep from pushing aync up to the subscribed observable function
66+
this.getServerInfo()
67+
.then((serverInfos) => this.getJupyterURL(serverInfos, this.output))
68+
.catch((ex) => traceWarning('Failed to get server info', ex));
69+
}
7170

72-
// Sometimes jupyter will return a 403 error. Not sure why. We used
73-
// to fail on this, but it looks like jupyter works with this error in place.
74-
},
75-
(e) => this.rejectStartPromise(e),
76-
// If the process dies, we can't extract connection information.
77-
() => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode))
78-
)
71+
// Sometimes jupyter will return a 403 error. Not sure why. We used
72+
// to fail on this, but it looks like jupyter works with this error in place.
73+
})
7974
);
75+
// If the process dies, we can't extract connection information.
76+
launchResult.out.done
77+
.then(() => this.rejectStartPromise(DataScience.jupyterServerCrashed(exitCode)))
78+
.catch((e) => this.rejectStartPromise(e));
8079
}
8180
public dispose() {
8281
// eslint-disable-next-line @typescript-eslint/no-explicit-any
8382
clearTimeout(this.launchTimeout as any);
84-
this.subscriptions.forEach((d) => d.unsubscribe());
83+
dispose(this.subscriptions);
8584
}
8685

8786
// From a list of jupyter server infos try to find the matching jupyter that we launched

src/kernels/jupyter/launcher/jupyterConnectionWaiter.node.unit.test.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,23 @@ import { CancellationToken, Uri } from 'vscode';
1010
import { IJupyterRequestAgentCreator, IJupyterRequestCreator, JupyterServerInfo } from '../types';
1111
import chaiAsPromised from 'chai-as-promised';
1212
import events from 'events';
13-
import { Subject } from 'rxjs/Subject';
1413
import sinon from 'sinon';
1514
import { JupyterSettings } from '../../../platform/common/configSettings';
1615
import { ConfigurationService } from '../../../platform/common/configuration/service.node';
1716
import { IFileSystemNode } from '../../../platform/common/platform/types.node';
1817
import { Output, ObservableExecutionResult } from '../../../platform/common/process/types.node';
19-
import { IConfigurationService, IJupyterSettings } from '../../../platform/common/types';
18+
import { IConfigurationService, IDisposable, IJupyterSettings } from '../../../platform/common/types';
2019
import { DataScience } from '../../../platform/common/utils/localize';
2120
import { EXTENSION_ROOT_DIR } from '../../../platform/constants.node';
2221
import { ServiceContainer } from '../../../platform/ioc/container';
2322
import { IServiceContainer } from '../../../platform/ioc/types';
2423
import { JupyterConnectionWaiter } from './jupyterConnectionWaiter.node';
2524
import { noop } from '../../../test/core';
25+
import { createObservable } from '../../../platform/common/process/proc.node';
26+
import { dispose } from '../../../platform/common/utils/lifecycle';
2627
use(chaiAsPromised);
2728
suite('Jupyter Connection Waiter', async () => {
28-
let observableOutput: Subject<Output<string>>;
29+
let observableOutput: ReturnType<typeof createObservable<Output<string>>>;
2930
let launchResult: ObservableExecutionResult<string>;
3031
let getServerInfoStub: sinon.SinonStub<[CancellationToken | undefined], JupyterServerInfo[] | undefined>;
3132
let configService: IConfigurationService;
@@ -35,6 +36,7 @@ suite('Jupyter Connection Waiter', async () => {
3536
const dsSettings: IJupyterSettings = { jupyterLaunchTimeout: 10_000 } as any;
3637
const childProc = new events.EventEmitter();
3738
const notebookDir = Uri.file('someDir');
39+
const disposables: IDisposable[] = [];
3840
const dummyServerInfos: JupyterServerInfo[] = [
3941
{
4042
base_url: 'http://localhost1:1',
@@ -73,7 +75,8 @@ suite('Jupyter Connection Waiter', async () => {
7375
const expectedServerInfo = dummyServerInfos[1];
7476

7577
setup(() => {
76-
observableOutput = new Subject<Output<string>>();
78+
observableOutput = createObservable<Output<string>>();
79+
disposables.push(observableOutput);
7780
launchResult = {
7881
dispose: noop,
7982
out: observableOutput,
@@ -96,6 +99,7 @@ suite('Jupyter Connection Waiter', async () => {
9699
instance(mock<IJupyterRequestAgentCreator>())
97100
);
98101
});
102+
teardown(() => dispose(disposables));
99103

100104
function createConnectionWaiter() {
101105
return new JupyterConnectionWaiter(
@@ -111,7 +115,7 @@ suite('Jupyter Connection Waiter', async () => {
111115
test('Successfully gets connection info', async () => {
112116
(<any>dsSettings).jupyterLaunchTimeout = 10_000;
113117
const waiter = createConnectionWaiter();
114-
observableOutput.next({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' });
118+
observableOutput.fire({ source: 'stderr', out: 'Jupyter listening on http://localhost2:2' });
115119

116120
const connection = await waiter.ready;
117121

@@ -133,7 +137,7 @@ suite('Jupyter Connection Waiter', async () => {
133137

134138
const promise = waiter.ready;
135139
childProc.emit('exit', exitCode);
136-
observableOutput.complete();
140+
observableOutput.resolve();
137141

138142
await assert.isRejected(promise, DataScience.jupyterServerCrashed(exitCode));
139143
});

src/kernels/jupyter/launcher/jupyterServerStarter.node.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ export class JupyterServerStarter implements IJupyterServerStarter {
101101
// Watch for premature exits
102102
if (launchResult.proc) {
103103
launchResult.proc.on('exit', (c: number | null) => (exitCode = c));
104-
launchResult.out.subscribe((out) => this.jupyterOutputChannel.append(out.out));
104+
launchResult.out.onDidChange((out) => this.jupyterOutputChannel.append(out.out));
105105
}
106106

107107
// Make sure this process gets cleaned up. We might be canceled before the connection finishes.

src/kernels/raw/finder/pythonKernelInterruptDaemon.node.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export class PythonKernelInterruptDaemon {
139139

140140
await new Promise<void>((resolve, reject) => {
141141
let started = false;
142-
const subscription = proc.out.subscribe((out) => {
142+
const subscription = proc.out.onDidChange((out) => {
143143
traceInfoIfCI(
144144
`Output from interrupt daemon started = ${started}, output (${out.source}) = ${out.out} ('END)`
145145
);
@@ -208,7 +208,7 @@ export class PythonKernelInterruptDaemon {
208208
}
209209
}
210210
});
211-
this.disposableRegistry.push(new Disposable(() => subscription.unsubscribe()));
211+
this.disposableRegistry.push(subscription);
212212
});
213213
this.disposableRegistry.push(new Disposable(() => swallowExceptions(() => proc.proc?.kill())));
214214
// Added for logging to see if this process dies.

src/kernels/raw/launcher/kernelProcess.node.ts

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -207,50 +207,48 @@ export class KernelProcess implements IKernelProcess {
207207
}
208208

209209
let sawKernelConnectionFile = false;
210-
exeObs.out.subscribe(
211-
(output) => {
212-
if (output.source === 'stderr') {
213-
output.out = stripUnwantedMessages(output.out);
214-
// Capture stderr, incase kernel doesn't start.
215-
stderr += output.out;
216-
217-
if (output.out.trim().length) {
218-
traceWarning(`StdErr from Kernel Process ${output.out.trim()}`);
219-
}
220-
} else {
221-
stdout += output.out;
222-
// Strip unwanted stuff from the output, else it just chews up unnecessary space.
223-
if (!sawKernelConnectionFile) {
224-
stdout = stdout.replace(kernelOutputToNotLog, '');
225-
stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), '');
226-
// Strip the leading space, as we've removed some leading text.
227-
stdout = stdout.trimStart();
228-
const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true });
229-
if (
230-
lines.length === 2 &&
231-
lines[0] === kernelOutputWithConnectionFile &&
232-
lines[1].startsWith('--existing') &&
233-
lines[1].endsWith('.json')
234-
) {
235-
stdout = `${lines.join(' ')}${os.EOL}`;
236-
}
237-
}
238-
if (stdout.includes(kernelOutputWithConnectionFile)) {
239-
sawKernelConnectionFile = true;
210+
exeObs.out.onDidChange((output) => {
211+
if (output.source === 'stderr') {
212+
output.out = stripUnwantedMessages(output.out);
213+
// Capture stderr, incase kernel doesn't start.
214+
stderr += output.out;
215+
216+
if (output.out.trim().length) {
217+
traceWarning(`StdErr from Kernel Process ${output.out.trim()}`);
218+
}
219+
} else {
220+
stdout += output.out;
221+
// Strip unwanted stuff from the output, else it just chews up unnecessary space.
222+
if (!sawKernelConnectionFile) {
223+
stdout = stdout.replace(kernelOutputToNotLog, '');
224+
stdout = stdout.replace(kernelOutputToNotLog.split(/\r?\n/).join(os.EOL), '');
225+
// Strip the leading space, as we've removed some leading text.
226+
stdout = stdout.trimStart();
227+
const lines = splitLines(stdout, { trim: true, removeEmptyEntries: true });
228+
if (
229+
lines.length === 2 &&
230+
lines[0] === kernelOutputWithConnectionFile &&
231+
lines[1].startsWith('--existing') &&
232+
lines[1].endsWith('.json')
233+
) {
234+
stdout = `${lines.join(' ')}${os.EOL}`;
240235
}
241-
traceVerbose(`Kernel Output: ${stdout}`);
242236
}
243-
this.sendToOutput(output.out);
244-
},
245-
(error) => {
246-
if (this.disposed) {
247-
traceWarning('Kernel died', error, stderr);
248-
return;
237+
if (stdout.includes(kernelOutputWithConnectionFile)) {
238+
sawKernelConnectionFile = true;
249239
}
250-
traceError('Kernel died', error, stderr);
251-
deferred.reject(error);
240+
traceVerbose(`Kernel Output: ${stdout}`);
252241
}
253-
);
242+
this.sendToOutput(output.out);
243+
});
244+
exeObs.out.done.catch((error) => {
245+
if (this.disposed) {
246+
traceWarning('Kernel died', error, stderr);
247+
return;
248+
}
249+
traceError('Kernel died', error, stderr);
250+
deferred.reject(error);
251+
});
254252

255253
// Don't return until our heartbeat channel is open for connections or the kernel died or we timed out
256254
try {

0 commit comments

Comments
 (0)