Skip to content

Commit

Permalink
Merge pull request #79 from parea-ai/PAI-250-change-python-trace-deco…
Browse files Browse the repository at this point in the history
…rator-interplay-with-parea-completion-endpoint

feat: measure latency in trace decorator
  • Loading branch information
joschkabraun committed Sep 8, 2023
2 parents 7e71951 + 2315d9c commit 4610255
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 84 deletions.
15 changes: 12 additions & 3 deletions parea/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import time
from uuid import uuid4

from attrs import asdict, define, field

from parea.api_client import HTTPClient
from parea.parea_logger import parea_logger
from parea.schemas.models import Completion, CompletionResponse, FeedbackRequest, UseDeployedPrompt, UseDeployedPromptResponse
from parea.utils.trace_utils import get_current_trace_id
from parea.utils.trace_utils import default_logger, get_current_trace_id, trace_data

COMPLETION_ENDPOINT = "/completion"
DEPLOYED_PROMPT_ENDPOINT = "/deployed-prompt"
Expand All @@ -23,21 +24,29 @@ def __attrs_post_init__(self):
parea_logger.set_client(self._client)

def completion(self, data: Completion) -> CompletionResponse:
data.inference_id = get_current_trace_id() or None
inference_id = str(uuid4())
data.inference_id = inference_id
r = self._client.request(
"POST",
COMPLETION_ENDPOINT,
data=asdict(data),
)
if parent_trace_id := get_current_trace_id():
trace_data.get()[parent_trace_id].children.append(inference_id)
default_logger(parent_trace_id)
return CompletionResponse(**r.json())

async def acompletion(self, data: Completion) -> CompletionResponse:
data.inference_id = get_current_trace_id()
inference_id = str(uuid4())
data.inference_id = inference_id
r = await self._client.request_async(
"POST",
COMPLETION_ENDPOINT,
data=asdict(data),
)
if parent_trace_id := get_current_trace_id():
trace_data.get()[parent_trace_id].children.append(inference_id)
default_logger(parent_trace_id)
return CompletionResponse(**r.json())

def get_prompt(self, data: UseDeployedPrompt) -> UseDeployedPromptResponse:
Expand Down
1 change: 0 additions & 1 deletion parea/cookbook/tracing_with_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def dump_task(task):
return d


