|
1 | 1 | """Module with the logic to create and manage traces and steps."""
|
2 | 2 |
|
3 |
| -import time |
4 | 3 | import asyncio
|
| 4 | +import contextvars |
5 | 5 | import inspect
|
6 | 6 | import logging
|
7 |
| -import contextvars |
8 |
| -from typing import Any, Dict, List, Tuple, Optional, Awaitable, Generator |
9 |
| -from functools import wraps |
| 7 | +import time |
| 8 | +import traceback |
10 | 9 | from contextlib import contextmanager
|
| 10 | +from functools import wraps |
| 11 | +from typing import Any, Awaitable, Dict, Generator, List, Optional, Tuple |
11 | 12 |
|
12 |
| -from . import enums, steps, traces |
13 |
| -from .. import utils |
14 |
| -from ..._client import Openlayer |
15 | 13 | from ..._base_client import DefaultHttpxClient
|
| 14 | +from ..._client import Openlayer |
16 | 15 | from ...types.inference_pipelines.data_stream_params import ConfigLlmData
|
| 16 | +from .. import utils |
| 17 | +from . import enums, steps, traces |
17 | 18 |
|
18 | 19 | logger = logging.getLogger(__name__)
|
19 | 20 |
|
@@ -251,12 +252,14 @@ async def __anext__(self):
|
251 | 252 | # Initialize tracing on first iteration only
|
252 | 253 | if not self._trace_initialized:
|
253 | 254 | self._original_gen = func(*func_args, **func_kwargs)
|
254 |
| - self._step, self._is_root_step, self._token = _create_and_initialize_step( |
255 |
| - step_name=step_name, |
256 |
| - step_type=enums.StepType.USER_CALL, |
257 |
| - inputs=None, |
258 |
| - output=None, |
259 |
| - metadata=None, |
| 255 | + self._step, self._is_root_step, self._token = ( |
| 256 | + _create_and_initialize_step( |
| 257 | + step_name=step_name, |
| 258 | + step_type=enums.StepType.USER_CALL, |
| 259 | + inputs=None, |
| 260 | + output=None, |
| 261 | + metadata=None, |
| 262 | + ) |
260 | 263 | )
|
261 | 264 | self._inputs = _extract_function_inputs(
|
262 | 265 | func_signature=func_signature,
|
@@ -466,16 +469,25 @@ def _handle_trace_completion(
|
466 | 469 | )
|
467 | 470 | if _publish:
|
468 | 471 | try:
|
| 472 | + inference_pipeline_id = inference_pipeline_id or utils.get_env_variable( |
| 473 | + "OPENLAYER_INFERENCE_PIPELINE_ID" |
| 474 | + ) |
469 | 475 | client = _get_client()
|
470 | 476 | if client:
|
471 | 477 | client.inference_pipelines.data.stream(
|
472 |
| - inference_pipeline_id=inference_pipeline_id |
473 |
| - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), |
| 478 | + inference_pipeline_id=inference_pipeline_id, |
474 | 479 | rows=[trace_data],
|
475 | 480 | config=config,
|
476 | 481 | )
|
477 | 482 | except Exception as err: # pylint: disable=broad-except
|
478 |
| - logger.error("Could not stream data to Openlayer %s", err) |
| 483 | + logger.error(traceback.format_exc()) |
| 484 | + logger.error( |
| 485 | + "Could not stream data to Openlayer (pipeline_id: %s, base_url: %s)" |
| 486 | + " Error: %s", |
| 487 | + inference_pipeline_id, |
| 488 | + client.base_url, |
| 489 | + err, |
| 490 | + ) |
479 | 491 | else:
|
480 | 492 | logger.debug("Ending step %s", step_name)
|
481 | 493 |
|
@@ -557,7 +569,6 @@ def _finalize_step_logging(
|
557 | 569 | # ----------------------------- Async generator specific functions ----------------------------- #
|
558 | 570 |
|
559 | 571 |
|
560 |
| - |
561 | 572 | def _finalize_async_generator_step(
|
562 | 573 | step: steps.Step,
|
563 | 574 | token: Any,
|
|
0 commit comments