diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java index 664011cc..d30b6c47 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java @@ -92,6 +92,8 @@ private DurableConfig(Builder builder) { this.pollingStrategy = builder.pollingStrategy != null ? builder.pollingStrategy : PollingStrategies.Presets.DEFAULT; this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0); + + validateConfiguration(); } /** @@ -166,6 +168,18 @@ public Duration getCheckpointDelay() { return checkpointDelay; } + public void validateConfiguration() { + if (getDurableExecutionClient() == null) { + throw new IllegalStateException("DurableExecutionClient configuration failed"); + } + if (getSerDes() == null) { + throw new IllegalStateException("SerDes configuration failed"); + } + if (getExecutorService() == null) { + throw new IllegalStateException("ExecutorService configuration failed"); + } + } + /** * Creates a default DurableExecutionClient with production LambdaClient. Uses * EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to @@ -317,7 +331,7 @@ public Builder withSerDes(SerDes serDes) { * will be created. * *
This executor is used exclusively for running user-defined operations. Internal SDK coordination (polling, - * checkpointing) uses the common ForkJoinPool and is not affected by this setting. + * checkpointing) uses the SDK InternalExecutor thread pool and is not affected by this setting. * * @param executorService Custom ExecutorService instance * @return This builder @@ -351,8 +365,8 @@ public Builder withPollingStrategy(PollingStrategy pollingStrategy) { } /** - * Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint - * batching. + * Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, SDK will checkpoint the + * updates as soon as possible. * * @param duration the checkpoint delay in Duration * @return This builder diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java index 4f55a8ac..a9acc3b8 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java @@ -4,33 +4,14 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.MapperFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.PropertyNamingStrategies; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.ParameterizedType; -import java.time.Instant; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.lambda.durable.model.DurableExecutionInput; -import software.amazon.lambda.durable.serde.AwsSdkV2Module; +import software.amazon.lambda.durable.serde.DurableInputOutputSerDes; /** * Abstract base class for Lambda handlers that use durable execution. @@ -46,7 +27,7 @@ public abstract class DurableHandler implements RequestStreamHandler { private final TypeToken inputType; private final DurableConfig config; - private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper + private final DurableInputOutputSerDes serDes = new DurableInputOutputSerDes(); // Internal ObjectMapper private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class); protected DurableHandler() { @@ -58,7 +39,6 @@ protected DurableHandler() { throw new IllegalArgumentException("Cannot determine input type parameter"); } this.config = createConfiguration(); - validateConfiguration(); } /** @@ -142,26 +122,22 @@ protected DurableConfig createConfiguration() { return DurableConfig.defaultConfig(); } - private void validateConfiguration() { - if (config.getDurableExecutionClient() == null) { - throw new IllegalStateException("DurableExecutionClient configuration failed"); - } - if (config.getSerDes() == null) { - throw new IllegalStateException("SerDes configuration failed"); - } - if (config.getExecutorService() == null) { - throw new IllegalStateException("ExecutorService configuration failed"); - } - } - + /** + * Reads the request, executes the durable function handler and writes the response + * + * @param inputStream the input stream + * @param outputStream the output stream + * @param context the Lambda context + * @throws IOException thrown when serialize/deserialize fails + */ @Override public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { var inputString = new String(inputStream.readAllBytes()); logger.debug("Raw input from durable handler: {}", inputString); - var input = this.objectMapper.readValue(inputString, DurableExecutionInput.class); + var input = serDes.deserialize(inputString, TypeToken.get(DurableExecutionInput.class)); var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config); - outputStream.write(objectMapper.writeValueAsBytes(output)); + outputStream.write(serDes.serialize(output).getBytes()); } /** @@ -172,66 +148,4 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre * @return Result */ public abstract O handleRequest(I input, DurableContext context); - - /** - * Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles - * Lambda Durable Functions backend protocol. - * - *
Customer-facing serialization uses SerDes from DurableConfig. - * - * @return Configured ObjectMapper for durable backend communication - */ - public static ObjectMapper createObjectMapper() { - var dateModule = new SimpleModule(); - dateModule.addDeserializer(Date.class, new JsonDeserializer<>() { - @Override - public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException { - // Timestamp is a double value represent seconds since epoch. - var timestamp = jsonParser.getDoubleValue(); - // Date expects milliseconds since epoch, so multiply by 1000. - return new Date((long) (timestamp * 1000)); - } - }); - dateModule.addSerializer(Date.class, new JsonSerializer<>() { - @Override - public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - // Timestamp should be a double value representing seconds since epoch, so - // convert from milliseconds. - double timestamp = date.getTime() / 1000.0; - jsonGenerator.writeNumber(timestamp); - } - }); - - // Needed for deserialization of timestamps for some SDK v2 objects - dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() { - private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX") - .toFormatter(); - - @Override - public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException { - if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) { - return Instant.ofEpochMilli(jsonParser.getLongValue()); - } - var timestampStr = jsonParser.getValueAsString(); - return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr)); - } - }); - - return JsonMapper.builder() - .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - // Looks pretty, and probably needed for tests to be deterministic. - .enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY) - .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS) - // Data passed over the wire from the backend is UpperCamelCase - .propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE) - .addModule(new JavaTimeModule()) - .addModule(dateModule) - .addModule(new AwsSdkV2Module()) - .build(); - } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/serde/DurableInputOutputSerDes.java b/sdk/src/main/java/software/amazon/lambda/durable/serde/DurableInputOutputSerDes.java new file mode 100644 index 00000000..db89d693 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/serde/DurableInputOutputSerDes.java @@ -0,0 +1,145 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.serde; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.io.IOException; +import java.lang.reflect.Type; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.util.ExceptionHelper; + +/** + * Serializer/Deserializer for Durable Execution Input and Output objects. This is for INTERNAL use only - handles + * Lambda Durable Functions backend protocol. + * + *
Customer-facing serialization uses SerDes from DurableConfig.
+ */
+public class DurableInputOutputSerDes implements SerDes {
+ private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
+ private final TypeFactory typeFactory = objectMapper.getTypeFactory();
+ private final Map Customer-facing serialization uses SerDes from DurableConfig.
+ *
+ * @return Configured ObjectMapper for durable backend communication
+ */
+ static ObjectMapper createObjectMapper() {
+ var dateModule = new SimpleModule();
+ dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
+ @Override
+ public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+ throws IOException {
+ // Timestamp is a double value represent seconds since epoch.
+ var timestamp = jsonParser.getDoubleValue();
+ // Date expects milliseconds since epoch, so multiply by 1000.
+ return new Date((long) (timestamp * 1000));
+ }
+ });
+ dateModule.addSerializer(Date.class, new JsonSerializer<>() {
+ @Override
+ public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+ throws IOException {
+ // Timestamp should be a double value representing seconds since epoch, so
+ // convert from milliseconds.
+ double timestamp = date.getTime() / 1000.0;
+ jsonGenerator.writeNumber(timestamp);
+ }
+ });
+
+ // Needed for deserialization of timestamps for some SDK v2 objects
+ dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
+ .toFormatter();
+
+ @Override
+ public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+ throws IOException {
+ if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
+ return Instant.ofEpochMilli(jsonParser.getLongValue());
+ }
+ var timestampStr = jsonParser.getValueAsString();
+ return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
+ }
+ });
+
+ return JsonMapper.builder()
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ // Looks pretty, and probably needed for tests to be deterministic.
+ .enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
+ .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
+ // Data passed over the wire from the backend is UpperCamelCase
+ .propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
+ .addModule(new JavaTimeModule())
+ .addModule(dateModule)
+ .addModule(new AwsSdkV2Module())
+ .build();
+ }
+
+ /**
+ * Serializes an object to a JSON string.
+ *
+ * @param value the object to serialize
+ * @return the JSON string representation, or null if value is null
+ */
+ @Override
+ public String serialize(Object value) {
+ if (value == null) {
+ return null;
+ }
+ try {
+ return objectMapper.writeValueAsString(value);
+ } catch (IOException e) {
+ ExceptionHelper.sneakyThrow(e);
+ return null;
+ }
+ }
+
+ /**
+ * Deserializes a JSON string to DurableExecutionInput object
+ *
+ * @param data the JSON string to deserialize
+ * @param typeToken the type token of DurableExecutionInput
+ * @return the deserialized object, or null if data is null
+ */
+ @Override
+ public