Skip to content

Commit

Permalink
Merge pull request #763 from parea-ai/PAI-449-improve-sdk-trace-log-s…
Browse files Browse the repository at this point in the history
…ystem-architecture-pys

Send trace logs when complete
  • Loading branch information
jalexanderII authored Apr 25, 2024
2 parents 9ed9c19 + 477f3ec commit 32f6370
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 51 deletions.
1 change: 0 additions & 1 deletion parea/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def _update_data_and_trace(self, data: Completion) -> Completion:
if parent_trace_id:
trace_data.get()[parent_trace_id].children.append(inference_id)
trace_data.get()[parent_trace_id].experiment_uuid = experiment_uuid
logger_record_log(parent_trace_id)
except Exception as e:
logger.debug(f"Error updating trace ids for completion. Trace log will be absent: {e}")

Expand Down
32 changes: 17 additions & 15 deletions parea/cookbook/simple_experiment_with_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,27 @@ def eval_func(log: Log) -> float:


@trace(eval_funcs=[eval_func])
def func(lang: str, framework: str) -> str:
return (
client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{
"role": "user",
"content": f"Write a hello world program in {lang} using {framework}",
}
],
def func(topic: str) -> dict[str, str | None]:
return {
"data": (
client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{
"role": "user",
"content": f"Write a short haiku about {topic}",
}
],
)
.choices[0]
.message.content
)
.choices[0]
.message.content
)
}


if __name__ == "__main__":
p.experiment(
name="hello-world-example",
data=[{"lang": "Python", "framework": "Flask"}],
name="hello-world-example-ch",
data=[{"topic": "Fish"}, {"topic": "Python"}],
func=func,
).run()
2 changes: 1 addition & 1 deletion parea/cookbook/tracing_and_evaluating_openai_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
load_dotenv()

openai.api_key = os.getenv("OPENAI_API_KEY")

openai.api_type = "openai"

use_cache = False # by using the in memory cache, you don't need a Parea API key
cache = InMemoryCache() if use_cache else None
Expand Down
17 changes: 6 additions & 11 deletions parea/cookbook/tracing_without_deployed_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@
p = Parea(api_key=os.getenv("PAREA_API_KEY"))


