Skip to content

Commit

Permalink
Merge pull request #451 from parea-ai/revert-446-PAI-666-global-swall…
Browse files Browse the repository at this point in the history
…ow-all-errors-on-trace

Revert "fix(errors): swallow trace errors"
  • Loading branch information
jalexanderII authored Feb 13, 2024
2 parents 12fc36c + 9fa68de commit 2bdad25
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 119 deletions.
2 changes: 1 addition & 1 deletion parea/cookbook/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ def generate_random_number(n: str) -> str:
# You can optionally run the experiment manually by calling `.run()`
if __name__ == "__main__":
p.experiment(
data=[{"n": "10"}],
data=[{"n": "10"}, {"n": "11"}, {"n": "12"}],
func=generate_random_number,
).run()
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def func(lang: str, framework: str) -> str:
p.experiment(
data="Hello World Example", # this is the name of my Test Collection in Parea (TestHub page)
func=func,
).run()
).run(name="hello-world-example")
7 changes: 2 additions & 5 deletions parea/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,5 @@ def run(self, name: Optional[str] = None) -> None:
param name: The name of the experiment. This name must be unique across experiment runs.
If no name is provided a memorable name will be generated automatically.
"""
try:
self._gen_name_if_none(name)
self.experiment_stats = asyncio.run(experiment(self.name, self.data, self.func, self.p))
except Exception as e:
print(f"Error running experiment: {e}")
self._gen_name_if_none(name)
self.experiment_stats = asyncio.run(experiment(self.name, self.data, self.func, self.p))
94 changes: 40 additions & 54 deletions parea/utils/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,27 @@ def trace_insert(data: dict[str, Any], trace_id: Optional[str] = None):
data: Keys can be one of: trace_name, end_user_identifier, metadata, tags
trace_id: The trace id to insert the data into. If not provided, the current trace id will be used.
"""
try:
current_trace_id = trace_id or get_current_trace_id()
current_trace_data: TraceLog = trace_data.get()[current_trace_id]
if not current_trace_data:
return
for key, new_value in data.items():
existing_value = current_trace_data.__getattribute__(key)
current_trace_data.__setattr__(key, merge(existing_value, new_value) if existing_value else new_value)
except Exception as e:
logger.debug(f"Error occurred inserting data into trace log: {e}", exc_info=e)
current_trace_id = trace_id or get_current_trace_id()
current_trace_data: TraceLog = trace_data.get()[current_trace_id]
if not current_trace_data:
return
for key, new_value in data.items():
existing_value = current_trace_data.__getattribute__(key)
current_trace_data.__setattr__(key, merge(existing_value, new_value) if existing_value else new_value)


def fill_trace_data(trace_id: str, data: dict[str, Any], scenario: UpdateTraceScenario):
try:
if scenario == UpdateTraceScenario.RESULT:
if not isinstance(data["result"], (Generator, AsyncGenerator, AsyncIterator, Iterator)):
trace_data.get()[trace_id].output = make_output(data["result"], data.get("output_as_list", False))
trace_data.get()[trace_id].status = "success"
trace_data.get()[trace_id].evaluation_metric_names = data.get("eval_funcs_names")
elif scenario == UpdateTraceScenario.ERROR:
trace_data.get()[trace_id].error = data["error"]
trace_data.get()[trace_id].status = "error"
elif scenario == UpdateTraceScenario.CHAIN:
trace_data.get()[trace_id].parent_trace_id = data["parent_trace_id"]
trace_data.get()[data["parent_trace_id"]].children.append(trace_id)
except Exception:
return
if scenario == UpdateTraceScenario.RESULT:
if not isinstance(data["result"], (Generator, AsyncGenerator, AsyncIterator, Iterator)):
trace_data.get()[trace_id].output = make_output(data["result"], data.get("output_as_list", False))
trace_data.get()[trace_id].status = "success"
trace_data.get()[trace_id].evaluation_metric_names = data.get("eval_funcs_names")
elif scenario == UpdateTraceScenario.ERROR:
trace_data.get()[trace_id].error = data["error"]
trace_data.get()[trace_id].status = "error"
elif scenario == UpdateTraceScenario.CHAIN:
trace_data.get()[trace_id].parent_trace_id = data["parent_trace_id"]
trace_data.get()[data["parent_trace_id"]].children.append(trace_id)


