Skip to content

Commit

Permalink
Initial Refactor of POC
Browse files Browse the repository at this point in the history
Jira ticket: CAMS-461

Co-authored-by: Fritz Madden <[email protected]>
Co-authored-by: James Brooks <[email protected]>
Co-authored-by: Brian Posey <[email protected]>,
  • Loading branch information
3 people committed Nov 15, 2024
1 parent e0412df commit 4b1bea7
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 108 deletions.
15 changes: 15 additions & 0 deletions backend/functions/poc/activity/getConsolidations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { InvocationContext } from '@azure/functions';
import { PredicateAndPage } from '../model';

async function getConsolidations(input: PredicateAndPage, context: InvocationContext) {
// Do some stuff
context.log('GetConsolidations', JSON.stringify(input));
return [
{ orderId: '53rs2', caseId: '071-23-012345' },
{ orderId: '426gh', caseId: '071-23-43215' },
];
}

export default {
handler: getConsolidations,
};
12 changes: 12 additions & 0 deletions backend/functions/poc/activity/getPageCount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { InvocationContext } from '@azure/functions';
import { PredicateAndPage } from '../model';

async function getPageCount(input: PredicateAndPage, context: InvocationContext) {
// Do some stuff
context.log('#################GetPageCount', JSON.stringify(input));
return 4;
}

export default {
handler: getPageCount,
};
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import * as df from 'durable-functions';
import { InvocationContext } from '@azure/functions';
import { AcmsConsolidation } from './model';
import { AcmsConsolidation } from '../model';
import { randomUUID } from 'crypto';

