Skip to content

Commit

Permalink
Merge pull request #264 from parea-ai/PAI-508-create-experiments-with…
Browse files Browse the repository at this point in the history
…-sdk-visualized-in-ui

feat: add experiments
  • Loading branch information
joschkabraun committed Jan 7, 2024
2 parents 227231d + 2ad29d8 commit a53e040
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 122 deletions.
11 changes: 8 additions & 3 deletions parea/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import sys
from importlib import metadata as importlib_metadata

from parea.benchmark import run_benchmark
from parea.cache import InMemoryCache, RedisCache
from parea.client import Parea, init
from parea.experiment.cli import experiment as experiment_cli
from parea.experiment.experiment import Experiment


def get_version() -> str:
Expand All @@ -28,7 +29,11 @@ def get_version() -> str:

def main():
args = sys.argv[1:]
if args[0] == "benchmark":
run_benchmark(args[1:])
if args[0] == "experiment":
experiment_cli(args[1:])
else:
print(f"Unknown command: '{args[0]}'")


if __name__ == "__main__":
main()
83 changes: 0 additions & 83 deletions parea/benchmark.py

This file was deleted.

52 changes: 47 additions & 5 deletions parea/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,31 @@
import time

from attrs import asdict, define, field
from cattrs import structure

from parea.api_client import HTTPClient
from parea.cache import InMemoryCache, RedisCache
from parea.cache.cache import Cache
from parea.helpers import gen_trace_id
from parea.parea_logger import parea_logger
from parea.schemas.models import Completion, CompletionResponse, FeedbackRequest, UseDeployedPrompt, UseDeployedPromptResponse
from parea.schemas.models import (
Completion,
CompletionResponse,
CreateExperimentRequest,
ExperimentSchema,
ExperimentStatsSchema,
FeedbackRequest,
UseDeployedPrompt,
UseDeployedPromptResponse,
)
from parea.utils.trace_utils import get_current_trace_id, logger_all_possible, logger_record_log, trace_data
from parea.wrapper import OpenAIWrapper

COMPLETION_ENDPOINT = "/completion"
DEPLOYED_PROMPT_ENDPOINT = "/deployed-prompt"
RECORD_FEEDBACK_ENDPOINT = "/feedback"
EXPERIMENT_ENDPOINT = "/experiment"
EXPERIMENT_STATS_ENDPOINT = "/experiment/{experiment_uuid}/stats"


@define
Expand Down Expand Up @@ -48,7 +60,7 @@ def completion(self, data: Completion) -> CompletionResponse:
if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
logger_record_log(parent_trace_id)
return CompletionResponse(**r.json())
return structure(r.json(), CompletionResponse)

async def acompletion(self, data: Completion) -> CompletionResponse:
parent_trace_id = get_current_trace_id()
Expand All @@ -64,23 +76,23 @@ async def acompletion(self, data: Completion) -> CompletionResponse:
if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
logger_record_log(parent_trace_id)
return CompletionResponse(**r.json())
return structure(r.json(), CompletionResponse)

def get_prompt(self, data: UseDeployedPrompt) -> UseDeployedPromptResponse:
r = self._client.request(
"POST",
DEPLOYED_PROMPT_ENDPOINT,
data=asdict(data),
)
return UseDeployedPromptResponse(**r.json())
return structure(r.json(), UseDeployedPromptResponse)

async def aget_prompt(self, data: UseDeployedPrompt) -> UseDeployedPromptResponse:
r = await self._client.request_async(
"POST",
DEPLOYED_PROMPT_ENDPOINT,
data=asdict(data),
)
return UseDeployedPromptResponse(**r.json())
return structure(r.json(), UseDeployedPromptResponse)

def record_feedback(self, data: FeedbackRequest) -> None:
time.sleep(2) # give logs time to update
Expand All @@ -98,6 +110,36 @@ async def arecord_feedback(self, data: FeedbackRequest) -> None:
data=asdict(data),
)

def create_experiment(self, data: CreateExperimentRequest) -> ExperimentSchema:
r = self._client.request(
"POST",
EXPERIMENT_ENDPOINT,
data=asdict(data),
)
return structure(r.json(), ExperimentSchema)

async def acreate_experiment(self, data: CreateExperimentRequest) -> ExperimentSchema:
r = await self._client.request_async(
"POST",
EXPERIMENT_ENDPOINT,
data=asdict(data),
)
return structure(r.json(), ExperimentSchema)

def get_experiment_stats(self, experiment_uuid: str) -> ExperimentStatsSchema:
r = self._client.request(
"GET",
EXPERIMENT_STATS_ENDPOINT.format(experiment_uuid=experiment_uuid),
)
return structure(r.json(), ExperimentStatsSchema)

async def aget_experiment_stats(self, experiment_uuid: str) -> ExperimentStatsSchema:
r = await self._client.request_async(
"GET",
EXPERIMENT_STATS_ENDPOINT.format(experiment_uuid=experiment_uuid),
)
return structure(r.json(), ExperimentStatsSchema)