@trace
def call_llm(
data: list[Message],
model: str = "gpt-3.5-turbo",
Expand Down
19 changes: 4 additions & 15 deletions parea/cookbook/tracing_with_deployed_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
p = Parea(api_key=os.getenv("PAREA_API_KEY"))


@trace
def deployed_argument_generator(query: str, additional_description: str = "") -> str:
return p.completion(
Completion(
Expand All @@ -26,7 +25,6 @@ def deployed_argument_generator(query: str, additional_description: str = "") ->
).content


@trace
def deployed_critic(argument: str) -> str:
return p.completion(
Completion(
Expand All @@ -36,7 +34,6 @@ def deployed_critic(argument: str) -> str:
).content


@trace
def deployed_refiner(query: str, additional_description: str, current_arg: str, criticism: str) -> str:
return p.completion(
Completion(
Expand All @@ -52,7 +49,6 @@ def deployed_refiner(query: str, additional_description: str, current_arg: str,
).content


@trace
def deployed_refiner2(query: str, additional_description: str, current_arg: str, criticism: str) -> CompletionResponse:
return p.completion(
Completion(
Expand All @@ -75,19 +71,11 @@ def deployed_argument_chain(query: str, additional_description: str = "") -> str
return deployed_refiner(query, additional_description, argument, criticism)


@trace
def deployed_argument_chain2(query: str, additional_description: str = "") -> tuple[str, str]:
trace_id = get_current_trace_id()
argument = deployed_argument_generator(query, additional_description)
criticism = deployed_critic(argument)
return deployed_refiner(query, additional_description, argument, criticism), trace_id


@trace(
tags=["cookbook-example-deployed", "feedback_tracked-deployed"],
metadata={"source": "python-sdk", "deployed": True},
)
def deployed_argument_chain3(query: str, additional_description: str = "") -> CompletionResponse:
def deployed_argument_chain_tags_metadata(query: str, additional_description: str = "") -> CompletionResponse:
argument = deployed_argument_generator(query, additional_description)
criticism = deployed_critic(argument)
return deployed_refiner2(query, additional_description, argument, criticism)
Expand All @@ -100,10 +88,11 @@ def deployed_argument_chain3(query: str, additional_description: str = "") -> Co
)
print(result1)

result2, trace_id2 = deployed_argument_chain2(
result2 = deployed_argument_chain(
"Whether wine is good for you.",
additional_description="Provide a concise, few sentence argument on why wine is good for you.",
)
trace_id2 = get_current_trace_id()
print(result2)
p.record_feedback(
FeedbackRequest(
Expand All @@ -113,7 +102,7 @@ def deployed_argument_chain3(query: str, additional_description: str = "") -> Co
)
)

result3 = deployed_argument_chain3(
result3 = deployed_argument_chain_tags_metadata(
"Whether coffee is good for you.",
additional_description="Provide a concise, few sentence argument on why coffee is good for you.",
)
Expand Down
109 changes: 56 additions & 53 deletions parea/cookbook/tracing_with_open_ai_endpoint_directly.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,75 @@
p = Parea(api_key=os.getenv("PAREA_API_KEY"))


def call_openai(data: list[dict], model: str = "gpt-3.5-turbo-0613", temperature: float = 0.0) -> str:
return openai.ChatCompletion.create(model=model, messages=data, temperature=temperature).choices[0].message["content"]


@trace
def argument_generator(query: str, additional_description: str = "") -> str:
return call_openai(
data=[
{
"role": "system",
"content": f"""You are a debater making an argument on a topic. {additional_description}.
The current time is {datetime.now()}""",
},
{"role": "user", "content": f"""The discussion topic is {query}"""},
]
def argument_chain(query: str, additional_description: str = "") -> str:
argument = (
openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
temperature=0.0,
messages=[
{
"role": "system",
"content": f"""You are a debater making an argument on a topic. {additional_description}.
The current time is {datetime.now()}""",
},
{"role": "user", "content": f"The discussion topic is {query}"},
],
)
.choices[0]
.message["content"]
)


@trace
def critic(argument: str) -> str:
return call_openai(
data=[
{
"role": "system",
"content": f"""You are a critic.
What unresolved questions or criticism do you have after reading the following argument?
Provide a concise summary of your feedback.""",
},
{"role": "user", "content": f"""{argument}"""},
]
criticism = (
openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
temperature=0.0,
messages=[
{
"role": "system",
"content": f"""You are a critic.
What unresolved questions or criticism do you have after reading the following argument?
Provide a concise summary of your feedback.""",
},
{"role": "user", "content": argument},
],
)
.choices[0]
.message["content"]
)


@trace
def refiner(query: str, additional_description: str, current_arg: str, criticism: str) -> str:
return call_openai(
data=[
{
"role": "system",
"content": f"""You are a debater making an argument on a topic. {additional_description}.
The current time is {datetime.now()}""",
},
{"role": "user", "content": f"""The discussion topic is {query}"""},
{"role": "assistant", "content": f"""{current_arg}"""},
{"role": "user", "content": f"""{criticism}"""},
{
"role": "system",
"content": f"""Please generate a new argument that incorporates the feedback from the user.""",
},
]
refined_argument = (
openai.ChatCompletion.create(
model="gpt-3.5-turbo-0613",
temperature=0.0,
messages=[
{
"role": "system",
"content": f"""You are a debater making an argument on a topic. {additional_description}.
The current time is {datetime.now()}""",
},
{"role": "user", "content": f"""The discussion topic is {query}"""},
{"role": "assistant", "content": argument},
{"role": "user", "content": criticism},
{
"role": "system",
"content": f"Please generate a new argument that incorporates the feedback from the user.",
},
],
)
.choices[0]
.message["content"]
)


@trace
def argument_chain(query: str, additional_description: str = "") -> tuple[str, str]:
trace_id = get_current_trace_id()
argument = argument_generator(query, additional_description)
criticism = critic(argument)
return refiner(query, additional_description, argument, criticism), trace_id
return refined_argument


if __name__ == "__main__":
result, trace_id = argument_chain(
result = argument_chain(
"Whether sparkling water is good for you.",
additional_description="Provide a concise, few sentence argument on why sparkling water is good for you.",
)
trace_id = get_current_trace_id()
print(result)
p.record_feedback(
FeedbackRequest(
Expand Down
21 changes: 11 additions & 10 deletions parea/utils/trace_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any, Optional, Tuple

import contextvars
import inspect
Expand Down Expand Up @@ -57,7 +57,7 @@ def trace(
target: Optional[str] = None,
end_user_identifier: Optional[str] = None,
):
def init_trace(func_name, args, kwargs, func):
def init_trace(func_name, args, kwargs, func) -> Tuple[str, float]:
start_time = time.time()
trace_id = str(uuid4())
trace_context.get().append(trace_id)
Expand All @@ -82,19 +82,19 @@ def init_trace(func_name, args, kwargs, func):
if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(trace_id)

return trace_id
return trace_id, start_time

def cleanup_trace(trace_id):
def cleanup_trace(trace_id, start_time):
end_time = time.time()
trace_data.get()[trace_id].end_timestamp = to_date_and_time_string(end_time)
trace_data.get()[trace_id].latency = end_time - start_time
default_logger(trace_id)
trace_context.get().pop()

def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
trace_id = init_trace(func.__name__, args, kwargs, func)
result = None
trace_id, start_time = init_trace(func.__name__, args, kwargs, func)
try:
result = await func(*args, **kwargs)
output = asdict(result) if isinstance(result, CompletionResponse) else result
Expand All @@ -103,14 +103,14 @@ async def async_wrapper(*args, **kwargs):
logger.exception(f"Error occurred in function {func.__name__}, {e}")
trace_data.get()[trace_id].error = str(e)
trace_data.get()[trace_id].status = "error"
raise e
finally:
cleanup_trace(trace_id)
cleanup_trace(trace_id, start_time)
return result

@wraps(func)
def wrapper(*args, **kwargs):
trace_id = init_trace(func.__name__, args, kwargs, func)
result = None
trace_id, start_time = init_trace(func.__name__, args, kwargs, func)
try:
result = func(*args, **kwargs)
output = asdict(result) if isinstance(result, CompletionResponse) else result
Expand All @@ -119,8 +119,9 @@ def wrapper(*args, **kwargs):
logger.exception(f"Error occurred in function {func.__name__}, {e}")
trace_data.get()[trace_id].error = str(e)
trace_data.get()[trace_id].status = "error"
raise e
finally:
cleanup_trace(trace_id)
cleanup_trace(trace_id, start_time)
return result

if inspect.iscoroutinefunction(func):
Expand Down
2 changes: 1 addition & 1 deletion parea/wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def wrapper(*args, **kwargs):
return response
except Exception as e:
error = str(e)
raise
raise e
finally:
self._cleanup_trace(trace_id, start_time, error, response, args, kwargs)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "parea-ai"
packages = [{ include = "parea" }]
version = "0.2.4"
version = "0.2.5"
description = "Parea python sdk"
readme = "README.md"
authors = ["joel-parea-ai <[email protected]>"]
Expand Down

0 comments on commit 4610255

Please sign in to comment.