Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Option worker definition optionally as string reference. #417

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions docs/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,26 @@ constructs:
handler: src/worker.handler
```

_Note: the Lambda "worker" function is configured in the `queue` construct, instead of being defined in the `functions` section._
or

The only required setting is the `handler`: this should point to the code that handles SQS messages. The handler [should be written to handle SQS events](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html), for example in JavaScript:
```yaml
function:
my-worker:
handler: src/worker.handler

constructs:
my-queue:
type: queue
# References the my-worker function defined in the function section
worker: my-worker
```

It is both possible to reference an existing function as `worker` or specify the entire worker as an object.

By defining the Lambda "worker" as an object, a new function is configured in the `queue` construct.
For the function object, the only required setting is the `handler`: this should point to the code that handles SQS messages.

Whether the handler is referenced, or created, it [should be written to handle SQS events](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html), for example in JavaScript:

```js
exports.handler = async function (event, context) {
Expand All @@ -226,7 +243,7 @@ exports.handler = async function (event, context) {
}
```

[All settings allowed for functions](https://www.serverless.com/framework/docs/providers/aws/guide/functions/) can be used under the `worker` key. For example:
When defined as an object, [all settings allowed for functions](https://www.serverless.com/framework/docs/providers/aws/guide/functions/) can be used under the `worker` key. For example:

```yaml
constructs:
Expand Down
50 changes: 38 additions & 12 deletions src/constructs/aws/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ const QUEUE_DEFINITION = {
properties: {
type: { const: "queue" },
worker: {
type: "object",
properties: {
timeout: { type: "number" },
},
additionalProperties: true,
oneOf: [
{ type: "string" },
{
type: "object",
properties: {
timeout: { type: "number" },
},
additionalProperties: true,
},
] as const,
},
maxRetries: { type: "number" },
alarm: { type: "string" },
Expand Down Expand Up @@ -127,6 +132,7 @@ export class Queue extends AwsConstruct {
private readonly queueArnOutput: CfnOutput;
private readonly queueUrlOutput: CfnOutput;
private readonly dlqUrlOutput: CfnOutput;
private readonly workerName: string;

constructor(
scope: CdkConstruct,
Expand All @@ -146,8 +152,24 @@ export class Queue extends AwsConstruct {
);
}

let functionConfig: number | undefined;
if (typeof configuration.worker === "string") {
this.workerName = configuration.worker;
const slsFunction = provider.getFunction(this.workerName);
if (!slsFunction) {
throw new ServerlessError(
`Invalid configuration in 'constructs.${this.id}': 'workerRef' needs to point to an existing function.`,
"LIFT_INVALID_CONSTRUCT_CONFIGURATION"
);
}
functionConfig = slsFunction.timeout;
} else {
this.workerName = `${this.id}Worker`;
functionConfig = configuration.worker.timeout;
}

// The default function timeout is 6 seconds in the Serverless Framework
const functionTimeout = configuration.worker.timeout ?? 6;
const functionTimeout = functionConfig ?? 6;

// This should be 6 times the lambda function's timeout + MaximumBatchingWindowInSeconds
// See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Expand Down Expand Up @@ -326,10 +348,15 @@ export class Queue extends AwsConstruct {
const maximumBatchingWindow = this.getMaximumBatchingWindow();
const maximumConcurrency = this.configuration.maxConcurrency;

// Override events for the worker
this.configuration.worker.events = [
// Subscribe the worker to the SQS queue
{
if (typeof this.configuration.worker !== "string") {
// Add the worker, if it is not a reference.
this.provider.addFunction(this.workerName, this.configuration.worker);
}

// Subscribe the worker to the SQS queue
this.provider.addFunctionEvent({
functionName: this.workerName,
event: {
sqs: {
arn: this.queue.queueArn,
batchSize: batchSize,
Expand All @@ -338,8 +365,7 @@ export class Queue extends AwsConstruct {
functionResponseType: "ReportBatchItemFailures",
},
},
];
this.provider.addFunction(`${this.id}Worker`, this.configuration.worker);
});
}

private async getQueueUrl(): Promise<string | undefined> {
Expand Down
33 changes: 33 additions & 0 deletions src/providers/AwsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ const AWS_DEFINITION = {
additionalProperties: false,
} as const;

type ValueType<T extends { [key: string | number | symbol]: unknown }> = T extends {
[key: string | number | symbol]: infer V;
}
? V
: never;

type ArrayType<T extends Array<unknown>> = T extends Array<infer V> ? V : never;

export class AwsProvider implements ProviderInterface {
public static type = "aws";
public static schema = AWS_DEFINITION;
Expand Down Expand Up @@ -112,6 +120,31 @@ export class AwsProvider implements ProviderInterface {
this.serverless.service.setFunctionNames(this.serverless.processedInput.options);
}

getFunction(functionName: string) {
if (!this.serverless.service.functions) {
return null;
}

return this.serverless.service.functions[functionName];
}

addFunctionEvent({
functionName,
event,
}: {
functionName: string;
event: ArrayType<Required<ValueType<Required<Serverless["service"]>["functions"]>>["events"]>;
}) {
const slsFunction = this.getFunction(functionName);
if (!slsFunction) {
throw new Error(`Serverless function ${functionName} doesn't exit, can not add an event.`);
}
if (!slsFunction.events) {
slsFunction.events = [];
}
slsFunction.events.push(event);
}

/**
* @internal
*/
Expand Down
16 changes: 16 additions & 0 deletions test/fixtures/queuesWorkerRef/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
service: app
configValidationMode: error

provider:
name: aws
# To avoid versions with random names (easier diffs)
versionFunctions: false

functions:
foo:
handler: worker.handler

constructs:
emails:
type: queue
worker: foo
Empty file.
28 changes: 28 additions & 0 deletions test/unit/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,4 +750,32 @@ describe("queues", () => {
);
}
});
it("should use a function if the function is defined", async () => {
const awsMock = mockAws();
sinon.stub(CloudFormationHelpers, "getStackOutput").resolves("queue-url");
const sendSpy = awsMock.mockService("SQS", "sendMessage").resolves();

await runServerless({
fixture: "queuesWorkerRef",
configExt: merge({}, pluginConfigExt, {
constructs: {
emails: {
fifo: true,
},
},
}),
command: "emails:send",
options: {
body: "Message body",
"group-id": "123",
},
});

expect(sendSpy.callCount).toBe(1);
expect(sendSpy.firstCall.firstArg).toStrictEqual({
QueueUrl: "queue-url",
MessageGroupId: "123",
MessageBody: "Message body",
});
});
});
Loading