Skip to content

Commit

Permalink
Merge pull request #499 from parea-ai/PAI-721-timestamps-are-wrong-fo…
Browse files Browse the repository at this point in the history
…r-other-timezones

Pai 721 timestamps are wrong for other timezones
  • Loading branch information
jalexanderII authored Feb 22, 2024
2 parents 14e58d8 + e1dd8a3 commit c40e7eb
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 39 deletions.
2 changes: 1 addition & 1 deletion parea/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from parea.experiment.cli import experiment as _experiment_cli
from parea.experiment.dvc import parea_dvc_initialized
from parea.experiment.experiment import Experiment
from parea.helpers import date_and_time_string_to_timestamp, gen_trace_id, to_date_and_time_string, write_trace_logs_to_csv
from parea.helpers import gen_trace_id, write_trace_logs_to_csv
from parea.parea_logger import parea_logger
from parea.utils.trace_utils import get_current_trace_id, get_root_trace_id, trace, trace_insert
from parea.wrapper.openai_raw_api_tracer import aprocess_stream_and_yield, process_stream_and_yield
Expand Down
9 changes: 9 additions & 0 deletions parea/cookbook/langchain/trace_langchain_simple.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os

from dotenv import load_dotenv
Expand Down Expand Up @@ -25,5 +26,13 @@ def main():
)


def amain():
return chain.ainvoke(
{"input": "Write a Hello World program in Python using FastAPI."},
config={"callbacks": [handler]},
)


if __name__ == "__main__":
print(main())
print(asyncio.run(amain()))
7 changes: 4 additions & 3 deletions parea/cookbook/tracing_with_agent.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import random
import time
from datetime import datetime

import pytz
from dotenv import load_dotenv

from parea import Parea, get_current_trace_id, to_date_and_time_string, trace
from parea import Parea, get_current_trace_id, trace
from parea.schemas import Completion, CompletionResponse, FeedbackRequest, LLMInputs, Message, ModelParams, Role

