diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index c7a051bf83..0bb1bddf05 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -127,6 +127,8 @@ + + diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 5e08a766f9..ea38de3953 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -221,6 +221,7 @@ + @@ -392,6 +393,7 @@ + @@ -432,6 +434,7 @@ + diff --git a/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/CheckpointWithRedis.csproj b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/CheckpointWithRedis.csproj new file mode 100644 index 0000000000..9d9359515d --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/CheckpointWithRedis.csproj @@ -0,0 +1,16 @@ + + + + Exe + net10.0 + enable + enable + $(NoWarn);MEAI001 + + + + + + + + diff --git a/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/Program.cs b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/Program.cs new file mode 100644 index 0000000000..4b88084177 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointWithRedis/Program.cs @@ -0,0 +1,150 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.Workflows; + +namespace CheckpointWithRedis; + +/// +/// This sample demonstrates how to use Redis-backed checkpoint storage for workflows. +/// Key concepts: +/// - RedisCheckpointStore: A distributed, durable checkpoint store using Redis +/// - TTL (Time-To-Live): Automatic expiration of checkpoints +/// - Parent-child relationships: Linking checkpoints to track workflow history +/// +/// +/// Pre-requisites: +/// - Redis must be running. Start it with: docker run --name redis -p 6379:6379 -d redis:7-alpine +/// - Or set REDIS_CONNECTION_STRING environment variable to your Redis instance. +/// +public static class Program +{ + public static async Task Main() + { + // Configuration + var redisConnectionString = Environment.GetEnvironmentVariable("REDIS_CONNECTION_STRING") ?? "localhost:6379"; + var ttl = TimeSpan.FromHours(24); + + Console.WriteLine("=== Redis Checkpoint Storage Demo ===\n"); + Console.WriteLine($"Connecting to Redis: {redisConnectionString}"); + + try + { + // Create checkpoint store with TTL + using var checkpointStore = RedisWorkflowExtensions.CreateRedisCheckpointStoreWithTtl( + redisConnectionString, + ttl, + keyPrefix: "workflow_checkpoints"); + + Console.WriteLine($"Key prefix: {checkpointStore.KeyPrefix}"); + Console.WriteLine($"TTL: {checkpointStore.TimeToLive}\n"); + + // Sample workflow data + var runId = $"run_{Guid.NewGuid():N}"; + var workflowState = new WorkflowState + { + CurrentStep = "initialize", + Variables = new Dictionary + { + ["user_input"] = "Hello, Agent!", + ["timestamp"] = DateTimeOffset.UtcNow.ToString("o") + } + }; + + // Create initial checkpoint + Console.WriteLine("--- Creating Initial Checkpoint ---"); + var initialCheckpoint = await checkpointStore.CreateCheckpointAsync( + runId, + JsonSerializer.SerializeToElement(workflowState)); + + Console.WriteLine($"Run ID: {runId}"); + Console.WriteLine($"Checkpoint ID: {initialCheckpoint.CheckpointId}"); + Console.WriteLine($"State: {workflowState.CurrentStep}\n"); + + // Simulate workflow progress + workflowState.CurrentStep = "processing"; + workflowState.Variables["processed"] = true; + workflowState.Variables["processing_time"] = DateTimeOffset.UtcNow.ToString("o"); + + // Create child checkpoint (linked to parent) + Console.WriteLine("--- Creating Child Checkpoint ---"); + var processingCheckpoint = await checkpointStore.CreateCheckpointAsync( + runId, + JsonSerializer.SerializeToElement(workflowState), + parent: initialCheckpoint); + + Console.WriteLine($"Checkpoint ID: {processingCheckpoint.CheckpointId}"); + Console.WriteLine($"Parent ID: {initialCheckpoint.CheckpointId}"); + Console.WriteLine($"State: {workflowState.CurrentStep}\n"); + + // Simulate more progress + workflowState.CurrentStep = "completed"; + workflowState.Variables["result"] = "Success!"; + workflowState.Variables["completion_time"] = DateTimeOffset.UtcNow.ToString("o"); + + // Create final checkpoint + Console.WriteLine("--- Creating Final Checkpoint ---"); + var finalCheckpoint = await checkpointStore.CreateCheckpointAsync( + runId, + JsonSerializer.SerializeToElement(workflowState), + parent: processingCheckpoint); + + Console.WriteLine($"Checkpoint ID: {finalCheckpoint.CheckpointId}"); + Console.WriteLine($"State: {workflowState.CurrentStep}\n"); + + // List all checkpoints for the run + Console.WriteLine("--- All Checkpoints for Run ---"); + var allCheckpoints = await checkpointStore.RetrieveIndexAsync(runId); + var checkpointList = allCheckpoints.ToList(); + Console.WriteLine($"Total checkpoints: {checkpointList.Count}"); + foreach (var cp in checkpointList) + { + Console.WriteLine($" - {cp.CheckpointId}"); + } + + Console.WriteLine(); + + // List children of the initial checkpoint + Console.WriteLine("--- Children of Initial Checkpoint ---"); + var children = await checkpointStore.RetrieveIndexAsync(runId, initialCheckpoint); + var childList = children.ToList(); + Console.WriteLine($"Child checkpoints: {childList.Count}"); + foreach (var child in childList) + { + Console.WriteLine($" - {child.CheckpointId}"); + } + + Console.WriteLine(); + + // Retrieve and display checkpoint data + Console.WriteLine("--- Retrieving Final Checkpoint Data ---"); + var retrievedData = await checkpointStore.RetrieveCheckpointAsync(runId, finalCheckpoint); + Console.WriteLine($"Current step: {retrievedData.GetProperty("CurrentStep").GetString()}"); + + var variables = retrievedData.GetProperty("Variables"); + Console.WriteLine($"User input: {variables.GetProperty("user_input").GetString()}"); + Console.WriteLine($"Result: {variables.GetProperty("result").GetString()}"); + Console.WriteLine($"Processed: {variables.GetProperty("processed").GetBoolean()}"); + + Console.WriteLine("\n=== Demo Complete ==="); + } + catch (Exception ex) + { + Console.WriteLine($"\nError: {ex.Message}"); + Console.WriteLine("\nMake sure Redis is running. You can start it with:"); + Console.WriteLine(" docker run --name redis -p 6379:6379 -d redis:7-alpine"); + return 1; + } + + return 0; + } +} + +/// +/// Sample workflow state class. +/// +public class WorkflowState +{ + public string CurrentStep { get; set; } = string.Empty; + public Dictionary Variables { get; set; } = new(); +} diff --git a/dotnet/src/Microsoft.Agents.AI.Redis/Microsoft.Agents.AI.Redis.csproj b/dotnet/src/Microsoft.Agents.AI.Redis/Microsoft.Agents.AI.Redis.csproj new file mode 100644 index 0000000000..f04214bc32 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Redis/Microsoft.Agents.AI.Redis.csproj @@ -0,0 +1,40 @@ + + + + $(TargetFrameworksCore) + Microsoft.Agents.AI + $(NoWarn);MEAI001 + preview + + + + true + true + true + true + true + true + + + + + + + Microsoft Agent Framework Redis Integration + Provides Redis implementations for Microsoft Agent Framework storage abstractions including CheckpointStore. + + + + + + + + + + + + + + + + diff --git a/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStore.cs b/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStore.cs new file mode 100644 index 0000000000..2f65a0c5c5 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStore.cs @@ -0,0 +1,329 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Microsoft.Shared.Diagnostics; +using StackExchange.Redis; + +namespace Microsoft.Agents.AI.Workflows.Checkpointing; + +/// +/// Provides a Redis implementation of the abstract class. +/// +/// The type of objects to store as checkpoint values. +[RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] +[RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] +public class RedisCheckpointStore : JsonCheckpointStore, IDisposable +{ + private readonly IConnectionMultiplexer _connectionMultiplexer; + private readonly IDatabase _database; + private readonly RedisCheckpointStoreOptions _options; + private readonly bool _ownsConnection; + private bool _disposed; + + /// + /// Initializes a new instance of the class using a Redis connection string. + /// + /// The Redis connection string (e.g., "localhost:6379"). + /// Optional configuration for the checkpoint store. + /// Thrown when is null. + /// Thrown when is empty or whitespace. + public RedisCheckpointStore(string connectionString, RedisCheckpointStoreOptions? options = null) + { + this._connectionMultiplexer = ConnectionMultiplexer.Connect(Throw.IfNullOrWhitespace(connectionString)); + this._options = options ?? new RedisCheckpointStoreOptions(); + this._database = this._connectionMultiplexer.GetDatabase(this._options.Database); + this._ownsConnection = true; + } + + /// + /// Initializes a new instance of the class using an existing . + /// + /// An existing Redis connection multiplexer. + /// Optional configuration for the checkpoint store. + /// Thrown when is null. + public RedisCheckpointStore(IConnectionMultiplexer connectionMultiplexer, RedisCheckpointStoreOptions? options = null) + { + this._connectionMultiplexer = Throw.IfNull(connectionMultiplexer); + this._options = options ?? new RedisCheckpointStoreOptions(); + this._database = this._connectionMultiplexer.GetDatabase(this._options.Database); + this._ownsConnection = false; + } + + /// + /// Initializes a new instance of the class using . + /// + /// Redis configuration options. + /// Optional configuration for the checkpoint store. + /// Thrown when is null. + public RedisCheckpointStore(ConfigurationOptions configuration, RedisCheckpointStoreOptions? options = null) + { + Throw.IfNull(configuration); + this._connectionMultiplexer = ConnectionMultiplexer.Connect(configuration); + this._options = options ?? new RedisCheckpointStoreOptions(); + this._database = this._connectionMultiplexer.GetDatabase(this._options.Database); + this._ownsConnection = true; + } + + /// + /// Gets the key prefix used for Redis keys. + /// + public string KeyPrefix => this._options.KeyPrefix; + + /// + /// Gets the Time-To-Live (TTL) configuration for checkpoints. + /// + public TimeSpan? TimeToLive => this._options.TimeToLive; + + /// + public override async ValueTask CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var checkpointId = Guid.NewGuid().ToString("N"); + var checkpointInfo = new CheckpointInfo(runId, checkpointId); + var checkpointKey = this.GetCheckpointKey(runId, checkpointId); + var indexKey = this.GetIndexKey(runId); + + var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + + var document = new RedisCheckpointDocument + { + RunId = runId, + CheckpointId = checkpointId, + Value = value.GetRawText(), + ParentCheckpointId = parent?.CheckpointId, + Timestamp = timestamp + }; + + var serializedDocument = JsonSerializer.Serialize(document, RedisJsonContext.Default.RedisCheckpointDocument); + + var transaction = this._database.CreateTransaction(); + _ = transaction.StringSetAsync(checkpointKey, serializedDocument); + _ = transaction.SortedSetAddAsync(indexKey, checkpointId, timestamp); + + if (this._options.TimeToLive.HasValue) + { + _ = transaction.KeyExpireAsync(checkpointKey, this._options.TimeToLive.Value); + _ = transaction.KeyExpireAsync(indexKey, this._options.TimeToLive.Value); + } + + var committed = await transaction.ExecuteAsync().ConfigureAwait(false); + if (!committed) + { + throw new InvalidOperationException($"Failed to create checkpoint '{checkpointId}' for run '{runId}'."); + } + + return checkpointInfo; + } + + /// + public override async ValueTask RetrieveCheckpointAsync(string runId, CheckpointInfo key) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + + if (key is null) + { + throw new ArgumentNullException(nameof(key)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var checkpointKey = this.GetCheckpointKey(runId, key.CheckpointId); + var redisValue = await this._database.StringGetAsync(checkpointKey).ConfigureAwait(false); + + if (redisValue.IsNullOrEmpty) + { + throw new InvalidOperationException($"Checkpoint with ID '{key.CheckpointId}' for run '{runId}' not found."); + } + + var document = JsonSerializer.Deserialize(redisValue.ToString(), RedisJsonContext.Default.RedisCheckpointDocument); + if (document is null) + { + throw new InvalidOperationException($"Failed to deserialize checkpoint '{key.CheckpointId}' for run '{runId}'."); + } + + using var jsonDocument = JsonDocument.Parse(document.Value); + return jsonDocument.RootElement.Clone(); + } + + /// + public override async ValueTask> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null) + { + if (string.IsNullOrWhiteSpace(runId)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(runId)); + } + +#pragma warning disable CA1513 // Use ObjectDisposedException.ThrowIf - not available on all target frameworks + if (this._disposed) + { + throw new ObjectDisposedException(this.GetType().FullName); + } +#pragma warning restore CA1513 + + var indexKey = this.GetIndexKey(runId); + + var checkpointIds = await this._database.SortedSetRangeByScoreAsync(indexKey).ConfigureAwait(false); + + if (checkpointIds.Length == 0) + { + return Enumerable.Empty(); + } + + if (withParent != null) + { + var results = new List(); + foreach (var checkpointId in checkpointIds) + { + var checkpointKey = this.GetCheckpointKey(runId, checkpointId.ToString()); + var redisValue = await this._database.StringGetAsync(checkpointKey).ConfigureAwait(false); + + if (!redisValue.IsNullOrEmpty) + { + var document = JsonSerializer.Deserialize(redisValue.ToString(), RedisJsonContext.Default.RedisCheckpointDocument); + if (document?.ParentCheckpointId == withParent.CheckpointId) + { + results.Add(new CheckpointInfo(runId, checkpointId.ToString())); + } + } + } + + return results; + } + + return checkpointIds.Select(id => new CheckpointInfo(runId, id.ToString())); + } + + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases the unmanaged resources used by the and optionally releases the managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!this._disposed) + { + if (disposing && this._ownsConnection) + { + this._connectionMultiplexer?.Dispose(); + } + + this._disposed = true; + } + } + + /// + /// Gets the Redis key for a checkpoint. + /// + /// The run identifier. + /// The checkpoint identifier. + /// The Redis key for the checkpoint. + private string GetCheckpointKey(string runId, string checkpointId) + => $"{this._options.KeyPrefix}:{runId}:{checkpointId}"; + + /// + /// Gets the Redis key for the run index. + /// + /// The run identifier. + /// The Redis key for the index. + private string GetIndexKey(string runId) + => $"{this._options.KeyPrefix}:{runId}:_index"; +} + +/// +/// Provides a non-generic Redis implementation of the abstract class. +/// +[RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] +[RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] +public sealed class RedisCheckpointStore : RedisCheckpointStore +{ + /// + public RedisCheckpointStore(string connectionString, RedisCheckpointStoreOptions? options = null) + : base(connectionString, options) + { + } + + /// + public RedisCheckpointStore(IConnectionMultiplexer connectionMultiplexer, RedisCheckpointStoreOptions? options = null) + : base(connectionMultiplexer, options) + { + } + + /// + public RedisCheckpointStore(ConfigurationOptions configuration, RedisCheckpointStoreOptions? options = null) + : base(configuration, options) + { + } +} + +/// +/// Represents a checkpoint document stored in Redis. +/// +internal sealed class RedisCheckpointDocument +{ + /// + /// Gets or sets the run identifier. + /// + public string RunId { get; set; } = string.Empty; + + /// + /// Gets or sets the checkpoint identifier. + /// + public string CheckpointId { get; set; } = string.Empty; + + /// + /// Gets or sets the JSON value of the checkpoint. + /// + public string Value { get; set; } = string.Empty; + + /// + /// Gets or sets the parent checkpoint identifier. + /// + public string? ParentCheckpointId { get; set; } + + /// + /// Gets or sets the Unix timestamp when the checkpoint was created. + /// + public long Timestamp { get; set; } +} + +/// +/// JSON serialization context for Redis checkpoint documents. +/// +[JsonSourceGenerationOptions( + JsonSerializerDefaults.Web, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] +[JsonSerializable(typeof(RedisCheckpointDocument))] +internal sealed partial class RedisJsonContext : JsonSerializerContext +{ +} diff --git a/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStoreOptions.cs b/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStoreOptions.cs new file mode 100644 index 0000000000..0d8d1a8bc0 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Redis/RedisCheckpointStoreOptions.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.Agents.AI.Workflows.Checkpointing; + +/// +/// Configuration options for . +/// +public sealed class RedisCheckpointStoreOptions +{ + /// + /// Gets or sets the key prefix for Redis keys. + /// Default: "checkpoint". + /// + /// + /// The full key format is: {KeyPrefix}:{runId}:{checkpointId}. + /// The index key format is: {KeyPrefix}:{runId}:_index. + /// + public string KeyPrefix { get; set; } = "checkpoint"; + + /// + /// Gets or sets the Time-To-Live (TTL) for checkpoints. + /// + /// + /// When set, checkpoints will automatically expire after this duration. + /// When null, checkpoints do not expire. + /// + public TimeSpan? TimeToLive { get; set; } + + /// + /// Gets or sets the Redis database index. + /// Default: -1 (use the default database). + /// + public int Database { get; set; } = -1; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Redis/RedisWorkflowExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Redis/RedisWorkflowExtensions.cs new file mode 100644 index 0000000000..9319384324 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Redis/RedisWorkflowExtensions.cs @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using Microsoft.Agents.AI.Workflows.Checkpointing; +using StackExchange.Redis; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides extension methods for integrating Redis checkpoint storage with the Agent Framework. +/// +public static class RedisWorkflowExtensions +{ + /// + /// Creates a Redis checkpoint store using a connection string. + /// + /// The Redis connection string (e.g., "localhost:6379"). + /// Optional configuration for the checkpoint store. + /// A new instance. + /// Thrown when is null. + /// Thrown when is empty or whitespace. + [RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static RedisCheckpointStore CreateRedisCheckpointStore( + string connectionString, + RedisCheckpointStoreOptions? options = null) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(connectionString)); + } + + return new RedisCheckpointStore(connectionString, options); + } + + /// + /// Creates a Redis checkpoint store using an existing connection multiplexer. + /// + /// An existing Redis connection multiplexer. + /// Optional configuration for the checkpoint store. + /// A new instance. + /// Thrown when is null. + [RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static RedisCheckpointStore CreateRedisCheckpointStore( + IConnectionMultiplexer connectionMultiplexer, + RedisCheckpointStoreOptions? options = null) + { + if (connectionMultiplexer is null) + { + throw new ArgumentNullException(nameof(connectionMultiplexer)); + } + + return new RedisCheckpointStore(connectionMultiplexer, options); + } + + /// + /// Creates a Redis checkpoint store with TTL configuration. + /// + /// The Redis connection string (e.g., "localhost:6379"). + /// The Time-To-Live duration for checkpoints. + /// Optional key prefix for Redis keys. Defaults to "checkpoint". + /// A new instance with TTL configured. + /// Thrown when is null. + /// Thrown when is empty or whitespace. + [RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static RedisCheckpointStore CreateRedisCheckpointStoreWithTtl( + string connectionString, + TimeSpan timeToLive, + string? keyPrefix = null) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(connectionString)); + } + + var options = new RedisCheckpointStoreOptions + { + TimeToLive = timeToLive, + KeyPrefix = keyPrefix ?? "checkpoint" + }; + + return new RedisCheckpointStore(connectionString, options); + } + + /// + /// Creates a generic Redis checkpoint store using a connection string. + /// + /// The type of objects to store as checkpoint values. + /// The Redis connection string (e.g., "localhost:6379"). + /// Optional configuration for the checkpoint store. + /// A new instance. + /// Thrown when is null. + /// Thrown when is empty or whitespace. + [RequiresUnreferencedCode("The RedisCheckpointStore uses JSON serialization which is incompatible with trimming.")] + [RequiresDynamicCode("The RedisCheckpointStore uses JSON serialization which is incompatible with NativeAOT.")] + public static RedisCheckpointStore CreateRedisCheckpointStore( + string connectionString, + RedisCheckpointStoreOptions? options = null) + { + if (string.IsNullOrWhiteSpace(connectionString)) + { + throw new ArgumentException("Cannot be null or whitespace", nameof(connectionString)); + } + + return new RedisCheckpointStore(connectionString, options); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/Microsoft.Agents.AI.Redis.UnitTests.csproj b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/Microsoft.Agents.AI.Redis.UnitTests.csproj new file mode 100644 index 0000000000..07757abbca --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/Microsoft.Agents.AI.Redis.UnitTests.csproj @@ -0,0 +1,23 @@ + + + + net10.0;net9.0 + $(NoWarn);MEAI001 + + + + false + + + + + + + + + + + + + + diff --git a/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCheckpointStoreTests.cs b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCheckpointStoreTests.cs new file mode 100644 index 0000000000..15ffadec2b --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCheckpointStoreTests.cs @@ -0,0 +1,588 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Agents.AI.Workflows.Checkpointing; +using StackExchange.Redis; + +namespace Microsoft.Agents.AI.Redis.UnitTests; + +/// +/// Contains tests for . +/// +/// To run tests with Redis: +/// - Set REDIS_CONNECTION_STRING environment variable (e.g., "localhost:6379") +/// - Set REDIS_AVAILABLE=true to enable Redis-dependent tests +/// +/// Example (PowerShell): +/// $env:REDIS_CONNECTION_STRING="localhost:6379"; $env:REDIS_AVAILABLE="true"; dotnet test +/// +/// Example (Bash): +/// REDIS_CONNECTION_STRING="localhost:6379" REDIS_AVAILABLE="true" dotnet test +/// +[Collection("Redis")] +[SuppressMessage("Performance", "CA1859:Use concrete types when possible for improved performance", Justification = "Interface needed for test flexibility")] +public class RedisCheckpointStoreTests : IAsyncLifetime, IDisposable +{ + private ConnectionMultiplexer? _connectionMultiplexer; + private string _connectionString = string.Empty; + private bool _redisAvailable; + + private static readonly JsonSerializerOptions s_jsonOptions = CreateJsonOptions(); + + private static JsonSerializerOptions CreateJsonOptions() + { + var options = new JsonSerializerOptions(); +#if NET9_0_OR_GREATER + options.TypeInfoResolver = new System.Text.Json.Serialization.Metadata.DefaultJsonTypeInfoResolver(); +#endif + return options; + } + + public async Task InitializeAsync() + { + var envConnectionString = Environment.GetEnvironmentVariable("REDIS_CONNECTION_STRING"); + + if (string.IsNullOrEmpty(envConnectionString)) + { + // No Redis connection string provided, tests will be skipped + this._redisAvailable = false; + return; + } + + try + { + this._connectionString = envConnectionString; + this._connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(this._connectionString).ConfigureAwait(false); + this._redisAvailable = this._connectionMultiplexer.IsConnected; + } + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException and not AccessViolationException) + { + // Redis not available, tests will be skipped + this._redisAvailable = false; + this._connectionMultiplexer?.Dispose(); + this._connectionMultiplexer = null; + } + } + + public Task DisposeAsync() + { + this._connectionMultiplexer?.Dispose(); + return Task.CompletedTask; + } + + private void SkipIfRedisNotAvailable() + { + var ciRedisAvailable = string.Equals( + Environment.GetEnvironmentVariable("REDIS_AVAILABLE"), + "true", + StringComparison.OrdinalIgnoreCase); + + Xunit.Skip.If(!ciRedisAvailable && !this._redisAvailable, "Redis is not available. Set REDIS_CONNECTION_STRING and REDIS_AVAILABLE=true to run these tests."); + } + + #region Constructor Tests + + [SkippableFact] + public void Constructor_WithConnectionString_SetsProperties() + { + this.SkipIfRedisNotAvailable(); + + // Act + using var store = new RedisCheckpointStore(this._connectionString); + + // Assert + Assert.Equal("checkpoint", store.KeyPrefix); + Assert.Null(store.TimeToLive); + } + + [SkippableFact] + public void Constructor_WithOptions_SetsProperties() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + var options = new RedisCheckpointStoreOptions + { + KeyPrefix = "custom", + TimeToLive = TimeSpan.FromHours(1) + }; + + // Act + using var store = new RedisCheckpointStore(this._connectionString, options); + + // Assert + Assert.Equal("custom", store.KeyPrefix); + Assert.Equal(TimeSpan.FromHours(1), store.TimeToLive); + } + + [SkippableFact] + public void Constructor_WithConnectionMultiplexer_SetsProperties() + { + this.SkipIfRedisNotAvailable(); + + // Act + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + + // Assert + Assert.Equal("checkpoint", store.KeyPrefix); + } + + [Fact] + public void Constructor_WithNullConnectionString_ThrowsArgumentNullException() + { + // Act & Assert + Assert.Throws(() => + new RedisCheckpointStore((string)null!)); + } + + [Fact] + public void Constructor_WithEmptyConnectionString_ThrowsArgumentException() + { + // Act & Assert + Assert.Throws(() => + new RedisCheckpointStore(string.Empty)); + } + + [Fact] + public void Constructor_WithNullConnectionMultiplexer_ThrowsArgumentNullException() + { + // Act & Assert + Assert.Throws(() => + new RedisCheckpointStore((IConnectionMultiplexer)null!)); + } + + #endregion + + #region Checkpoint Operations Tests + + [SkippableFact] + public async Task CreateCheckpointAsync_NewCheckpoint_CreatesSuccessfullyAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement( + new { data = "test checkpoint" }, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Assert + Assert.NotNull(checkpointInfo); + Assert.Equal(runId, checkpointInfo.RunId); + Assert.NotNull(checkpointInfo.CheckpointId); + Assert.NotEmpty(checkpointInfo.CheckpointId); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_ExistingCheckpoint_ReturnsCorrectValueAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var originalData = new { message = "Hello, World!", timestamp = DateTimeOffset.UtcNow }; + var checkpointValue = JsonSerializer.SerializeToElement(originalData, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + var retrievedValue = await store.RetrieveCheckpointAsync(runId, checkpointInfo); + + // Assert + Assert.Equal(JsonValueKind.Object, retrievedValue.ValueKind); + Assert.True(retrievedValue.TryGetProperty("message", out var messageProp)); + Assert.Equal("Hello, World!", messageProp.GetString()); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_NonExistentCheckpoint_ThrowsInvalidOperationExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var fakeCheckpointInfo = new CheckpointInfo(runId, "nonexistent-checkpoint"); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveCheckpointAsync(runId, fakeCheckpointInfo).AsTask()); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_EmptyStore_ReturnsEmptyCollectionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + + // Act + var index = await store.RetrieveIndexAsync(runId); + + // Assert + Assert.NotNull(index); + Assert.Empty(index); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_WithCheckpoints_ReturnsAllCheckpointsAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Create multiple checkpoints + var checkpoint1 = await store.CreateCheckpointAsync(runId, checkpointValue); + var checkpoint2 = await store.CreateCheckpointAsync(runId, checkpointValue); + var checkpoint3 = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Act + var index = (await store.RetrieveIndexAsync(runId)).ToList(); + + // Assert + Assert.Equal(3, index.Count); + Assert.Contains(index, c => c.CheckpointId == checkpoint1.CheckpointId); + Assert.Contains(index, c => c.CheckpointId == checkpoint2.CheckpointId); + Assert.Contains(index, c => c.CheckpointId == checkpoint3.CheckpointId); + } + + [SkippableFact] + public async Task CreateCheckpointAsync_WithParent_CreatesHierarchyAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var parentCheckpoint = await store.CreateCheckpointAsync(runId, checkpointValue); + var childCheckpoint = await store.CreateCheckpointAsync(runId, checkpointValue, parentCheckpoint); + + // Assert + Assert.NotEqual(parentCheckpoint.CheckpointId, childCheckpoint.CheckpointId); + Assert.Equal(runId, parentCheckpoint.RunId); + Assert.Equal(runId, childCheckpoint.RunId); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_WithParentFilter_ReturnsFilteredResultsAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Create parent and child checkpoints + var parent = await store.CreateCheckpointAsync(runId, checkpointValue); + var child1 = await store.CreateCheckpointAsync(runId, checkpointValue, parent); + var child2 = await store.CreateCheckpointAsync(runId, checkpointValue, parent); + + // Create an orphan checkpoint + var orphan = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Act + var allCheckpoints = (await store.RetrieveIndexAsync(runId)).ToList(); + var childrenOfParent = (await store.RetrieveIndexAsync(runId, parent)).ToList(); + + // Assert + Assert.Equal(4, allCheckpoints.Count); // parent + 2 children + orphan + Assert.Equal(2, childrenOfParent.Count); // only children + + Assert.Contains(childrenOfParent, c => c.CheckpointId == child1.CheckpointId); + Assert.Contains(childrenOfParent, c => c.CheckpointId == child2.CheckpointId); + Assert.DoesNotContain(childrenOfParent, c => c.CheckpointId == parent.CheckpointId); + Assert.DoesNotContain(childrenOfParent, c => c.CheckpointId == orphan.CheckpointId); + } + + #endregion + + #region TTL Tests + + [SkippableFact] + public async Task CreateCheckpointAsync_WithTtl_SetsExpirationAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + var options = new RedisCheckpointStoreOptions + { + TimeToLive = TimeSpan.FromSeconds(30), + KeyPrefix = $"ttl_test_{Guid.NewGuid():N}" + }; + + using var store = new RedisCheckpointStore(this._connectionMultiplexer!, options); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Assert - Verify TTL was set on the key + var db = this._connectionMultiplexer!.GetDatabase(); + var key = $"{options.KeyPrefix}:{runId}:{checkpointInfo.CheckpointId}"; + var ttl = await db.KeyTimeToLiveAsync(key); + + Assert.NotNull(ttl); + Assert.True(ttl.Value.TotalSeconds > 0 && ttl.Value.TotalSeconds <= 30); + } + + [SkippableFact] + public async Task CreateCheckpointAsync_WithoutTtl_NoExpirationAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + var options = new RedisCheckpointStoreOptions + { + TimeToLive = null, + KeyPrefix = $"no_ttl_test_{Guid.NewGuid():N}" + }; + + using var store = new RedisCheckpointStore(this._connectionMultiplexer!, options); + var runId = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var checkpointInfo = await store.CreateCheckpointAsync(runId, checkpointValue); + + // Assert - Verify no TTL was set on the key + var db = this._connectionMultiplexer!.GetDatabase(); + var key = $"{options.KeyPrefix}:{runId}:{checkpointInfo.CheckpointId}"; + var ttl = await db.KeyTimeToLiveAsync(key); + + Assert.Null(ttl); + } + + #endregion + + #region Run Isolation Tests + + [SkippableFact] + public async Task CheckpointOperations_DifferentRuns_IsolatesDataAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId1 = Guid.NewGuid().ToString(); + var runId2 = Guid.NewGuid().ToString(); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + var checkpoint1 = await store.CreateCheckpointAsync(runId1, checkpointValue); + var checkpoint2 = await store.CreateCheckpointAsync(runId2, checkpointValue); + + var index1 = (await store.RetrieveIndexAsync(runId1)).ToList(); + var index2 = (await store.RetrieveIndexAsync(runId2)).ToList(); + + // Assert + Assert.Single(index1); + Assert.Single(index2); + Assert.Equal(checkpoint1.CheckpointId, index1[0].CheckpointId); + Assert.Equal(checkpoint2.CheckpointId, index2[0].CheckpointId); + Assert.NotEqual(checkpoint1.CheckpointId, checkpoint2.CheckpointId); + } + + #endregion + + #region Error Handling Tests + + [SkippableFact] + public async Task CreateCheckpointAsync_WithNullRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync(null!, checkpointValue).AsTask()); + } + + [SkippableFact] + public async Task CreateCheckpointAsync_WithEmptyRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync(string.Empty, checkpointValue).AsTask()); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_WithNullRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var fakeCheckpointInfo = new CheckpointInfo("run", "checkpoint"); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveCheckpointAsync(null!, fakeCheckpointInfo).AsTask()); + } + + [SkippableFact] + public async Task RetrieveCheckpointAsync_WithNullCheckpointInfo_ThrowsArgumentNullExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var runId = Guid.NewGuid().ToString(); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveCheckpointAsync(runId, null!).AsTask()); + } + + [SkippableFact] + public async Task RetrieveIndexAsync_WithNullRunId_ThrowsArgumentExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + using var store = new RedisCheckpointStore(this._connectionMultiplexer!); + + // Act & Assert + await Assert.ThrowsAsync(() => + store.RetrieveIndexAsync(null!).AsTask()); + } + + #endregion + + #region Disposal Tests + + [SkippableFact] + public async Task Dispose_AfterDisposal_ThrowsObjectDisposedExceptionAsync() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + var store = new RedisCheckpointStore(this._connectionMultiplexer!); + var checkpointValue = JsonSerializer.SerializeToElement(new { data = "test" }, s_jsonOptions); + + // Act + store.Dispose(); + + // Assert + await Assert.ThrowsAsync(() => + store.CreateCheckpointAsync("test-run", checkpointValue).AsTask()); + } + + [SkippableFact] + public void Dispose_MultipleCalls_DoesNotThrow() + { + this.SkipIfRedisNotAvailable(); + + // Arrange + var store = new RedisCheckpointStore(this._connectionMultiplexer!); + + // Act & Assert (should not throw) + store.Dispose(); + store.Dispose(); + store.Dispose(); + } + + [SkippableFact] + public void Dispose_WithOwnedConnection_DisposesConnection() + { + this.SkipIfRedisNotAvailable(); + + // Arrange - Create store with connection string (owns connection) + var store = new RedisCheckpointStore(this._connectionString); + + // Act + store.Dispose(); + + // Assert - Store should be disposed without throwing + // The owned connection should be disposed + Assert.True(true); // If we get here, disposal worked + } + + [SkippableFact] + public void Dispose_WithSharedConnection_DoesNotDisposeConnection() + { + this.SkipIfRedisNotAvailable(); + + // Arrange - Create store with existing multiplexer (does not own connection) + var store = new RedisCheckpointStore(this._connectionMultiplexer!); + + // Act + store.Dispose(); + + // Assert - The shared connection should still be connected + Assert.True(this._connectionMultiplexer!.IsConnected); + } + + #endregion + + #region Extension Methods Tests + + [SkippableFact] + public void CreateRedisCheckpointStore_WithConnectionString_CreatesStore() + { + this.SkipIfRedisNotAvailable(); + + // Act + using var store = RedisWorkflowExtensions.CreateRedisCheckpointStore(this._connectionString); + + // Assert + Assert.NotNull(store); + Assert.Equal("checkpoint", store.KeyPrefix); + } + + [SkippableFact] + public void CreateRedisCheckpointStoreWithTtl_CreatesStoreWithTtl() + { + this.SkipIfRedisNotAvailable(); + + // Act + using var store = RedisWorkflowExtensions.CreateRedisCheckpointStoreWithTtl( + this._connectionString, + TimeSpan.FromHours(2), + "custom_prefix"); + + // Assert + Assert.NotNull(store); + Assert.Equal("custom_prefix", store.KeyPrefix); + Assert.Equal(TimeSpan.FromHours(2), store.TimeToLive); + } + + #endregion + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + // Connection multiplexer disposed in DisposeAsync + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCollectionFixture.cs b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCollectionFixture.cs new file mode 100644 index 0000000000..fb9128ff65 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Redis.UnitTests/RedisCollectionFixture.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Redis.UnitTests; + +/// +/// Defines a collection fixture for Redis tests to ensure they run sequentially. +/// This prevents race conditions and resource conflicts when tests create and delete +/// data in Redis. +/// +[CollectionDefinition("Redis", DisableParallelization = true)] +public sealed class RedisCollectionFixture +{ +}