diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs index d1367b83ad..9883358e9e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs @@ -250,7 +250,18 @@ await AddUpdateAsync( } } - allMessages.AddRange(updates.ToAgentResponse().Messages); + AgentResponse agentResponse = updates.ToAgentResponse(); + + // Since there is no good way to configure the agent output behaviour due to how we add it to Handoff orchestration + // configurations, treat the emitEvents flag as simply determining whether to stream updates or to emit the whole response + // It would make little sense to avoid emitting any agent responses since this is only used in Orchestration workflows, + // which are Agent-only, and thus would do nothing. + if (message.TurnToken.EmitEvents is not true) + { + await context.YieldOutputAsync(agentResponse, cancellationToken).ConfigureAwait(false); + } + + allMessages.AddRange(agentResponse.Messages); roleChanges.ResetUserToAssistantForChangedRoles(); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowChatHistoryProvider.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowChatHistoryProvider.cs index 2815ed99f0..69c09abb27 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowChatHistoryProvider.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowChatHistoryProvider.cs @@ -43,7 +43,7 @@ internal void AddMessages(AgentSession session, params IEnumerable => this._sessionState.GetOrInitializeState(session).Messages.AddRange(messages); protected override ValueTask> ProvideChatHistoryAsync(InvokingContext context, CancellationToken cancellationToken = default) - => new(this._sessionState.GetOrInitializeState(context.Session).Messages); + => new(this._sessionState.GetOrInitializeState(context.Session).Messages.AsReadOnly()); protected override ValueTask StoreChatHistoryAsync(InvokedContext context, CancellationToken cancellationToken = default) { @@ -62,6 +62,12 @@ public IEnumerable GetFromBookmark(AgentSession session) } } + public IEnumerable GetAllMessages(AgentSession session) + { + var state = this._sessionState.GetOrInitializeState(session); + return state.Messages.AsReadOnly(); + } + public void UpdateBookmark(AgentSession session) { var state = this._sessionState.GetOrInitializeState(session); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs index 7679123970..295c08ceeb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs @@ -119,13 +119,17 @@ Task RunCoreAsync( MessageMerger merger = new(); await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken) - .ConfigureAwait(false) - .WithCancellation(cancellationToken)) + .ConfigureAwait(false) + .WithCancellation(cancellationToken)) { merger.AddUpdate(update); } - return merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); + AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); + workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages); + workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession); + + return response; } protected override async @@ -138,11 +142,18 @@ IAsyncEnumerable RunCoreStreamingAsync( await this.ValidateWorkflowAsync().ConfigureAwait(false); WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false); + MessageMerger merger = new(); + await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken) .ConfigureAwait(false) .WithCancellation(cancellationToken)) { + merger.AddUpdate(update); yield return update; } + + AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name); + workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages); + workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession); } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 40a18dbadb..a01bd68e4b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -110,7 +110,7 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, params AI { Throw.IfNullOrEmpty(parts); - AgentResponseUpdate update = new(ChatRole.Assistant, parts) + return new(ChatRole.Assistant, parts) { CreatedAt = DateTimeOffset.UtcNow, MessageId = Guid.NewGuid().ToString("N"), @@ -118,27 +118,19 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, params AI ResponseId = responseId, RawRepresentation = raw }; - - this.ChatHistoryProvider.AddMessages(this, update.ToChatMessage()); - - return update; } public AgentResponseUpdate CreateUpdate(string responseId, object raw, ChatMessage message) { Throw.IfNull(message); - AgentResponseUpdate update = new(message.Role, message.Contents) + return new(message.Role, message.Contents) { CreatedAt = message.CreatedAt ?? DateTimeOffset.UtcNow, MessageId = message.MessageId ?? Guid.NewGuid().ToString("N"), ResponseId = responseId, RawRepresentation = raw }; - - this.ChatHistoryProvider.AddMessages(this, update.ToChatMessage()); - - return update; } private async ValueTask CreateOrResumeRunAsync(List messages, CancellationToken cancellationToken = default) @@ -170,94 +162,86 @@ internal async IAsyncEnumerable InvokeStageAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { - try - { - this.LastResponseId = Guid.NewGuid().ToString("N"); - List messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList(); + this.LastResponseId = Guid.NewGuid().ToString("N"); + List messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList(); #pragma warning disable CA2007 // Analyzer misfiring and not seeing .ConfigureAwait(false) below. - await using StreamingRun run = - await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false); + await using StreamingRun run = + await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false); #pragma warning restore CA2007 - await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); - await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken) - .ConfigureAwait(false) - .WithCancellation(cancellationToken)) + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false); + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken) + .ConfigureAwait(false) + .WithCancellation(cancellationToken)) + { + switch (evt) { - switch (evt) - { - case AgentResponseUpdateEvent agentUpdate: - yield return agentUpdate.Update; - break; - - case RequestInfoEvent requestInfo: - FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall(); - AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent); - yield return update; - break; - - case WorkflowErrorEvent workflowError: - Exception? exception = workflowError.Exception; - if (exception is TargetInvocationException tie && tie.InnerException != null) - { - exception = tie.InnerException; - } - - if (exception != null) - { - string message = this._includeExceptionDetails - ? exception.Message - : "An error occurred while executing the workflow."; - - ErrorContent errorContent = new(message); - yield return this.CreateUpdate(this.LastResponseId, evt, errorContent); - } - - break; - - case SuperStepCompletedEvent stepCompleted: - this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; + case AgentResponseUpdateEvent agentUpdate: + yield return agentUpdate.Update; + break; + + case RequestInfoEvent requestInfo: + FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall(); + AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent); + yield return update; + break; + + case WorkflowErrorEvent workflowError: + Exception? exception = workflowError.Exception; + if (exception is TargetInvocationException tie && tie.InnerException != null) + { + exception = tie.InnerException; + } + + if (exception != null) + { + string message = this._includeExceptionDetails + ? exception.Message + : "An error occurred while executing the workflow."; + + ErrorContent errorContent = new(message); + yield return this.CreateUpdate(this.LastResponseId, evt, errorContent); + } + + break; + + case SuperStepCompletedEvent stepCompleted: + this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; + goto default; + + case WorkflowOutputEvent output: + IEnumerable? updateMessages = output.Data switch + { + IEnumerable chatMessages => chatMessages, + ChatMessage chatMessage => [chatMessage], + _ => null + }; + + if (!this._includeWorkflowOutputsInResponse || updateMessages == null) + { goto default; - - case WorkflowOutputEvent output: - IEnumerable? updateMessages = output.Data switch - { - IEnumerable chatMessages => chatMessages, - ChatMessage chatMessage => [chatMessage], - _ => null - }; - - if (!this._includeWorkflowOutputsInResponse || updateMessages == null) - { - goto default; - } - - foreach (ChatMessage message in updateMessages) - { - yield return this.CreateUpdate(this.LastResponseId, evt, message); - } - break; - - default: - // Emit all other workflow events for observability (DevUI, logging, etc.) - yield return new AgentResponseUpdate(ChatRole.Assistant, []) - { - CreatedAt = DateTimeOffset.UtcNow, - MessageId = Guid.NewGuid().ToString("N"), - Role = ChatRole.Assistant, - ResponseId = this.LastResponseId, - RawRepresentation = evt - }; - break; - } + } + + foreach (ChatMessage message in updateMessages) + { + yield return this.CreateUpdate(this.LastResponseId, evt, message); + } + break; + + default: + // Emit all other workflow events for observability (DevUI, logging, etc.) + yield return new AgentResponseUpdate(ChatRole.Assistant, []) + { + CreatedAt = DateTimeOffset.UtcNow, + MessageId = Guid.NewGuid().ToString("N"), + Role = ChatRole.Assistant, + ResponseId = this.LastResponseId, + RawRepresentation = evt + }; + break; } } - finally - { - // Do we want to try to undo the step, and not update the bookmark? - this.ChatHistoryProvider.UpdateBookmark(this); - } } public string? LastResponseId { get; set; } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs index 2ea117856f..222317d09e 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIAgentHostExecutorTests.cs @@ -10,10 +10,10 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; -public class AIAgentHostExecutorTests +public abstract class AIAgentHostingExecutorTestsBase { - private const string TestAgentId = nameof(TestAgentId); - private const string TestAgentName = nameof(TestAgentName); + protected const string TestAgentId = nameof(TestAgentId); + protected const string TestAgentName = nameof(TestAgentName); private static readonly string[] s_messageStrings = [ "", @@ -22,34 +22,10 @@ public class AIAgentHostExecutorTests "Quisque dignissim ante odio, at facilisis orci porta a. Duis mi augue, fringilla eu egestas a, pellentesque sed lacus." ]; - private static List TestMessages => TestReplayAgent.ToChatMessages(s_messageStrings); + protected static List TestMessages => TestReplayAgent.ToChatMessages(s_messageStrings); - [Theory] - [InlineData(null, null)] - [InlineData(null, true)] - [InlineData(null, false)] - [InlineData(true, null)] - [InlineData(true, true)] - [InlineData(true, false)] - [InlineData(false, null)] - [InlineData(false, true)] - [InlineData(false, false)] - public async Task Test_AgentHostExecutor_EmitsStreamingUpdatesIFFConfiguredAsync(bool? executorSetting, bool? turnSetting) + protected static void CheckResponseUpdateEventsAgainstTestMessages(AgentResponseUpdateEvent[] updates, bool expectingEvents, string expectedExecutorId) { - // Arrange - TestRunContext testContext = new(); - TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); - AIAgentHostExecutor executor = new(agent, new() { EmitAgentUpdateEvents = executorSetting }); - testContext.ConfigureExecutor(executor); - - // Act - await executor.TakeTurnAsync(new(turnSetting), testContext.BindWorkflowContext(executor.Id)); - - // Assert - // The rules are: TurnToken overrides Agent, if set. Default to false, if both unset. - bool expectingEvents = turnSetting ?? executorSetting ?? false; - - AgentResponseUpdateEvent[] updates = testContext.Events.OfType().ToArray(); if (expectingEvents) { // The way TestReplayAgent is set up, it will emit one update per non-empty AIContent @@ -61,7 +37,7 @@ public async Task Test_AgentHostExecutor_EmitsStreamingUpdatesIFFConfiguredAsync AgentResponseUpdateEvent updateEvent = updates[i]; AIContent expectedUpdateContent = expectedUpdateContents[i]; - updateEvent.ExecutorId.Should().Be(agent.GetDescriptiveId()); + updateEvent.ExecutorId.Should().Be(expectedExecutorId); AgentResponseUpdate update = updateEvent.Update; update.AuthorName.Should().Be(TestAgentName); @@ -76,28 +52,14 @@ public async Task Test_AgentHostExecutor_EmitsStreamingUpdatesIFFConfiguredAsync } } - [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task Test_AgentHostExecutor_EmitsResponseIFFConfiguredAsync(bool executorSetting) + protected static void CheckResponseEventsAgainstTestMessages(AgentResponseEvent[] updates, bool expectingResponse, string expectedExecutorId) { - // Arrange - TestRunContext testContext = new(); - TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); - AIAgentHostExecutor executor = new(agent, new() { EmitAgentResponseEvents = executorSetting }); - testContext.ConfigureExecutor(executor); - - // Act - await executor.TakeTurnAsync(new(), testContext.BindWorkflowContext(executor.Id)); - - // Assert - AgentResponseEvent[] updates = testContext.Events.OfType().ToArray(); - if (executorSetting) + if (expectingResponse) { updates.Should().HaveCount(1); AgentResponseEvent responseEvent = updates[0]; - responseEvent.ExecutorId.Should().Be(agent.GetDescriptiveId()); + responseEvent.ExecutorId.Should().Be(expectedExecutorId); AgentResponse response = responseEvent.Response; response.AgentId.Should().Be(TestAgentId); @@ -117,6 +79,86 @@ public async Task Test_AgentHostExecutor_EmitsResponseIFFConfiguredAsync(bool ex updates.Should().BeEmpty(); } } +} + +public class HandoffAgentExecutorTests : AIAgentHostingExecutorTestsBase +{ + [Theory] + [InlineData(null)] + [InlineData(true)] + [InlineData(false)] + public async Task Test_HandoffAgentExecutor_EmitsCorrectOutputTypeAsync(bool? turnSetting) + { + // Arrange + TestRunContext testContext = new(); + TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); + HandoffAgentExecutor executor = new(agent, new("", HandoffToolCallFilteringBehavior.None)); + testContext.ConfigureExecutor(executor); + + // Act + HandoffState message = new(new(turnSetting), null, []); + await executor.HandleAsync(message, testContext.BindWorkflowContext(executor.Id)); + + // Assert + bool expectingStreamingUpdates = turnSetting is true; + + AgentResponseEvent[] responses = testContext.Events.OfType().ToArray(); + CheckResponseEventsAgainstTestMessages(responses, !expectingStreamingUpdates, agent.GetDescriptiveId()); + + AgentResponseUpdateEvent[] updates = testContext.Events.OfType().ToArray(); + CheckResponseUpdateEventsAgainstTestMessages(updates, expectingStreamingUpdates, agent.GetDescriptiveId()); + } +} + +public class AIAgentHostExecutorTests : AIAgentHostingExecutorTestsBase +{ + [Theory] + [InlineData(null, null)] + [InlineData(null, true)] + [InlineData(null, false)] + [InlineData(true, null)] + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, null)] + [InlineData(false, true)] + [InlineData(false, false)] + public async Task Test_AgentHostExecutor_EmitsStreamingUpdatesIFFConfiguredAsync(bool? executorSetting, bool? turnSetting) + { + // Arrange + TestRunContext testContext = new(); + TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); + AIAgentHostExecutor executor = new(agent, new() { EmitAgentUpdateEvents = executorSetting }); + testContext.ConfigureExecutor(executor); + + // Act + await executor.TakeTurnAsync(new(turnSetting), testContext.BindWorkflowContext(executor.Id)); + + // Assert + // The rules are: TurnToken overrides Agent, if set. Default to false, if both unset. + bool expectingEvents = turnSetting ?? executorSetting ?? false; + + AgentResponseUpdateEvent[] updates = testContext.Events.OfType().ToArray(); + CheckResponseUpdateEventsAgainstTestMessages(updates, expectingEvents, agent.GetDescriptiveId()); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task Test_AgentHostExecutor_EmitsResponseIFFConfiguredAsync(bool executorSetting) + { + // Arrange + TestRunContext testContext = new(); + TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); + AIAgentHostExecutor executor = new(agent, new() { EmitAgentResponseEvents = executorSetting }); + testContext.ConfigureExecutor(executor); + + // Act + await executor.TakeTurnAsync(new(), testContext.BindWorkflowContext(executor.Id)); + + // Assert + AgentResponseEvent[] updates = testContext.Events.OfType().ToArray(); + CheckResponseEventsAgainstTestMessages(updates, expectingResponse: executorSetting, agent.GetDescriptiveId()); + } private static ChatMessage UserMessage => new(ChatRole.User, "Hello from User!") { AuthorName = "User" }; private static ChatMessage AssistantMessage => new(ChatRole.Assistant, "Hello from Assistant!") { AuthorName = "User" }; diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 40e4f2098f..be66bc4472 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -28,7 +28,7 @@ public ExpectedException(string? message, Exception? innerException) : base(mess } } -public class WorkflowHostSmokeTests +public class WorkflowHostSmokeTests : AIAgentHostingExecutorTestsBase { private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent { @@ -112,4 +112,50 @@ public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptio hadErrorContent.Should().BeTrue(); } + + [Fact] + public async Task Test_SingleAgent_AsAgent_OutgoingMessagesInHistoryAsync() + { + // Arrange + TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); + Workflow singleAgentWorkflow = new WorkflowBuilder(agent).Build(); + AIAgent workflowAgent = singleAgentWorkflow.AsAIAgent(); + + // Act + AgentSession session = await workflowAgent.CreateSessionAsync(); + AgentResponse response = await workflowAgent.RunAsync(session); + + // Assert + WorkflowSession workflowSession = session.Should().BeOfType().Subject; + + ChatMessage[] responseMessages = response.Messages.ToArray(); + ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession).ToArray(); + + // Since we never sent an incoming message, the expectation is that there should be nothing in the session + // except the response + responseMessages.Should().BeEquivalentTo(sessionMessages, options => options.WithStrictOrdering()); + } + + [Fact] + public async Task Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync() + { + // Arrange + TestReplayAgent agent = new(TestMessages, TestAgentId, TestAgentName); + Workflow handoffWorkflow = new HandoffsWorkflowBuilder(agent).Build(); + AIAgent workflowAgent = handoffWorkflow.AsAIAgent(); + + // Act + AgentSession session = await workflowAgent.CreateSessionAsync(); + AgentResponse response = await workflowAgent.RunAsync(session); + + // Assert + WorkflowSession workflowSession = session.Should().BeOfType().Subject; + + ChatMessage[] responseMessages = response.Messages.ToArray(); + ChatMessage[] sessionMessages = workflowSession.ChatHistoryProvider.GetAllMessages(workflowSession).ToArray(); + + // Since we never sent an incoming message, the expectation is that there should be nothing in the session + // except the response + responseMessages.Should().BeEquivalentTo(sessionMessages, options => options.WithStrictOrdering()); + } }