Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ted/streaming addons #573

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sphinxcontrib-htmlhelp==2.0.0
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.3
sphinxcontrib-serializinghtml==1.1.5
sseclient-py==1.8.0
toml==0.10.2
tomli==2.0.0
typing-extensions==4.2.0
Expand Down
104 changes: 83 additions & 21 deletions src/steamship/agents/functional/functions_based.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
from typing import List

from steamship import Block
from steamship import Block, MimeTypes, Tag
from steamship.agents.functional.output_parser import FunctionsBasedOutputParser
from steamship.agents.schema import Action, AgentContext, ChatAgent, ChatLLM, Tool
from steamship.data.tags.tag_constants import RoleTag
from steamship.agents.schema import Action, AgentContext, ChatAgent, ChatLLM, FinishAction, Tool
from steamship.data.tags.tag_constants import RoleTag, TagKind, TagValueKey
from steamship.data.tags.tag_utils import get_tag


class FunctionsBasedAgent(ChatAgent):
Expand All @@ -25,13 +27,17 @@ def __init__(self, tools: List[Tool], llm: ChatLLM, **kwargs):
output_parser=FunctionsBasedOutputParser(tools=tools), llm=llm, tools=tools, **kwargs
)

def _get_or_create_system_message(self, context: AgentContext) -> Block:
sys_msg = context.chat_history.last_system_message
if sys_msg:
return sys_msg

return context.chat_history.append_system_message(text=self.PROMPT, mime_type=MimeTypes.TXT)

def build_chat_history_for_tool(self, context: AgentContext) -> List[Block]:
messages: List[Block] = []
messages: List[Block] = [self._get_or_create_system_message(context)]

# get system message
system_message = Block(text=self.PROMPT)
system_message.set_chat_role(RoleTag.SYSTEM)
messages.append(system_message)

messages_from_memory = []
# get prior conversations
Expand All @@ -41,21 +47,15 @@ def build_chat_history_for_tool(self, context: AgentContext) -> List[Block]:
.wait()
.to_ranked_blocks()
)

# TODO(dougreid): we need a way to threshold message inclusion, especially for small contexts

# remove the actual prompt from the semantic search (it will be an exact match)
messages_from_memory = [
msg
for msg in messages_from_memory
if msg.id != context.chat_history.last_user_message.id
]

# get most recent context
messages_from_memory.extend(context.chat_history.select_messages(self.message_selector))

# de-dupe the messages from memory
ids = [context.chat_history.last_user_message.id]
ids = [
context.chat_history.last_user_message.id
] # filter out last user message, it is appended afterwards
for msg in messages_from_memory:
if msg.id not in ids:
messages.append(msg)
Expand All @@ -67,10 +67,8 @@ def build_chat_history_for_tool(self, context: AgentContext) -> List[Block]:
# this should happen BEFORE any agent/assistant messages related to tool selection
messages.append(context.chat_history.last_user_message)

# get completed steps
actions = context.completed_steps
for action in actions:
messages.extend(action.to_chat_messages())
# get working history (completed actions)
messages.extend(self._function_calls_since_last_user_message(context))

return messages

Expand All @@ -81,4 +79,68 @@ def next_action(self, context: AgentContext) -> Action:
# Run the default LLM on those messages
output_blocks = self.llm.chat(messages=messages, tools=self.tools)

return self.output_parser.parse(output_blocks[0].text, context)
future_action = self.output_parser.parse(output_blocks[0].text, context)
if not isinstance(future_action, FinishAction):
# record the LLM's function response in history
self._record_action_selection(future_action, context)
return future_action

def _function_calls_since_last_user_message(self, context: AgentContext) -> List[Block]:
function_calls = []
for block in context.chat_history.messages[::-1]: # is this too inefficient at scale?
if block.chat_role == RoleTag.USER:
return reversed(function_calls)
if get_tag(block.tags, kind=TagKind.ROLE, name=RoleTag.FUNCTION):
function_calls.append(block)
elif get_tag(block.tags, kind=TagKind.FUNCTION_SELECTION):
function_calls.append(block)
return reversed(function_calls)

def _to_openai_function_selection(self, action: Action) -> str:
fc = {"name": action.tool}
args = {}
for block in action.input:
for t in block.tags:
if t.kind == TagKind.FUNCTION_ARG:
args[t.name] = block.as_llm_input(exclude_block_wrapper=True)

fc["arguments"] = json.dumps(args) # the arguments must be a string value NOT a dict
return json.dumps(fc)

