Skip to content

Commit

Permalink
Merge pull request #453 from parea-ai/PAI-666-global-swallow-all-erro…
Browse files Browse the repository at this point in the history
…rs-on-trace-v2

feat(error): global error handling and logging kill switch
  • Loading branch information
jalexanderII committed Feb 13, 2024
2 parents a185421 + 79d6cf4 commit daf927a
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 168 deletions.
99 changes: 37 additions & 62 deletions parea/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Callable, Optional, Union

import asyncio
import logging
import os
import time
from collections.abc import AsyncIterable, Iterable
Expand Down Expand Up @@ -37,6 +38,8 @@

load_dotenv()

logger = logging.getLogger()

COMPLETION_ENDPOINT = "/completion"
DEPLOYED_PROMPT_ENDPOINT = "/deployed-prompt"
RECORD_FEEDBACK_ENDPOINT = "/feedback"
Expand Down Expand Up @@ -81,69 +84,25 @@ def _add_project_uuid_to_data(self, data) -> dict:
return data_dict

def completion(self, data: Completion) -> CompletionResponse:
parent_trace_id = get_current_trace_id()
inference_id = gen_trace_id()
data.inference_id = inference_id
data.parent_trace_id = parent_trace_id or inference_id
data.root_trace_id = get_root_trace_id()
data.project_uuid = self._project.uuid

if experiment_uuid := os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None):
data.experiment_uuid = experiment_uuid

data = self._update_data_and_trace(data)
r = self._client.request(
"POST",
COMPLETION_ENDPOINT,
data=asdict(data),
)

if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)

return structure(r.json(), CompletionResponse)

async def acompletion(self, data: Completion) -> CompletionResponse:
parent_trace_id = get_current_trace_id()
inference_id = gen_trace_id()
data.inference_id = inference_id
data.parent_trace_id = parent_trace_id or inference_id
data.root_trace_id = get_root_trace_id()
data.project_uuid = self._project.uuid

if experiment_uuid := os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None):
data.experiment_uuid = experiment_uuid

data = self._update_data_and_trace(data)
r = await self._client.request_async(
"POST",
COMPLETION_ENDPOINT,
data=asdict(data),
)

if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)

return structure(r.json(), CompletionResponse)

def stream(self, data: Completion) -> Iterable[bytes]:
parent_trace_id = get_current_trace_id()
inference_id = gen_trace_id()
data.inference_id = inference_id
data.parent_trace_id = parent_trace_id or inference_id
data.root_trace_id = get_root_trace_id()
data.project_uuid = self._project.uuid

if experiment_uuid := os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None):
data.experiment_uuid = experiment_uuid

if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)

data = self._update_data_and_trace(data)
response = self._client.stream_request(
"POST",
f"{COMPLETION_ENDPOINT}/stream",
Expand All @@ -152,21 +111,7 @@ def stream(self, data: Completion) -> Iterable[bytes]:
yield from response

async def astream(self, data: Completion) -> AsyncIterable[bytes]:
parent_trace_id = get_current_trace_id()
inference_id = gen_trace_id()
data.inference_id = inference_id
data.parent_trace_id = parent_trace_id or inference_id
data.root_trace_id = get_root_trace_id()
data.project_uuid = self._project.uuid

if experiment_uuid := os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None):
data.experiment_uuid = experiment_uuid

if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)

