Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spec/waitForCondition.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface WaitForConditionWaitStrategy<T> {
```

- `state`: the current state returned by the check function
- `attempt`: 1-based attempt number (first check is attempt 1)
- `attempt`: 0-based attempt number (first check is attempt 0)
- Returns a `WaitForConditionDecision` indicating whether to continue or stop

### WaitForConditionDecision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public ErrorObject getError() {
return details != null ? details.error() : null;
}

/** Returns the current retry attempt number (1-based), defaulting to 1 if not available. */
/** Returns the current retry attempt number (0-based), defaulting to 0 if not available. */
public int getAttempt() {
var details = operation.stepDetails();
return details != null && details.attempt() != null ? details.attempt() : 1;
return details != null && details.attempt() != null ? details.attempt() : 0;
}
}
119 changes: 17 additions & 102 deletions sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,134 +3,49 @@
package software.amazon.lambda.durable;

import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.durable.execution.ExecutionManager;
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.logging.DurableLogger;

public abstract class BaseContext implements AutoCloseable {
private final ExecutionManager executionManager;
private final DurableConfig durableConfig;
private final Context lambdaContext;
private final ExecutionContext executionContext;
private final String contextId;
private final String contextName;
private final ThreadType threadType;

private boolean isReplaying;

/**
* Creates a new BaseContext instance.
*
* @param executionManager the execution manager for thread coordination and state management
* @param durableConfig the durable execution configuration
* @param lambdaContext the AWS Lambda runtime context
* @param contextId the context ID, null for root context, set for child contexts
* @param contextName the human-readable name for this context
* @param threadType the type of thread this context runs on
*/
protected BaseContext(
ExecutionManager executionManager,
DurableConfig durableConfig,
Context lambdaContext,
String contextId,
String contextName,
ThreadType threadType) {
this.executionManager = executionManager;
this.durableConfig = durableConfig;
this.lambdaContext = lambdaContext;
this.contextId = contextId;
this.contextName = contextName;
this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn());
this.isReplaying = executionManager.hasOperationsForContext(contextId);
this.threadType = threadType;

// write the thread id and type to thread local
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
}

// =============== accessors ================
public interface BaseContext extends AutoCloseable {
/**
* Gets a logger with additional information of the current execution context.
*
* @return a DurableLogger instance
*/
public abstract DurableLogger getLogger();
DurableLogger getLogger();

/**
* Returns the AWS Lambda runtime context.
*
* @return the Lambda context
*/
public Context getLambdaContext() {
return lambdaContext;
}
Context getLambdaContext();

/**
* Returns metadata about the current durable execution.
*
* <p>The execution context provides information that remains constant throughout the execution lifecycle, such as
* the durable execution ARN. This is useful for tracking execution progress, correlating logs, and referencing this
* execution in external systems.
* Returns the current durable execution arn
*
* @return the execution context
* @return the execution arn
*/
public ExecutionContext getExecutionContext() {
return executionContext;
}
String getExecutionArn();

/**
* Returns the configuration for durable execution behavior.
*
* @return the durable configuration
*/
public DurableConfig getDurableConfig() {
return durableConfig;
}
DurableConfig getDurableConfig();

// ============= internal utilities ===============

/** Gets the context ID for this context. Null for root context, set for child contexts. */
public String getContextId() {
return contextId;
}

public String getContextName() {
return contextName;
}
/**
* Gets the context ID for this context. Null for root context, operationId of the context operation for child
* contexts.
*/
String getContextId();

public ExecutionManager getExecutionManager() {
return executionManager;
}
/** Gets the context name for this context. Null for root context. */
String getContextName();

/** Returns whether this context is currently in replay mode. */
boolean isReplaying() {
return isReplaying;
}

/**
* Transitions this context from replay to execution mode. Called when the first un-cached operation is encountered.
*/
void setExecutionMode() {
this.isReplaying = false;
}
boolean isReplaying();

public void close() {
// this is called in the user thread, after the context's user code has completed
if (getContextId() != null) {
// if this is a child context or a step context, we need to
// deregister the context's thread from the execution manager
try {
executionManager.deregisterActiveThread(getContextId());
} catch (SuspendExecutionException e) {
// Expected when this is the last active thread. Must catch here because:
// 1/ This runs in a worker thread detached from handlerFuture
// 2/ Uncaught exception would prevent stepAsync().get() from resume
// Suspension/Termination is already signaled via
// suspendExecutionFuture/terminateExecutionFuture
// before the throw.
}
}
}
/** Closes this context. */
void close();
}
Loading
Loading