load_dotenv()
Expand Down Expand Up @@ -84,7 +85,7 @@ def generate_tasks(main_objective: str, expounded_initial_task: list[dict[str, s
return new_tasks_list


@trace(name=f"run_agent-{to_date_and_time_string(time.time())}") # You can provide a custom name other than the function name
@trace(name=f"run_agent-{datetime.now(pytz.utc)}") # You can provide a custom name other than the function name
def run_agent(main_objective: str, initial_task: str = "") -> tuple[list[dict[str, str]], str]:
trace_id = get_current_trace_id()
generated_tasks = []
Expand Down
15 changes: 6 additions & 9 deletions parea/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import csv
import random
import time
import uuid
from collections.abc import Iterable
from copy import deepcopy
from datetime import datetime

import pytz
from attr import asdict, fields_dict

from parea.constants import ADJECTIVES, NOUNS
Expand All @@ -19,14 +20,6 @@ def gen_trace_id() -> str:
return str(uuid.uuid4())


def to_date_and_time_string(timestamp: float) -> str:
return time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(timestamp))


def date_and_time_string_to_timestamp(date_and_time_string: str) -> float:
return time.mktime(time.strptime(date_and_time_string, "%Y-%m-%d %H:%M:%S %Z"))


def write_trace_logs_to_csv(path_csv: str, trace_logs: list[TraceLog]):
with open(path_csv, "w", newline="") as file:
# write header
Expand Down Expand Up @@ -70,3 +63,7 @@ def serialize_values(metadata: dict[str, Any]) -> dict[str, str]:
log_data.metadata = serialized_values

return log_data


def timezone_aware_now() -> datetime:
return datetime.now(pytz.utc)
1 change: 1 addition & 0 deletions parea/utils/trace_integrations/langchain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Union

from uuid import UUID

from langchain_core.tracers import BaseTracer
Expand Down
18 changes: 9 additions & 9 deletions parea/utils/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import logging
import os
import threading
import time
from collections import ChainMap
from collections.abc import AsyncGenerator, AsyncIterator, Generator, Iterator
from datetime import datetime
from functools import wraps
from random import random

from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID, TURN_OFF_PAREA_LOGGING
from parea.helpers import gen_trace_id, to_date_and_time_string
from parea.helpers import gen_trace_id, timezone_aware_now
from parea.parea_logger import parea_logger
from parea.schemas.models import NamedEvaluationScore, TraceLog, UpdateLog, UpdateTraceScenario
from parea.utils.universal_encoder import json_dumps
Expand Down Expand Up @@ -124,8 +124,8 @@ def trace(
access_output_of_func: Optional[Callable] = None,
apply_eval_frac: float = 1.0,
):
def init_trace(func_name, _parea_target_field, args, kwargs, func) -> tuple[str, float]:
start_time = time.time()
def init_trace(func_name, _parea_target_field, args, kwargs, func) -> tuple[str, datetime]:
start_time = timezone_aware_now()
trace_id = gen_trace_id()

if TURN_OFF_PAREA_LOGGING:
Expand Down Expand Up @@ -155,7 +155,7 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> tuple[str,
trace_id=trace_id,
parent_trace_id=trace_id,
root_trace_id=trace_context.get()[0],
start_timestamp=to_date_and_time_string(start_time),
start_timestamp=start_time.isoformat(),
trace_name=name or func_name,
end_user_identifier=end_user_identifier,
metadata=metadata,
Expand All @@ -173,10 +173,10 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> tuple[str,

return trace_id, start_time

def cleanup_trace(trace_id, start_time):
end_time = time.time()
trace_data.get()[trace_id].end_timestamp = to_date_and_time_string(end_time)
trace_data.get()[trace_id].latency = end_time - start_time
def cleanup_trace(trace_id: str, start_time: datetime):
end_time = timezone_aware_now()
trace_data.get()[trace_id].end_timestamp = end_time.isoformat()
trace_data.get()[trace_id].latency = (end_time - start_time).total_seconds()

output = trace_data.get()[trace_id].output
if trace_data.get()[trace_id].status == "success" and output:
Expand Down
32 changes: 16 additions & 16 deletions parea/wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import inspect
import logging
import os
import time
from datetime import datetime
from uuid import uuid4

from parea.cache.cache import Cache
from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID, TURN_OFF_PAREA_LOGGING
from parea.evals.utils import _make_evaluations
from parea.helpers import date_and_time_string_to_timestamp
from parea.helpers import timezone_aware_now
from parea.schemas.models import TraceLog
from parea.utils.trace_utils import call_eval_funcs_then_log, to_date_and_time_string, trace_context, trace_data
from parea.utils.trace_utils import call_eval_funcs_then_log, trace_context, trace_data
from parea.wrapper.utils import skip_decorator_if_func_in_stack

logger = logging.getLogger()
Expand Down Expand Up @@ -62,8 +62,8 @@ def _get_decorator(self, unwrapped_func: Callable, original_func: Callable):
else:
return self.sync_decorator(original_func)

def _init_trace(self) -> tuple[str, float]:
start_time = time.time()
def _init_trace(self) -> tuple[str, datetime]:
start_time = timezone_aware_now()
trace_id = str(uuid4())
if TURN_OFF_PAREA_LOGGING:
return trace_id, start_time
Expand All @@ -74,7 +74,7 @@ def _init_trace(self) -> tuple[str, float]:
trace_id=trace_id,
parent_trace_id=trace_id,
root_trace_id=trace_id,
start_timestamp=to_date_and_time_string(start_time),
start_timestamp=start_time.isoformat(),
trace_name="LLM",
end_user_identifier=None,
metadata=None,
Expand All @@ -93,7 +93,7 @@ def _init_trace(self) -> tuple[str, float]:
trace_id=parent_trace_id,
parent_trace_id=parent_trace_id,
root_trace_id=parent_trace_id,
start_timestamp=to_date_and_time_string(start_time),
start_timestamp=start_time.isoformat(),
end_user_identifier=None,
metadata=None,
target=None,
Expand Down Expand Up @@ -172,7 +172,7 @@ def wrapper(*args, **kwargs):

return wrapper

def _cleanup_trace_core(self, trace_id: str, start_time: float, error: str, cache_hit, args, kwargs):
def _cleanup_trace_core(self, trace_id: str, start_time: datetime, error: str, cache_hit, args, kwargs):
trace_data.get()[trace_id].cache_hit = cache_hit

if error:
Expand All @@ -182,14 +182,14 @@ def _cleanup_trace_core(self, trace_id: str, start_time: float, error: str, cach
trace_data.get()[trace_id].status = "success"

def final_log():
end_time = time.time()
trace_data.get()[trace_id].end_timestamp = to_date_and_time_string(end_time)
trace_data.get()[trace_id].latency = end_time - start_time
end_time = timezone_aware_now()
trace_data.get()[trace_id].end_timestamp = end_time.isoformat()
trace_data.get()[trace_id].latency = (end_time - start_time).total_seconds()

parent_id = trace_context.get()[-2]
trace_data.get()[parent_id].end_timestamp = to_date_and_time_string(end_time)
start_time_parent = date_and_time_string_to_timestamp(trace_data.get()[parent_id].start_timestamp)
trace_data.get()[parent_id].latency = end_time - start_time_parent
trace_data.get()[parent_id].end_timestamp = end_time.isoformat()
start_time_parent = datetime.fromisoformat(trace_data.get()[parent_id].start_timestamp)
trace_data.get()[parent_id].latency = (end_time - start_time_parent).total_seconds()

if not error and self.cache:
self.cache.set(self.convert_kwargs_to_cache_request(args, kwargs), trace_data.get()[trace_id])
Expand All @@ -200,7 +200,7 @@ def final_log():

return final_log

def _cleanup_trace(self, trace_id: str, start_time: float, error: str, cache_hit, args, kwargs, response):
def _cleanup_trace(self, trace_id: str, start_time: datetime, error: str, cache_hit, args, kwargs, response):
try:
final_log = self._cleanup_trace_core(trace_id, start_time, error, cache_hit, args, kwargs)

Expand All @@ -214,7 +214,7 @@ def _cleanup_trace(self, trace_id: str, start_time: float, error: str, cache_hit
logger.debug(f"Error occurred cleaning up openai trace, {e}")
return response

def _acleanup_trace(self, trace_id: str, start_time: float, error: str, cache_hit, args, kwargs, response):
def _acleanup_trace(self, trace_id: str, start_time: datetime, error: str, cache_hit, args, kwargs, response):
try:
final_log = self._cleanup_trace_core(trace_id, start_time, error, cache_hit, args, kwargs)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "parea-ai"
packages = [{ include = "parea" }]
version = "0.2.80"
version = "0.2.81"
description = "Parea python sdk"
readme = "README.md"
authors = ["joel-parea-ai <[email protected]>"]
Expand Down

0 comments on commit c40e7eb

Please sign in to comment.