-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Python: Add the Python process framework (#9363)
### Motivation and Context An initial PR to add the foundational pieces of the Python Process framework, which holds it design to be similar to dotnet in that step types are added to a process builder, and later on, when the step is run, it is first instantiated and the proper state is provided. <!-- Thank you for your contribution to the semantic-kernel repo! Please help reviewers and future users, providing the following information: 1. Why is this change required? 2. What problem does it solve? 3. What scenario does it contribute to? 4. If it fixes an open issue, please link to the issue here. --> ### Description Adding the initial process framework components: - Closes #9354 **TODO** - more unit tests will be added to increase the code coverage. Currently there are several files with no (or low) code coverage. - more samples will either be added to this PR or a subsequent PR <!-- Describe your changes, the overall approach, the underlying design. These notes will help understanding how your code works. Thanks! --> ### Contribution Checklist <!-- Before submitting this PR, please make sure: --> - [X] The code builds clean without any errors or warnings - [X] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [X] All unit tests pass, and I have added new tests where possible - [X] I didn't break anyone 😄
- Loading branch information
Showing
56 changed files
with
3,924 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
python/samples/concepts/processes/cycles_with_fan_in.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# Copyright (c) Microsoft. All rights reserved. | ||
|
||
import asyncio | ||
import logging | ||
from enum import Enum | ||
from typing import ClassVar | ||
|
||
from pydantic import Field | ||
|
||
from semantic_kernel import Kernel | ||
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion | ||
from semantic_kernel.functions import kernel_function | ||
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep | ||
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext | ||
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState | ||
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent | ||
from semantic_kernel.processes.local_runtime.local_kernel_process import start | ||
from semantic_kernel.processes.process_builder import ProcessBuilder | ||
|
||
logging.basicConfig(level=logging.WARNING) | ||
|
||
|
||
class CommonEvents(Enum): | ||
"""Common events for the sample process.""" | ||
|
||
UserInputReceived = "UserInputReceived" | ||
CompletionResponseGenerated = "CompletionResponseGenerated" | ||
WelcomeDone = "WelcomeDone" | ||
AStepDone = "AStepDone" | ||
BStepDone = "BStepDone" | ||
CStepDone = "CStepDone" | ||
StartARequested = "StartARequested" | ||
StartBRequested = "StartBRequested" | ||
ExitRequested = "ExitRequested" | ||
StartProcess = "StartProcess" | ||
|
||
|
||
# Define a sample step that once the `on_input_event` is received, | ||
# it will emit two events to start the A and B steps. | ||
class KickOffStep(KernelProcessStep): | ||
KICK_OFF_FUNCTION: ClassVar[str] = "kick_off" | ||
|
||
@kernel_function(name=KICK_OFF_FUNCTION) | ||
async def print_welcome_message(self, context: KernelProcessStepContext): | ||
context.emit_event(KernelProcessEvent(id=CommonEvents.StartARequested.value, data="Get Going A")) | ||
context.emit_event(KernelProcessEvent(id=CommonEvents.StartBRequested.value, data="Get Going B")) | ||
|
||
|
||
# Define a sample `AStep` step that will emit an event after 1 second. | ||
# The event will be sent to the `CStep` step with the data `I did A`. | ||
class AStep(KernelProcessStep): | ||
@kernel_function() | ||
async def do_it(self, context: KernelProcessStepContext): | ||
await asyncio.sleep(1) | ||
context.emit_event(KernelProcessEvent(id=CommonEvents.AStepDone.value, data="I did A")) | ||
|
||
|
||
# Define a sample `BStep` step that will emit an event after 2 seconds. | ||
# The event will be sent to the `CStep` step with the data `I did B`. | ||
class BStep(KernelProcessStep): | ||
@kernel_function() | ||
async def do_it(self, context: KernelProcessStepContext): | ||
await asyncio.sleep(2) | ||
context.emit_event(KernelProcessEvent(id=CommonEvents.BStepDone.value, data="I did B")) | ||
|
||
|
||
# Define a sample `CStepState` that will keep track of the current cycle. | ||
class CStepState: | ||
current_cycle: int = 0 | ||
|
||
|
||
# Define a sample `CStep` step that will emit an `ExitRequested` event after 3 cycles. | ||
class CStep(KernelProcessStep[CStepState]): | ||
state: CStepState = Field(default_factory=CStepState) | ||
|
||
# The activate method overrides the base class method to set the state in the step. | ||
async def activate(self, state: KernelProcessStepState[CStepState]): | ||
"""Activates the step and sets the state.""" | ||
self.state = state.state | ||
|
||
@kernel_function() | ||
async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepdata: str): | ||
self.state.current_cycle += 1 | ||
print(f"CStep Current Cycle: {self.state.current_cycle}") | ||
if self.state.current_cycle == 3: | ||
print("CStep Exit Requested") | ||
context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.ExitRequested.value)) | ||
return | ||
context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.CStepDone.value)) | ||
|
||
|
||
kernel = Kernel() | ||
|
||
|
||
async def cycles_with_fan_in(): | ||
kernel.add_service(OpenAIChatCompletion(service_id="default")) | ||
|
||
# Define the process builder | ||
process = ProcessBuilder(name="Test Process") | ||
|
||
# Add the step types to the builder | ||
kickoff_step = process.add_step(step_type=KickOffStep) | ||
myAStep = process.add_step(step_type=AStep) | ||
myBStep = process.add_step(step_type=BStep) | ||
myCStep = process.add_step(step_type=CStep) | ||
|
||
# Define the input event and where to send it to | ||
process.on_input_event(event_id=CommonEvents.StartProcess.value).send_event_to(target=kickoff_step) | ||
|
||
# Define the process flow | ||
kickoff_step.on_event(event_id=CommonEvents.StartARequested.value).send_event_to(target=myAStep) | ||
kickoff_step.on_event(event_id=CommonEvents.StartBRequested.value).send_event_to(target=myBStep) | ||
myAStep.on_event(event_id=CommonEvents.AStepDone.value).send_event_to(target=myCStep, parameter_name="astepdata") | ||
|
||
# Define the fan in behavior once both AStep and BStep are done | ||
myBStep.on_event(event_id=CommonEvents.BStepDone.value).send_event_to(target=myCStep, parameter_name="bstepdata") | ||
myCStep.on_event(event_id=CommonEvents.CStepDone.value).send_event_to(target=kickoff_step) | ||
myCStep.on_event(event_id=CommonEvents.ExitRequested.value).stop_process() | ||
|
||
# Build the process | ||
kernel_process = process.build() | ||
|
||
async with await start( | ||
process=kernel_process, | ||
kernel=kernel, | ||
initial_event=KernelProcessEvent(id=CommonEvents.StartProcess.value, data="foo"), | ||
) as process_context: | ||
process_state = await process_context.get_state() | ||
c_step_state: KernelProcessStepState[CStepState] = next( | ||
(s.state for s in process_state.steps if s.state.name == "CStep"), None | ||
) | ||
assert c_step_state.state # nosec | ||
assert c_step_state.state.current_cycle == 3 # nosec | ||
print(f"Final State Check: CStepState current cycle: {c_step_state.state.current_cycle}") | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(cycles_with_fan_in()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
# Copyright (c) Microsoft. All rights reserved. | ||
|
||
import asyncio | ||
import logging | ||
from enum import Enum | ||
from typing import ClassVar | ||
|
||
from pydantic import Field | ||
|
||
from semantic_kernel import Kernel | ||
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion | ||
from semantic_kernel.functions import kernel_function | ||
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess | ||
from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEventVisibility | ||
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep | ||
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext | ||
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState | ||
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent | ||
from semantic_kernel.processes.local_runtime.local_kernel_process import start | ||
from semantic_kernel.processes.process_builder import ProcessBuilder | ||
from semantic_kernel.processes.process_types import TState | ||
|
||
logging.basicConfig(level=logging.WARNING) | ||
|
||
|
||
class ProcessEvents(Enum): | ||
StartProcess = "StartProcess" | ||
StartInnerProcess = "StartInnerProcess" | ||
OutputReadyPublic = "OutputReadyPublic" | ||
OutputReadyInternal = "OutputReadyInternal" | ||
|
||
|
||
class StepState: | ||
last_message: str = None | ||
|
||
|
||
class EchoStep(KernelProcessStep): | ||
ECHO: ClassVar[str] = "echo" | ||
|
||
@kernel_function(name=ECHO) | ||
async def echo(self, message: str): | ||
print(f"[ECHO] {message}") | ||
return message | ||
|
||
|
||
class RepeatStep(KernelProcessStep[StepState]): | ||
REPEAT: ClassVar[str] = "repeat" | ||
|
||
state: StepState = Field(default_factory=StepState) | ||
|
||
async def activate(self, state: KernelProcessStepState[TState]): | ||
"""Activates the step and sets the state.""" | ||
self.state = state.state | ||
|
||
@kernel_function(name=REPEAT) | ||
async def repeat(self, message: str, context: KernelProcessStepContext, count: int = 2): | ||
output = " ".join([message] * count) | ||
self.state.last_message = output | ||
print(f"[REPEAT] {output}") | ||
|
||
context.emit_event( | ||
process_event=KernelProcessEvent( | ||
id=ProcessEvents.OutputReadyPublic.value, data=output, visibility=KernelProcessEventVisibility.Public | ||
) | ||
) | ||
context.emit_event( | ||
process_event=KernelProcessEvent( | ||
id=ProcessEvents.OutputReadyInternal.value, | ||
data=output, | ||
visibility=KernelProcessEventVisibility.Internal, | ||
) | ||
) | ||
|
||
|
||
def create_linear_process(name: str): | ||
process_builder = ProcessBuilder(name=name) | ||
echo_step = process_builder.add_step(step_type=EchoStep) | ||
repeat_step = process_builder.add_step(step_type=RepeatStep) | ||
|
||
process_builder.on_input_event(event_id=ProcessEvents.StartProcess.value).send_event_to(target=echo_step) | ||
|
||
echo_step.on_function_result(function_name=EchoStep.ECHO).send_event_to( | ||
target=repeat_step, parameter_name="message" | ||
) | ||
|
||
return process_builder | ||
|
||
|
||
kernel = Kernel() | ||
|
||
|
||
async def nested_process(): | ||
kernel.add_service(OpenAIChatCompletion(service_id="default")) | ||
|
||
process_builder = create_linear_process("Outer") | ||
|
||
nested_process_step = process_builder.add_step_from_process(create_linear_process("Inner")) | ||
|
||
process_builder.steps[1].on_event(ProcessEvents.OutputReadyInternal.value).send_event_to( | ||
nested_process_step.where_input_event_is(ProcessEvents.StartProcess.value) | ||
) | ||
|
||
process = process_builder.build() | ||
|
||
test_input = "Test" | ||
|
||
process_handle = await start( | ||
process=process, kernel=kernel, initial_event=ProcessEvents.StartProcess.value, data=test_input | ||
) | ||
process_info = await process_handle.get_state() | ||
|
||
inner_process: KernelProcess = next((s for s in process_info.steps if s.state.name == "Inner"), None) | ||
|
||
repeat_step_state: KernelProcessStepState[StepState] = next( | ||
(s.state for s in inner_process.steps if s.state.name == "RepeatStep"), None | ||
) | ||
assert repeat_step_state.state # nosec | ||
assert repeat_step_state.state.last_message == "Test Test Test Test" # nosec | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(nested_process()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Semantic Kernel Processes - Getting Started | ||
|
||
This project contains a step by step guide to get started with _Semantic Kernel Processes_. | ||
|
||
|
||
#### PyPI: | ||
- The initial release of the Python Process Framework was in the Semantic Kernel pypi version 1.12.0. | ||
|
||
#### Sources | ||
|
||
- [Semantic Kernel Process Framework](../../semantic_kernel/processes/) | ||
- [Semantic Kernel Processes - Kernel Process](../../semantic_kernel/processes/kernel_process/) | ||
- [Semantic Kernel Processes - Local Runtime](../../semantic_kernel/processes/local_runtime/) | ||
|
||
The examples can be run as scripts and the code can also be copied to stand-alone projects, using the proper package imports. | ||
|
||
## Examples | ||
|
||
The getting started with agents examples include: | ||
|
||
Example|Description | ||
---|--- | ||
[step01_processes](../getting_started_with_processes/step01_processes.py)|How to create a simple process with a loop and a conditional exit | ||
|
||
### step01_processes | ||
|
||
```mermaid | ||
flowchart LR | ||
Intro(Intro)--> UserInput(User Input) | ||
UserInput-->|User message == 'exit'| Exit(Exit) | ||
UserInput-->|User message| AssistantResponse(Assistant Response) | ||
AssistantResponse--> UserInput | ||
``` | ||
|
||
## Configuring the Kernel | ||
|
||
Similar to the Semantic Kernel Python concept samples, it is necessary to configure the secrets | ||
and keys used by the kernel. See the follow "Configuring the Kernel" [guide](../concepts/README.md#configuring-the-kernel) for | ||
more information. | ||
|
||
## Running Concept Samples | ||
|
||
Concept samples can be run in an IDE or via the command line. After setting up the required api key | ||
for your AI connector, the samples run without any extra command line arguments. |
Oops, something went wrong.