Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,18 @@ await AddUpdateAsync(
}
}

allMessages.AddRange(updates.ToAgentResponse().Messages);
AgentResponse agentResponse = updates.ToAgentResponse();

// Since there is no good way to configure the agent output behaviour due to how we add it to Handoff orchestration
// configurations, treat the emitEvents flag as simply determining whether to stream updates or to emit the whole response
// It would make little sense to avoid emitting any agent responses since this is only used in Orchestration workflows,
// which are Agent-only, and thus would do nothing.
if (message.TurnToken.EmitEvents is not true)
{
await context.YieldOutputAsync(agentResponse, cancellationToken).ConfigureAwait(false);
}

allMessages.AddRange(agentResponse.Messages);

roleChanges.ResetUserToAssistantForChangedRoles();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal void AddMessages(AgentSession session, params IEnumerable<ChatMessage>
=> this._sessionState.GetOrInitializeState(session).Messages.AddRange(messages);

protected override ValueTask<IEnumerable<ChatMessage>> ProvideChatHistoryAsync(InvokingContext context, CancellationToken cancellationToken = default)
=> new(this._sessionState.GetOrInitializeState(context.Session).Messages);
=> new(this._sessionState.GetOrInitializeState(context.Session).Messages.AsReadOnly());

protected override ValueTask StoreChatHistoryAsync(InvokedContext context, CancellationToken cancellationToken = default)
{
Expand All @@ -62,6 +62,12 @@ public IEnumerable<ChatMessage> GetFromBookmark(AgentSession session)
}
}

public IEnumerable<ChatMessage> GetAllMessages(AgentSession session)
{
var state = this._sessionState.GetOrInitializeState(session);
return state.Messages.AsReadOnly();
}

public void UpdateBookmark(AgentSession session)
{
var state = this._sessionState.GetOrInitializeState(session);
Expand Down
17 changes: 14 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ Task<AgentResponse> RunCoreAsync(
MessageMerger merger = new();

await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
{
merger.AddUpdate(update);
}

return merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages);
workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession);

return response;
}

protected override async
Expand All @@ -138,11 +142,18 @@ IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
await this.ValidateWorkflowAsync().ConfigureAwait(false);

WorkflowSession workflowSession = await this.UpdateSessionAsync(messages, session, cancellationToken).ConfigureAwait(false);
MessageMerger merger = new();

await foreach (AgentResponseUpdate update in workflowSession.InvokeStageAsync(cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
{
merger.AddUpdate(update);
yield return update;
}

AgentResponse response = merger.ComputeMerged(workflowSession.LastResponseId!, this.Id, this.Name);
workflowSession.ChatHistoryProvider.AddMessages(workflowSession, response.Messages);
workflowSession.ChatHistoryProvider.UpdateBookmark(workflowSession);
}
}
164 changes: 74 additions & 90 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,35 +110,27 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, params AI
{
Throw.IfNullOrEmpty(parts);

AgentResponseUpdate update = new(ChatRole.Assistant, parts)
return new(ChatRole.Assistant, parts)
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = responseId,
RawRepresentation = raw
};

this.ChatHistoryProvider.AddMessages(this, update.ToChatMessage());

return update;
}

public AgentResponseUpdate CreateUpdate(string responseId, object raw, ChatMessage message)
{
Throw.IfNull(message);

AgentResponseUpdate update = new(message.Role, message.Contents)
return new(message.Role, message.Contents)
{
CreatedAt = message.CreatedAt ?? DateTimeOffset.UtcNow,
MessageId = message.MessageId ?? Guid.NewGuid().ToString("N"),
ResponseId = responseId,
RawRepresentation = raw
};

this.ChatHistoryProvider.AddMessages(this, update.ToChatMessage());

return update;
}

