Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private DurableConfig(Builder builder) {
this.pollingStrategy =
builder.pollingStrategy != null ? builder.pollingStrategy : PollingStrategies.Presets.DEFAULT;
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);

validateConfiguration();
}

/**
Expand Down Expand Up @@ -166,6 +168,18 @@ public Duration getCheckpointDelay() {
return checkpointDelay;
}

public void validateConfiguration() {
if (getDurableExecutionClient() == null) {
throw new IllegalStateException("DurableExecutionClient configuration failed");
}
if (getSerDes() == null) {
throw new IllegalStateException("SerDes configuration failed");
}
if (getExecutorService() == null) {
throw new IllegalStateException("ExecutorService configuration failed");
}
}

/**
* Creates a default DurableExecutionClient with production LambdaClient. Uses
* EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to
Expand Down Expand Up @@ -317,7 +331,7 @@ public Builder withSerDes(SerDes serDes) {
* will be created.
*
* <p>This executor is used exclusively for running user-defined operations. Internal SDK coordination (polling,
* checkpointing) uses the common ForkJoinPool and is not affected by this setting.
* checkpointing) uses the SDK InternalExecutor thread pool and is not affected by this setting.
*
* @param executorService Custom ExecutorService instance
* @return This builder
Expand Down Expand Up @@ -351,8 +365,8 @@ public Builder withPollingStrategy(PollingStrategy pollingStrategy) {
}

/**
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
* batching.
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, SDK will checkpoint the
* updates as soon as possible.
*
* @param duration the checkpoint delay in Duration
* @return This builder
Expand Down
110 changes: 12 additions & 98 deletions sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,14 @@

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.serde.AwsSdkV2Module;
import software.amazon.lambda.durable.serde.DurableInputOutputSerDes;

/**
* Abstract base class for Lambda handlers that use durable execution.
Expand All @@ -46,7 +27,7 @@ public abstract class DurableHandler<I, O> implements RequestStreamHandler {

private final TypeToken<I> inputType;
private final DurableConfig config;
private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
private final DurableInputOutputSerDes serDes = new DurableInputOutputSerDes(); // Internal ObjectMapper
private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class);

protected DurableHandler() {
Expand All @@ -58,7 +39,6 @@ protected DurableHandler() {
throw new IllegalArgumentException("Cannot determine input type parameter");
}
this.config = createConfiguration();
validateConfiguration();
}

/**
Expand Down Expand Up @@ -142,26 +122,22 @@ protected DurableConfig createConfiguration() {
return DurableConfig.defaultConfig();
}

private void validateConfiguration() {
if (config.getDurableExecutionClient() == null) {
throw new IllegalStateException("DurableExecutionClient configuration failed");
}
if (config.getSerDes() == null) {
throw new IllegalStateException("SerDes configuration failed");
}
if (config.getExecutorService() == null) {
throw new IllegalStateException("ExecutorService configuration failed");
}
}

/**
* Reads the request, executes the durable function handler and writes the response
*
* @param inputStream the input stream
* @param outputStream the output stream
* @param context the Lambda context
* @throws IOException thrown when serialize/deserialize fails
*/
@Override
public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
throws IOException {
var inputString = new String(inputStream.readAllBytes());
logger.debug("Raw input from durable handler: {}", inputString);
var input = this.objectMapper.readValue(inputString, DurableExecutionInput.class);
var input = serDes.deserialize(inputString, TypeToken.get(DurableExecutionInput.class));
var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config);
outputStream.write(objectMapper.writeValueAsBytes(output));
outputStream.write(serDes.serialize(output).getBytes());
}

/**
Expand All @@ -172,66 +148,4 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre
* @return Result
*/
public abstract O handleRequest(I input, DurableContext context);

/**
* Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
* Lambda Durable Functions backend protocol.
*
* <p>Customer-facing serialization uses SerDes from DurableConfig.
*
* @return Configured ObjectMapper for durable backend communication
*/
public static ObjectMapper createObjectMapper() {
var dateModule = new SimpleModule();
dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
@Override
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
// Timestamp is a double value represent seconds since epoch.
var timestamp = jsonParser.getDoubleValue();
// Date expects milliseconds since epoch, so multiply by 1000.
return new Date((long) (timestamp * 1000));
}
});
dateModule.addSerializer(Date.class, new JsonSerializer<>() {
@Override
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException {
// Timestamp should be a double value representing seconds since epoch, so
// convert from milliseconds.
double timestamp = date.getTime() / 1000.0;
jsonGenerator.writeNumber(timestamp);
}
});

