Skip to content

Commit

Permalink
WIP - POC for durable functions
Browse files Browse the repository at this point in the history
Jira ticket: CAMS-461

Co-authored-by: Fritz Madden <[email protected]>
Co-authored-by: James Brooks <[email protected]>
Co-authored-by: Brian Posey <[email protected]>,
  • Loading branch information
3 people committed Nov 14, 2024
1 parent 7eb3735 commit bb8a3e5
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 0 deletions.
90 changes: 90 additions & 0 deletions backend/functions/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/functions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@okta/okta-sdk-nodejs": "^7.1.1",
"applicationinsights": "^3.4.0",
"dotenv": "^16.4.5",
"durable-functions": "^3.1.0",
"jsonwebtoken": "^9.0.2",
"mongodb": "^6.10.0",
"mssql": "^10.0.4"
Expand Down
23 changes: 23 additions & 0 deletions backend/functions/poc/dfClient.function.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import * as df from 'durable-functions';
import { app, HttpRequest, HttpResponse, InvocationContext } from '@azure/functions';

export default async function httpStart(
request: HttpRequest,
context: InvocationContext,
): Promise<HttpResponse> {
const client = df.getClient(context);
const body: unknown = await request.json();
const instanceId: string = await client.startNew('orchestrator', {
input: body,
});

context.log(`Started orchestration with ID = '${instanceId}'.`);

return client.createCheckStatusResponse(request, instanceId);
}

app.http('dfClient', {
route: 'orchestrators/orchestrator',
extraInputs: [df.input.durableClient()],
handler: httpStart,
});
13 changes: 13 additions & 0 deletions backend/functions/poc/getConsolidations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import * as df from 'durable-functions';
import { InvocationContext } from '@azure/functions';
import { PredicateAndPage } from './model';

export default async function handler(input: PredicateAndPage, context: InvocationContext) {
// Do some stuff
context.log('GetConsolidations', JSON.stringify(input));
}

df.app.activity('getConsolidationsFromACMS', {
// extraOutputs: [blobOutput],
handler,
});
13 changes: 13 additions & 0 deletions backend/functions/poc/getPageCount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import * as df from 'durable-functions';
import { InvocationContext } from '@azure/functions';
import { PredicateAndPage } from './model';

export default async function handler(input: PredicateAndPage, context: InvocationContext) {
// Do some stuff
context.log('GetPageCount', JSON.stringify(input));
return 4;
}

df.app.activity('getPageCountFromACMS', {
handler,
});
15 changes: 15 additions & 0 deletions backend/functions/poc/model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export type Bounds = {
divisionCodes: string[];
chapters: string[];
dateRange: [string, string];
};

export type Predicate = {
divisionCode: string;
chapter: string;
dateRange: [string, string];
};

export type PredicateAndPage = Predicate & {
pageNumber: number;
};
27 changes: 27 additions & 0 deletions backend/functions/poc/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Bounds } from './model';

import * as df from 'durable-functions';

df.app.orchestration('orchestrator', function* (context) {
const bounds: Bounds = context.df.getInput();

context.log('orchestrator', JSON.stringify(bounds));

const provisioningTasks = [];

const { divisionCodes, chapters, dateRange } = bounds;
for (const divisionCode of divisionCodes) {
for (const chapter of chapters) {
const predicate = {
divisionCode,
chapter,
dateRange,
};
const child_id = context.df.instanceId + `:${divisionCode}:${chapter}:`;
provisioningTasks.push(
context.df.callSubOrchestrator('subOrchestrator', predicate, child_id),
);
}
}
yield context.df.Task.all(provisioningTasks);
});
25 changes: 25 additions & 0 deletions backend/functions/poc/sub-orchestrator-etl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PredicateAndPage } from './model';

import * as df from 'durable-functions';

df.app.orchestration('subOrchestratorETL', function* (context) {
const predicateAndPage: PredicateAndPage = context.df.getInput();

context.log('subOrchestratorETL', JSON.stringify(predicateAndPage));

const consolidatedOrdersPage = yield context.df.callActivity(
'getConsolidationsFromACMS',
predicateAndPage,
);

const parallelTasks = [];
for (let i = 0; i < consolidatedOrdersPage.length; i++) {
parallelTasks.push(context.df.callActivity('transformAndLoad', consolidatedOrdersPage[i]));
}

yield context.df.Task.all(parallelTasks);

// DO we need to fan in??
// const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
// yield context.df.callActivity('finalResults??', sum);
});
25 changes: 25 additions & 0 deletions backend/functions/poc/sub-orchestrator-paging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Predicate, PredicateAndPage } from './model';

import * as df from 'durable-functions';

df.app.orchestration('subOrchestratorPaging', function* (context) {
const predicate: Predicate = context.df.getInput();

context.log('subOrchestratorPaging', JSON.stringify(predicate));

const pageCount = yield context.df.callActivity('getPageCountFromACMS', predicate);

const provisioningTasks = [];
for (let pageNumber = 0; pageNumber < pageCount; pageNumber++) {
const predicateAndPage: PredicateAndPage = {
...predicate,
pageNumber,
};
const child_id = context.df.instanceId + `:${pageNumber}`;
provisioningTasks.push(
context.df.callSubOrchestrator('subOrchestrator', predicateAndPage, child_id),
);
}

yield context.df.Task.all(provisioningTasks);
});

0 comments on commit bb8a3e5

Please sign in to comment.