_initialized_parea_wrapper = False

Expand Down
1 change: 1 addition & 0 deletions parea/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PAREA_OS_ENV_EXPERIMENT_UUID = "_PAREA_EXPERIMENT_UUID"
Empty file added parea/experiment/__init__.py
Empty file.
48 changes: 48 additions & 0 deletions parea/experiment/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import argparse
import csv
import importlib
import os
import sys
import traceback
from importlib import util

from .experiment import _experiments


def load_from_path(module_path):
# Ensure the directory of user-provided script is in the system path
dir_name = os.path.dirname(module_path)
if dir_name not in sys.path:
sys.path.insert(0, dir_name)

module_name = os.path.basename(module_path)
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

if spec.name not in sys.modules:
sys.modules[spec.name] = module


def read_input_file(file_path) -> list[dict]:
with open(file_path) as file:
reader = csv.DictReader(file)
inputs = list(reader)
return inputs


def experiment(args):
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Path to the experiment", type=str)

parsed_args = parser.parse_args(args)

try:
load_from_path(parsed_args.file)
except Exception as e:
print(f"Error loading function: {e}\n", file=sys.stderr)
traceback.print_exc()
sys.exit(1)

for _experiment in _experiments:
_experiment.run()
93 changes: 93 additions & 0 deletions parea/experiment/experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Callable, Dict, Iterator, List

import asyncio
import inspect
import json
import os
import time

from attrs import define, field
from dotenv import load_dotenv
from tqdm import tqdm

from parea.client import Parea
from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID
from parea.schemas.models import CreateExperimentRequest, ExperimentSchema, ExperimentStatsSchema, TraceStatsSchema


def calculate_avg_as_string(values: List[float]) -> str:
if not values:
return "N/A"
values = [x for x in values if x is not None]
avg = sum(values) / len(values)
return f"{avg:.2f}"


def calculate_avg_std_for_experiment(experiment_stats: ExperimentStatsSchema) -> Dict[str, str]:
trace_stats: List[TraceStatsSchema] = experiment_stats.parent_trace_stats
latency_values = [trace_stat.latency for trace_stat in trace_stats]
input_tokens_values = [trace_stat.input_tokens for trace_stat in trace_stats]
output_tokens_values = [trace_stat.output_tokens for trace_stat in trace_stats]
total_tokens_values = [trace_stat.total_tokens for trace_stat in trace_stats]
cost_values = [trace_stat.cost for trace_stat in trace_stats]
score_name_to_values: Dict[str, List[float]] = {}
for trace_stat in trace_stats:
if trace_stat.scores:
for score in trace_stat.scores:
score_name_to_values[score.name] = score_name_to_values.get(score.name, []) + [score.score]

return {
"latency": calculate_avg_as_string(latency_values),
"input_tokens": calculate_avg_as_string(input_tokens_values),
"output_tokens": calculate_avg_as_string(output_tokens_values),
"total_tokens": calculate_avg_as_string(total_tokens_values),
"cost": calculate_avg_as_string(cost_values),
**{score_name: calculate_avg_as_string(score_values) for score_name, score_values in score_name_to_values.items()},
}


def async_wrapper(fn, **kwargs):
return asyncio.run(fn(**kwargs))


def experiment(name: str, data: Iterator, func: Callable) -> ExperimentStatsSchema:
"""Creates an experiment and runs the function on the data iterator."""
load_dotenv()

if not (parea_api_key := os.getenv("PAREA_API_KEY")):
raise ValueError("Please set the PAREA_API_KEY environment variable")
p = Parea(api_key=parea_api_key)

experiment_schema: ExperimentSchema = p.create_experiment(CreateExperimentRequest(name=name))
experiment_uuid = experiment_schema.uuid
os.environ[PAREA_OS_ENV_EXPERIMENT_UUID] = experiment_uuid

for data_input in tqdm(data):
if inspect.iscoroutinefunction(func):
asyncio.run(func(**data_input))
else:
func(**data_input)
time.sleep(5) # wait for all trace logs to be written to DB
experiment_stats: ExperimentStatsSchema = p.get_experiment_stats(experiment_uuid)
stat_name_to_avg_std = calculate_avg_std_for_experiment(experiment_stats)
print(f"Experiment stats:\n{json.dumps(stat_name_to_avg_std, indent=2)}\n\n")
print(f"View experiment & its traces at: https://app.parea.ai/experiments/{experiment_uuid}\n")
return experiment_stats


_experiments = []


@define
class Experiment:
name: str = field(init=True)
data: Iterator[Dict] = field(init=True)
func: Callable = field(init=True)
experiment_stats: ExperimentStatsSchema = field(init=False, default=None)

def __attrs_post_init__(self):
global _experiments
_experiments.append(self)

def run(self):
self.experiment_stats = experiment(self.name, self.data, self.func)
Loading

0 comments on commit a53e040

Please sign in to comment.