// Needed for deserialization of timestamps for some SDK v2 objects
dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
.toFormatter();

@Override
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
return Instant.ofEpochMilli(jsonParser.getLongValue());
}
var timestampStr = jsonParser.getValueAsString();
return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
}
});

return JsonMapper.builder()
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Looks pretty, and probably needed for tests to be deterministic.
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
// Data passed over the wire from the backend is UpperCamelCase
.propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
.addModule(new JavaTimeModule())
.addModule(dateModule)
.addModule(new AwsSdkV2Module())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.serde;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.lang.reflect.Type;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.util.ExceptionHelper;

/**
* Serializer/Deserializer for Durable Execution Input and Output objects. This is for INTERNAL use only - handles
* Lambda Durable Functions backend protocol.
*
* <p>Customer-facing serialization uses SerDes from DurableConfig.
*/
public class DurableInputOutputSerDes implements SerDes {
private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
private final TypeFactory typeFactory = objectMapper.getTypeFactory();
private final Map<Type, JavaType> typeCache = new ConcurrentHashMap<>();

/**
* Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
* Lambda Durable Functions backend protocol.
*
* <p>Customer-facing serialization uses SerDes from DurableConfig.
*
* @return Configured ObjectMapper for durable backend communication
*/
static ObjectMapper createObjectMapper() {
var dateModule = new SimpleModule();
dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
@Override
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
// Timestamp is a double value represent seconds since epoch.
var timestamp = jsonParser.getDoubleValue();
// Date expects milliseconds since epoch, so multiply by 1000.
return new Date((long) (timestamp * 1000));
}
});
dateModule.addSerializer(Date.class, new JsonSerializer<>() {
@Override
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException {
// Timestamp should be a double value representing seconds since epoch, so
// convert from milliseconds.
double timestamp = date.getTime() / 1000.0;
jsonGenerator.writeNumber(timestamp);
}
});

// Needed for deserialization of timestamps for some SDK v2 objects
dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
.toFormatter();

@Override
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
return Instant.ofEpochMilli(jsonParser.getLongValue());
}
var timestampStr = jsonParser.getValueAsString();
return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
}
});

return JsonMapper.builder()
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Looks pretty, and probably needed for tests to be deterministic.
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
// Data passed over the wire from the backend is UpperCamelCase
.propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
.addModule(new JavaTimeModule())
.addModule(dateModule)
.addModule(new AwsSdkV2Module())
.build();
}

/**
* Serializes an object to a JSON string.
*
* @param value the object to serialize
* @return the JSON string representation, or null if value is null
*/
@Override
public String serialize(Object value) {
if (value == null) {
return null;
}
try {
return objectMapper.writeValueAsString(value);
} catch (IOException e) {
ExceptionHelper.sneakyThrow(e);
return null;
}
}

/**
* Deserializes a JSON string to DurableExecutionInput object
*
* @param data the JSON string to deserialize
* @param typeToken the type token of DurableExecutionInput
* @return the deserialized object, or null if data is null
*/
@Override
public <T> T deserialize(String data, TypeToken<T> typeToken) {
if (data == null) {
return null;
}
try {
JavaType javaType = typeCache.computeIfAbsent(typeToken.getType(), typeFactory::constructType);
return objectMapper.readValue(data, javaType);
} catch (IOException e) {
ExceptionHelper.sneakyThrow(e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ public class JacksonSerDes implements SerDes {

/** Creates a new JacksonSerDes with default ObjectMapper configuration. */
public JacksonSerDes() {
this.mapper = new ObjectMapper()
this(new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES));
}

/** Creates a new JacksonSerDes with a custom ObjectMapper configuration. */
public JacksonSerDes(ObjectMapper objectMapper) {
this.mapper = objectMapper;
this.typeFactory = mapper.getTypeFactory();
this.typeCache = new ConcurrentHashMap<>();
}
Expand Down
Loading
Loading