From f5be8b74ee85c104a4806664d42e14d47b8a2676 Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Fri, 17 May 2024 21:14:37 -0400 Subject: [PATCH 1/6] WIP --- parea/cookbook/tracing_tool_calling.py | 89 ++++++++++--------- parea/cookbook/tracing_with_images_open_ai.py | 2 +- .../tracing_with_open_ai_endpoint_directly.py | 6 +- parea/parea_logger.py | 33 +++---- parea/schemas/models.py | 3 + parea/utils/trace_utils.py | 13 ++- 6 files changed, 83 insertions(+), 63 deletions(-) diff --git a/parea/cookbook/tracing_tool_calling.py b/parea/cookbook/tracing_tool_calling.py index 45c6b50b..6df88314 100644 --- a/parea/cookbook/tracing_tool_calling.py +++ b/parea/cookbook/tracing_tool_calling.py @@ -13,48 +13,53 @@ p.wrap_openai_client(client) -tools = [ - { - "type": "function", - "function": { - "name": "get_current_weather", - "description": "Get the current weather in a given location", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. San Francisco, CA", +def main(): + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, }, - "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + "required": ["location"], }, - "required": ["location"], }, - }, - } -] -messages = [{"role": "user", "content": "What's the weather like in Boston today?"}] -completion = client.chat.completions.create( - model="gpt-3.5-turbo", - messages=messages, - tools=tools, - tool_choice="auto", -) -messages.append({k: v for k, v in completion.choices[0].message.model_dump().items() if v is not None}) -# messages.append(completion.choices[0].message) -messages.append({"role": "tool", "content": "5 Celcius", "tool_call_id": completion.choices[0].message.tool_calls[0].id}) -messages.append( - { - "role": "user", - "content": "What's the weather like in Boston today?", - } -) - -final_completion = client.chat.completions.create( - model="gpt-3.5-turbo", - messages=messages, - tools=tools, - tool_choice="auto", -) - -print(final_completion) + } + ] + messages = [{"role": "user", "content": "What's the weather like in Boston today?"}] + completion = client.chat.completions.create( + model="gpt-4o", + messages=messages, + tools=tools, + tool_choice="auto", + ) + messages.append({k: v for k, v in completion.choices[0].message.model_dump().items() if v is not None}) + # messages.append(completion.choices[0].message) + messages.append({"role": "tool", "content": "5 Celcius", "tool_call_id": completion.choices[0].message.tool_calls[0].id}) + messages.append( + { + "role": "user", + "content": "What's the weather like in Boston today?", + } + ) + + final_completion = client.chat.completions.create( + model="gpt-4o", + messages=messages, + tools=tools, + tool_choice="auto", + ) + + print(final_completion) + + +if __name__ == "__main__": + main() diff --git a/parea/cookbook/tracing_with_images_open_ai.py b/parea/cookbook/tracing_with_images_open_ai.py index d60a424f..86e96518 100644 --- a/parea/cookbook/tracing_with_images_open_ai.py +++ b/parea/cookbook/tracing_with_images_open_ai.py @@ -30,7 +30,7 @@ def image_maker(query: str) -> str: @trace def ask_vision(image_url: str) -> Optional[str]: response = client.chat.completions.create( - model="gpt-4-turbo", + model="gpt-4o", messages=[ { "role": "user", diff --git a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py index 9e71fe65..a6cb2842 100644 --- a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py +++ b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py @@ -17,7 +17,7 @@ p.wrap_openai_client(client) -def call_llm(data: List[dict], model: str = "gpt-4-turbo", temperature: float = 0.0) -> str: +def call_llm(data: List[dict], model: str = "gpt-4o", temperature: float = 0.0) -> str: return client.chat.completions.create(model=model, temperature=temperature, messages=data).choices[0].message.content @@ -64,7 +64,7 @@ def refiner(query: str, additional_description: str, argument: str, criticism: s {"role": "user", "content": criticism}, { "role": "system", - "content": f"Please generate a new argument that incorporates the feedback from the user.", + "content": "Please generate a new argument that incorporates the feedback from the user.", }, ], ) @@ -83,7 +83,7 @@ def argument_chain(query: str, additional_description: str = "") -> Tuple[str, s @trace(session_id="cus_1234", end_user_identifier="user_1234") def json_call() -> str: completion = client.chat.completions.create( - model="gpt-4-turbo-2024-04-09", + model="gpt-4o", messages=[{"role": "system", "content": "You are a helpful assistant talking in JSON."}, {"role": "user", "content": "What are you?"}], response_format={"type": "json_object"}, ) diff --git a/parea/parea_logger.py b/parea/parea_logger.py index 80783805..7a1b51f1 100644 --- a/parea/parea_logger.py +++ b/parea/parea_logger.py @@ -29,29 +29,32 @@ def set_project_uuid(self, project_uuid: str) -> None: def update_log(self, data: UpdateLog) -> None: data = serialize_metadata_values(data) - self._client.request( - "PUT", - LOG_ENDPOINT, - data=asdict(data), - ) + print("update_log", data) + # self._client.request( + # "PUT", + # LOG_ENDPOINT, + # data=asdict(data), + # ) def record_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) data.project_uuid = self._project_uuid - self._client.request( - "POST", - LOG_ENDPOINT, - data=asdict(data), - ) + print("record_log", data) + # self._client.request( + # "POST", + # LOG_ENDPOINT, + # data=asdict(data), + # ) async def arecord_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) data.project_uuid = self._project_uuid - await self._client.request_async( - "POST", - LOG_ENDPOINT, - data=asdict(data), - ) + print("arecord_log", data) + # await self._client.request_async( + # "POST", + # LOG_ENDPOINT, + # data=asdict(data), + # ) def write_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) diff --git a/parea/schemas/models.py b/parea/schemas/models.py index 76a54c46..f6cdd42c 100644 --- a/parea/schemas/models.py +++ b/parea/schemas/models.py @@ -141,6 +141,9 @@ class TraceLog(EvaluatedLog): comments: Optional[List[TraceLogCommentSchema]] = None annotations: Optional[Dict[int, Dict[str, TraceLogAnnotationSchema]]] = None + depth: int = 0 + execution_order: int = 0 + @define class TraceLogTree(TraceLog): diff --git a/parea/utils/trace_utils.py b/parea/utils/trace_utils.py index 11487c83..3f829641 100644 --- a/parea/utils/trace_utils.py +++ b/parea/utils/trace_utils.py @@ -55,8 +55,8 @@ def check_multiple_return_values(func) -> bool: def make_output(result, islist) -> Optional[str]: - if not result: - return None + if result is None: + return result try: if islist: json_list = [json_dumps(r) for r in result] @@ -179,6 +179,8 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> Tuple[str, # if we can't serialize the value, just convert it to a string inputs[k] = str(v) + depth = len(new_trace_context) - 1 + trace_data.get()[trace_id] = TraceLog( trace_id=trace_id, parent_trace_id=trace_id, @@ -194,10 +196,17 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> Tuple[str, experiment_uuid=os.environ.get(PAREA_OS_ENV_EXPERIMENT_UUID, None), apply_eval_frac=apply_eval_frac, deployment_id=deployment_id, + depth=depth, ) parent_trace_id = new_trace_context[-2] if len(new_trace_context) > 1 else None if parent_trace_id: fill_trace_data(trace_id, {"parent_trace_id": parent_trace_id}, UpdateTraceScenario.CHAIN) + parent_trace_data = trace_data.get()[parent_trace_id] + execution_order = len(parent_trace_data.children) + else: + execution_order = 0 + + trace_data.get()[trace_id].execution_order = execution_order except Exception as e: logger.debug(f"Error occurred initializing trace for function {func_name}, {e}") From 10393d2fb45d1f5100248f36c9f9b1e35443b046 Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Mon, 20 May 2024 16:05:11 -0400 Subject: [PATCH 2/6] add depth --- parea/parea_logger.py | 33 +++++++++++++++------------------ parea/utils/trace_utils.py | 20 +++++++++++++------- parea/wrapper/utils.py | 11 +++++++++-- parea/wrapper/wrapper.py | 19 ++++++++++++++++--- 4 files changed, 53 insertions(+), 30 deletions(-) diff --git a/parea/parea_logger.py b/parea/parea_logger.py index 7a1b51f1..80783805 100644 --- a/parea/parea_logger.py +++ b/parea/parea_logger.py @@ -29,32 +29,29 @@ def set_project_uuid(self, project_uuid: str) -> None: def update_log(self, data: UpdateLog) -> None: data = serialize_metadata_values(data) - print("update_log", data) - # self._client.request( - # "PUT", - # LOG_ENDPOINT, - # data=asdict(data), - # ) + self._client.request( + "PUT", + LOG_ENDPOINT, + data=asdict(data), + ) def record_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) data.project_uuid = self._project_uuid - print("record_log", data) - # self._client.request( - # "POST", - # LOG_ENDPOINT, - # data=asdict(data), - # ) + self._client.request( + "POST", + LOG_ENDPOINT, + data=asdict(data), + ) async def arecord_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) data.project_uuid = self._project_uuid - print("arecord_log", data) - # await self._client.request_async( - # "POST", - # LOG_ENDPOINT, - # data=asdict(data), - # ) + await self._client.request_async( + "POST", + LOG_ENDPOINT, + data=asdict(data), + ) def write_log(self, data: TraceLog) -> None: data = serialize_metadata_values(data) diff --git a/parea/utils/trace_utils.py b/parea/utils/trace_utils.py index 3f829641..025dae34 100644 --- a/parea/utils/trace_utils.py +++ b/parea/utils/trace_utils.py @@ -30,6 +30,9 @@ # Context variable to maintain running evals in thread thread_ids_running_evals = contextvars.ContextVar("thread_ids_running_evals", default=[]) +# Add a counter variable to maintain the execution order +execution_order_counters = contextvars.ContextVar("execution_order_counters", default={}) + def log_in_thread(target_func: Callable, data: Dict[str, Any]): logging_thread = threading.Thread(target=target_func, kwargs=data) @@ -180,11 +183,19 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> Tuple[str, inputs[k] = str(v) depth = len(new_trace_context) - 1 + root_trace_id = new_trace_context[0] + + # Get the execution order counter for the current root trace + counters = execution_order_counters.get() + if root_trace_id not in counters: + counters[root_trace_id] = 0 + execution_order = counters[root_trace_id] + counters[root_trace_id] += 1 trace_data.get()[trace_id] = TraceLog( trace_id=trace_id, parent_trace_id=trace_id, - root_trace_id=new_trace_context[0], + root_trace_id=root_trace_id, start_timestamp=start_time.isoformat(), trace_name=name or func_name, end_user_identifier=end_user_identifier, @@ -197,16 +208,11 @@ def init_trace(func_name, _parea_target_field, args, kwargs, func) -> Tuple[str, apply_eval_frac=apply_eval_frac, deployment_id=deployment_id, depth=depth, + execution_order=execution_order, ) parent_trace_id = new_trace_context[-2] if len(new_trace_context) > 1 else None if parent_trace_id: fill_trace_data(trace_id, {"parent_trace_id": parent_trace_id}, UpdateTraceScenario.CHAIN) - parent_trace_data = trace_data.get()[parent_trace_id] - execution_order = len(parent_trace_data.children) - else: - execution_order = 0 - - trace_data.get()[trace_id].execution_order = execution_order except Exception as e: logger.debug(f"Error occurred initializing trace for function {func_name}, {e}") diff --git a/parea/wrapper/utils.py b/parea/wrapper/utils.py index cd1899f0..f1410c90 100644 --- a/parea/wrapper/utils.py +++ b/parea/wrapper/utils.py @@ -198,7 +198,11 @@ def clean_json_string(s): try: function_args = json.loads(body.arguments) except json.decoder.JSONDecodeError: - function_args = json.loads(clean_json_string(body.arguments)) + try: + function_args = json.loads(clean_json_string(body.arguments)) + except Exception: + function_args = str(body.arguments) + if is_tool_call: calls.append( { @@ -307,7 +311,10 @@ def _process_stream_response(content: list, tools: dict, data: dict, trace_id: s tool_calls = [t["function"] for t in tools.values()] for tool in tool_calls: - tool["arguments"] = json.loads(tool["arguments"]) + try: + tool["arguments"] = json.loads(tool["arguments"]) + except Exception: + tool["arguments"] = str(tool["arguments"]) completion = final_content or json_dumps(tool_calls, indent=4) diff --git a/parea/wrapper/wrapper.py b/parea/wrapper/wrapper.py index 8c72ff04..f710b269 100644 --- a/parea/wrapper/wrapper.py +++ b/parea/wrapper/wrapper.py @@ -13,7 +13,7 @@ from parea.evals.utils import _make_evaluations from parea.helpers import is_logging_disabled, timezone_aware_now from parea.schemas.models import TraceLog, UpdateTraceScenario -from parea.utils.trace_utils import call_eval_funcs_then_log, fill_trace_data, trace_context, trace_data +from parea.utils.trace_utils import call_eval_funcs_then_log, fill_trace_data, trace_context, trace_data, execution_order_counters from parea.wrapper.utils import safe_format_template_to_prompt, skip_decorator_if_func_in_stack logger = logging.getLogger() @@ -86,11 +86,22 @@ def _init_trace(self, kwargs) -> Tuple[str, datetime, contextvars.Token]: if is_logging_disabled(): return trace_id, start_time, token + + depth = len(new_trace_context) - 1 + root_trace_id = new_trace_context[0] + + # Get the execution order counter for the current root trace + counters = execution_order_counters.get() + if root_trace_id not in counters: + counters[root_trace_id] = 0 + execution_order = counters[root_trace_id] + counters[root_trace_id] += 1 + try: trace_data.get()[trace_id] = TraceLog( trace_id=trace_id, - parent_trace_id=new_trace_context[0], - root_trace_id=new_trace_context[0], + parent_trace_id=root_trace_id, + root_trace_id=root_trace_id, start_timestamp=start_time.isoformat(), trace_name="LLM", end_user_identifier=None, @@ -100,6 +111,8 @@ def _init_trace(self, kwargs) -> Tuple[str, datetime, contextvars.Token]: tags=None, inputs=template_inputs, experiment_uuid=os.getenv(PAREA_OS_ENV_EXPERIMENT_UUID, None), + depth=depth, + execution_order=execution_order, ) parent_trace_id = new_trace_context[-2] if len(new_trace_context) > 1 else None From f102c2301d89545ebad25c15a23f11afeec2d53f Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Mon, 20 May 2024 22:17:18 -0400 Subject: [PATCH 3/6] add depth --- .../tracing_with_open_ai_endpoint_directly.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py index a6cb2842..a053ebdb 100644 --- a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py +++ b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py @@ -95,12 +95,12 @@ def json_call() -> str: "Whether sparkling wine is good for you.", additional_description="Provide a concise, few sentence argument on why sparkling wine is good for you.", ) - print(result) - p.record_feedback( - FeedbackRequest( - trace_id=trace_id, - score=0.7, # 0.0 (bad) to 1.0 (good) - ) - ) - - print(json_call()) + # print(result) + # p.record_feedback( + # FeedbackRequest( + # trace_id=trace_id, + # score=0.7, # 0.0 (bad) to 1.0 (good) + # ) + # ) + + # print(json_call()) From 857a04011f2420e4c8bbe9d7a975110875a9cca2 Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Tue, 21 May 2024 12:27:25 -0400 Subject: [PATCH 4/6] PAI-1061-execution-order-and-depth-py --- parea/wrapper/anthropic/anthropic.py | 1 + parea/wrapper/openai/openai.py | 1 + parea/wrapper/wrapper.py | 14 ++++++++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/parea/wrapper/anthropic/anthropic.py b/parea/wrapper/anthropic/anthropic.py index 4ac0bb67..bb750936 100644 --- a/parea/wrapper/anthropic/anthropic.py +++ b/parea/wrapper/anthropic/anthropic.py @@ -34,6 +34,7 @@ def init(self, log: Callable, cache: Cache, client: Client): convert_kwargs_to_cache_request=self.convert_kwargs_to_cache_request, convert_cache_to_response=self.convert_cache_to_response, aconvert_cache_to_response=self.aconvert_cache_to_response, + name="llm-anthropic", ) @staticmethod diff --git a/parea/wrapper/openai/openai.py b/parea/wrapper/openai/openai.py index 4460c8ac..41e4200e 100644 --- a/parea/wrapper/openai/openai.py +++ b/parea/wrapper/openai/openai.py @@ -73,6 +73,7 @@ def init(self, log: Callable, cache: Cache = None, module_client=openai): convert_kwargs_to_cache_request=self.convert_kwargs_to_cache_request, convert_cache_to_response=self.convert_cache_to_response, aconvert_cache_to_response=self.aconvert_cache_to_response, + name="llm-openai", ) def resolver(self, trace_id: str, _args: Sequence[Any], kwargs: Dict[str, Any], response: Optional[Any]) -> Optional[Any]: diff --git a/parea/wrapper/wrapper.py b/parea/wrapper/wrapper.py index f710b269..3d1c265f 100644 --- a/parea/wrapper/wrapper.py +++ b/parea/wrapper/wrapper.py @@ -9,7 +9,7 @@ from uuid import uuid4 from parea.cache.cache import Cache -from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID +from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID, TURN_OFF_PAREA_LOGGING from parea.evals.utils import _make_evaluations from parea.helpers import is_logging_disabled, timezone_aware_now from parea.schemas.models import TraceLog, UpdateTraceScenario @@ -33,6 +33,7 @@ def __init__( convert_cache_to_response: Callable, aconvert_cache_to_response: Callable, log: Callable, + name: str = "LLM", ) -> None: self.resolver = resolver self.gen_resolver = gen_resolver @@ -44,6 +45,7 @@ def __init__( self.convert_kwargs_to_cache_request = convert_kwargs_to_cache_request self.convert_cache_to_response = convert_cache_to_response self.aconvert_cache_to_response = aconvert_cache_to_response + self.name = name def wrap_functions(self, module: Any, func_names: List[str]): for func_name in func_names: @@ -79,14 +81,14 @@ def _init_trace(self, kwargs) -> Tuple[str, datetime, contextvars.Token]: new_trace_context = trace_context.get() + [trace_id] token = trace_context.set(new_trace_context) + if is_logging_disabled(): + return trace_id, start_time, token + if template_inputs := kwargs.pop("template_inputs", None): - for m in kwargs["messages"] or []: + for m in kwargs.get("messages", []): if isinstance(m, dict) and "content" in m: m["content"] = safe_format_template_to_prompt(m["content"], **template_inputs) - if is_logging_disabled(): - return trace_id, start_time, token - depth = len(new_trace_context) - 1 root_trace_id = new_trace_context[0] @@ -103,7 +105,7 @@ def _init_trace(self, kwargs) -> Tuple[str, datetime, contextvars.Token]: parent_trace_id=root_trace_id, root_trace_id=root_trace_id, start_timestamp=start_time.isoformat(), - trace_name="LLM", + trace_name=self.name, end_user_identifier=None, session_id=None, metadata=None, From d1fe0d3088e43555a5c2abc728793a34f3db1fb1 Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Tue, 21 May 2024 12:29:48 -0400 Subject: [PATCH 5/6] PAI-1061-execution-order-and-depth-py --- .../tracing_with_open_ai_endpoint_directly.py | 18 +++++++++--------- parea/wrapper/wrapper.py | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py index a053ebdb..a6cb2842 100644 --- a/parea/cookbook/tracing_with_open_ai_endpoint_directly.py +++ b/parea/cookbook/tracing_with_open_ai_endpoint_directly.py @@ -95,12 +95,12 @@ def json_call() -> str: "Whether sparkling wine is good for you.", additional_description="Provide a concise, few sentence argument on why sparkling wine is good for you.", ) - # print(result) - # p.record_feedback( - # FeedbackRequest( - # trace_id=trace_id, - # score=0.7, # 0.0 (bad) to 1.0 (good) - # ) - # ) - - # print(json_call()) + print(result) + p.record_feedback( + FeedbackRequest( + trace_id=trace_id, + score=0.7, # 0.0 (bad) to 1.0 (good) + ) + ) + + print(json_call()) diff --git a/parea/wrapper/wrapper.py b/parea/wrapper/wrapper.py index 3d1c265f..1f7ba673 100644 --- a/parea/wrapper/wrapper.py +++ b/parea/wrapper/wrapper.py @@ -9,11 +9,11 @@ from uuid import uuid4 from parea.cache.cache import Cache -from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID, TURN_OFF_PAREA_LOGGING +from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID from parea.evals.utils import _make_evaluations from parea.helpers import is_logging_disabled, timezone_aware_now from parea.schemas.models import TraceLog, UpdateTraceScenario -from parea.utils.trace_utils import call_eval_funcs_then_log, fill_trace_data, trace_context, trace_data, execution_order_counters +from parea.utils.trace_utils import call_eval_funcs_then_log, execution_order_counters, fill_trace_data, trace_context, trace_data from parea.wrapper.utils import safe_format_template_to_prompt, skip_decorator_if_func_in_stack logger = logging.getLogger() From 12e4cf81380822807560828b149705de85ce821b Mon Sep 17 00:00:00 2001 From: Joel Alexander Date: Tue, 21 May 2024 12:30:07 -0400 Subject: [PATCH 6/6] PAI-1061-execution-order-and-depth-py --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e437455e..3b1fd198 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "parea-ai" packages = [{ include = "parea" }] -version = "0.2.157" +version = "0.2.158" description = "Parea python sdk" readme = "README.md" authors = ["joel-parea-ai "]