diff --git a/backend/functions/poc/activity/getConsolidations.ts b/backend/functions/poc/activity/getConsolidations.ts new file mode 100644 index 0000000000..80404d37a3 --- /dev/null +++ b/backend/functions/poc/activity/getConsolidations.ts @@ -0,0 +1,15 @@ +import { InvocationContext } from '@azure/functions'; +import { PredicateAndPage } from '../model'; + +async function getConsolidations(input: PredicateAndPage, context: InvocationContext) { + // Do some stuff + context.log('GetConsolidations', JSON.stringify(input)); + return [ + { orderId: '53rs2', caseId: '071-23-012345' }, + { orderId: '426gh', caseId: '071-23-43215' }, + ]; +} + +export default { + handler: getConsolidations, +}; diff --git a/backend/functions/poc/activity/getPageCount.ts b/backend/functions/poc/activity/getPageCount.ts new file mode 100644 index 0000000000..0a2f45a02c --- /dev/null +++ b/backend/functions/poc/activity/getPageCount.ts @@ -0,0 +1,12 @@ +import { InvocationContext } from '@azure/functions'; +import { PredicateAndPage } from '../model'; + +async function getPageCount(input: PredicateAndPage, context: InvocationContext) { + // Do some stuff + context.log('#################GetPageCount', JSON.stringify(input)); + return 4; +} + +export default { + handler: getPageCount, +}; diff --git a/backend/functions/poc/transformAndLoad.ts b/backend/functions/poc/activity/transformAndLoad.ts similarity index 60% rename from backend/functions/poc/transformAndLoad.ts rename to backend/functions/poc/activity/transformAndLoad.ts index 0fbd028221..fedaf52b70 100644 --- a/backend/functions/poc/transformAndLoad.ts +++ b/backend/functions/poc/activity/transformAndLoad.ts @@ -1,9 +1,8 @@ -import * as df from 'durable-functions'; import { InvocationContext } from '@azure/functions'; -import { AcmsConsolidation } from './model'; +import { AcmsConsolidation } from '../model'; import { randomUUID } from 'crypto'; -export default async function handler(input: AcmsConsolidation, context: InvocationContext) { +async function transformAndLoad(input: AcmsConsolidation, context: InvocationContext) { // Do some stuff context.log('#################Transform and load', JSON.stringify(input)); const newOrder = { @@ -13,6 +12,6 @@ export default async function handler(input: AcmsConsolidation, context: Invocat context.log(`Persisting ACMS consolidation ${newOrder.orderId} to CAMS ${newOrder.camsId}.`); } -df.app.activity('transformAndLoad', { - handler, -}); +export default { + handler: transformAndLoad, +}; diff --git a/backend/functions/poc/dfClient.function.ts b/backend/functions/poc/client/dfClient.function.ts similarity index 58% rename from backend/functions/poc/dfClient.function.ts rename to backend/functions/poc/client/dfClient.function.ts index fa610d7182..4bd2e8d0dc 100644 --- a/backend/functions/poc/dfClient.function.ts +++ b/backend/functions/poc/client/dfClient.function.ts @@ -1,5 +1,6 @@ import * as df from 'durable-functions'; -import { app, HttpRequest, HttpResponse, InvocationContext } from '@azure/functions'; +import { HttpRequest, HttpResponse, InvocationContext } from '@azure/functions'; +import { MAIN_ORCHESTRATOR } from '../loadConsolidations'; export default async function httpStart( request: HttpRequest, @@ -7,7 +8,7 @@ export default async function httpStart( ): Promise { const client = df.getClient(context); const body: unknown = await request.json(); - const instanceId: string = await client.startNew('orchestrator', { + const instanceId: string = await client.startNew(MAIN_ORCHESTRATOR, { input: body, }); @@ -15,9 +16,3 @@ export default async function httpStart( return client.createCheckStatusResponse(request, instanceId); } - -app.http('dfClient', { - route: 'orchestrators/orchestrator', - extraInputs: [df.input.durableClient()], - handler: httpStart, -}); diff --git a/backend/functions/poc/getConsolidations.ts b/backend/functions/poc/getConsolidations.ts deleted file mode 100644 index dacdf52995..0000000000 --- a/backend/functions/poc/getConsolidations.ts +++ /dev/null @@ -1,17 +0,0 @@ -import * as df from 'durable-functions'; -import { InvocationContext } from '@azure/functions'; -import { PredicateAndPage } from './model'; - -export default async function handler(input: PredicateAndPage, context: InvocationContext) { - // Do some stuff - context.log('GetConsolidations', JSON.stringify(input)); - return [ - { orderId: '53rs2', caseId: '071-23-012345' }, - { orderId: '426gh', caseId: '071-23-43215' }, - ]; -} - -df.app.activity('getConsolidationsFromACMS', { - // extraOutputs: [blobOutput], - handler, -}); diff --git a/backend/functions/poc/getPageCount.ts b/backend/functions/poc/getPageCount.ts deleted file mode 100644 index 81290c8044..0000000000 --- a/backend/functions/poc/getPageCount.ts +++ /dev/null @@ -1,13 +0,0 @@ -import * as df from 'durable-functions'; -import { InvocationContext } from '@azure/functions'; -import { PredicateAndPage } from './model'; - -export default async function handler(input: PredicateAndPage, context: InvocationContext) { - // Do some stuff - context.log('#################GetPageCount', JSON.stringify(input)); - return 4; -} - -df.app.activity('getPageCountFromACMS', { - handler, -}); diff --git a/backend/functions/poc/loadConsolidations.ts b/backend/functions/poc/loadConsolidations.ts new file mode 100644 index 0000000000..4ad5ec7da0 --- /dev/null +++ b/backend/functions/poc/loadConsolidations.ts @@ -0,0 +1,30 @@ +import * as df from 'durable-functions'; +import { app } from '@azure/functions'; + +import httpStart from './client/dfClient.function'; +import { main } from './orchestration/orchestrator'; +import { subOrchestratorETL } from './orchestration/sub-orchestrator-etl'; +import { subOrchestratorPaging } from './orchestration/sub-orchestrator-paging'; +import getConsolidations from './activity/getConsolidations'; +import getPageCount from './activity/getPageCount'; +import transformAndLoad from './activity/transformAndLoad'; + +export const SUB_ORCHESTRATOR_ETL = 'SubOrchestratorETL'; +export const SUB_ORCHESTRATOR_PAGING = 'SubOrchestratorPaging'; +export const MAIN_ORCHESTRATOR = 'orchestrator'; +export const PAGE_COUNT_ACTIVITY = 'getPageCountFromACMS'; +export const CONSOLIDATIONS_FROM_ACMS = 'getConsolidationsFromACMS'; +export const TRANSFORM_AND_LOAD = 'transformAndLoad'; + +df.app.orchestration(MAIN_ORCHESTRATOR, main); +df.app.orchestration(SUB_ORCHESTRATOR_ETL, subOrchestratorETL); +df.app.orchestration(SUB_ORCHESTRATOR_PAGING, subOrchestratorPaging); +df.app.activity(CONSOLIDATIONS_FROM_ACMS, getConsolidations); +df.app.activity(PAGE_COUNT_ACTIVITY, getPageCount); +df.app.activity(TRANSFORM_AND_LOAD, transformAndLoad); + +app.http('dfClient', { + route: 'orchestrators/orchestrator', + extraInputs: [df.input.durableClient()], + handler: httpStart, +}); diff --git a/backend/functions/poc/model.ts b/backend/functions/poc/model.ts index f82079f144..39446224b2 100644 --- a/backend/functions/poc/model.ts +++ b/backend/functions/poc/model.ts @@ -14,6 +14,7 @@ export type PredicateAndPage = Predicate & { pageNumber: number; }; +// properties here are temporary. Need to figure out what this type should look like. export type AcmsConsolidation = { orderId: string; caseId: string; diff --git a/backend/functions/poc/orchestrator.ts b/backend/functions/poc/orchestration/orchestrator.ts similarity index 61% rename from backend/functions/poc/orchestrator.ts rename to backend/functions/poc/orchestration/orchestrator.ts index 2dec064184..060a1e698c 100644 --- a/backend/functions/poc/orchestrator.ts +++ b/backend/functions/poc/orchestration/orchestrator.ts @@ -1,15 +1,14 @@ -import { Bounds } from './model'; +import { Bounds } from '../model'; +import { SUB_ORCHESTRATOR_PAGING } from '../loadConsolidations'; +import { OrchestrationContext } from 'durable-functions'; -import * as df from 'durable-functions'; - -df.app.orchestration('orchestrator', function* (context) { +export function* main(context: OrchestrationContext) { const bounds: Bounds = context.df.getInput(); - context.log('orchestrator', JSON.stringify(bounds)); - const provisioningTasks = []; const { divisionCodes, chapters, dateRange } = bounds; + // TODO: Add an activity to flatten the arrays for (const divisionCode of divisionCodes) { for (const chapter of chapters) { const predicate = { @@ -19,9 +18,9 @@ df.app.orchestration('orchestrator', function* (context) { }; const child_id = context.df.instanceId + `:${divisionCode}:${chapter}:`; provisioningTasks.push( - context.df.callSubOrchestrator('subOrchestratorPaging', predicate, child_id), + context.df.callSubOrchestrator(SUB_ORCHESTRATOR_PAGING, predicate, child_id), ); } } yield context.df.Task.all(provisioningTasks); -}); +} diff --git a/backend/functions/poc/orchestration/sub-orchestrator-etl.ts b/backend/functions/poc/orchestration/sub-orchestrator-etl.ts new file mode 100644 index 0000000000..3dee221dc4 --- /dev/null +++ b/backend/functions/poc/orchestration/sub-orchestrator-etl.ts @@ -0,0 +1,23 @@ +import { CONSOLIDATIONS_FROM_ACMS, TRANSFORM_AND_LOAD } from '../loadConsolidations'; +import { PredicateAndPage } from '../model'; +import { OrchestrationContext } from 'durable-functions'; + +export function* subOrchestratorETL(context: OrchestrationContext) { + const predicateAndPage: PredicateAndPage = context.df.getInput(); + + const consolidatedOrdersPage = yield context.df.callActivity( + CONSOLIDATIONS_FROM_ACMS, + predicateAndPage, + ); + + const parallelTasks = []; + for (let i = 0; i < consolidatedOrdersPage.length; i++) { + parallelTasks.push(context.df.callActivity(TRANSFORM_AND_LOAD, consolidatedOrdersPage[i])); + } + + yield context.df.Task.all(parallelTasks); + + // DO we need to fan in?? + // const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0); + // yield context.df.callActivity('finalResults??', sum); +} diff --git a/backend/functions/poc/orchestration/sub-orchestrator-paging.ts b/backend/functions/poc/orchestration/sub-orchestrator-paging.ts new file mode 100644 index 0000000000..0d69584cc2 --- /dev/null +++ b/backend/functions/poc/orchestration/sub-orchestrator-paging.ts @@ -0,0 +1,22 @@ +import { Predicate, PredicateAndPage } from '../model'; +import { OrchestrationContext } from 'durable-functions'; +import { PAGE_COUNT_ACTIVITY, SUB_ORCHESTRATOR_ETL } from '../loadConsolidations'; + +export function* subOrchestratorPaging(context: OrchestrationContext) { + const predicate: Predicate = context.df.getInput(); + + const pageCount = yield context.df.callActivity(PAGE_COUNT_ACTIVITY, predicate); + const provisioningTasks = []; + for (let pageNumber = 0; pageNumber < pageCount; pageNumber++) { + const predicateAndPage: PredicateAndPage = { + ...predicate, + pageNumber, + }; + const child_id = context.df.instanceId + `:${pageNumber}`; + provisioningTasks.push( + context.df.callSubOrchestrator(SUB_ORCHESTRATOR_ETL, predicateAndPage, child_id), + ); + } + + yield context.df.Task.all(provisioningTasks); +} diff --git a/backend/functions/poc/sub-orchestrator-etl.ts b/backend/functions/poc/sub-orchestrator-etl.ts deleted file mode 100644 index 66f539df7d..0000000000 --- a/backend/functions/poc/sub-orchestrator-etl.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { PredicateAndPage } from './model'; - -import * as df from 'durable-functions'; - -df.app.orchestration('subOrchestratorETL', function* (context) { - const predicateAndPage: PredicateAndPage = context.df.getInput(); - - context.log( - '#################subOrchestratorETL:', - context.df.instanceId, - JSON.stringify(predicateAndPage), - ); - - const consolidatedOrdersPage = yield context.df.callActivity( - 'getConsolidationsFromACMS', - predicateAndPage, - ); - - context.log('#################length', consolidatedOrdersPage.length); - const parallelTasks = []; - for (let i = 0; i < consolidatedOrdersPage.length; i++) { - parallelTasks.push(context.df.callActivity('transformAndLoad', consolidatedOrdersPage[i])); - } - - yield context.df.Task.all(parallelTasks); - - // DO we need to fan in?? - // const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0); - // yield context.df.callActivity('finalResults??', sum); -}); diff --git a/backend/functions/poc/sub-orchestrator-paging.ts b/backend/functions/poc/sub-orchestrator-paging.ts deleted file mode 100644 index 3ed9f27739..0000000000 --- a/backend/functions/poc/sub-orchestrator-paging.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Predicate, PredicateAndPage } from './model'; - -import * as df from 'durable-functions'; - -df.app.orchestration('subOrchestratorPaging', function* (context) { - const predicate: Predicate = context.df.getInput(); - - context.log('#################subOrchestratorPaging', JSON.stringify(predicate)); - - const pageCount = yield context.df.callActivity('getPageCountFromACMS', predicate); - context.log('#################pageCount', pageCount); - const provisioningTasks = []; - for (let pageNumber = 0; pageNumber < pageCount; pageNumber++) { - const predicateAndPage: PredicateAndPage = { - ...predicate, - pageNumber, - }; - const child_id = context.df.instanceId + `:${pageNumber}`; - provisioningTasks.push( - context.df.callSubOrchestrator('subOrchestratorETL', predicateAndPage, child_id), - ); - } - - context.log('#################results', JSON.stringify(provisioningTasks)); - yield context.df.Task.all(provisioningTasks); -});