private async ValueTask<StreamingRun> CreateOrResumeRunAsync(List<ChatMessage> messages, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -170,94 +162,86 @@ internal async
IAsyncEnumerable<AgentResponseUpdate> InvokeStageAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
try
{
this.LastResponseId = Guid.NewGuid().ToString("N");
List<ChatMessage> messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList();
this.LastResponseId = Guid.NewGuid().ToString("N");
List<ChatMessage> messages = this.ChatHistoryProvider.GetFromBookmark(this).ToList();

#pragma warning disable CA2007 // Analyzer misfiring and not seeing .ConfigureAwait(false) below.
await using StreamingRun run =
await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false);
await using StreamingRun run =
await this.CreateOrResumeRunAsync(messages, cancellationToken).ConfigureAwait(false);
#pragma warning restore CA2007

await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
await run.TrySendMessageAsync(new TurnToken(emitEvents: true)).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false, cancellationToken)
.ConfigureAwait(false)
.WithCancellation(cancellationToken))
{
switch (evt)
{
switch (evt)
{
case AgentResponseUpdateEvent agentUpdate:
yield return agentUpdate.Update;
break;

case RequestInfoEvent requestInfo:
FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall();
AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent);
yield return update;
break;

case WorkflowErrorEvent workflowError:
Exception? exception = workflowError.Exception;
if (exception is TargetInvocationException tie && tie.InnerException != null)
{
exception = tie.InnerException;
}

if (exception != null)
{
string message = this._includeExceptionDetails
? exception.Message
: "An error occurred while executing the workflow.";

ErrorContent errorContent = new(message);
yield return this.CreateUpdate(this.LastResponseId, evt, errorContent);
}

break;

case SuperStepCompletedEvent stepCompleted:
this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint;
case AgentResponseUpdateEvent agentUpdate:
yield return agentUpdate.Update;
break;

case RequestInfoEvent requestInfo:
FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall();
AgentResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent);
yield return update;
break;

case WorkflowErrorEvent workflowError:
Exception? exception = workflowError.Exception;
if (exception is TargetInvocationException tie && tie.InnerException != null)
{
exception = tie.InnerException;
}

if (exception != null)
{
string message = this._includeExceptionDetails
? exception.Message
: "An error occurred while executing the workflow.";

ErrorContent errorContent = new(message);
yield return this.CreateUpdate(this.LastResponseId, evt, errorContent);
}

break;

case SuperStepCompletedEvent stepCompleted:
this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint;
goto default;

case WorkflowOutputEvent output:
IEnumerable<ChatMessage>? updateMessages = output.Data switch
{
IEnumerable<ChatMessage> chatMessages => chatMessages,
ChatMessage chatMessage => [chatMessage],
_ => null
};

if (!this._includeWorkflowOutputsInResponse || updateMessages == null)
{
goto default;

case WorkflowOutputEvent output:
IEnumerable<ChatMessage>? updateMessages = output.Data switch
{
IEnumerable<ChatMessage> chatMessages => chatMessages,
ChatMessage chatMessage => [chatMessage],
_ => null
};

if (!this._includeWorkflowOutputsInResponse || updateMessages == null)
{
goto default;
}

foreach (ChatMessage message in updateMessages)
{
yield return this.CreateUpdate(this.LastResponseId, evt, message);
}
break;

default:
// Emit all other workflow events for observability (DevUI, logging, etc.)
yield return new AgentResponseUpdate(ChatRole.Assistant, [])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = this.LastResponseId,
RawRepresentation = evt
};
break;
}
}

foreach (ChatMessage message in updateMessages)
{
yield return this.CreateUpdate(this.LastResponseId, evt, message);
}
break;

default:
// Emit all other workflow events for observability (DevUI, logging, etc.)
yield return new AgentResponseUpdate(ChatRole.Assistant, [])
{
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = this.LastResponseId,
RawRepresentation = evt
};
break;
}
}
finally
{
// Do we want to try to undo the step, and not update the bookmark?
this.ChatHistoryProvider.UpdateBookmark(this);
}
}

public string? LastResponseId { get; set; }
Expand Down
Loading
Loading