def _record_action_selection(self, action: Action, context: AgentContext):
tags = [
Tag(kind=TagKind.ROLE, name=RoleTag.ASSISTANT),
Tag(kind=TagKind.FUNCTION_SELECTION, name=action.tool),
Tag(
kind="request-id",
name=context.request_id,
value={TagValueKey.STRING_VALUE: context.request_id},
),
]
context.chat_history.file.append_block(
text=self._to_openai_function_selection(action), tags=tags, mime_type=MimeTypes.TXT
)

def record_action_run(self, action: Action, context: AgentContext):
super().record_action_run(action, context)

tags = [
Tag(
kind=TagKind.ROLE,
name=RoleTag.FUNCTION,
value={TagValueKey.STRING_VALUE: action.tool},
),
Tag(
kind="request-id",
name=context.request_id,
value={TagValueKey.STRING_VALUE: context.request_id},
),
]
# TODO(dougreid): I'm not convinced this is correct for tools that return multiple values.
# It _feels_ like these should be named and inlined as a single message in history, etc.
for block in action.output:
context.chat_history.file.append_block(
text=block.as_llm_input(exclude_block_wrapper=True),
tags=tags,
mime_type=block.mime_type,
)
44 changes: 37 additions & 7 deletions src/steamship/agents/functional/output_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from json import JSONDecodeError
from typing import Dict, List, Optional

from steamship import Block, MimeTypes, Steamship
from steamship import Block, MimeTypes, Steamship, Tag
from steamship.agents.schema import Action, AgentContext, FinishAction, OutputParser, Tool
from steamship.data.tags.tag_constants import RoleTag
from steamship.data.tags.tag_constants import RoleTag, TagKind
from steamship.utils.utils import is_valid_uuid4


Expand Down Expand Up @@ -43,16 +43,45 @@ def _extract_action_from_function_call(self, text: str, context: AgentContext) -
try:
args = json.loads(arguments)
if text := args.get("text"):
input_blocks.append(Block(text=text, mime_type=MimeTypes.TXT))
input_blocks.append(
Block(
text=text,
tags=[Tag(kind=TagKind.FUNCTION_ARG, name="text")],
mime_type=MimeTypes.TXT,
)
)
elif uuid_arg := args.get("uuid"):
input_blocks.append(Block.get(context.client, _id=uuid_arg))
existing_block = Block.get(context.client, _id=uuid_arg)
tag = Tag.create(
existing_block.client,
file_id=existing_block.file_id,
block_id=existing_block.id,
kind=TagKind.FUNCTION_ARG,
name="uuid",
)
existing_block.tags.append(tag)
input_blocks.append(existing_block)
except json.decoder.JSONDecodeError:
if isinstance(arguments, str):
if is_valid_uuid4(arguments):
input_blocks.append(Block.get(context.client, _id=uuid_arg))
existing_block = Block.get(context.client, _id=arguments)
tag = Tag.create(
existing_block.client,
file_id=existing_block.file_id,
block_id=existing_block.id,
kind=TagKind.FUNCTION_ARG,
name="uuid",
)
existing_block.tags.append(tag)
input_blocks.append(existing_block)
else:
input_blocks.append(Block(text=arguments, mime_type=MimeTypes.TXT))

input_blocks.append(
Block(
text=arguments,
tags=[Tag(kind=TagKind.FUNCTION_ARG, name="text")],
mime_type=MimeTypes.TXT,
)
)
return Action(tool=tool.name, input=input_blocks, context=context)

