Skip to content

Commit be4e4e2

Browse files
authored
feat(event-handler): expose response streaming in public API (#4743)
1 parent f0677d4 commit be4e4e2

File tree

5 files changed

+192
-110
lines changed

5 files changed

+192
-110
lines changed

packages/event-handler/src/rest/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ export {
2525
isAPIGatewayProxyEventV2,
2626
isExtendedAPIGatewayProxyResult,
2727
isHttpMethod,
28+
streamify,
2829
} from './utils.js';

packages/event-handler/src/rest/utils.ts

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ import {
44
isRegExp,
55
isString,
66
} from '@aws-lambda-powertools/commons/typeutils';
7-
import type { APIGatewayProxyEvent, APIGatewayProxyEventV2 } from 'aws-lambda';
7+
import type {
8+
APIGatewayProxyEvent,
9+
APIGatewayProxyEventV2,
10+
StreamifyHandler,
11+
} from 'aws-lambda';
12+
import type { Router } from '../rest/Router.js';
13+
import type { ResolveOptions } from '../types/index.js';
814
import type {
915
CompiledRoute,
1016
CompressionOptions,
@@ -385,3 +391,76 @@ export const getStatusCode = (
385391
}
386392
return fallback;
387393
};
394+
395+
const streamifyResponse =
396+
globalThis.awslambda?.streamifyResponse ??
397+
(<TEvent = unknown, TResult = void>(
398+
handler: StreamifyHandler<TEvent, TResult>
399+
): StreamifyHandler<TEvent, TResult> => {
400+
return (async (event, responseStream, context) => {
401+
await handler(event, responseStream, context);
402+
403+
if ('chunks' in responseStream && Array.isArray(responseStream.chunks)) {
404+
const output = Buffer.concat(responseStream.chunks as Buffer[]);
405+
const nullBytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0]);
406+
const separatorIndex = output.indexOf(nullBytes);
407+
408+
const preludeBuffer = output.subarray(0, separatorIndex);
409+
const bodyBuffer = output.subarray(separatorIndex + 8);
410+
const prelude = JSON.parse(preludeBuffer.toString());
411+
412+
return {
413+
body: bodyBuffer.toString(),
414+
headers: prelude.headers,
415+
statusCode: prelude.statusCode,
416+
} as TResult;
417+
}
418+
}) as StreamifyHandler<TEvent, TResult>;
419+
});
420+
421+
/**
422+
* Wraps a Router instance to create a Lambda handler that uses response streaming.
423+
*
424+
* In Lambda runtime, uses `awslambda.streamifyResponse` to enable streaming responses.
425+
* In test/local environments, returns an unwrapped handler that works with mock streams.
426+
*
427+
* @param router - The Router instance to wrap
428+
* @param options - Optional configuration including scope for decorator binding
429+
* @returns A Lambda handler that streams responses
430+
*
431+
* @example
432+
* ```typescript
433+
* import { Router, streamify } from '@aws-lambda-powertools/event-handler/experimental-rest';
434+
*
435+
* const app = new Router();
436+
* app.get('/test', () => ({ message: 'Hello' }));
437+
*
438+
* export const handler = streamify(app);
439+
* ```
440+
*
441+
* @example
442+
* ```typescript
443+
* // With scope for decorators
444+
* class Lambda {
445+
* public scope = 'my-scope';
446+
*
447+
* @app.get('/test')
448+
* public getTest() {
449+
* return { message: `${this.scope}: success` };
450+
* }
451+
*
452+
* public handler = streamify(app, { scope: this });
453+
* }
454+
* ```
455+
*/
456+
export const streamify = (
457+
router: Router,
458+
options?: ResolveOptions
459+
): StreamifyHandler => {
460+
return streamifyResponse(async (event, responseStream, context) => {
461+
await router.resolveStream(event, context, {
462+
responseStream,
463+
scope: options?.scope,
464+
});
465+
});
466+
};

packages/event-handler/tests/unit/rest/Router/decorators.test.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,18 @@ import {
66
MethodNotAllowedError,
77
type NotFoundError,
88
Router,
9+
streamify,
910
UnauthorizedError,
1011
} from '../../../../src/rest/index.js';
1112
import type { RequestContext } from '../../../../src/types/rest.js';
1213
import {
1314
createHandler,
1415
createHandlerWithScope,
15-
createStreamHandler,
1616
createTestEvent,
1717
createTestEventV2,
1818
createTestLambdaClass,
1919
createTrackingMiddleware,
2020
MockResponseStream,
21-
parseStreamOutput,
2221
} from '../helpers.js';
2322

2423
describe.each([
@@ -481,7 +480,7 @@ describe.each([
481480
});
482481

483482
describe('streaming with decorators', () => {
484-
it('preserves scope when using resolveStream with decorators', async () => {
483+
it('preserves scope when using streamify with decorators', async () => {
485484
// Prepare
486485
const app = new Router();
487486

@@ -495,20 +494,23 @@ describe.each([
495494
};
496495
}
497496

498-
public handler = createStreamHandler(app, this);
497+
public handler = streamify(app, { scope: this });
499498
}
500499

501500
const lambda = new Lambda();
502501
const responseStream = new MockResponseStream();
503502
const handler = lambda.handler.bind(lambda);
504503

505504
// Act
506-
await handler(createTestEvent('/test', 'GET'), context, responseStream);
505+
const result = await handler(
506+
createTestEvent('/test', 'GET'),
507+
responseStream,
508+
context
509+
);
507510

508511
// Assess
509-
const { prelude, body } = parseStreamOutput(responseStream.chunks);
510-
expect(prelude.statusCode).toBe(200);
511-
expect(JSON.parse(body)).toEqual({
512+
expect(result.statusCode).toBe(200);
513+
expect(JSON.parse(result.body)).toEqual({
512514
message: 'streaming-scope: streaming success',
513515
});
514516
});
@@ -534,20 +536,23 @@ describe.each([
534536
throw new UnauthorizedError('UnauthorizedError!');
535537
}
536538

537-
public handler = createStreamHandler(app, this);
539+
public handler = streamify(app, { scope: this });
538540
}
539541

540542
const lambda = new Lambda();
541543
const responseStream = new MockResponseStream();
542544
const handler = lambda.handler.bind(lambda);
543545

544546
// Act
545-
await handler(createTestEvent('/test', 'GET'), context, responseStream);
547+
const result = await handler(
548+
createTestEvent('/test', 'GET'),
549+
responseStream,
550+
context
551+
);
546552

547553
// Assess
548-
const { prelude, body } = parseStreamOutput(responseStream.chunks);
549-
expect(prelude.statusCode).toBe(401);
550-
expect(JSON.parse(body)).toEqual({
554+
expect(result.statusCode).toBe(401);
555+
expect(JSON.parse(result.body)).toEqual({
551556
statusCode: HttpStatusCodes.UNAUTHORIZED,
552557
error: 'Unauthorized',
553558
message: 'error-scope: UnauthorizedError!',

0 commit comments

Comments
 (0)