export default async function handler(input: AcmsConsolidation, context: InvocationContext) {
async function transformAndLoad(input: AcmsConsolidation, context: InvocationContext) {
// Do some stuff
context.log('#################Transform and load', JSON.stringify(input));
const newOrder = {
Expand All @@ -13,6 +12,6 @@ export default async function handler(input: AcmsConsolidation, context: Invocat
context.log(`Persisting ACMS consolidation ${newOrder.orderId} to CAMS ${newOrder.camsId}.`);
}

df.app.activity('transformAndLoad', {
handler,
});
export default {
handler: transformAndLoad,
};
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import * as df from 'durable-functions';
import { app, HttpRequest, HttpResponse, InvocationContext } from '@azure/functions';
import { HttpRequest, HttpResponse, InvocationContext } from '@azure/functions';
import { MAIN_ORCHESTRATOR } from '../loadConsolidations';

export default async function httpStart(
request: HttpRequest,
context: InvocationContext,
): Promise<HttpResponse> {
const client = df.getClient(context);
const body: unknown = await request.json();
const instanceId: string = await client.startNew('orchestrator', {
const instanceId: string = await client.startNew(MAIN_ORCHESTRATOR, {
input: body,
});

context.log(`Started orchestration with ID = '${instanceId}'.`);

return client.createCheckStatusResponse(request, instanceId);
}

app.http('dfClient', {
route: 'orchestrators/orchestrator',
extraInputs: [df.input.durableClient()],
handler: httpStart,
});
17 changes: 0 additions & 17 deletions backend/functions/poc/getConsolidations.ts

This file was deleted.

13 changes: 0 additions & 13 deletions backend/functions/poc/getPageCount.ts

This file was deleted.

30 changes: 30 additions & 0 deletions backend/functions/poc/loadConsolidations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import * as df from 'durable-functions';
import { app } from '@azure/functions';

import httpStart from './client/dfClient.function';
import { main } from './orchestration/orchestrator';
import { subOrchestratorETL } from './orchestration/sub-orchestrator-etl';
import { subOrchestratorPaging } from './orchestration/sub-orchestrator-paging';
import getConsolidations from './activity/getConsolidations';
import getPageCount from './activity/getPageCount';
import transformAndLoad from './activity/transformAndLoad';

export const SUB_ORCHESTRATOR_ETL = 'SubOrchestratorETL';
export const SUB_ORCHESTRATOR_PAGING = 'SubOrchestratorPaging';
export const MAIN_ORCHESTRATOR = 'orchestrator';
export const PAGE_COUNT_ACTIVITY = 'getPageCountFromACMS';
export const CONSOLIDATIONS_FROM_ACMS = 'getConsolidationsFromACMS';
export const TRANSFORM_AND_LOAD = 'transformAndLoad';

df.app.orchestration(MAIN_ORCHESTRATOR, main);
df.app.orchestration(SUB_ORCHESTRATOR_ETL, subOrchestratorETL);
df.app.orchestration(SUB_ORCHESTRATOR_PAGING, subOrchestratorPaging);
df.app.activity(CONSOLIDATIONS_FROM_ACMS, getConsolidations);
df.app.activity(PAGE_COUNT_ACTIVITY, getPageCount);
df.app.activity(TRANSFORM_AND_LOAD, transformAndLoad);

app.http('dfClient', {
route: 'orchestrators/orchestrator',
extraInputs: [df.input.durableClient()],
handler: httpStart,
});
1 change: 1 addition & 0 deletions backend/functions/poc/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type PredicateAndPage = Predicate & {
pageNumber: number;
};

// properties here are temporary. Need to figure out what this type should look like.
export type AcmsConsolidation = {
orderId: string;
caseId: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { Bounds } from './model';
import { Bounds } from '../model';
import { SUB_ORCHESTRATOR_PAGING } from '../loadConsolidations';
import { OrchestrationContext } from 'durable-functions';

import * as df from 'durable-functions';

df.app.orchestration('orchestrator', function* (context) {
export function* main(context: OrchestrationContext) {
const bounds: Bounds = context.df.getInput();

context.log('orchestrator', JSON.stringify(bounds));

const provisioningTasks = [];

const { divisionCodes, chapters, dateRange } = bounds;
// TODO: Add an activity to flatten the arrays
for (const divisionCode of divisionCodes) {
for (const chapter of chapters) {
const predicate = {
Expand All @@ -19,9 +18,9 @@ df.app.orchestration('orchestrator', function* (context) {
};
const child_id = context.df.instanceId + `:${divisionCode}:${chapter}:`;
provisioningTasks.push(
context.df.callSubOrchestrator('subOrchestratorPaging', predicate, child_id),
context.df.callSubOrchestrator(SUB_ORCHESTRATOR_PAGING, predicate, child_id),
);
}
}
yield context.df.Task.all(provisioningTasks);
});
}
23 changes: 23 additions & 0 deletions backend/functions/poc/orchestration/sub-orchestrator-etl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { CONSOLIDATIONS_FROM_ACMS, TRANSFORM_AND_LOAD } from '../loadConsolidations';
import { PredicateAndPage } from '../model';
import { OrchestrationContext } from 'durable-functions';

export function* subOrchestratorETL(context: OrchestrationContext) {
const predicateAndPage: PredicateAndPage = context.df.getInput();

const consolidatedOrdersPage = yield context.df.callActivity(
CONSOLIDATIONS_FROM_ACMS,
predicateAndPage,
);

const parallelTasks = [];
for (let i = 0; i < consolidatedOrdersPage.length; i++) {
parallelTasks.push(context.df.callActivity(TRANSFORM_AND_LOAD, consolidatedOrdersPage[i]));
}

yield context.df.Task.all(parallelTasks);

// DO we need to fan in??
// const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
// yield context.df.callActivity('finalResults??', sum);
}
22 changes: 22 additions & 0 deletions backend/functions/poc/orchestration/sub-orchestrator-paging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Predicate, PredicateAndPage } from '../model';
import { OrchestrationContext } from 'durable-functions';
import { PAGE_COUNT_ACTIVITY, SUB_ORCHESTRATOR_ETL } from '../loadConsolidations';

export function* subOrchestratorPaging(context: OrchestrationContext) {
const predicate: Predicate = context.df.getInput();

const pageCount = yield context.df.callActivity(PAGE_COUNT_ACTIVITY, predicate);
const provisioningTasks = [];
for (let pageNumber = 0; pageNumber < pageCount; pageNumber++) {
const predicateAndPage: PredicateAndPage = {
...predicate,
pageNumber,
};
const child_id = context.df.instanceId + `:${pageNumber}`;
provisioningTasks.push(
context.df.callSubOrchestrator(SUB_ORCHESTRATOR_ETL, predicateAndPage, child_id),
);
}

yield context.df.Task.all(provisioningTasks);
}
30 changes: 0 additions & 30 deletions backend/functions/poc/sub-orchestrator-etl.ts

This file was deleted.

26 changes: 0 additions & 26 deletions backend/functions/poc/sub-orchestrator-paging.ts

This file was deleted.

0 comments on commit 4b1bea7

Please sign in to comment.