@trace # <--- If you want to log the inputs to the LLM call you can optionally add a trace decorator here
def call_llm(
data: List[dict],
model: str = "gpt-3.5-turbo-1106",
provider: str = "openai",
temperature: float = 0.0,
) -> CompletionResponse:
return p.completion(
data=Completion(
llm_configuration=LLMInputs(
model=model,
provider=provider,
model_params=ModelParams(temp=temperature),
messages=[Message(**d) for d in data],
)
Expand All @@ -49,7 +46,7 @@ def critic(argument: str) -> str:
[
{
"role": "system",
"content": f"You are a critic."
"content": "You are a critic."
"\nWhat unresolved questions or criticism do you have after reading the following argument?"
"Provide a concise summary of your feedback.",
},
Expand Down Expand Up @@ -106,8 +103,7 @@ def refiner2(query: str, additional_description: str, current_arg: str, criticis
"content": "Please generate a new argument that incorporates the feedback from the user.",
},
],
model="claude-2",
provider="anthropic",
model="claude-3-haiku-20240307",
)


Expand All @@ -128,7 +124,6 @@ def json_call():
data=Completion(
llm_configuration=LLMInputs(
model="gpt-3.5-turbo-1106",
provider="openai",
model_params=ModelParams(temp=0.0, response_format={"type": "json_object"}),
messages=[Message(**d) for d in json_messages],
)
Expand All @@ -147,12 +142,12 @@ def json_call():
"Whether wine is good for you.",
additional_description="Provide a concise, few sentence argument on why wine is good for you.",
)
print(result2)
print(trace_id2, result2)
p.record_feedback(
FeedbackRequest(
trace_id=trace_id2,
score=0.0, # 0.0 (bad) to 1.0 (good)
target="Moonshine is wonderful.",
score=0.7, # 0.0 (bad) to 1.0 (good)
target="Wine is wonderful.",
)
)

Expand All @@ -164,7 +159,7 @@ def json_call():
p.record_feedback(
FeedbackRequest(
trace_id=result3.inference_id,
score=0.7, # 0.0 (bad) to 1.0 (good)
score=0.5, # 0.0 (bad) to 1.0 (good)
target="Moonshine is wonderful. End of story.",
)
)
Expand Down
7 changes: 6 additions & 1 deletion parea/evals/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from parea.schemas import EvaluationResult, Log
from parea.schemas.log import Log
from parea.schemas.models import UpdateLog
from parea.utils.trace_utils import thread_ids_running_evals, trace_data

seg = pysbd.Segmenter(language="en", clean=False)

Expand Down Expand Up @@ -142,7 +143,11 @@ def _make_evaluations(trace_id: str, log: Log, eval_funcs: List[EvalFuncTuple],
elif result is not None:
scores.append(EvaluationResult(name=eval.name, score=result))

parea_logger.update_log(data=UpdateLog(trace_id=trace_id, field_name_to_value_map={"scores": scores, "target": log.target}))
trace_data.get()[trace_id].scores = scores
trace_data.get()[trace_id].target = log.target
data_with_scores = trace_data.get()[trace_id]
thread_ids_running_evals.get().remove(trace_id)
parea_logger.default_log(data=data_with_scores)
if verbose:
print("###Eval Results###")
for score in scores:
Expand Down
24 changes: 22 additions & 2 deletions parea/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def calculate_avg_std_for_experiment(experiment_stats: ExperimentStatsSchema) ->
score_accumulators[score.name] += score.score
score_counts[score.name] += 1

averages = {attr: "N/A" if counts[attr] == 0 else f"{accumulators[attr] / counts[attr]:.2f}" for attr in accumulators}
averages = {attr: "N/A" if counts[attr] == 0 else f"{accumulators[attr] / counts[attr]:.{5 if attr == 'cost' else 2}f}" for attr in accumulators}

score_averages = {f"{name}": "N/A" if score_counts[name] == 0 else f"{score_accumulators[name] / score_counts[name]:.2f}" for name in score_accumulators}

Expand Down Expand Up @@ -157,7 +157,7 @@ def limit_concurrency_sync(sample):
experiment_stats: ExperimentStatsSchema = p.finish_experiment(experiment_uuid, FinishExperimentRequestSchema(dataset_level_stats=dataset_level_eval_results))
stat_name_to_avg_std = calculate_avg_std_for_experiment(experiment_stats)
if dataset_level_eval_results:
stat_name_to_avg_std.update({eval_result.name: eval_result.score for eval_result in dataset_level_eval_results})
stat_name_to_avg_std.update(**{eval_result.name: eval_result.score for eval_result in dataset_level_eval_results})
print(f"Experiment {experiment_name} Run {run_name} stats:\n{json_dumps(stat_name_to_avg_std, indent=2)}\n\n")
print(f"View experiment & traces at: https://app.parea.ai/experiments/{experiment_name}/{experiment_uuid}\n")
save_results_to_dvc_if_init(run_name, stat_name_to_avg_std)
Expand Down Expand Up @@ -223,6 +223,26 @@ def run(self, run_name: Optional[str] = None) -> None:
traceback.print_exc()
print(f"Error running experiment: {e}")

async def arun(self, run_name: Optional[str] = None) -> None:
"""Run the experiment and save the results to DVC.
param run_name: The run name of the experiment. This name must be unique across experiment runs.
If no run name is provided a memorable name will be generated automatically.
"""
if TURN_OFF_PAREA_LOGGING:
print("Parea logging is turned off. Experiment can't be run without logging. Set env var TURN_OFF_PAREA_LOGGING to False to enable.")
return

try:
self._gen_run_name_if_none(run_name)
self.experiment_stats = await experiment(
self.experiment_name, self.run_name, self.data, self.func, self.p, self.n_trials, self.metadata, self.dataset_level_evals, self.n_workers
)
except Exception as e:
import traceback

traceback.print_exc()
print(f"Error running experiment: {e}")

@property
def avg_scores(self) -> Dict[str, float]:
"""Returns the average score across all evals."""
Expand Down
2 changes: 1 addition & 1 deletion parea/schemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class UseDeployedPromptResponse:
class FeedbackRequest:
score: float = field(validator=[validators.ge(0), validators.le(1)])
trace_id: Optional[str] = None
inference_id: Optional[str] = None
name: Optional[str] = None
target: Optional[str] = None

Expand Down Expand Up @@ -219,6 +218,7 @@ class UpdateTraceScenario(str, Enum):
ERROR: str = "error"
CHAIN: str = "chain"
USAGE: str = "usage"
OPENAICONFIG: str = "openaiconfig"


@define
Expand Down
16 changes: 13 additions & 3 deletions parea/utils/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from parea.helpers import gen_trace_id, timezone_aware_now
from parea.parea_logger import parea_logger
from parea.schemas import EvaluationResult
from parea.schemas.models import TraceLog, UpdateLog, UpdateTraceScenario
from parea.schemas.models import TraceLog, UpdateTraceScenario
from parea.utils.universal_encoder import json_dumps

logger = logging.getLogger()
Expand Down Expand Up @@ -117,6 +117,13 @@ def fill_trace_data(trace_id: str, data: Dict[str, Any], scenario: UpdateTraceSc
elif scenario == UpdateTraceScenario.CHAIN:
trace_data.get()[trace_id].parent_trace_id = data["parent_trace_id"]
trace_data.get()[data["parent_trace_id"]].children.append(trace_id)
elif scenario == UpdateTraceScenario.OPENAICONFIG:
trace_data.get()[trace_id].configuration = data["configuration"]
trace_data.get()[trace_id].output = data["output"]
trace_data.get()[trace_id].input_tokens = data["input_tokens"]
trace_data.get()[trace_id].output_tokens = data["output_tokens"]
trace_data.get()[trace_id].total_tokens = data["total_tokens"]
trace_data.get()[trace_id].cost = data["cost"]
else:
logger.debug(f"Error occurred filling trace data. Scenario not valid: {scenario}")
except Exception as e:
Expand Down Expand Up @@ -265,7 +272,7 @@ def wrapper(*args, **kwargs):

def call_eval_funcs_then_log(trace_id: str, eval_funcs: List[Callable] = None):
data = trace_data.get()[trace_id]
parea_logger.default_log(data=data)
# parea_logger.default_log(data=data)

if eval_funcs and data.status == "success" and random() <= data.apply_eval_frac:
thread_ids_running_evals.get().append(trace_id)
Expand All @@ -283,10 +290,13 @@ def call_eval_funcs_then_log(trace_id: str, eval_funcs: List[Callable] = None):
scores.append(EvaluationResult(name=func.__name__, score=score))
except Exception as e:
logger.exception(f"Error occurred calling evaluation function '{func.__name__}', {e}", exc_info=e)
parea_logger.update_log(data=UpdateLog(trace_id=trace_id, field_name_to_value_map={"scores": scores}))
# parea_logger.update_log(data=UpdateLog(trace_id=trace_id, field_name_to_value_map={"scores": scores}))
trace_data.get()[trace_id].scores = scores
thread_ids_running_evals.get().remove(trace_id)

data_with_scores = trace_data.get()[trace_id]
parea_logger.default_log(data=data_with_scores)


def logger_record_log(trace_id: str):
log_in_thread(parea_logger.record_log, {"data": trace_data.get()[trace_id]})
Expand Down
28 changes: 13 additions & 15 deletions parea/wrapper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from parea.constants import ALL_NON_AZURE_MODELS_INFO, AZURE_MODEL_INFO
from parea.parea_logger import parea_logger
from parea.schemas.log import LLMInputs, Message, ModelParams, Role
from parea.schemas.models import UpdateLog
from parea.utils.trace_utils import get_current_trace_id, log_in_thread, trace_insert
from parea.schemas.models import UpdateLog, UpdateTraceScenario
from parea.utils.trace_utils import fill_trace_data, get_current_trace_id, log_in_thread, trace_data, trace_insert
from parea.utils.universal_encoder import json_dumps

is_openai_1 = openai_version.startswith("1.")
Expand Down Expand Up @@ -318,19 +318,17 @@ def _process_stream_response(content: list, tools: dict, data: dict, trace_id: s
data.get("model"),
)
completion_tokens = _num_tokens_from_string(final_content if final_content else json_dumps(tool_calls), model)
parea_logger.update_log(
UpdateLog(
trace_id=trace_id,
field_name_to_value_map={
"configuration": _kwargs_to_llm_configuration(data, model),
"output": completion,
"input_tokens": prompt_tokens,
"output_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
"cost": _compute_cost(prompt_tokens, completion_tokens, model),
},
)
)
data = {
"configuration": _kwargs_to_llm_configuration(data, model),
"output": completion,
"input_tokens": prompt_tokens,
"output_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
"cost": _compute_cost(prompt_tokens, completion_tokens, model),
}
fill_trace_data(trace_id, data, UpdateTraceScenario.OPENAICONFIG)
data_with_config = trace_data.get()[trace_id]
parea_logger.default_log(data=data_with_config)


def convert_openai_raw_stream_to_log(content: list, tools: dict, data: dict, trace_id: str):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "parea-ai"
packages = [{ include = "parea" }]
version = "0.2.139"
version = "0.2.140"
description = "Parea python sdk"
readme = "README.md"
authors = ["joel-parea-ai <[email protected]>"]
Expand Down

0 comments on commit 32f6370

Please sign in to comment.