@staticmethod
Expand Down Expand Up @@ -112,4 +141,5 @@ def parse(self, text: str, context: AgentContext) -> Action:
finish_blocks = FunctionsBasedOutputParser._blocks_from_text(context.client, text)
for finish_block in finish_blocks:
finish_block.set_chat_role(RoleTag.ASSISTANT)
finish_block.set_request_id(context.request_id)
return FinishAction(output=finish_blocks, context=context)
41 changes: 31 additions & 10 deletions src/steamship/agents/llms/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def complete(self, prompt: str, stop: Optional[str] = None, **kwargs) -> List[Bl
if "max_tokens" in kwargs:
options["max_tokens"] = kwargs["max_tokens"]

# TODO(dougreid): do we care about streaming here? should we take a kwarg that is file_id ?
action_task = self.generator.generate(text=prompt, options=options)
action_task.wait()
return action_task.output.blocks
Expand All @@ -84,12 +85,8 @@ def chat(self, messages: List[Block], tools: Optional[List[Tool]], **kwargs) ->
Supported kwargs include:
- `max_tokens` (controls the size of LLM responses)
"""

temp_file = File.create(
client=self.client,
blocks=messages,
tags=[Tag(kind=TagKind.GENERATION, name=GenerationTag.PROMPT_COMPLETION)],
)
if len(messages) <= 0:
return []

options = {}
if len(tools) > 0:
Expand Down Expand Up @@ -119,7 +116,31 @@ def chat(self, messages: List[Block], tools: Optional[List[Tool]], **kwargs) ->

logging.info(f"OpenAI ChatComplete ({messages[-1].as_llm_input()})", extra=extra)

tool_selection_task = self.generator.generate(input_file_id=temp_file.id, options=options)
tool_selection_task.wait()

return tool_selection_task.output.blocks
# for streaming use cases, we want to always use the existing file
# the way to detect this would be if all messages were from the same file
if self._from_same_file(blocks=messages):
file_id = messages[0].file_id
block_indices = [b.index_in_file for b in messages]
generate_task = self.generator.generate(
input_file_id=file_id,
input_file_block_index_list=block_indices,
options=options,
append_output_to_file=True,
)
else:
tags = [Tag(kind=TagKind.GENERATION, name=GenerationTag.PROMPT_COMPLETION)]
temp_file = File.create(client=self.client, blocks=messages, tags=tags)
generate_task = self.generator.generate(input_file_id=temp_file.id, options=options)

generate_task.wait()

return generate_task.output.blocks

def _from_same_file(self, blocks: List[Block]) -> bool:
if len(blocks) <= 1:
return True
file_id = blocks[0].file_id
for b in blocks[1:]:
if b.file_id != file_id:
return False
return True
27 changes: 3 additions & 24 deletions src/steamship/agents/schema/action.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from typing import List, Optional

from pydantic import BaseModel
from pydantic.fields import Field

from steamship import Block, Tag
from steamship.data import TagKind
from steamship.data.tags.tag_constants import RoleTag
from steamship import Block


class Action(BaseModel):
Expand All @@ -22,32 +21,12 @@ class Action(BaseModel):
output: Optional[List[Block]]
"""Any direct output produced by the Tool."""

is_final: bool = False
is_final: bool = Field(default=False)
"""Whether this Action should be the final action performed in a reasoning loop.

Setting this to True means that the executing Agent should halt any reasoning.
"""

def to_chat_messages(self) -> List[Block]:
tags = [
Tag(kind=TagKind.ROLE, name=RoleTag.FUNCTION),
Tag(kind="name", name=self.tool),
]
blocks = []
for block in self.output:
# TODO(dougreid): should we revisit as_llm_input? we might need only the UUID...
blocks.append(
Block(
text=block.as_llm_input(exclude_block_wrapper=True),
tags=tags,
mime_type=block.mime_type,
)
)

# TODO(dougreid): revisit when have multiple output functions.
# Current thinking: LLM will be OK with multiple function blocks in a row. NEEDS validation.
return blocks


class FinishAction(Action):
"""Represents a final selected action in an Agent Execution."""
Expand Down
19 changes: 12 additions & 7 deletions src/steamship/agents/schema/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from steamship.agents.schema.message_selectors import MessageSelector, NoMessages
from steamship.agents.schema.output_parser import OutputParser
from steamship.agents.schema.tool import Tool
from steamship.data.tags.tag_constants import RoleTag
from steamship.data.tags.tag_constants import RoleTag, TagKind
from steamship.data.tags.tag_utils import get_tag


class Agent(BaseModel, ABC):
Expand All @@ -31,6 +32,11 @@ class Agent(BaseModel, ABC):
def next_action(self, context: AgentContext) -> Action:
pass

@abstractmethod
def record_action_run(self, action: Action, context: AgentContext):
# TODO(dougreid): should this method (or just bit) actually be on AgentContext?
context.completed_steps.append(action)


class LLMAgent(Agent):
"""LLMAgents choose next actions for an AgentService based on interactions with an LLM."""
Expand All @@ -53,17 +59,16 @@ def messages_to_prompt_history(messages: List[Block]) -> str:
# Internal Status Messages are not considered part of **prompt** history.
# Their inclusion could lead to problematic LLM behavior, etc.
# As such are explicitly skipped here:
# - DON'T RETURN AGENT MESSAGES
# - DON'T RETURN TOOL MESSAGES
# - DON'T RETURN LLM MESSAGES
# - DON'T RETURN STATUS MESSAGES
# - DON'T RETURN FUNCTION or FUNCTION_SELECTION MESSAGES
if role == RoleTag.USER:
as_strings.append(f"User: {block.text}")
elif role == RoleTag.ASSISTANT:
elif role == RoleTag.ASSISTANT and (
get_tag(block.tags, TagKind.FUNCTION_SELECTION) is None
):
as_strings.append(f"Assistant: {block.text}")
elif role == RoleTag.SYSTEM:
as_strings.append(f"System: {block.text}")
elif role == RoleTag.FUNCTION:
as_strings.append(f"Function: {block.text}")
return "\n".join(as_strings)


Expand Down
Loading