Utilities for building and parsing Cumulus workflow messages.
npm install @cumulus/message
- Build
Utility functions for building Cumulus messages
- Executions
Utility functions for generating execution information or parsing execution information from a Cumulus message
- Granules
Utility functions for parsing granule information from a Cumulus message
- Queue
Utility functions for parsing queue information from a Cumulus message
- getMessageAsyncOperationId(message) ⇒
undefined
|string
⏏ Get the async operation ID from a workflow message, if any.
- constructCollectionId(name, version) ⇒
string
⏏ Returns the collection ID.
- deconstructCollectionId(collectionId) ⇒ ⏏
Returns the name and version of a collection based on the collectionId used in elasticsearch indexing
- getCollectionNameAndVersionFromMessage(message) ⇒
CollectionInfo
|undefined
⏏ Get collection name and version from execution message.
- getCollectionIdFromMessage(message) ⇒
string
|undefined
⏏ Get collection ID from execution message.
- isCumulusMessageLike()
Bare check for CumulusMessage Shape
- isDLQRecordLike()
Bare check for SQS message Shape
- unwrapDeadLetterCumulusMessage()
Unwrap dead letter Cumulus message, which may be wrapped in a States cloudwatch event, which is wrapped in an SQS message.
- extractSQSMetadata(message) ⇒
peel out metadata from an SQS(/DLQ)record
- hoistCumulusMessageDetails()
Reformat object with key attributes at top level.
- getMessagePdr(message) ⇒
undefined
|Object
⏏ Get the PDR object from a workflow message, if any.
- messageHasPdr(message) ⇒
boolean
⏏ Determine if message has a PDR.
- getMessagePdrPANSent(message) ⇒
boolean
⏏ Get the PAN sent status from a workflow message, if any.
- getMessagePdrPANMessage(message) ⇒
string
⏏ Get the PAN message status from a workflow message, if any.
- getMessagePdrName(message) ⇒
string
⏏ Get the PDR name from a workflow message, if any.
- getMessagePdrRunningExecutions(message) ⇒
number
⏏ Get the number of running executions for a PDR, if any.
- getMessagePdrCompletedExecutions(message) ⇒
number
⏏ Get the number of completed executions for a PDR, if any.
- getMessagePdrFailedExecutions(message) ⇒
number
⏏ Get the number of failed executions for a PDR, if any.
- getMessagePdrStats(message) ⇒
PdrStats
⏏ Get the PDR stats from a workflow message, if any.
- getPdrPercentCompletion(stats) ⇒
number
⏏ Get the percent completion of PDR executions
- generatePdrApiRecordFromMessage(message, [updatedAt]) ⇒
ApiPdr
⏏ Generate a PDR record for the API from the message.
- getMessageProvider(message) ⇒
MessageProvider
|string
⏏ Get the provider from a workflow message, if any.
- getMessageProviderId(message) ⇒
undefined
|string
⏏ Get the provider ID from a workflow message, if any.
- pullStepFunctionEvent(event) ⇒
Promise.<object>
⏏ Given a Step Function event, replace specified key in event with contents of S3 remote message
- parseStepMessage(stepMessage, stepName) ⇒
Promise.<object>
⏏ Parse step message with CMA keys and replace specified key in event with contents of S3 remote message
- getFailedStepName(events, failedStepEvent) ⇒
string
Searches the Execution step History for the TaskStateEntered pertaining to the failed task Id. HistoryEvent ids are numbered sequentially, starting at one.
- lastFailedEventStep(events) ⇒
Array.<HistoryEvent>
|undefined
Finds all failed execution events and returns the last one in the list.
- getFailedExecutionMessage(inputCumulusMessage, getExecutionHistoryFunction) ⇒
Object
Get message to use for publishing failed execution notifications.
Try to get the input to the last failed step in the execution so we can update the status of any granules/PDRs that don't exist in the initial execution input.
Falls back to overall execution input.
- isFileExtensionMatched(granuleFile, extension) ⇒
boolean
Check if the file has the extension
- parseException(exception) ⇒
string
Ensures that the exception is returned as an object
- getMetaStatus(message) ⇒
Message.WorkflowStatus
|undefined
⏏ Get the status of a workflow message, if any.
- getMessageWorkflowTasks(message) ⇒
Object
|undefined
⏏ Get the workflow tasks in a workflow message, if any.
- getMessageWorkflowStartTime(message) ⇒
number
|undefined
⏏ Get the workflow start time, if any.
- getMessageWorkflowStopTime(message) ⇒
number
|undefined
⏏ Get the workflow stop time, if any.
- getMessageWorkflowName(message) ⇒
string
|undefined
⏏ Get the workflow name, if any.
- getWorkflowDuration(startTime, [stopTime]) ⇒
number
⏏ Get the workflow duration.
Utility functions for building Cumulus messages
Example
const Build = require('@cumulus/message/Build');
Build an SQS message from a workflow template for queueing executions.
Kind: Exported function
Returns: Message.CumulusMessage
- A Cumulus message object
Param | Type | Description |
---|---|---|
params | Object |
|
params.provider | Object |
A provider object |
params.collection | Object |
A collection object |
params.parentExecutionArn | string |
ARN for parent execution |
params.messageTemplate | Object |
Message template for the workflow |
params.payload | Object |
Payload for the workflow |
params.workflow | Object |
workflow name & arn object |
[params.asyncOperationId] | string |
Async operation ID |
[params.customCumulusMeta] | Object |
Custom data for message.cumulus_meta |
[params.customMeta] | Object |
Custom data for message.meta |
[params.executionNamePrefix] | string |
Prefix to apply to the name of the enqueued execution |
Utility functions for generating execution information or parsing execution information from a Cumulus message
Example
const Executions = require('@cumulus/message/Executions');
- Executions
- buildExecutionArn(stateMachineArn, executionName) ⇒
string
⏏ - getExecutionUrlFromArn(executionArn) ⇒
string
⏏ - getStateMachineArnFromExecutionArn(executionArn) ⇒
string
⏏ - getMessageExecutionName(message) ⇒
string
⏏ - getMessageStateMachineArn(message) ⇒
string
⏏ - getMessageExecutionArn(message) ⇒
null
|string
⏏ - getMessageExecutionParentArn(message) ⇒
undefined
|string
⏏ - getMessageCumulusVersion(message) ⇒
undefined
|string
⏏ - getMessageExecutionOriginalPayload(message) ⇒
unknown
|undefined
⏏ - getMessageExecutionFinalPayload(message) ⇒
unknown
|undefined
⏏ - generateExecutionApiRecordFromMessage(message, [updatedAt]) ⇒
ApiExecution
⏏ - global
- buildExecutionArn(stateMachineArn, executionName) ⇒
Build execution ARN from a state machine ARN and execution name
Kind: Exported function
Returns: string
- an execution ARN
Param | Type | Description |
---|---|---|
stateMachineArn | string |
state machine ARN |
executionName | string |
state machine's execution name |
Returns execution URL from an execution ARN.
Kind: Exported function
Returns: string
- returns AWS console URL for the execution
Param | Type | Description |
---|---|---|
executionArn | string |
an execution ARN |
Get state machine ARN from an execution ARN
Kind: Exported function
Returns: string
- a state machine ARN
Param | Type | Description |
---|---|---|
executionArn | string |
an execution ARN |
Get the execution name from a workflow message.
Kind: Exported function
Returns: string
- An execution name
Throws:
Error
if there is no execution name
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
Get the state machine ARN from a workflow message.
Kind: Exported function
Returns: string
- A state machine ARN
Throws:
Error
if there is not state machine ARN
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
Get the execution ARN from a workflow message.
Kind: Exported function
Returns: null
| string
- A state machine execution ARN
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
Get the parent execution ARN from a workflow message, if any.
Kind: Exported function
Returns: undefined
| string
- A state machine execution ARN
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
Get the Cumulus version from a workflow message, if any.
Kind: Exported function
Returns: undefined
| string
- The cumulus version
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
Get the workflow original payload, if any.
Kind: Exported function
Returns: unknown
| undefined
- The workflow original payload
Param | Type | Description |
---|---|---|
message | MessageWithPayload |
A workflow message object |
Get the workflow final payload, if any.
Kind: Exported function
Returns: unknown
| undefined
- The workflow final payload
Param | Type | Description |
---|---|---|
message | MessageWithPayload |
A workflow message object |
Generate an execution record for the API from the message.
Kind: Exported function
Returns: ApiExecution
- An execution API record
Param | Type | Description |
---|---|---|
message | MessageWithPayload |
A workflow message object |
[updatedAt] | string |
Optional updated timestamp to apply to record |
Generate a PDR record for the API from the message.
Kind: global method of Executions
Returns: ApiPdr
- An PDR API record
Param | Type | Description |
---|---|---|
message | MessageWithOptionalPayloadPdr |
A workflow message object |
[updatedAt] | string |
Optional updated timestamp to apply to record |
Utility functions for parsing granule information from a Cumulus message
Example
const Granules = require('@cumulus/message/Granules');
- Granules
- getMessageGranules(message) ⇒
Array.<Object>
|undefined
⏏ - messageHasGranules(message) ⇒
boolean
⏏ - getGranuleStatus(workflowStatus, granule) ⇒
string
⏏ - getGranuleQueryFields(message) ⇒
unknown
|undefined
⏏ - generateGranuleApiRecord(message) ⇒
Promise.<ApiGranule>
⏏
- getMessageGranules(message) ⇒
Get granules from payload?.granules of a workflow message.
Kind: Exported function
Returns: Array.<Object>
| undefined
- An array of granule objects, or
undefined if message.payload.granules
is not set
Param | Type | Description |
---|---|---|
message | MessageWithGranules |
A workflow message |
Determine if message has a granules object.
Kind: Exported function
Returns: boolean
- true if message has a granules object
Param | Type | Description |
---|---|---|
message | MessageWithOptionalGranules |
A workflow message object |
Determine the status of a granule.
Kind: Exported function
Returns: string
- The granule status
Param | Type | Description |
---|---|---|
workflowStatus | string |
The workflow status |
granule | MessageGranule |
A granule record conforming to the 'api' schema |
Get the query fields of a granule, if any
Kind: Exported function
Returns: unknown
| undefined
- The granule query fields, if any
Param | Type | Description |
---|---|---|
message | MessageWithGranules |
A workflow message |
Generate an API granule record
Kind: Exported function
Returns: Promise.<ApiGranule>
- The granule API record
Param | Type | Description |
---|---|---|
message | MessageWithGranules |
A workflow message |
Utility functions for parsing queue information from a Cumulus message
Example
const Queue = require('@cumulus/message/Queue');
- Queue
- getQueueUrl(message) ⇒
string
⏏ - getMaximumExecutions(message, queueUrl) ⇒
number
⏏ - hasQueueAndExecutionLimit(message) ⇒
boolean
⏏
- getQueueUrl(message) ⇒
Get the queue URL from a workflow message.
Kind: Exported function
Returns: string
- A queue URL
Param | Type | Description |
---|---|---|
message | MessageWithQueueInfo |
A workflow message object |
Get the maximum executions for a queue.
Kind: Exported function
Returns: number
- Count of the maximum executions for the queue
Throws:
Error
if no maximum executions can be found
Param | Type | Description |
---|---|---|
message | Message.CumulusMessage |
A workflow message object |
queueUrl | string |
A queue URL |
Determine if there is a queue and queue execution limit in the message.
Kind: Exported function
Returns: boolean
- True if there is a queue and execution limit.
Param | Type | Description |
---|---|---|
message | MessageWithQueueInfo |
A workflow message object |
Bare check for CumulusMessage Shape
Bare check for SQS message Shape
Unwrap dead letter Cumulus message, which may be wrapped in a States cloudwatch event, which is wrapped in an SQS message.
peel out metadata from an SQS(/DLQ)record
Kind: global function
Returns: the given message without its body
Param | Description |
---|---|
message | DLQ or SQS message |
Reformat object with key attributes at top level.
Searches the Execution step History for the TaskStateEntered pertaining to the failed task Id. HistoryEvent ids are numbered sequentially, starting at one.
Kind: global function
Returns: string
- name of the current stepfunction task or 'UnknownFailedStepName'.
Param | Type | Description |
---|---|---|
events | Array.<HistoryEvent> |
Step Function events array |
failedStepEvent | failedStepEvent |
Step Function's failed event. |
failedStepEvent.id | number (long), Step Functions failed event id. |
Finds all failed execution events and returns the last one in the list.
Kind: global function
Returns: Array.<HistoryEvent>
| undefined
- - the last lambda or activity that failed in the
event array, or an empty array.
Param | Type | Description |
---|---|---|
events | Array.<HistoryEvent> |
array of AWS Stepfunction execution HistoryEvents |
Get message to use for publishing failed execution notifications.
Try to get the input to the last failed step in the execution so we can update the status of any granules/PDRs that don't exist in the initial execution input.
Falls back to overall execution input.
Kind: global function
Returns: Object
- - CumulusMessage Execution step message or execution input message
Param | Type | Description |
---|---|---|
inputCumulusMessage | Object |
Workflow execution input message |
getExecutionHistoryFunction | function |
Testing override for mock/etc of StepFunctions.getExecutionHistory |
Check if the file has the extension
Kind: global function
Returns: boolean
- whether the file has the extension
Param | Type | Description |
---|---|---|
granuleFile | ApiFile |
Granule file |
extension | string |
File extension to check |
Ensures that the exception is returned as an object
Kind: global function
Returns: string
- an stringified exception
Param | Type | Description |
---|---|---|
exception | Object | undefined |
the exception |
Cumulus is a cloud-based data ingest, archive, distribution and management prototype for NASA's future Earth science data streams.
To make a contribution, please see our contributing guidelines.
Generated automatically using npm run build-docs