diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index 31de34fde367..91e9287bb14c 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -140,7 +140,13 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat.Tests", "test\Microsoft.AutoGen.AgentChat.Tests\Microsoft.AutoGen.AgentChat.Tests.csproj", "{217A4F86-8ADD-4998-90BA-880092A019F5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgent.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgent.AppHost\HelloAgent.AppHost.csproj", "{0C371D65-7EF9-44EA-8128-A105DA82A80E}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Microsoft.AutoGen.Integration.Tests.AppHosts", "Microsoft.AutoGen.Integration.Tests.AppHosts", "{D1C2B0BB-1276-4146-A699-D1983AE8ED04}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentTests", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgentTests\HelloAgentTests.csproj", "{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InMemoryTests.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\InMemoryTests.AppHost\InMemoryTests.AppHost.csproj", "{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "XlangTests.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\XLangTests.AppHost\XlangTests.AppHost.csproj", "{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -372,6 +378,18 @@ Global {0C371D65-7EF9-44EA-8128-A105DA82A80E}.Debug|Any CPU.Build.0 = Debug|Any CPU {0C371D65-7EF9-44EA-8128-A105DA82A80E}.Release|Any CPU.ActiveCfg = Release|Any CPU {0C371D65-7EF9-44EA-8128-A105DA82A80E}.Release|Any CPU.Build.0 = Release|Any CPU + {CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Release|Any CPU.Build.0 = Release|Any CPU + {1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Release|Any CPU.Build.0 = Release|Any CPU + {62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -436,7 +454,10 @@ Global {EF954ED3-87D5-40F1-8557-E7179F43EA0E} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {7F828599-56E8-4597-8F68-EE26FD631417} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} {217A4F86-8ADD-4998-90BA-880092A019F5} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} - {0C371D65-7EF9-44EA-8128-A105DA82A80E} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1} + {D1C2B0BB-1276-4146-A699-D1983AE8ED04} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64} + {CD10E29A-725E-4BEF-9CFF-6C0E0A652926} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04} + {1E4E1ED4-7701-4A05-A861-64461C3B1EE3} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04} + {62CDFB27-3B02-4D4B-B789-8AAD5E20688A} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj index 85480fcaed21..a6f9bc4d4cca 100644 --- a/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj +++ b/dotnet/samples/Hello/HelloAgent/HelloAgent.csproj @@ -23,6 +23,7 @@ + diff --git a/dotnet/samples/Hello/HelloAgent/Program.cs b/dotnet/samples/Hello/HelloAgent/Program.cs index 6235bf61bdbb..ef15c2059868 100644 --- a/dotnet/samples/Hello/HelloAgent/Program.cs +++ b/dotnet/samples/Hello/HelloAgent/Program.cs @@ -4,14 +4,22 @@ using Microsoft.AutoGen.Agents; using Microsoft.AutoGen.Contracts; using Microsoft.AutoGen.Core; +using Microsoft.AutoGen.Core.Grpc; using Samples; - -// Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent -AgentsAppBuilder appBuilder = new AgentsAppBuilder() - .UseInProcessRuntime(deliverToSelf: true) - .AddAgent("HelloAgent"); +var appBuilder = new AgentsAppBuilder(); // Create app builder +// if we are using distributed, we need the AGENT_HOST var defined and then we will use the grpc runtime +if (Environment.GetEnvironmentVariable("AGENT_HOST") is string agentHost) +{ + appBuilder.AddGrpcAgentWorker(agentHost) + .AddAgent("HelloAgent"); +} +else +{ + // Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent + appBuilder.UseInProcessRuntime(deliverToSelf: true).AddAgent("HelloAgent"); +} var app = await appBuilder.BuildAsync(); // Build the app // Create a custom message type from proto and define message -NewMessageReceived message = new NewMessageReceived { Message = "Hello World!" }; -await app.PublishMessageAsync(message, new TopicId("HelloTopic")); // Publish custom message (handler has been set in HelloAgent) -await app.WaitForShutdownAsync(); // Wait for shutdown from agent +var message = new NewMessageReceived { Message = "Hello World!" }; +await app.PublishMessageAsync(message, new TopicId("HelloTopic")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent) +await app.WaitForShutdownAsync().ConfigureAwait(false); // Wait for shutdown from agent diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs index 651be87314e0..ce81078d14b7 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/ConsoleAgent/IHandleConsole.cs @@ -1,6 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IHandleConsole.cs -using Google.Protobuf; using Microsoft.AutoGen.Contracts; namespace Microsoft.AutoGen.Agents; @@ -14,13 +13,12 @@ public interface IHandleConsole : IHandle, IHandle, IProcessIO /// /// Prototype for Publish Message Async method /// - /// /// /// /// - /// + /// /// ValueTask - ValueTask PublishMessageAsync(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage; + ValueTask PublishMessageAsync(object message, TopicId topic, string? messageId = null, CancellationToken cancellationToken = default); /// /// Receives events of type Output and writes them to the console @@ -39,7 +37,7 @@ async ValueTask IHandle.HandleAsync(Output item, MessageContext messageC { Route = "console" }; - await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false); + await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false); } /// @@ -60,6 +58,6 @@ async ValueTask IHandle.HandleAsync(Input item, MessageContext messageCon { Route = "console" }; - await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false); + await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs index b3d670004e19..648a3ffdf527 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/IOAgent/FileAgent/IHandleFileIO.cs @@ -1,7 +1,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // IHandleFileIO.cs - -using Google.Protobuf; using Microsoft.AutoGen.Contracts; using Microsoft.Extensions.Logging; @@ -25,13 +23,12 @@ public interface IHandleFileIO : IHandle, IHandle, IProcessIO /// /// Prototype for Publish Message Async method /// - /// /// /// /// - /// + /// /// ValueTask - ValueTask PublishMessageAsync(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage; + ValueTask PublishMessageAsync(object message, TopicId topic, string? messageId = null, CancellationToken cancellationToken = default); async ValueTask IHandle.HandleAsync(Input item, MessageContext messageContext) { @@ -45,7 +42,7 @@ async ValueTask IHandle.HandleAsync(Input item, MessageContext messageCon { Message = errorMessage }; - await PublishMessageAsync(err, new TopicId("IOError"), null, token: CancellationToken.None).ConfigureAwait(false); + await PublishMessageAsync(err, new TopicId("IOError"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false); return; } string content; @@ -58,7 +55,7 @@ async ValueTask IHandle.HandleAsync(Input item, MessageContext messageCon { Route = Route }; - await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false); + await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false); } async ValueTask IHandle.HandleAsync(Output item, MessageContext messageContext) { @@ -70,6 +67,6 @@ async ValueTask IHandle.HandleAsync(Output item, MessageContext messageC { Route = Route }; - await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false); + await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.AutoGen/Core.Grpc/ProtobufSerializationRegistry.cs b/dotnet/src/Microsoft.AutoGen/Core.Grpc/ProtobufSerializationRegistry.cs index 1bc0449d5688..3ce6b658ecf6 100644 --- a/dotnet/src/Microsoft.AutoGen/Core.Grpc/ProtobufSerializationRegistry.cs +++ b/dotnet/src/Microsoft.AutoGen/Core.Grpc/ProtobufSerializationRegistry.cs @@ -28,10 +28,7 @@ public bool Exists(Type type) public void RegisterSerializer(Type type, IProtobufMessageSerializer serializer) { - if (_serializers.ContainsKey(TypeNameResolver.ResolveTypeName(type))) - { - throw new InvalidOperationException($"Serializer already registered for {type.FullName}"); - } + _serializers.TryAdd(TypeNameResolver.ResolveTypeName(type), serializer); _serializers[TypeNameResolver.ResolveTypeName(type)] = serializer; } } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs index 2baa70b33ef8..5a1e0ad8da20 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/AgentsRegistryState.cs @@ -4,12 +4,40 @@ using Microsoft.AutoGen.Protobuf; namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; + +/// +/// Stores agent subscription information such as topic and prefix mappings, +/// and maintains an ETag for concurrency checks. +/// public class AgentsRegistryState { + /// + /// Maps each agent ID to the set of topics they subscribe to. + /// public ConcurrentDictionary> AgentsToTopicsMap { get; set; } = []; + + /// + /// Maps each agent ID to the set of topic prefixes they subscribe to. + /// public ConcurrentDictionary> AgentsToTopicsPrefixMap { get; set; } = []; + + /// + /// Maps each topic name to the set of agent types subscribed to it. + /// public ConcurrentDictionary> TopicToAgentTypesMap { get; set; } = []; + + /// + /// Maps each topic prefix to the set of agent types subscribed to it. + /// public ConcurrentDictionary> TopicPrefixToAgentTypesMap { get; set; } = []; + + /// + /// Stores subscriptions by GUID + /// public ConcurrentDictionary> GuidSubscriptionsMap { get; set; } = []; + + /// + /// The concurrency ETag for identifying the registry's version or state. + /// public string Etag { get; set; } = Guid.NewGuid().ToString(); } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs new file mode 100644 index 000000000000..5f88e37a972d --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/IMessageRegistryGrain.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// IMessageRegistryGrain.cs + +using Microsoft.AutoGen.Contracts; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; + +public interface IMessageRegistryGrain : IGrainWithIntegerKey +{ + /// + /// Writes a message to the dead-letter queue for the given topic. + /// + Task WriteMessageAsync(string topic, CloudEvent message); + + /// + /// Removes all messages for the given topic from the dead-letter queue. + /// + /// The topic to remove messages for. + /// A task representing the asynchronous operation, with the list of removed messages as the result. + Task> RemoveMessagesAsync(string topic); +} + diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs new file mode 100644 index 000000000000..e753c79ee399 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Abstractions/MessageRegistryState.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// MessageRegistryState.cs + +using System.Collections.Concurrent; +using Microsoft.AutoGen.Contracts; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; + +/// +/// Holds a dead-letter queue by topic type. +/// +public class MessageRegistryState +{ + /// + /// Dictionary mapping topic types to a list of CloudEvents that failed delivery. + /// + public ConcurrentDictionary> DeadLetterQueue { get; set; } = new(); +} diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs index 353300f6a3ac..e285e5f65391 100644 --- a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs @@ -18,8 +18,18 @@ public sealed class GrpcGateway : BackgroundService, IGateway { private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); private readonly ILogger _logger; + /// + /// The Orleans cluster client. + /// private readonly IClusterClient _clusterClient; + /// + /// The Orleans Grain that manages the AgentRegistration, Subscription, and Gateways + /// private readonly IRegistryGrain _gatewayRegistry; + /// + /// The Orleans Grain that manages the DeadLetterQueue and MessageBuffer + /// + private readonly IMessageRegistryGrain _messageRegistry; private readonly IGateway _reference; private readonly ConcurrentDictionary> _supportedAgentTypes = []; public readonly ConcurrentDictionary _workers = new(); @@ -37,6 +47,8 @@ public GrpcGateway(IClusterClient clusterClient, ILogger logger) _clusterClient = clusterClient; _reference = clusterClient.CreateObjectReference(this); _gatewayRegistry = clusterClient.GetGrain(0); + _messageRegistry = clusterClient.GetGrain(0); + } /// @@ -148,6 +160,29 @@ public async ValueTask SubscribeAsync(AddSubscriptionRe // We do not actually need to defer these, since we do not listen to ClientId on this for some reason... // TODO: Fix this await _gatewayRegistry.SubscribeAsync(request).ConfigureAwait(true); + + var topic = request.Subscription.SubscriptionCase switch + { + Subscription.SubscriptionOneofCase.TypeSubscription + => request.Subscription.TypeSubscription.TopicType, + Subscription.SubscriptionOneofCase.TypePrefixSubscription + => request.Subscription.TypePrefixSubscription.TopicTypePrefix, + _ => null + }; + + if (!string.IsNullOrEmpty(topic)) + { + var removedMessages = await _messageRegistry.RemoveMessagesAsync(topic); + if (removedMessages.Any()) + { + _logger.LogInformation("Removed {Count} dead-letter messages for topic '{Topic}'.", removedMessages.Count, topic); + // now that someone is subscribed, dispatch the messages + foreach (var message in removedMessages) + { + await DispatchEventAsync(message).ConfigureAwait(true); + } + } + } return new AddSubscriptionResponse { }; } catch (Exception ex) @@ -311,7 +346,7 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can { var registry = _clusterClient.GetGrain(0); //intentionally blocking - var targetAgentTypes = await registry.GetSubscribedAndHandlingAgentsAsync(evt.Source, evt.Type).ConfigureAwait(true); + var targetAgentTypes = await registry.GetSubscribedAndHandlingAgentsAsync(evt.Type, evt.Source).ConfigureAwait(true); if (targetAgentTypes is not null && targetAgentTypes.Count > 0) { targetAgentTypes = targetAgentTypes.Distinct().ToList(); @@ -324,15 +359,26 @@ private async ValueTask DispatchEventAsync(CloudEvent evt, CancellationToken can var activeConnections = connections.Where(c => c.Completion?.IsCompleted == false).ToList(); foreach (var connection in activeConnections) { + _logger.LogDebug("Dispatching event {Event} to connection {Connection}, for AgentType {AgentType}.", evt, connection, agentType); tasks.Add(this.WriteResponseAsync(connection, evt, cancellationToken)); } } + else + { + // we have target agent types that aren't in the supported agent types + // could be a race condition or a bug + _logger.LogWarning($"Agent type {agentType} is not supported, but registry returned it as subscribed to {evt.Type}/{evt.Source}. Buffering an event to the dead-letter queue."); + await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true); + } } + await Task.WhenAll(tasks).ConfigureAwait(false); } else { // log that no agent types were found - _logger.LogWarning("No agent types found for event type {EventType}.", evt.Type); + _logger.LogWarning("No agent types found for event type {EventType}. Adding to Dead Letter Queue", evt.Type); + // buffer the event to the dead-letter queue + await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true); } } @@ -423,28 +469,6 @@ private static async ValueTask RespondBadRequestAsync(GrpcWorkerConnection conne throw new RpcException(new Status(StatusCode.InvalidArgument, error)); } - /// - /// Dispatches an event to the specified agent types. - /// - /// The agent types. - /// The cloud event. - /// A task that represents the asynchronous operation. - private async ValueTask DispatchEventToAgentsAsync(IEnumerable agentTypes, CloudEvent evt) - { - var tasks = new List(agentTypes.Count()); - foreach (var agentType in agentTypes) - { - if (_supportedAgentTypes.TryGetValue(agentType, out var connections)) - { - foreach (var connection in connections) - { - tasks.Add(this.WriteResponseAsync(connection, evt)); - } - } - } - await Task.WhenAll(tasks).ConfigureAwait(false); - } - /// /// Writes a response to a worker connection. /// @@ -456,15 +480,4 @@ private async Task WriteResponseAsync(GrpcWorkerConnection connection, CloudEven { await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false); } - - /// - /// Writes a response to a worker connection. - /// - /// The worker connection. - /// The cloud event. - /// A task that represents the asynchronous operation. - public async Task WriteResponseAsync(IConnection connection, CloudEvent cloudEvent) - { - await WriteResponseAsync((GrpcWorkerConnection)connection, cloudEvent, default).ConfigureAwait(false); - } } diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs new file mode 100644 index 000000000000..83a2e527d01a --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/MessageRegistryGrain.cs @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// MessageRegistryGrain.cs + +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc; + +internal sealed class MessageRegistryGrain( + [PersistentState("state", "PubSubStore")] IPersistentState state, + ILogger logger +) : Grain, IMessageRegistryGrain +{ + private const int _retries = 5; + private readonly ILogger _logger = logger; + + public async Task WriteMessageAsync(string topic, CloudEvent message) + { + var retries = _retries; + while (!await WriteMessageAsync(topic, message, state.Etag).ConfigureAwait(false)) + { + if (retries-- <= 0) + { + throw new InvalidOperationException($"Failed to write MessageRegistryState after {_retries} retries."); + } + _logger.LogWarning("Failed to write MessageRegistryState. Retrying..."); + retries--; + } + } + private async ValueTask WriteMessageAsync(string topic, CloudEvent message, string etag) + { + if (state.Etag != null && state.Etag != etag) + { + return false; + } + var queue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new()); + queue.Add(message); + state.State.DeadLetterQueue.AddOrUpdate(topic, queue, (_, _) => queue); + await state.WriteStateAsync().ConfigureAwait(true); + return true; + } + + public async Task> RemoveMessagesAsync(string topic) + { + if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List? letters)) + { + await state.WriteStateAsync().ConfigureAwait(true); + if (letters != null) + { + return letters; + } + } + return []; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AnySurrogate.cs b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AnySurrogate.cs new file mode 100644 index 000000000000..cefa9b83b2cb --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Orleans/Surrogates/AnySurrogate.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// AnySurrogate.cs + +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates; + +[GenerateSerializer] +[Alias("Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates.AnySurrogate")] +public struct AnySurrogate +{ + [Id(0)] + public string TypeUrl; + [Id(1)] + public byte[] Value; +} + +[RegisterConverter] +public sealed class AnySurrogateConverter : IConverter +{ + public Any ConvertFromSurrogate(in AnySurrogate surrogate) => + new() + { + TypeUrl = surrogate.TypeUrl, + Value = ByteString.CopyFrom(surrogate.Value) + }; + + public AnySurrogate ConvertToSurrogate(in Any value) => + new() + { + TypeUrl = value.TypeUrl, + Value = value.Value.ToByteArray() + }; +} diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgent.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgent.cs new file mode 100644 index 000000000000..a789f068a0ee --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgent.cs @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// HelloAgent.cs + +using Microsoft.AutoGen.Agents; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Samples; + +[TypeSubscription("HelloTopic")] +public class HelloAgent( + IHostApplicationLifetime hostApplicationLifetime, + AgentId id, + IAgentRuntime runtime, + Logger? logger = null) : BaseAgent(id, runtime, "Hello Agent", logger), + IHandle, + IHandle, + IHandle, IHandleConsole +{ + // This will capture the message sent in Program.cs + public async ValueTask HandleAsync(NewMessageReceived item, MessageContext messageContext) + { + Console.Out.WriteLine(item.Message); // Print message to console + ConversationClosed goodbye = new ConversationClosed + { + UserId = this.Id.Type, + UserMessage = "Goodbye" + }; + // This will publish the new message type which will be handled by the ConversationClosed handler + await this.PublishMessageAsync(goodbye, new TopicId("HelloTopic")); + } + public async ValueTask HandleAsync(ConversationClosed item, MessageContext messageContext) + { + var goodbye = $"{item.UserId} said {item.UserMessage}"; // Print goodbye message to console + Console.Out.WriteLine(goodbye); + if (Environment.GetEnvironmentVariable("STAY_ALIVE_ON_GOODBYE") != "true") + { + // Publish message that will be handled by shutdown handler + await this.PublishMessageAsync(new Shutdown(), new TopicId("HelloTopic")); + } + } + public async ValueTask HandleAsync(Shutdown item, MessageContext messageContext) + { + Console.WriteLine("Shutting down..."); + hostApplicationLifetime.StopApplication(); // Shuts down application + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgentTests.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgentTests.csproj new file mode 100644 index 000000000000..58373272f7cc --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgentTests.csproj @@ -0,0 +1,32 @@ + + + Exe + net8.0 + enable + enable + + + + PreserveNewest + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Program.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Program.cs new file mode 100644 index 000000000000..7d3985bcec56 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Program.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Program.cs +using Microsoft.AutoGen.Agents; +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.Core; +using Microsoft.AutoGen.Core.Grpc; + +using Samples; + +var appBuilder = new AgentsAppBuilder(); // Create app builder +// if we are using distributed, we need the AGENT_HOST var defined and then we will use the grpc runtime +if (Environment.GetEnvironmentVariable("AGENT_HOST") != null) +{ + appBuilder.AddGrpcAgentWorker( + Environment.GetEnvironmentVariable("AGENT_HOST")) + .AddAgent("HelloAgent"); +} +else +{ + // Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent + appBuilder.UseInProcessRuntime(deliverToSelf: true).AddAgent("HelloAgent"); +} +var app = await appBuilder.BuildAsync(); // Build the app +// Create a custom message type from proto and define message +var message = new NewMessageReceived { Message = "Hello World!" }; +await app.PublishMessageAsync(message, new TopicId("HelloTopic", "HelloAgents/dotnet")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent) +//await app.PublishMessageAsync(message, new TopicId("HelloTopic")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent) +await app.WaitForShutdownAsync().ConfigureAwait(false); // Wait for shutdown from agent diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Properties/launchSettings.json b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Properties/launchSettings.json new file mode 100644 index 000000000000..1bddd184e32e --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "HelloAgent": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:53113;http://localhost:53114" + } + } +} diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/README.md b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/README.md new file mode 100644 index 000000000000..968f454905c3 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/README.md @@ -0,0 +1,120 @@ +# AutoGen 0.4 .NET Hello World Sample + +This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response. + +## Prerequisites + +To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later. +Also recommended is the [GitHub CLI](https://cli.github.com/). + +## Instructions to run the sample + +```bash +# Clone the repository +gh repo clone microsoft/autogen +cd dotnet/samples/Hello +dotnet run +``` + +## Key Concepts + +This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages. + +Flow Diagram: + +```mermaid +%%{init: {'theme':'forest'}}%% +graph LR; + A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item, CancellationToken cancellationToken = default)"} + B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent] + C --> D{"WriteConsole()"} + B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item, CancellationToken cancellationToken = default)"} + B --> |"PublishEventAsync(Output('***Goodbye***'))"| C + E --> F{"Shutdown()"} + +``` + +### Writing Event Handlers + +The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data. + +Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Contracts;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface. + +```csharp +TopicSubscription("HelloAgents")] +public class HelloAgent( + iAgentWorker worker, + [FromKeyedServices("AgentsMetadata")] AgentsMetadata typeRegistry) : ConsoleAgent( + worker, + typeRegistry), + ISayHello, + IHandle, + IHandle +{ + public async Task Handle(NewMessageReceived item, CancellationToken cancellationToken = default) + { + var response = await SayHello(item.Message).ConfigureAwait(false); + var evt = new Output + { + Message = response + }.ToCloudEvent(this.AgentId.Key); + await PublishEventAsync(evt).ConfigureAwait(false); + var goodbye = new ConversationClosed + { + UserId = this.AgentId.Key, + UserMessage = "Goodbye" + }.ToCloudEvent(this.AgentId.Key); + await PublishEventAsync(goodbye).ConfigureAwait(false); + } +``` + +### Inheritance and Composition + +This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method. + +### Starting the Application Runtime + +AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method. + +```csharp +// send a message to the agent +var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived +{ + Message = "World" +}, local: true); + +await App.RuntimeApp!.WaitForShutdownAsync(); +await app.WaitForShutdownAsync(); +``` + +### Sending Messages + +The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file: + +```proto +syntax = "proto3"; +package devteam; +option csharp_namespace = "DevTeam.Shared"; +message NewAsk { + string org = 1; + string repo = 2; + string ask = 3; + int64 issue_number = 4; +} +message ReadmeRequested { + string org = 1; + string repo = 2; + int64 issue_number = 3; + string ask = 4; +} +``` + +```xml + + + + + +``` + +You can send messages using the [```Microsoft.AutoGen.Agents.AgentWorker``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus. diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/appsettings.json b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/appsettings.json new file mode 100644 index 000000000000..df63bbd62951 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/appsettings.json @@ -0,0 +1,19 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning", + "Microsoft.Hosting.Lifetime": "Information", + "Microsoft.AspNetCore": "Information", + "Microsoft": "Information", + "Microsoft.Orleans": "Warning", + "Orleans.Runtime": "Error", + "Grpc": "Information" + } + }, + "AllowedHosts": "*", + "Kestrel": { + "EndpointDefaults": { + "Protocols": "Http2" + } + } +} \ No newline at end of file diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/InMemoryTests.AppHost.csproj similarity index 85% rename from dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj rename to dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/InMemoryTests.AppHost.csproj index 441d48d18cb5..7caf48ad9eda 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/HelloAgent.AppHost.csproj +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/InMemoryTests.AppHost.csproj @@ -16,6 +16,6 @@ - + diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/Program.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/Program.cs similarity index 75% rename from dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/Program.cs rename to dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/Program.cs index 3234a7abab9f..4bdcb146b0e9 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/Program.cs +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/Program.cs @@ -4,7 +4,7 @@ using Microsoft.Extensions.Hosting; var appHost = DistributedApplication.CreateBuilder(); -appHost.AddProject("HelloAgentsDotNetInMemoryRuntime"); +appHost.AddProject("HelloAgentsDotNetInMemoryRuntime"); var app = appHost.Build(); await app.StartAsync(); await app.WaitForShutdownAsync(); diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/Properties/launchSettings.json b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/Properties/launchSettings.json similarity index 100% rename from dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgent.AppHost/Properties/launchSettings.json rename to dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/Properties/launchSettings.json diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Program.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Program.cs new file mode 100644 index 000000000000..2592f59baf07 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Program.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Program.cs +using Aspire.Hosting.Python; +using Microsoft.Extensions.Hosting; +const string pythonHelloAgentPath = "../core_xlang_hello_python_agent"; +const string pythonHelloAgentPy = "hello_python_agent.py"; +const string pythonVEnv = "../../../../python/.venv"; +//Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true"); +//Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true"); +var builder = DistributedApplication.CreateBuilder(args); +var backend = builder.AddProject("AgentHost").WithExternalHttpEndpoints(); +IResourceBuilder? dotnet = null; +#pragma warning disable ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +IResourceBuilder? python = null; +if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("XLANG_TEST_NO_DOTNET"))) +{ + dotnet = builder.AddProject("HelloAgentTestsDotNET") + .WithReference(backend) + .WithEnvironment("AGENT_HOST", backend.GetEndpoint("https")) + .WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true") + .WaitFor(backend); +} +if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("XLANG_TEST_NO_PYTHON"))) +{ + // xlang is over http for now - in prod use TLS between containers + python = builder.AddPythonApp("HelloAgentTestsPython", pythonHelloAgentPath, pythonHelloAgentPy, pythonVEnv) + .WithReference(backend) + .WithEnvironment("AGENT_HOST", backend.GetEndpoint("http")) + .WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true") + .WithEnvironment("GRPC_DNS_RESOLVER", "native") + .WithOtlpExporter() + .WaitFor(backend); + if (dotnet != null) { python.WaitFor(dotnet); } +} +#pragma warning restore ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +using var app = builder.Build(); +await app.StartAsync(); +var url = backend.GetEndpoint("http").Url; +Console.WriteLine("Backend URL: " + url); +if (dotnet != null) { Console.WriteLine("Dotnet Resource Projects.HelloAgentTests invoked as HelloAgentTestsDotNET"); } +if (python != null) { Console.WriteLine("Python Resource hello_python_agent.py invoked as HelloAgentTestsPython"); } +await app.WaitForShutdownAsync(); diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Properties/launchSettings.json b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Properties/launchSettings.json new file mode 100644 index 000000000000..ea78f2933fdb --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/Properties/launchSettings.json @@ -0,0 +1,43 @@ +{ + "profiles": { + "https": { + "commandName": "Project", + "launchBrowser": true, + "dotnetRunMessages": true, + "applicationUrl": "https://localhost:15887;http://localhost:15888", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + //"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:16037", + "DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "https://localhost:16038", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:17037", + "DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true" + } + }, + "http": { + "commandName": "Project", + "launchBrowser": true, + "dotnetRunMessages": true, + "applicationUrl": "http://localhost:15888", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + //"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16031", + "DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "http://localhost:16032", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:17031", + "DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true", + "ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true" + } + }, + "generate-manifest": { + "commandName": "Project", + "dotnetRunMessages": true, + "commandLineArgs": "--publisher manifest --output-path aspire-manifest.json", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development" + } + } + }, + "$schema": "https://json.schemastore.org/launchsettings.json" +} diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/XlangTests.AppHost.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/XlangTests.AppHost.csproj new file mode 100644 index 000000000000..93570be648ff --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/XLangTests.AppHost/XlangTests.AppHost.csproj @@ -0,0 +1,24 @@ + + + + + + Exe + net8.0 + enable + enable + true + ecb5cbe4-15d8-4120-8f18-d3ba4902915b + + + + + + + + + + + + + diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/README.md b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/README.md new file mode 100644 index 000000000000..bb94d34f305e --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/README.md @@ -0,0 +1,14 @@ +# Python and dotnet agents interoperability sample + +This sample demonstrates how to create a Python agent that interacts with a .NET agent. +To run the sample, check out the autogen repository. +Then do the following: + +1. Navigate to autogen/dotnet/samples/Hello/Hello.AppHost +2. Run `dotnet run` to start the .NET Aspire app host, which runs three projects: + - Backend (the .NET Agent Runtime) + - HelloAgent (the .NET Agent) + - this Python agent - hello_python_agent.py +3. The AppHost will start the Aspire dashboard on [https://localhost:15887](https://localhost:15887). + +The Python agent will interact with the .NET agent by sending a message to the .NET runtime, which will relay the message to the .NET agent. diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/hello_python_agent.py b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/hello_python_agent.py new file mode 100644 index 000000000000..3ea0cb85df02 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/hello_python_agent.py @@ -0,0 +1,75 @@ +import asyncio +import logging +import os +import sys + +# from protos.agents_events_pb2 import NewMessageReceived +from autogen_core import ( + PROTOBUF_DATA_CONTENT_TYPE, + AgentId, + DefaultSubscription, + DefaultTopicId, + TypeSubscription, + try_get_known_serializers_for_type, +) +from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime + +# Add the local package directory to sys.path +thisdir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(thisdir, "..", "..")) +from dotenv import load_dotenv # type: ignore # noqa: E402 +from protos.agent_events_pb2 import NewMessageReceived, Output # type: ignore # noqa: E402 +from user_input import UserProxy # type: ignore # noqa: E402 + +agnext_logger = logging.getLogger("autogen_core") + + +async def main() -> None: + load_dotenv() + agentHost = os.getenv("AGENT_HOST") or "http://localhost:50673" + # grpc python bug - can only use the hostname, not prefix - if hostname has a prefix we have to remove it: + if agentHost.startswith("http://"): + agentHost = agentHost[7:] + if agentHost.startswith("https://"): + agentHost = agentHost[8:] + agnext_logger.info("0") + agnext_logger.info(agentHost) + runtime = GrpcWorkerAgentRuntime(host_address=agentHost, payload_serialization_format=PROTOBUF_DATA_CONTENT_TYPE) + + agnext_logger.info("1") + await runtime.start() + runtime.add_message_serializer(try_get_known_serializers_for_type(NewMessageReceived)) + + agnext_logger.info("2") + + await UserProxy.register(runtime, "HelloAgent", lambda: UserProxy()) + await runtime.add_subscription(DefaultSubscription(agent_type="HelloAgent")) + await runtime.add_subscription(TypeSubscription(topic_type="HelloTopic", agent_type="HelloAgent")) + await runtime.add_subscription(TypeSubscription(topic_type="agents.NewMessageReceived", agent_type="HelloAgent")) + await runtime.add_subscription(TypeSubscription(topic_type="agents.ConversationClosed", agent_type="HelloAgent")) + await runtime.add_subscription(TypeSubscription(topic_type="agents.Output", agent_type="HelloAgent")) + agnext_logger.info("3") + + new_message = NewMessageReceived(message="Hello from Python!") + output_message = Output(message="^v^v^v---Wild Hello from Python!---^v^v^v") + + await runtime.publish_message( + message=new_message, + topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"), + sender=AgentId("HelloAgents", "python"), + ) + runtime.add_message_serializer(try_get_known_serializers_for_type(Output)) + await runtime.publish_message( + message=output_message, + topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"), + sender=AgentId("HelloAgents", "python"), + ) + await runtime.stop_when_signal() + # await runtime.stop_when_idle() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + agnext_logger.setLevel(logging.DEBUG) + agnext_logger.log(logging.DEBUG, "Starting worker") + asyncio.run(main()) diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/__init__.py b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/__init__.py new file mode 100644 index 000000000000..b3ea671c3b9b --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/__init__.py @@ -0,0 +1,8 @@ +""" +The :mod:`autogen_core.worker.protos` module provides Google Protobuf classes for agent-worker communication +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.py b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.py new file mode 100644 index 000000000000..4d65bcefd3cc --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: agent_events.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'agent_events.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_events.proto\x12\x06\x61gents\"2\n\x0bTextMessage\x12\x13\n\x0btextMessage\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"\x18\n\x05Input\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1f\n\x0eInputProcessed\x12\r\n\x05route\x18\x01 \x01(\t\"\x19\n\x06Output\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1e\n\rOutputWritten\x12\r\n\x05route\x18\x01 \x01(\t\"\x1a\n\x07IOError\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x12NewMessageReceived\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x11ResponseGenerated\x12\x10\n\x08response\x18\x01 \x01(\t\"\x1a\n\x07GoodBye\x12\x0f\n\x07message\x18\x01 \x01(\t\" \n\rMessageStored\x12\x0f\n\x07message\x18\x01 \x01(\t\";\n\x12\x43onversationClosed\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x14\n\x0cuser_message\x18\x02 \x01(\t\"\x1b\n\x08Shutdown\x12\x0f\n\x07message\x18\x01 \x01(\tB\x1b\xaa\x02\x18Microsoft.AutoGen.Agentsb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_events_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\252\002\030Microsoft.AutoGen.Agents' + _globals['_TEXTMESSAGE']._serialized_start=30 + _globals['_TEXTMESSAGE']._serialized_end=80 + _globals['_INPUT']._serialized_start=82 + _globals['_INPUT']._serialized_end=106 + _globals['_INPUTPROCESSED']._serialized_start=108 + _globals['_INPUTPROCESSED']._serialized_end=139 + _globals['_OUTPUT']._serialized_start=141 + _globals['_OUTPUT']._serialized_end=166 + _globals['_OUTPUTWRITTEN']._serialized_start=168 + _globals['_OUTPUTWRITTEN']._serialized_end=198 + _globals['_IOERROR']._serialized_start=200 + _globals['_IOERROR']._serialized_end=226 + _globals['_NEWMESSAGERECEIVED']._serialized_start=228 + _globals['_NEWMESSAGERECEIVED']._serialized_end=265 + _globals['_RESPONSEGENERATED']._serialized_start=267 + _globals['_RESPONSEGENERATED']._serialized_end=304 + _globals['_GOODBYE']._serialized_start=306 + _globals['_GOODBYE']._serialized_end=332 + _globals['_MESSAGESTORED']._serialized_start=334 + _globals['_MESSAGESTORED']._serialized_end=366 + _globals['_CONVERSATIONCLOSED']._serialized_start=368 + _globals['_CONVERSATIONCLOSED']._serialized_end=427 + _globals['_SHUTDOWN']._serialized_start=429 + _globals['_SHUTDOWN']._serialized_end=456 +# @@protoc_insertion_point(module_scope) diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.pyi b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.pyi new file mode 100644 index 000000000000..01cfbafee51e --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2.pyi @@ -0,0 +1,197 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import builtins +import google.protobuf.descriptor +import google.protobuf.message +import typing + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing.final +class TextMessage(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TEXTMESSAGE_FIELD_NUMBER: builtins.int + SOURCE_FIELD_NUMBER: builtins.int + textMessage: builtins.str + source: builtins.str + def __init__( + self, + *, + textMessage: builtins.str = ..., + source: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["source", b"source", "textMessage", b"textMessage"]) -> None: ... + +global___TextMessage = TextMessage + +@typing.final +class Input(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Input = Input + +@typing.final +class InputProcessed(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROUTE_FIELD_NUMBER: builtins.int + route: builtins.str + def __init__( + self, + *, + route: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ... + +global___InputProcessed = InputProcessed + +@typing.final +class Output(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Output = Output + +@typing.final +class OutputWritten(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ROUTE_FIELD_NUMBER: builtins.int + route: builtins.str + def __init__( + self, + *, + route: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ... + +global___OutputWritten = OutputWritten + +@typing.final +class IOError(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___IOError = IOError + +@typing.final +class NewMessageReceived(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___NewMessageReceived = NewMessageReceived + +@typing.final +class ResponseGenerated(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RESPONSE_FIELD_NUMBER: builtins.int + response: builtins.str + def __init__( + self, + *, + response: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["response", b"response"]) -> None: ... + +global___ResponseGenerated = ResponseGenerated + +@typing.final +class GoodBye(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___GoodBye = GoodBye + +@typing.final +class MessageStored(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___MessageStored = MessageStored + +@typing.final +class ConversationClosed(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + USER_ID_FIELD_NUMBER: builtins.int + USER_MESSAGE_FIELD_NUMBER: builtins.int + user_id: builtins.str + user_message: builtins.str + def __init__( + self, + *, + user_id: builtins.str = ..., + user_message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["user_id", b"user_id", "user_message", b"user_message"]) -> None: ... + +global___ConversationClosed = ConversationClosed + +@typing.final +class Shutdown(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___Shutdown = Shutdown diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.py b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.py new file mode 100644 index 000000000000..d0eda77e9e8c --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.py @@ -0,0 +1,24 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + + +GRPC_GENERATED_VERSION = '1.70.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in agent_events_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.pyi b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.pyi new file mode 100644 index 000000000000..a6a9cff9dfd4 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/protos/agent_events_pb2_grpc.pyi @@ -0,0 +1,17 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import abc +import collections.abc +import grpc +import grpc.aio +import typing + +_T = typing.TypeVar("_T") + +class _MaybeAsyncIterator(collections.abc.AsyncIterator[_T], collections.abc.Iterator[_T], metaclass=abc.ABCMeta): ... + +class _ServicerContext(grpc.ServicerContext, grpc.aio.ServicerContext): # type: ignore[misc, type-arg] + ... diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/user_input.py b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/user_input.py new file mode 100644 index 000000000000..71a0c0929a24 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/core_xlang_hello_python_agent/user_input.py @@ -0,0 +1,38 @@ +import asyncio +import logging +from typing import Union + +from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, message_handler +from protos.agent_events_pb2 import ConversationClosed, Input, NewMessageReceived, Output # type: ignore + +input_types = Union[ConversationClosed, Input, Output] + + +class UserProxy(RoutedAgent): + """An agent that allows the user to play the role of an agent in the conversation via input.""" + + DEFAULT_DESCRIPTION = "A human user." + + def __init__( + self, + description: str = DEFAULT_DESCRIPTION, + ) -> None: + super().__init__(description) + + @message_handler + async def handle_user_chat_input(self, message: input_types, ctx: MessageContext) -> None: + logger = logging.getLogger("autogen_core") + + if isinstance(message, Input): + response = await self.ainput("User input ('exit' to quit): ") + response = response.strip() + logger.info(response) + + await self.publish_message(NewMessageReceived(message=response), topic_id=DefaultTopicId()) + elif isinstance(message, Output): + logger.info(message.message) + else: + pass + + async def ainput(self, prompt: str) -> str: + return await asyncio.to_thread(input, f"{prompt} ") diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/protos/agent_events.proto b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/protos/agent_events.proto new file mode 100644 index 000000000000..a97df6e5855f --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests.AppHosts/protos/agent_events.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package agents; + +option csharp_namespace = "Microsoft.AutoGen.Contracts"; +message TextMessage { + string textMessage = 1; + string source = 2; +} +message Input { + string message = 1; +} +message InputProcessed { + string route = 1; +} +message Output { + string message = 1; +} +message OutputWritten { + string route = 1; +} +message IOError { + string message = 1; +} +message NewMessageReceived { + string message = 1; +} +message ResponseGenerated { + string response = 1; +} +message GoodBye { + string message = 1; +} +message MessageStored { + string message = 1; +} +message ConversationClosed { + string user_id = 1; + string user_message = 2; +} +message Shutdown { + string message = 1; +} diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/HelloAppHostIntegrationTests.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests/HelloAppHostIntegrationTests.cs index e612eb5b87ea..bf4598748985 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests/HelloAppHostIntegrationTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/HelloAppHostIntegrationTests.cs @@ -8,10 +8,15 @@ namespace Microsoft.AutoGen.Integration.Tests; public class HelloAppHostIntegrationTests(ITestOutputHelper testOutput) { - [Theory, Trait("Category", "Integration")] - [MemberData(nameof(AppHostAssemblies))] - public async Task AppHostRunsCleanly(string appHostPath) + private const string AppHostAssemblyName = "XlangTests.AppHost"; + private const string DotNetResourceName = "HelloAgentTestsDotNET"; + private const string PythonResourceName = "HelloAgentTestsPython"; + private const string BackendResourceName = "AgentHost"; + + [Fact] + public async Task AppHostRunsCleanly() { + var appHostPath = GetAssemblyPath(AppHostAssemblyName); var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); @@ -22,53 +27,157 @@ public async Task AppHostRunsCleanly(string appHostPath) await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15)); } - [Theory, Trait("Category", "Integration")] - [MemberData(nameof(TestEndpoints))] - public async Task AppHostLogsHelloAgentE2E(TestEndpoints testEndpoints) + [Fact] + public async Task Test_Dotnet_Sends_AgentHost_Delivers_and_Python_Receives() { - var appHostName = testEndpoints.AppHost!; - var appHostPath = $"{appHostName}.dll"; + //Prepare + var appHostPath = GetAssemblyPath(AppHostAssemblyName); var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + //Act + var expectedMessage = "INFO:autogen_core:Received a message from host: cloudEvent {"; + var containsExpectedMessage = false; + app.EnsureNoErrorsLogged(); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + expectedMessage = "Hello World!"; + containsExpectedMessage = false; + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + await app.StopAsync(); + + //Assert + Assert.True(containsExpectedMessage); + } + + [Fact] + public async Task Test_Python_Sends_AgentHost_Delivers_and_DotNet_Receives() + { + //Prepare + var appHostPath = GetAssemblyPath(AppHostAssemblyName); + var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); + await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); - await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120)); - if (testEndpoints.WaitForResources?.Count > 0) - { - // Wait until each resource transitions to the required state - var timeout = TimeSpan.FromMinutes(5); - foreach (var (ResourceName, TargetState) in testEndpoints.WaitForResources) - { - await app.WaitForResource(ResourceName, TargetState).WaitAsync(timeout); - } - } - //sleep to make sure the app is running - await Task.Delay(20000); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + + //Act + var expectedMessage = "from Python!"; + var containsExpectedMessage = false; app.EnsureNoErrorsLogged(); - app.EnsureLogContains("HelloAgent said Goodbye"); - app.EnsureLogContains("Wild Hello from Python!"); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(60)); + await app.StopAsync(); - await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15)); + //Assert + Assert.True(containsExpectedMessage); + } + + [Fact] + public async Task Test_Python_Agent_Sends_And_AgentHost_Receives() + { + //Prepare + Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true"); + var appHostPath = GetAssemblyPath(AppHostAssemblyName); + var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); + await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + + //Act + var expectedMessage = "\"source\": \"HelloAgents/python\""; + var containsExpectedMessage = false; + app.EnsureNoErrorsLogged(); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(BackendResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + await app.StopAsync(); + + //Assert + Assert.True(containsExpectedMessage); + } + + [Fact] + public async Task Test_Dotnet_Agent_Sends_And_AgentHost_Receives() + { + //Prepare + Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true"); + var appHostPath = GetAssemblyPath(AppHostAssemblyName); + var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); + await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + + //Act + var expectedMessage = "\"source\": \"HelloAgents/dotnet\""; + var containsExpectedMessage = false; + app.EnsureNoErrorsLogged(); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(BackendResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + await app.StopAsync(); + + //Assert + Assert.True(containsExpectedMessage); } - public static TheoryData AppHostAssemblies() + + [Fact] + public async Task Test_Dotnet_Agent_Sends_And_AgentHost_Delivers_Back_To_It() { - var appHostAssemblies = GetSamplesAppHostAssemblyPaths(); - var theoryData = new TheoryData(); - return new(appHostAssemblies.Select(p => Path.GetRelativePath(AppContext.BaseDirectory, p))); + //Prepare + Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true"); + var appHostPath = GetAssemblyPath(AppHostAssemblyName); + var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); + await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + + //Act + var expectedMessage = "Hello World!"; + var containsExpectedMessage = false; + app.EnsureNoErrorsLogged(); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + expectedMessage = "HelloAgent said Goodbye"; + containsExpectedMessage = false; + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + await app.StopAsync(); + + //Assert + Assert.True(containsExpectedMessage); } - public static TheoryData TestEndpoints() => - new([ - new TestEndpoints("Hello.AppHost", new() { - { "backend", ["/"] } - }), - ]); + [Fact] + public async Task Test_Python_Agent_Sends_And_AgentHost_Delivers_Back_To_It() + { + //Prepare + Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true"); + var appHostPath = GetAssemblyPath(AppHostAssemblyName); + var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput); + await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); + await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120)); + + //Act + var expectedMessage = "INFO:autogen_core:Received a message from host: cloudEvent {"; + var containsExpectedMessage = false; + app.EnsureNoErrorsLogged(); + containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120)); + await app.StopAsync(); + //Assert + Assert.True(containsExpectedMessage); + } - private static IEnumerable GetSamplesAppHostAssemblyPaths() + private static string GetAssemblyPath(string assemblyName) { - // All the AppHost projects are referenced by this project so we can find them by looking for all their assemblies in the base directory - return Directory.GetFiles(AppContext.BaseDirectory, "*.AppHost.dll") - .Where(fileName => !fileName.EndsWith("Aspire.Hosting.AppHost.dll", StringComparison.OrdinalIgnoreCase)); + var parentDir = Directory.GetParent(AppContext.BaseDirectory)?.FullName; + var grandParentDir = parentDir is not null ? Directory.GetParent(parentDir)?.FullName : null; + var greatGrandParentDir = grandParentDir is not null ? Directory.GetParent(grandParentDir)?.FullName : null + ?? AppContext.BaseDirectory; + var options = new EnumerationOptions { RecurseSubdirectories = true, MatchCasing = MatchCasing.CaseInsensitive }; + if (greatGrandParentDir is not null) + { + var foundFile = Directory.GetFiles(greatGrandParentDir, $"{assemblyName}.dll", options).FirstOrDefault(); + if (foundFile is not null) + { + return foundFile; + } + } + throw new FileNotFoundException($"Could not find {assemblyName}.dll in {grandParentDir ?? "unknown"} directory"); } } diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/InMemoryRuntimeIntegrationTests.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests/InMemoryRuntimeIntegrationTests.cs index ee972a67ef47..099ae9a440bf 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests/InMemoryRuntimeIntegrationTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/InMemoryRuntimeIntegrationTests.cs @@ -6,38 +6,26 @@ namespace Microsoft.AutoGen.Integration.Tests; public class InMemoryRuntimeIntegrationTests(ITestOutputHelper testOutput) { - - [Theory, Trait("Category", "Integration")] - [MemberData(nameof(AppHostAssemblies))] - public async Task HelloAgentsE2EInMemory(string appHostAssemblyPath) + [Fact] + public async Task HelloAgentsE2EInMemory() { + // Locate InMemoryTests.AppHost.dll in the test output folder + var appHostAssemblyPath = Directory.GetFiles(AppContext.BaseDirectory, "InMemoryTests.AppHost.dll", SearchOption.AllDirectories) + .FirstOrDefault() + ?? throw new FileNotFoundException("Could not find InMemoryTests.AppHost.dll in the test output folder"); var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostAssemblyPath, testOutput); await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15)); + // Start the application and wait for resources await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120)); - await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120)); - await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120)); - - //sleep 5 seconds to make sure the app is running - await Task.Delay(15000); + // Sleep 5 seconds to ensure the app is up + await Task.Delay(5000); app.EnsureNoErrorsLogged(); app.EnsureLogContains("Hello World"); app.EnsureLogContains("HelloAgent said Goodbye"); await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15)); } - public static TheoryData AppHostAssemblies() - { - var appHostAssemblies = GetSamplesAppHostAssemblyPaths(); - var theoryData = new TheoryData(); - return new(appHostAssemblies.Select(p => Path.GetRelativePath(AppContext.BaseDirectory, p))); - } - private static IEnumerable GetSamplesAppHostAssemblyPaths() - { - // All the AppHost projects are referenced by this project so we can find them by looking for all their assemblies in the base directory - return Directory.GetFiles(AppContext.BaseDirectory, "HelloAgent.AppHost.dll") - .Where(fileName => !fileName.EndsWith("Aspire.Hosting.AppHost.dll", StringComparison.OrdinalIgnoreCase)); - } } diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Infrastructure/DistributedApplicationExtension.cs b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Infrastructure/DistributedApplicationExtension.cs index e3ad24f9f962..b4439068c484 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Infrastructure/DistributedApplicationExtension.cs +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Infrastructure/DistributedApplicationExtension.cs @@ -1,8 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // DistributedApplicationExtension.cs +using System.Diagnostics; using System.Security.Cryptography; -using Aspire.Hosting; using Aspire.Hosting.Python; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -185,6 +185,19 @@ public static (IReadOnlyList AppHostLogs, IReadOnlyList + /// Gets the logs for the specified resource. + /// + /// The DistributedApplication + /// The name of the resource + /// List + public static IReadOnlyList GetResourceLogs(this DistributedApplication app, string resourceName) + { + var environment = app.Services.GetRequiredService(); + var logCollector = app.Services.GetFakeLogCollector(); + return logCollector.GetSnapshot().Where(l => l.Category == $"{environment.ApplicationName}.Resources.{resourceName}").ToList(); + } + /// /// Get all logs from the whole test run. /// @@ -240,6 +253,58 @@ public static void EnsureLogContains(this DistributedApplication app, string mes Assert.Contains(resourceLogs, log => log.Message.Contains(message)); } + /// + /// WaitForExpectedMessageInLogs + /// + /// DistributedApplication + /// string + /// TimeSpan + public static async Task WaitForExpectedMessageInResourceLogs(this DistributedApplication app, string resourceName, string expectedMessage, TimeSpan timeout) + { + var containsExpectedMessage = false; + var logWatchCancellation = new CancellationTokenSource(); + var logWatchTask = Task.Run(async () => + { + while (!containsExpectedMessage) + { + var logs = app.GetResourceLogs(resourceName); + if (logs != null && logs.Any(log => log.Message.Contains(expectedMessage))) + { + containsExpectedMessage = true; + logWatchCancellation.Cancel(); + } + } + }, logWatchCancellation.Token).WaitAsync(timeout); + try + { + await logWatchTask.ConfigureAwait(true); + } + catch (OperationCanceledException) + { + // Task was cancelled, which means the expected message was found + } + catch (Exception ex) + { + if (Debugger.IsAttached) + { + var logs = app.GetResourceLogs(resourceName); + foreach (var log in logs) + { + Console.WriteLine(log.Message); + } + var environment = app.Services.GetRequiredService(); + var logCollector = app.Services.GetFakeLogCollector(); + var allLogs = logCollector.GetSnapshot(); + } + throw new Exception($"Failed to find expected message '{expectedMessage}' in logs for resource '{resourceName}' within the timeout period.", ex); + } + finally + { + logWatchCancellation.Cancel(); + } + return containsExpectedMessage; + } + /// /// Creates an configured to communicate with the specified resource. /// diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj index a1e0d395aa28..661c49d3ba47 100644 --- a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Microsoft.AutoGen.Integration.Tests.csproj @@ -26,11 +26,16 @@ + + + + + - - - + + + diff --git a/dotnet/test/Microsoft.AutoGen.Integration.Tests/Properties/launchSettings.json b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Properties/launchSettings.json new file mode 100644 index 000000000000..ea78f2933fdb --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Integration.Tests/Properties/launchSettings.json @@ -0,0 +1,43 @@ +{ + "profiles": { + "https": { + "commandName": "Project", + "launchBrowser": true, + "dotnetRunMessages": true, + "applicationUrl": "https://localhost:15887;http://localhost:15888", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + //"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:16037", + "DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "https://localhost:16038", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:17037", + "DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true" + } + }, + "http": { + "commandName": "Project", + "launchBrowser": true, + "dotnetRunMessages": true, + "applicationUrl": "http://localhost:15888", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + //"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16031", + "DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "http://localhost:16032", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:17031", + "DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true", + "ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true" + } + }, + "generate-manifest": { + "commandName": "Project", + "dotnetRunMessages": true, + "commandLineArgs": "--publisher manifest --output-path aspire-manifest.json", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development" + } + } + }, + "$schema": "https://json.schemastore.org/launchsettings.json" +} diff --git a/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs new file mode 100644 index 000000000000..56876cee3170 --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.RuntimeGateway.Grpc.Tests/MessageRegistryTests.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// MessageRegistryTests.cs + +using Microsoft.AutoGen.Contracts; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions; +using Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans; +using Orleans.TestingHost; + +namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests; +public class MessageRegistryTests : IClassFixture +{ + private readonly TestCluster _cluster; + + public MessageRegistryTests(ClusterFixture fixture) + { + _cluster = fixture.Cluster; + } + + [Fact] + public async Task Write_and_Remove_Messages() + { + // Arrange + var grain = _cluster.GrainFactory.GetGrain(0); + var topic = Guid.NewGuid().ToString(); // Random topic + var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" }; + + // Act + await grain.WriteMessageAsync(topic, message); + + // Assert + // attempt to remove the topic from the queue + var removedMessages = await grain.RemoveMessagesAsync(topic); + // attempt to compare the message with the removed message + Assert.Single(removedMessages); + Assert.Equal(message.Id, removedMessages[0].Id); + // ensure the queue is empty + removedMessages = await grain.RemoveMessagesAsync(topic); + Assert.Empty(removedMessages); + } +} diff --git a/python/samples/core_xlang_hello_python_agent/hello_python_agent.py b/python/samples/core_xlang_hello_python_agent/hello_python_agent.py index f1288ff39f32..3ea0cb85df02 100644 --- a/python/samples/core_xlang_hello_python_agent/hello_python_agent.py +++ b/python/samples/core_xlang_hello_python_agent/hello_python_agent.py @@ -44,23 +44,24 @@ async def main() -> None: await UserProxy.register(runtime, "HelloAgent", lambda: UserProxy()) await runtime.add_subscription(DefaultSubscription(agent_type="HelloAgent")) + await runtime.add_subscription(TypeSubscription(topic_type="HelloTopic", agent_type="HelloAgent")) await runtime.add_subscription(TypeSubscription(topic_type="agents.NewMessageReceived", agent_type="HelloAgent")) await runtime.add_subscription(TypeSubscription(topic_type="agents.ConversationClosed", agent_type="HelloAgent")) await runtime.add_subscription(TypeSubscription(topic_type="agents.Output", agent_type="HelloAgent")) agnext_logger.info("3") - new_message = NewMessageReceived(message="from Python!") + new_message = NewMessageReceived(message="Hello from Python!") output_message = Output(message="^v^v^v---Wild Hello from Python!---^v^v^v") await runtime.publish_message( message=new_message, - topic_id=DefaultTopicId("agents.NewMessageReceived", "HelloAgents/python"), + topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"), sender=AgentId("HelloAgents", "python"), ) - + runtime.add_message_serializer(try_get_known_serializers_for_type(Output)) await runtime.publish_message( message=output_message, - topic_id=DefaultTopicId("agents.Output", "HelloAgents"), + topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"), sender=AgentId("HelloAgents", "python"), ) await runtime.stop_when_signal()