Skip to content

Commit a9e9cff

Browse files
author
Martin Guitteny
committed
🎨(summary) create observability class
Create observability class to be more maintenable Change LLM calling from langfuse to openai to hide prompts
1 parent 398afe1 commit a9e9cff

File tree

4 files changed

+146
-34
lines changed

4 files changed

+146
-34
lines changed

env.d/development/summary.dist

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ POSTHOG_ENABLED="False"
2525
LANGFUSE_SECRET_KEY="your-secret-key"
2626
LANGFUSE_PUBLIC_KEY="your-public-key"
2727
LANGFUSE_HOST="https://cloud.langfuse.com"
28+
LANFUSE_ENABLED="False"

src/summary/summary/core/celery_worker.py

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,22 @@
66
import os
77
import tempfile
88
import time
9-
from contextlib import nullcontext
109
from pathlib import Path
1110
from typing import Optional
1211

1312
import sentry_sdk
1413
from celery import Celery, signals
1514
from celery.utils.log import get_task_logger
16-
from langfuse import get_client, observe
17-
from langfuse.openai import openai
1815
from minio import Minio
1916
from mutagen import File
17+
from openai import OpenAI
2018
from requests import Session, exceptions
2119
from requests.adapters import HTTPAdapter
2220
from urllib3.util import Retry
2321