def trace(
Expand Down Expand Up @@ -189,43 +183,35 @@ def cleanup_trace(trace_id, start_time):
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
_parea_target_field = kwargs.pop("_parea_target_field", None)
trace_id, start_time = init_trace(func.__name__, _parea_target_field, args, kwargs, func)
output_as_list = check_multiple_return_values(func)
try:
_parea_target_field = kwargs.pop("_parea_target_field", None)
trace_id, start_time = init_trace(func.__name__, _parea_target_field, args, kwargs, func)
output_as_list = check_multiple_return_values(func)
try:
result = await func(*args, **kwargs)
fill_trace_data(trace_id, {"result": result, "output_as_list": output_as_list, "eval_funcs_names": eval_funcs_names}, UpdateTraceScenario.RESULT)
except Exception as e:
logger.exception(f"Error occurred in function {func.__name__}, {e}")
fill_trace_data(trace_id, {"error": str(e)}, UpdateTraceScenario.ERROR)
raise e
finally:
cleanup_trace(func.__name__, trace_id, start_time)
return result
result = await func(*args, **kwargs)
fill_trace_data(trace_id, {"result": result, "output_as_list": output_as_list, "eval_funcs_names": eval_funcs_names}, UpdateTraceScenario.RESULT)
except Exception as e:
logger.debug(f"Trace decorator on {func.__name__} failed silently with error: {e}")
return await func(*args, **kwargs)
logger.exception(f"Error occurred in function {func.__name__}, {e}")
fill_trace_data(trace_id, {"error": str(e)}, UpdateTraceScenario.ERROR)
raise e
finally:
cleanup_trace(trace_id, start_time)
return result

@wraps(func)
def wrapper(*args, **kwargs):
_parea_target_field = kwargs.pop("_parea_target_field", None)
trace_id, start_time = init_trace(func.__name__, _parea_target_field, args, kwargs, func)
output_as_list = check_multiple_return_values(func)
try:
_parea_target_field = kwargs.pop("_parea_target_field", None)
trace_id, start_time = init_trace(func.__name__, _parea_target_field, args, kwargs, func)
output_as_list = check_multiple_return_values(func)
try:
result = func(*args, **kwargs)
fill_trace_data(trace_id, {"result": result, "output_as_list": output_as_list, "eval_funcs_names": eval_funcs_names}, UpdateTraceScenario.RESULT)
except Exception as e:
logger.exception(f"Error occurred in function {func.__name__}, {e}")
fill_trace_data(trace_id, {"error": str(e)}, UpdateTraceScenario.ERROR)
raise e
finally:
cleanup_trace(func.__name__, trace_id, start_time)
return result
result = func(*args, **kwargs)
fill_trace_data(trace_id, {"result": result, "output_as_list": output_as_list, "eval_funcs_names": eval_funcs_names}, UpdateTraceScenario.RESULT)
except Exception as e:
logger.debug(f"Trace decorator on {func.__name__} failed silently with error: {e}")
return func(*args, **kwargs)
logger.exception(f"Error occurred in function {func.__name__}, {e}")
fill_trace_data(trace_id, {"error": str(e)}, UpdateTraceScenario.ERROR)
raise e
finally:
cleanup_trace(trace_id, start_time)
return result

if inspect.iscoroutinefunction(func):
return async_wrapper
Expand Down
103 changes: 46 additions & 57 deletions parea/wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import functools
import inspect
import logging
import os
import time
from uuid import uuid4
Expand All @@ -15,8 +14,6 @@
from parea.utils.trace_utils import call_eval_funcs_then_log, to_date_and_time_string, trace_context, trace_data
from parea.wrapper.utils import skip_decorator_if_func_in_stack

logger = logging.getLogger()


class Wrapper:
def __init__(
Expand Down Expand Up @@ -108,70 +105,62 @@ def _init_trace(self) -> tuple[str, float]:
@skip_decorator_if_func_in_stack(call_eval_funcs_then_log, _make_evaluations)
def async_decorator(self, orig_func: Callable) -> Callable:
async def wrapper(*args, **kwargs):
trace_id, start_time = self._init_trace()
response = None
exception = None
error = None
cache_hit = False
cache_key = self.convert_kwargs_to_cache_request(args, kwargs)
try:
trace_id, start_time = self._init_trace()
response = None
exception = None
error = None
cache_hit = False
cache_key = self.convert_kwargs_to_cache_request(args, kwargs)
try:
if self.cache:
cache_result = await self.cache.aget(cache_key)
if cache_result is not None:
response = self.aconvert_cache_to_response(args, kwargs, cache_result)
cache_hit = True
if response is None:
response = await orig_func(*args, **kwargs)
except Exception as e:
exception = e
error = str(e)
if self.cache:
await self.cache.ainvalidate(cache_key)
finally:
if exception is not None:
self._acleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
raise exception
else:
return self._acleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
if self.cache:
cache_result = await self.cache.aget(cache_key)
if cache_result is not None:
response = self.aconvert_cache_to_response(args, kwargs, cache_result)
cache_hit = True
if response is None:
response = await orig_func(*args, **kwargs)
except Exception as e:
logger.debug(f"Error in openai async_decorator: {e}")
return await orig_func(*args, **kwargs)
exception = e
error = str(e)
if self.cache:
await self.cache.ainvalidate(cache_key)
finally:
if exception is not None:
self._acleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
raise exception
else:
return self._acleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)

return wrapper

@skip_decorator_if_func_in_stack(call_eval_funcs_then_log, _make_evaluations)
def sync_decorator(self, orig_func: Callable) -> Callable:
def wrapper(*args, **kwargs):
trace_id, start_time = self._init_trace()
response = None
error = None
cache_hit = False
cache_key = self.convert_kwargs_to_cache_request(args, kwargs)
exception = None
try:
trace_id, start_time = self._init_trace()
response = None
error = None
cache_hit = False
cache_key = self.convert_kwargs_to_cache_request(args, kwargs)
exception = None
try:
if self.cache:
cache_result = self.cache.get(cache_key)
if cache_result is not None:
response = self.convert_cache_to_response(args, kwargs, cache_result)
cache_hit = True
if response is None:
response = orig_func(*args, **kwargs)
except Exception as e:
exception = e
error = str(e)
if self.cache:
self.cache.invalidate(cache_key)
finally:
if exception is not None:
self._cleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
raise exception
else:
return self._cleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
if self.cache:
cache_result = self.cache.get(cache_key)
if cache_result is not None:
response = self.convert_cache_to_response(args, kwargs, cache_result)
cache_hit = True
if response is None:
response = orig_func(*args, **kwargs)
except Exception as e:
logger.debug(f"Error in openai sync_decorator: {e}")
return orig_func(*args, **kwargs)
exception = e
error = str(e)
if self.cache:
self.cache.invalidate(cache_key)
finally:
if exception is not None:
self._cleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)
raise exception
else:
return self._cleanup_trace(trace_id, start_time, error, cache_hit, args, kwargs, response)

return wrapper

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "parea-ai"
packages = [{ include = "parea" }]
version = "0.2.68a0"
version = "0.2.68a1"
description = "Parea python sdk"
readme = "README.md"
authors = ["joel-parea-ai <[email protected]>"]
Expand Down

0 comments on commit 2bdad25

Please sign in to comment.