diff --git a/.vscode/settings.json b/.vscode/settings.json index 0f2399922cfc..7b04615099a9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -27,6 +27,6 @@ "deno.enablePaths": ["packages/deno/test"], "editor.defaultFormatter": "biomejs.biome", "[typescript]": { - "editor.defaultFormatter": "biomejs.biome" + "editor.defaultFormatter": "vscode.typescript-language-features" } } diff --git a/packages/core/src/types-hoist/index.ts b/packages/core/src/types-hoist/index.ts index e74eca7ae927..274e1c0fa800 100644 --- a/packages/core/src/types-hoist/index.ts +++ b/packages/core/src/types-hoist/index.ts @@ -124,10 +124,7 @@ export type { StackFrame } from './stackframe'; export type { Stacktrace, StackParser, StackLineParser, StackLineParserFn } from './stacktrace'; export type { PropagationContext, TracePropagationTargets, SerializedTraceData } from './tracing'; export type { StartSpanOptions } from './startSpanOptions'; -export type { - TraceparentData, - TransactionSource, -} from './transaction'; +export type { TraceparentData, TransactionSource } from './transaction'; export type { CustomSamplingContext, SamplingContext } from './samplingcontext'; export type { DurationUnit, @@ -147,7 +144,14 @@ export type { TransportRequestExecutor, } from './transport'; export type { User } from './user'; -export type { WebFetchHeaders, WebFetchRequest } from './webfetchapi'; +export type { + WebFetchHeaders, + WebFetchRequest, + WebFetchResponse, + WebReadableStream, + WebReadableStreamDefaultReader, + WebReadableStreamReadResult, +} from './whatwg'; export type { WrappedFunction } from './wrappedfunction'; export type { HandlerDataFetch, diff --git a/packages/core/src/types-hoist/instrument.ts b/packages/core/src/types-hoist/instrument.ts index 420482579dd9..ed75a2df5aec 100644 --- a/packages/core/src/types-hoist/instrument.ts +++ b/packages/core/src/types-hoist/instrument.ts @@ -1,7 +1,7 @@ // This should be: null | Blob | BufferSource | FormData | URLSearchParams | string // But since not all of those are available in node, we just export `unknown` here for now -import type { WebFetchHeaders } from './webfetchapi'; +import type { WebFetchResponse } from './whatwg'; // Make sure to cast it where needed! type XHRSendInput = unknown; @@ -51,13 +51,7 @@ export interface HandlerDataFetch { fetchData: SentryFetchData; // This data is among other things dumped directly onto the fetch breadcrumb data startTimestamp: number; endTimestamp?: number; - // This is actually `Response` - Note: this type is not complete. Add to it if necessary. - response?: { - readonly ok: boolean; - readonly status: number; - readonly url: string; - headers: WebFetchHeaders; - }; + response?: WebFetchResponse; error?: unknown; // This is to be consumed by the HttpClient integration virtualError?: unknown; diff --git a/packages/core/src/types-hoist/webfetchapi.ts b/packages/core/src/types-hoist/webfetchapi.ts deleted file mode 100644 index 78b7d464ea71..000000000000 --- a/packages/core/src/types-hoist/webfetchapi.ts +++ /dev/null @@ -1,17 +0,0 @@ -// These are vendored types for the standard web fetch API types because typescript needs the DOM types to be able to understand the `Request`, `Headers`, ... types and not everybody has those. - -export interface WebFetchHeaders { - append(name: string, value: string): void; - delete(name: string): void; - get(name: string): string | null; - has(name: string): boolean; - set(name: string, value: string): void; - forEach(callbackfn: (value: string, key: string, parent: WebFetchHeaders) => void): void; -} - -export interface WebFetchRequest { - readonly headers: WebFetchHeaders; - readonly method: string; - readonly url: string; - clone(): WebFetchRequest; -} diff --git a/packages/core/src/types-hoist/whatwg/fetch.ts b/packages/core/src/types-hoist/whatwg/fetch.ts new file mode 100644 index 000000000000..437bc8936f29 --- /dev/null +++ b/packages/core/src/types-hoist/whatwg/fetch.ts @@ -0,0 +1,38 @@ +// These are vendored types for the standard web fetch API types because typescript needs the DOM types to be able to understand the `Request`, `Headers`, ... types and not everybody has those. + +import type { WebReadableStream } from './stream'; + +export interface WebFetchHeaders { + append(name: string, value: string): void; + delete(name: string): void; + get(name: string): string | null; + has(name: string): boolean; + set(name: string, value: string): void; + forEach(callbackfn: (value: string, key: string, parent: WebFetchHeaders) => void): void; +} + +export interface WebFetchRequest { + readonly headers: WebFetchHeaders; + readonly method: string; + readonly url: string; + clone(): WebFetchRequest; +} + +export interface WebFetchResponse { + readonly ok: boolean; + readonly status: number; + readonly statusText: string; + readonly headers: WebFetchHeaders; + readonly url: string; + readonly redirected: boolean; + readonly body: WebReadableStream | null; + + clone(): WebFetchResponse; + + // Methods to consume the response body + json(): Promise; // Parses response as JSON + text(): Promise; // Reads response body as text + arrayBuffer(): Promise; // Reads response body as ArrayBuffer + blob(): Promise; // Reads response body as Blob + formData(): Promise; // Reads response body as FormData +} diff --git a/packages/core/src/types-hoist/whatwg/index.ts b/packages/core/src/types-hoist/whatwg/index.ts new file mode 100644 index 000000000000..6b7be9732ccc --- /dev/null +++ b/packages/core/src/types-hoist/whatwg/index.ts @@ -0,0 +1,3 @@ +export type { WebReadableStream, WebReadableStreamDefaultReader, WebReadableStreamReadResult } from './stream'; + +export type { WebFetchHeaders, WebFetchRequest, WebFetchResponse } from './fetch'; diff --git a/packages/core/src/types-hoist/whatwg/stream.ts b/packages/core/src/types-hoist/whatwg/stream.ts new file mode 100644 index 000000000000..d8814f52e0ea --- /dev/null +++ b/packages/core/src/types-hoist/whatwg/stream.ts @@ -0,0 +1,23 @@ +export interface WebReadableStream { + locked: boolean; // Indicates if the stream is currently locked + + cancel(reason?: any): Promise; // Cancels the stream with an optional reason + getReader(): WebReadableStreamDefaultReader; // Returns a reader for the stream +} + +export interface WebReadableStreamDefaultReader { + closed: boolean; + // Closes the stream and resolves the reader's lock + cancel(reason?: any): Promise; + + // Returns a promise with the next chunk in the stream + read(): Promise>; + + // Releases the reader's lock on the stream + releaseLock(): void; +} + +export interface WebReadableStreamReadResult { + done: boolean; // True if the reader is done with the stream + value?: R; // The data chunk read from the stream (if not done) +} diff --git a/packages/core/src/utils-hoist/instrument/fetch.ts b/packages/core/src/utils-hoist/instrument/fetch.ts index 954ab50a7536..85390e6fb8bf 100644 --- a/packages/core/src/utils-hoist/instrument/fetch.ts +++ b/packages/core/src/utils-hoist/instrument/fetch.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import type { HandlerDataFetch } from '../../types-hoist'; +import type { HandlerDataFetch, WebFetchResponse, WebReadableStreamDefaultReader } from '../../types-hoist'; import { isError } from '../is'; import { addNonEnumerableProperty, fill } from '../object'; @@ -117,55 +117,102 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat }); } -async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise { - if (res && res.body) { - const body = res.body; - const responseReader = body.getReader(); +async function resolveReader(reader: WebReadableStreamDefaultReader, onFinishedResolving: () => void): Promise { + let running = true; + while (running) { + try { + // This .read() call will reject/throw when `reader.cancel()` + const { done } = await reader.read(); - // Define a maximum duration after which we just cancel - const maxFetchDurationTimeout = setTimeout( - () => { - body.cancel().then(null, () => { - // noop - }); - }, - 90 * 1000, // 90s - ); - - let readingActive = true; - while (readingActive) { - let chunkTimeout; - try { - // abort reading if read op takes more than 5s - chunkTimeout = setTimeout(() => { - body.cancel().then(null, () => { - // noop on error - }); - }, 5000); - - // This .read() call will reject/throw when we abort due to timeouts through `body.cancel()` - const { done } = await responseReader.read(); - - clearTimeout(chunkTimeout); + running = !done; - if (done) { - onFinishedResolving(); - readingActive = false; - } - } catch (error) { - readingActive = false; - } finally { - clearTimeout(chunkTimeout); + if (done) { + onFinishedResolving(); } + } catch (_) { + running = false; + } + } +} + +/** + * Resolves the body stream of a `Response` object and links its cancellation to a parent `Response` body. + * + * This function attaches a custom `cancel` behavior to both the parent `Response` body and its `getReader()` method. + * When the parent stream or its reader is canceled, it triggers the cancellation of the child stream as well. + * The function also monitors the resolution of the child's body stream using `resolveReader` and performs cleanup. + * + * @param {Response} res - The `Response` object whose body stream will be resolved. + * @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`. + * @param {() => void} onFinishedResolving - A callback function to be invoked when the body stream of `res` is fully resolved. + * + * Export For Test Only + */ +export function resolveResponse( + res: WebFetchResponse, + parentRes: WebFetchResponse, + onFinishedResolving: () => void, +): void { + if (!res.body || !parentRes.body) { + if (res.body) { + res.body.cancel().catch(_ => { + // noop on error + }); } - clearTimeout(maxFetchDurationTimeout); + return; + } + + const body = res.body; + const parentBody = parentRes.body; + // According to the WHATWG Streams API specification, when a stream is locked by calling `getReader()`, + // invoking `stream.cancel()` will result in a TypeError. + // To cancel while the stream is locked, must use `reader.cancel()` + // @seealso: https://streams.spec.whatwg.org + const responseReader = body.getReader(); + + const originalCancel = parentBody.cancel.bind(parentBody) as (reason?: any) => Promise; - responseReader.releaseLock(); - body.cancel().then(null, () => { + // Override cancel method on parent response's body + parentBody.cancel = async (reason?: any) => { + responseReader.cancel('Cancelled by parent stream').catch(_ => { // noop on error }); - } + + await originalCancel(reason); + }; + + const originalGetReader = parentRes.body.getReader.bind(parentBody) as ( + options: ReadableStreamGetReaderOptions, + ) => ReadableStreamDefaultReader; + + // Override getReader on parent response's body + parentBody.getReader = ((opts?: any) => { + const reader = originalGetReader(opts) as ReadableStreamDefaultReader; + + const originalReaderCancel = reader.cancel.bind(reader) as (reason?: any) => Promise; + + reader.cancel = async (reason?: any) => { + responseReader.cancel('Cancelled by parent reader').catch(_ => { + // noop on error + }); + + await originalReaderCancel(reason); + }; + + return reader; + }) as any; + + resolveReader(responseReader, onFinishedResolving).finally(() => { + try { + responseReader.releaseLock(); + body.cancel().catch(() => { + // noop on error + }); + } catch (_) { + // noop on error + } + }); } function streamHandler(response: Response): void { @@ -177,8 +224,7 @@ function streamHandler(response: Response): void { return; } - // eslint-disable-next-line @typescript-eslint/no-floating-promises - resolveResponse(clonedResponseForResolving, () => { + resolveResponse(clonedResponseForResolving as WebFetchResponse, response as WebFetchResponse, () => { triggerHandlers('fetch-body-resolved', { endTimestamp: timestampInSeconds() * 1000, response, diff --git a/packages/core/test/utils-hoist/instrument/fetch.test.ts b/packages/core/test/utils-hoist/instrument/fetch.test.ts index fc6102d6b617..64c916875a8a 100644 --- a/packages/core/test/utils-hoist/instrument/fetch.test.ts +++ b/packages/core/test/utils-hoist/instrument/fetch.test.ts @@ -1,4 +1,14 @@ -import { parseFetchArgs } from '../../../src/utils-hoist/instrument/fetch'; +import type { WebFetchResponse, WebReadableStream } from '../../../src/types-hoist'; + +import { parseFetchArgs, resolveResponse } from '../../../src/utils-hoist/instrument/fetch'; + +async function delay(ms: number) { + await new Promise(res => { + setTimeout(() => { + res(true); + }, ms); + }); +} describe('instrument > parseFetchArgs', () => { it.each([ @@ -27,3 +37,115 @@ describe('instrument > parseFetchArgs', () => { expect(actual).toEqual(expected); }); }); + +describe('instrument > fetch > resolveResponse', () => { + let mockReader: jest.Mocked>; + let mockResponse: jest.Mocked; + let mockParentResponse: jest.Mocked; + let mockParentReader: jest.Mocked>; + let onFinishedResolving: jest.Mock; + + beforeEach(() => { + mockReader = { + read: jest.fn(), + cancel: jest.fn(async (reason?: any) => { + // Set read to reject on next call after cancel + mockReader.read.mockRejectedValueOnce(new Error(reason)); + }), + releaseLock: jest.fn(), + } as unknown as jest.Mocked>; + + mockResponse = { + body: { + getReader: jest.fn(() => mockReader), + cancel: jest.fn(), + } as unknown as WebReadableStream, + } as jest.Mocked; + + mockParentReader = { + read: jest.fn(), + cancel: jest.fn().mockResolvedValue(undefined), + releaseLock: jest.fn(), + } as unknown as jest.Mocked>; + + mockParentResponse = { + body: { + cancel: jest.fn().mockResolvedValue(undefined), + getReader: jest.fn(() => mockParentReader), + } as unknown as WebReadableStream, + } as jest.Mocked; + + onFinishedResolving = jest.fn(); + }); + + test('should call onFinishedResolving when the stream is fully read', async () => { + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk' }) + .mockResolvedValueOnce({ done: true, value: null }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + // wait 100ms so all promise can be resolved/rejected + await delay(100); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.read).toHaveBeenCalledTimes(2); + expect(onFinishedResolving).toHaveBeenCalled(); + }); + + test('should handle read errors gracefully', async () => { + mockReader.read.mockRejectedValue(new Error('Read error')); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); + + test('should cancel reader and gracefully exit when parent response is cancelled', async () => { + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk1' }) + .mockResolvedValueOnce({ done: false, value: 'chunk2' }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + await Promise.resolve(); + await mockParentResponse.body?.cancel(); + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.cancel).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); + + test('should cancel reader and gracefully exit when parent reader is cancelled', async () => { + mockReader.read + .mockResolvedValueOnce({ done: false, value: 'chunk1' }) + .mockResolvedValueOnce({ done: false, value: 'chunk2' }); + + resolveResponse(mockResponse, mockParentResponse, onFinishedResolving); + + const parentReader = mockParentResponse.body!.getReader(); + await Promise.resolve(); + + await parentReader.cancel(); + await delay(100); + + expect(onFinishedResolving).not.toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.releaseLock).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockReader.cancel).toHaveBeenCalled(); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(mockResponse.body?.cancel).toHaveBeenCalled(); + }); +}); diff --git a/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts b/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts index d52af5c8526c..be8fc2b7c6b2 100644 --- a/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts +++ b/packages/replay-internal/src/coreHandlers/util/fetchUtils.ts @@ -293,21 +293,40 @@ function _tryCloneResponse(response: Response): Response | void { * Fetch can return a streaming body, that may not resolve (or not for a long time). * If that happens, we rather abort after a short time than keep waiting for this. */ -function _tryGetResponseText(response: Response): Promise { - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => reject(new Error('Timeout while trying to read response body')), 500); - - _getResponseText(response) - .then( - txt => resolve(txt), - reason => reject(reason), - ) - .finally(() => clearTimeout(timeout)); - }); -} +async function _tryGetResponseText(response: Response): Promise { + if (!response.body) { + throw new Error('Response has no body'); + } + + const reader = response.body.getReader(); + + const decoder = new TextDecoder(); + let result = ''; + let running = true; + + const timeout = setTimeout(() => { + if (running) { + reader.cancel('Timeout while trying to read response body').catch(_ => { + // This block is only triggered when stream has already been released, + // so it's safe to ignore. + }); + } + }, 500); -async function _getResponseText(response: Response): Promise { - // Force this to be a promise, just to be safe - // eslint-disable-next-line no-return-await - return await response.text(); + try { + while (running) { + const { value, done } = await reader.read(); + + running = !done; + + if (value) { + result += decoder.decode(value, { stream: !done }); + } + } + } finally { + clearTimeout(timeout); + reader.releaseLock(); + } + + return result; } diff --git a/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts b/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts index 512c2b0f33eb..1b5b8257ed36 100644 --- a/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts +++ b/packages/replay-internal/test/unit/coreHandlers/handleNetworkBreadcrumbs.test.ts @@ -40,6 +40,8 @@ function getMockResponse(contentLength?: string, body?: string, headers?: Record ...headers, }; + const encoder = new TextEncoder(); + const response = { headers: { has: (prop: string) => { @@ -49,6 +51,24 @@ function getMockResponse(contentLength?: string, body?: string, headers?: Record return internalHeaders[prop.toLowerCase() ?? '']; }, }, + body: { + getReader: () => { + return { + read: () => { + return Promise.resolve({ + done: true, + value: encoder.encode(body), + }); + }, + cancel: async () => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.resolve(body), } as unknown as Response; @@ -741,6 +761,7 @@ other-header: test`; options.networkCaptureBodies = true; const largeBody = JSON.stringify({ a: LARGE_BODY }); + const encoder = new TextEncoder(); const breadcrumb: Breadcrumb = { category: 'fetch', @@ -756,6 +777,24 @@ other-header: test`; get: () => '', }, clone: () => mockResponse, + body: { + getReader: () => { + return { + read: () => { + return Promise.resolve({ + done: true, + value: encoder.encode(largeBody), + }); + }, + cancel: async () => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, text: () => Promise.resolve(largeBody), } as unknown as Response; diff --git a/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts b/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts index 4da9ecab639e..7689cf3fd6b0 100644 --- a/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts +++ b/packages/replay-internal/test/unit/coreHandlers/util/fetchUtils.test.ts @@ -43,6 +43,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with text body', async () => { + const encoder = new TextEncoder(); + + const mockRead = vi + .fn() + .mockResolvedValueOnce({ + value: encoder.encode('text body'), + done: false, + }) + .mockResolvedValueOnce({ + value: null, + done: true, + }); + const response = { headers: { has: () => { @@ -52,6 +65,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: mockRead, + cancel: async (reason?: any) => { + mockRead.mockRejectedValue(new Error(reason)); + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.resolve('text body'), } as unknown as Response; @@ -74,6 +100,8 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with body that fails', async () => { + const mockRead = vi.fn().mockRejectedValueOnce(new Error('cannot read')); + const response = { headers: { has: () => { @@ -83,6 +111,19 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: mockRead, + cancel: async (_?: any) => { + // noop + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => Promise.reject('cannot read'), } as unknown as Response; @@ -105,6 +146,12 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { }); it('works with body that times out', async () => { + const encoder = new TextEncoder(); + const mockRead = vi.fn(); + + let cancelled = false; + let cancellReason = ''; + const response = { headers: { has: () => { @@ -114,6 +161,38 @@ describe('Unit | coreHandlers | util | fetchUtils', () => { return undefined; }, }, + body: { + getReader: () => { + return { + read: async () => { + if (cancelled) { + mockRead.mockRejectedValue(new Error(cancellReason)); + } else { + mockRead.mockResolvedValueOnce({ + value: encoder.encode('chunk'), + done: false, + }); + } + + await new Promise(res => { + setTimeout(() => { + res(1); + }, 200); + }); + + // eslint-disable-next-line no-return-await + return await mockRead(); + }, + cancel: async (reason?: any) => { + cancelled = true; + cancellReason = reason; + }, + releaseLock: async () => { + // noop + }, + }; + }, + }, clone: () => response, text: () => new Promise(resolve => setTimeout(() => resolve('text body'), 1000)), } as unknown as Response;