2422
from summary.core.analytics import MetadataManager, get_analytics
2523
from summary.core.config import get_settings
24+
from summary.core.observability import Observability
2625
from summary.core.prompt import (
2726
PROMPT_SYSTEM_CLEANING,
2827
PROMPT_SYSTEM_NEXT_STEP,
@@ -48,7 +47,13 @@
4847

4948
celery.config_from_object("summary.core.celery_config")
5049

51-
langfuse = get_client()
50+
obs = Observability(
51+
is_enabled=settings.langfuse_is_enabled,
52+
langfuse_host=settings.langfuse_host,
53+
langfuse_public_key=settings.langfuse_public_key,
54+
langfuse_secret_key=settings.langfuse_secret_key,
55+
)
56+
logger.info("Observability enabled: %s", obs.is_enabled)
5257

5358
if settings.sentry_dsn and settings.sentry_is_enabled:
5459

@@ -115,9 +120,10 @@ class LLMService:
115120

116121
def __init__(self):
117122
"""Init the LLMService once."""
118-
self._client = openai.OpenAI(
123+
self._client = OpenAI(
119124
base_url=settings.llm_base_url, api_key=settings.llm_api_key
120125
)
126+
self.gen_ctx = obs.generation
121127

122128
def call(self, system_prompt: str, user_prompt: str):
123129
"""Call the LLM service.
@@ -138,6 +144,14 @@ def call(self, system_prompt: str, user_prompt: str):
138144
logger.error("LLM call failed: %s", e)
139145
raise LLMException("LLM call failed.") from e
140146

147+
def call_llm_gen(self, name, system, user):
148+
"""Call the LLM service within a generation context."""
149+
with self.gen_ctx(
150+
name=name,
151+
model=settings.llm_model,
152+
):
153+
return self.call(system, user)
154+
141155

142156
def format_segments(transcription_data):
143157
"""Format transcription segments from WhisperX into a readable conversation format.
@@ -205,7 +219,7 @@ def task_failure_handler(task_id, exception=None, **kwargs):
205219
autoretry_for=[exceptions.HTTPError],
206220
max_retries=settings.celery_max_retries,
207221
)
208-
@observe(name="process-audio", capture_input=True, capture_output=False)
222+
@obs.observe(name="process-audio", capture_input=True, capture_output=False)
209223
def process_audio_transcribe_summarize_v2( # noqa: PLR0915
210224
self,
211225
filename: str,
@@ -228,7 +242,7 @@ def process_audio_transcribe_summarize_v2( # noqa: PLR0915
228242
logger.debug("filename: %s", filename)
229243

230244
try:
231-
langfuse.update_current_trace(
245+
obs.update_current_trace(
232246
user_id=sub or email,
233247
tags=["celery", "transcription", "whisperx"],
234248
metadata={
@@ -253,10 +267,7 @@ def process_audio_transcribe_summarize_v2( # noqa: PLR0915
253267

254268
logger.debug("Connection to the Minio bucket successful")
255269

256-
span_ctx = getattr(langfuse, "start_as_current_span", None) or (
257-
lambda **_: nullcontext()
258-
)
259-
with span_ctx(
270+
with obs.span(
260271
name="minio.get_object",
261272
input={
262273
"bucket": settings.aws_storage_bucket_name,
@@ -288,7 +299,7 @@ def process_audio_transcribe_summarize_v2( # noqa: PLR0915
288299
raise AudioValidationError(error_msg)
289300

290301
logger.info("Initiating WhisperX client")
291-
whisperx_client = openai.OpenAI(
302+
whisperx_client = OpenAI(
292303
api_key=settings.whisperx_api_key,
293304
base_url=settings.whisperx_base_url,
294305
max_retries=settings.whisperx_max_retries,
@@ -297,7 +308,7 @@ def process_audio_transcribe_summarize_v2( # noqa: PLR0915
297308
try:
298309
logger.info("Querying transcription …")
299310
transcription_start_time = time.time()
300-
with span_ctx(
311+
with obs.span(
301312
name="whisperx.transcribe",
302313
input={
303314
"model": settings.whisperx_asr_model,
@@ -373,7 +384,7 @@ def process_audio_transcribe_summarize_v2( # noqa: PLR0915
373384
max_retries=settings.celery_max_retries,
374385
queue=settings.summarize_queue,
375386
)
376-
@observe(name="summarize-transcription", capture_input=False, capture_output=False)
387+
@obs.observe(name="summarize-transcription", capture_input=False, capture_output=False)
377388
def summarize_transcription(self, transcript: str, email: str, sub: str, title: str):
378389
"""Generate a summary from the provided transcription text.
379390
@@ -387,8 +398,9 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title:
387398
logger.info("Starting summarization task")
388399

389400
llm_service = LLMService()
401+
390402
try:
391-
langfuse.update_current_trace(
403+
obs.update_current_trace(
392404
user_id=sub or email,
393405
tags=["celery", "summarization"],
394406
metadata={"title": title},
@@ -397,24 +409,11 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title:
397409
logger.warning("Langfuse update trace failed: %s", e)
398410
pass
399411

400-
gen_ctx = getattr(
401-
langfuse,
402-
"start_as_current_generation",
403-
None,
404-
) or (lambda **_: nullcontext())
405-
406-
def call_llm_gen(name, system, user):
407-
with gen_ctx(
408-
name=name,
409-
model=settings.llm_model,
410-
):
411-
return llm_service.call(system, user)
412-
413-
tldr = call_llm_gen("tldr", PROMPT_SYSTEM_TLDR, transcript)
412+
tldr = llm_service.call_llm_gen("tldr", PROMPT_SYSTEM_TLDR, transcript)
414413

415414
logger.info("TLDR generated")
416415

417-
parts = call_llm_gen("plan", PROMPT_SYSTEM_PLAN, transcript)
416+
parts = llm_service.call_llm_gen("plan", PROMPT_SYSTEM_PLAN, transcript)
418417
logger.info("Plan generated")
419418

420419
parts = parts.split("\n")
@@ -426,17 +425,21 @@ def call_llm_gen(name, system, user):
426425
prompt_user_part = PROMPT_USER_PART.format(part=part, transcript=transcript)
427426
logger.info("Summarizing part: %s", part)
428427
parts_summarized.append(
429-
call_llm_gen("part", PROMPT_SYSTEM_PART, prompt_user_part)
428+
llm_service.call_llm_gen("part", PROMPT_SYSTEM_PART, prompt_user_part)
430429
)
431430

432431
logger.info("Parts summarized")
433432

434433
raw_summary = "\n\n".join(parts_summarized)
435434

436-
next_steps = call_llm_gen("next_steps", PROMPT_SYSTEM_NEXT_STEP, transcript)
435+
next_steps = llm_service.call_llm_gen(
436+
"next_steps", PROMPT_SYSTEM_NEXT_STEP, transcript
437+
)
437438
logger.info("Next steps generated")
438439

439-
cleaned_summary = call_llm_gen("cleaning", PROMPT_SYSTEM_CLEANING, raw_summary)
440+
cleaned_summary = llm_service.call_llm_gen(
441+
"cleaning", PROMPT_SYSTEM_CLEANING, raw_summary
442+
)
440443
logger.info("Summary cleaned")
441444

442445
summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps
@@ -457,7 +460,7 @@ def call_llm_gen(name, system, user):
457460
logger.info("Webhook submitted successfully. Status: %s", response.status_code)
458461
logger.debug("Response body: %s", response.text)
459462
try:
460-
langfuse.flush()
463+
obs.flush()
461464
except Exception as e:
462465
logger.warning("Langfuse flush failed: %s", e)
463466
pass

src/summary/summary/core/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Annotated, List, Optional
55

66
from fastapi import Depends
7+
from pydantic import SecretStr
78
from pydantic_settings import BaseSettings, SettingsConfigDict
89

910

@@ -75,6 +76,12 @@ class Settings(BaseSettings):
7576
task_tracker_redis_url: str = "redis://redis/0"
7677
task_tracker_prefix: str = "task_metadata:"
7778

79+
# Langfuse
80+
langfuse_is_enabled: bool = True
81+
langfuse_host: Optional[str] = None
82+
langfuse_public_key: Optional[str] = None
83+
langfuse_secret_key: Optional[SecretStr] = None
84+
7885

7986
@lru_cache
8087
def get_settings():
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Wrapper around Langfuse observability."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from contextlib import nullcontext
7+
from typing import Any, Callable, ContextManager
8+
9+
logger = logging.getLogger(__name__)
10+
11+
try:
12+
from langfuse import Langfuse as _Langfuse
13+
from langfuse import observe as _lf_observe
14+
except Exception as e:
15+
logger.debug("Langfuse import failed: %s", e)
16+
_Langfuse = None
17+
_lf_observe = None
18+
19+
20+
class Observability:
21+
"""Wrapper around Langfuse observability."""
22+
23+
def __init__(
24+
self, is_enabled, langfuse_host, langfuse_public_key, langfuse_secret_key
25+
) -> None:
26+
"""Initialize the Observability instance."""
27+
self._client = None
28+
if hasattr(langfuse_secret_key, "get_secret_value"):
29+
langfuse_secret_key = langfuse_secret_key.get_secret_value()
30+
31+
self._enabled = bool(
32+
is_enabled and langfuse_host and langfuse_public_key and langfuse_secret_key
33+
)
34+
35+
if not self._enabled or _Langfuse is None:
36+
self._enabled = False
37+
return
38+
39+
try:
40+
self._client = _Langfuse(
41+
public_key=langfuse_public_key,
42+
secret_key=langfuse_secret_key,
43+
host=langfuse_host,
44+
)
45+
except Exception as e:
46+
logger.warning("Langfuse init failed: %s", e)
47+
self._enabled = False
48+
self._client = None
49+
50+
def observe(
51+
self, **decorator_kwargs
52+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
53+
"""Decorator to observe a function with Langfuse. If disabled, returns a no-op decorator.""" # noqa: E501
54+
if self._enabled and self._client and _lf_observe is not None:
55+
return _lf_observe(**decorator_kwargs)
56+
57+
def _noop(fn):
58+
return fn
59+
60+
return _noop
61+
62+
def span(self, name: str, **kwargs) -> ContextManager[Any]:
63+
"""Context manager to create a span with Langfuse."""
64+
if self._enabled and self._client:
65+
start_span = getattr(self._client, "start_as_current_span", None)
66+
if callable(start_span):
67+
return start_span(name=name, **kwargs)
68+
return nullcontext()
69+
70+
def generation(self, **kwargs) -> ContextManager[Any]:
71+
"""Context manager to create a generation with Langfuse."""
72+
if self._enabled and self._client:
73+
start_gen = getattr(self._client, "start_as_current_generation", None)
74+
if callable(start_gen):
75+
return start_gen(**kwargs)
76+
return nullcontext()
77+
78+
def update_current_trace(self, **kwargs) -> None:
79+
"""Update the current trace with additional metadata."""
80+
if not (self._enabled and self._client):
81+
return
82+
try:
83+
self._client.update_current_trace(**kwargs)
84+
except Exception as e:
85+
logger.warning("Langfuse update_current_trace failed: %s", e)
86+
pass
87+
88+
def flush(self) -> None:
89+
"""Flush any buffered data to Langfuse."""
90+
if not (self._enabled and self._client):
91+
return
92+
try:
93+
self._client.flush()
94+
except Exception as e:
95+
logger.warning("Langfuse flush failed: %s", e)
96+
pass
97+
98+
@property
99+
def is_enabled(self) -> bool:
100+
"""Check if observability is enabled."""
101+
return bool(self._enabled and self._client)

0 commit comments

Comments
 (0)