data = self._update_data_and_trace(data)
response = self._client.stream_request_async(
"POST",
f"{COMPLETION_ENDPOINT}/stream",
Expand All @@ -192,6 +137,10 @@ async def aget_prompt(self, data: UseDeployedPrompt) -> UseDeployedPromptRespons
return structure(r.json(), UseDeployedPromptResponse)

def record_feedback(self, data: FeedbackRequest) -> None:
if not data.trace_id:
logger.info("No trace_id found in feedback request")
return

time.sleep(2) # give logs time to update
self._client.request(
"POST",
Expand All @@ -200,6 +149,10 @@ def record_feedback(self, data: FeedbackRequest) -> None:
)

async def arecord_feedback(self, data: FeedbackRequest) -> None:
if not data.trace_id:
logger.info("No trace_id found in feedback request")
return

await asyncio.sleep(2) # give logs time to update
await self._client.request_async(
"POST",
Expand Down Expand Up @@ -319,6 +272,28 @@ def experiment(self, data: Union[str, Iterable[dict]], func: Callable, n_trials:

return Experiment(data=data, func=func, p=self, n_trials=n_trials)

def _update_data_and_trace(self, data: Completion) -> Completion:
inference_id = gen_trace_id()
data.inference_id = inference_id
data.project_uuid = self._project.uuid

try:
parent_trace_id = get_current_trace_id()
data.parent_trace_id = parent_trace_id or inference_id
data.root_trace_id = get_root_trace_id()

if experiment_uuid := os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None):
data.experiment_uuid = experiment_uuid

if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)
except Exception as e:
logger.debug(f"Error updating trace ids for completion. Trace log will be absent: {e}")

return data


_initialized_parea_wrapper = False

Expand Down
15 changes: 15 additions & 0 deletions parea/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

import os

from dotenv import load_dotenv

load_dotenv()


def str2bool(v):
if isinstance(v, bool):
return v
if not v or not isinstance(v, str):
return False
return v.lower() in ("yes", "true", "t", "1")


TURN_OFF_PAREA_LOGGING = str2bool(os.getenv("TURN_OFF_PAREA_LOGGING", False))

PAREA_OS_ENV_EXPERIMENT_UUID = "_PAREA_EXPERIMENT_UUID"
PAREA_DVC_DIR = ".parea"
PAREA_DVC_METRICS_FILE = str(os.path.join(PAREA_DVC_DIR, "metrics.json"))
Expand Down
4 changes: 2 additions & 2 deletions parea/cookbook/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ def generate_random_number(n: str) -> str:
# p.experiment(
# data=[{"n": "11"}], # Data to run the experiment on (list of dicts)
# func=generate_random_number, # Function to run (callable)
# n_trials=1, # Number of times to run the experiment on the same data
# n_trials=1, # Number of times to run the experiment on the same data
# )

# You can optionally run the experiment manually by calling `.run()`
if __name__ == "__main__":
p.experiment(
data=[{"n": "12"}],
data=[{"n": "10"}],
func=generate_random_number,
n_trials=3,
).run()
38 changes: 38 additions & 0 deletions parea/cookbook/tracing_with_parea_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import os

from dotenv import load_dotenv

from parea import Parea, trace
from parea.schemas import Completion, LLMInputs, Message, ModelParams, Role

load_dotenv()

p = Parea(api_key=os.getenv("PAREA_API_KEY"))

completion = Completion(
llm_configuration=LLMInputs(
model="gpt-3.5-turbo-1106",
model_params=ModelParams(temp=0.1),
messages=[Message(role=Role.user, content="Write a short haiku about the moon.")],
)
)


@trace
def call_llm_stream():
stream = p.stream(completion)
for chunk in stream:
print(chunk)


@trace
async def acall_llm_stream():
stream = p.astream(completion)
async for chunk in stream:
print(chunk)


if __name__ == "__main__":
# call_llm_stream()
asyncio.run(acall_llm_stream())
13 changes: 10 additions & 3 deletions parea/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tqdm.asyncio import tqdm_asyncio

from parea import Parea
from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID
from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID, TURN_OFF_PAREA_LOGGING
from parea.experiment.dvc import save_results_to_dvc_if_init
from parea.helpers import duplicate_dicts, gen_random_name
from parea.schemas.models import CreateExperimentRequest, ExperimentSchema, ExperimentStatsSchema
Expand Down Expand Up @@ -146,5 +146,12 @@ def run(self, name: Optional[str] = None) -> None:
param name: The name of the experiment. This name must be unique across experiment runs.
If no name is provided a memorable name will be generated automatically.
"""
self._gen_name_if_none(name)
self.experiment_stats = asyncio.run(experiment(self.name, self.data, self.func, self.p, self.n_trials))
if TURN_OFF_PAREA_LOGGING:
print("Parea logging is turned off. Experiment can't be run without logging. Set env var TURN_OFF_PAREA_LOGGING to False to enable.")
return

try:
self._gen_name_if_none(name)
self.experiment_stats = asyncio.run(experiment(self.name, self.data, self.func, self.p, self.n_trials))
except Exception as e:
print(f"Error running experiment: {e}")
3 changes: 3 additions & 0 deletions parea/utils/trace_integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from langchain_core.tracers import BaseTracer
from langchain_core.tracers.schemas import ChainRun, LLMRun, Run, ToolRun

from parea.client import TURN_OFF_PAREA_LOGGING
from parea.parea_logger import parea_logger
from parea.schemas.log import TraceIntegrations

Expand All @@ -13,6 +14,8 @@ class PareaAILangchainTracer(BaseTracer):
parent_trace_id: UUID

def _persist_run(self, run: Union[Run, LLMRun, ChainRun, ToolRun]) -> None:
if TURN_OFF_PAREA_LOGGING:
return
self.parent_trace_id = run.id
# using .dict() since langchain Run class currently set to Pydantic v1
data = run.dict()
Expand Down
Loading

0 comments on commit daf927a

Please sign in to comment.