-
Notifications
You must be signed in to change notification settings - Fork 93
Setup langfuse #716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GuittenyMartin
wants to merge
8
commits into
main
Choose a base branch
from
setup-langfuse
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Setup langfuse #716
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
244ed6e
➕(summary) add dependency and env variables for langfuse
0f73bcc
📈(summary) add observability with langfuse
0d51cdc
📈(summary) add langfuse trace on stt call
4ad6b79
fixup! 📈(summary) add observability with langfuse
983c7fa
fixup! 📈(summary) add observability with langfuse
398afe1
fixup! 📈(summary) add observability with langfuse
a9e9cff
🎨(summary) create observability class
7f2308e
📈(summary) delete observability span for minio
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,18 +9,19 @@ | |||||
| from pathlib import Path | ||||||
| from typing import Optional | ||||||
|
|
||||||
| import openai | ||||||
| import sentry_sdk | ||||||
| from celery import Celery, signals | ||||||
| from celery.utils.log import get_task_logger | ||||||
| from minio import Minio | ||||||
| from mutagen import File | ||||||
| from openai import OpenAI | ||||||
| from requests import Session, exceptions | ||||||
| from requests.adapters import HTTPAdapter | ||||||
| from urllib3.util import Retry | ||||||
|
|
||||||
| from summary.core.analytics import MetadataManager, get_analytics | ||||||
| from summary.core.config import get_settings | ||||||
| from summary.core.observability import Observability | ||||||
| from summary.core.prompt import ( | ||||||
| PROMPT_SYSTEM_CLEANING, | ||||||
| PROMPT_SYSTEM_NEXT_STEP, | ||||||
|
|
@@ -46,6 +47,14 @@ | |||||
|
|
||||||
| celery.config_from_object("summary.core.celery_config") | ||||||
|
|
||||||
| obs = Observability( | ||||||
| is_enabled=settings.langfuse_is_enabled, | ||||||
| langfuse_host=settings.langfuse_host, | ||||||
| langfuse_public_key=settings.langfuse_public_key, | ||||||
| langfuse_secret_key=settings.langfuse_secret_key, | ||||||
| ) | ||||||
| logger.info("Observability enabled: %s", obs.is_enabled) | ||||||
|
|
||||||
| if settings.sentry_dsn and settings.sentry_is_enabled: | ||||||
|
|
||||||
| @signals.celeryd_init.connect | ||||||
|
|
@@ -111,9 +120,10 @@ class LLMService: | |||||
|
|
||||||
| def __init__(self): | ||||||
| """Init the LLMService once.""" | ||||||
| self._client = openai.OpenAI( | ||||||
| self._client = OpenAI( | ||||||
| base_url=settings.llm_base_url, api_key=settings.llm_api_key | ||||||
| ) | ||||||
| self.gen_ctx = obs.generation | ||||||
|
|
||||||
| def call(self, system_prompt: str, user_prompt: str): | ||||||
| """Call the LLM service. | ||||||
|
|
@@ -134,6 +144,14 @@ def call(self, system_prompt: str, user_prompt: str): | |||||
| logger.error("LLM call failed: %s", e) | ||||||
| raise LLMException("LLM call failed.") from e | ||||||
|
|
||||||
| def call_llm_gen(self, name, system, user): | ||||||
| """Call the LLM service within a generation context.""" | ||||||
| with self.gen_ctx( | ||||||
| name=name, | ||||||
| model=settings.llm_model, | ||||||
| ): | ||||||
| return self.call(system, user) | ||||||
|
|
||||||
|
|
||||||
| def format_segments(transcription_data): | ||||||
| """Format transcription segments from WhisperX into a readable conversation format. | ||||||
|
|
@@ -201,7 +219,8 @@ def task_failure_handler(task_id, exception=None, **kwargs): | |||||
| autoretry_for=[exceptions.HTTPError], | ||||||
| max_retries=settings.celery_max_retries, | ||||||
| ) | ||||||
| def process_audio_transcribe_summarize_v2( | ||||||
| @obs.observe(name="process-audio", capture_input=True, capture_output=False) | ||||||
| def process_audio_transcribe_summarize_v2( # noqa: PLR0915 | ||||||
| self, | ||||||
| filename: str, | ||||||
| email: str, | ||||||
|
|
@@ -222,6 +241,21 @@ def process_audio_transcribe_summarize_v2( | |||||
| logger.info("Notification received") | ||||||
| logger.debug("filename: %s", filename) | ||||||
|
|
||||||
| try: | ||||||
| obs.update_current_trace( | ||||||
| user_id=sub or email, | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess sub is always set
Suggested change
It applies for next calls Question: Should we use |
||||||
| tags=["celery", "transcription", "whisperx"], | ||||||
| metadata={ | ||||||
| "filename": filename, | ||||||
| "room": room, | ||||||
| "recording_date": recording_date, | ||||||
| "recording_time": recording_time, | ||||||
| }, | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| logger.warning("Langfuse update trace failed: %s", e) | ||||||
| pass | ||||||
|
|
||||||
| task_id = self.request.id | ||||||
|
|
||||||
| minio_client = Minio( | ||||||
|
|
@@ -236,7 +270,6 @@ def process_audio_transcribe_summarize_v2( | |||||
| audio_file_stream = minio_client.get_object( | ||||||
| settings.aws_storage_bucket_name, object_name=filename | ||||||
| ) | ||||||
|
|
||||||
| temp_file_path = save_audio_stream(audio_file_stream) | ||||||
|
|
||||||
| logger.info("Recording successfully downloaded") | ||||||
|
|
@@ -257,7 +290,7 @@ def process_audio_transcribe_summarize_v2( | |||||
| raise AudioValidationError(error_msg) | ||||||
|
|
||||||
| logger.info("Initiating WhisperX client") | ||||||
| whisperx_client = openai.OpenAI( | ||||||
| whisperx_client = OpenAI( | ||||||
| api_key=settings.whisperx_api_key, | ||||||
| base_url=settings.whisperx_base_url, | ||||||
| max_retries=settings.whisperx_max_retries, | ||||||
|
|
@@ -266,20 +299,25 @@ def process_audio_transcribe_summarize_v2( | |||||
| try: | ||||||
| logger.info("Querying transcription …") | ||||||
| transcription_start_time = time.time() | ||||||
| with open(temp_file_path, "rb") as audio_file: | ||||||
| transcription = whisperx_client.audio.transcriptions.create( | ||||||
| model=settings.whisperx_asr_model, file=audio_file | ||||||
| ) | ||||||
| metadata_manager.track( | ||||||
| task_id, | ||||||
| { | ||||||
| "transcription_time": round( | ||||||
| time.time() - transcription_start_time, 2 | ||||||
| ) | ||||||
| }, | ||||||
| ) | ||||||
| logger.info("Transcription received.") | ||||||
| logger.debug("Transcription: \n %s", transcription) | ||||||
| with obs.span( | ||||||
| name="whisperx.transcribe", | ||||||
| input={ | ||||||
| "model": settings.whisperx_asr_model, | ||||||
| "audio_seconds": round(audio_file.info.length, 2), | ||||||
| "endpoint": settings.whisperx_base_url, | ||||||
| }, | ||||||
| ): | ||||||
| with open(temp_file_path, "rb") as audio_file_rb: | ||||||
| transcription = whisperx_client.audio.transcriptions.create( | ||||||
| model=settings.whisperx_asr_model, | ||||||
| file=audio_file_rb, | ||||||
| ) | ||||||
| metadata_manager.track( | ||||||
| task_id, | ||||||
| {"transcription_time": round(time.time() - transcription_start_time, 2)}, | ||||||
| ) | ||||||
| logger.info("Transcription received.") | ||||||
| logger.debug("Transcription: \n %s", transcription) | ||||||
| finally: | ||||||
| if os.path.exists(temp_file_path): | ||||||
| os.remove(temp_file_path) | ||||||
|
|
@@ -337,6 +375,7 @@ def process_audio_transcribe_summarize_v2( | |||||
| max_retries=settings.celery_max_retries, | ||||||
| queue=settings.summarize_queue, | ||||||
| ) | ||||||
| @obs.observe(name="summarize-transcription", capture_input=False, capture_output=False) | ||||||
| def summarize_transcription(self, transcript: str, email: str, sub: str, title: str): | ||||||
| """Generate a summary from the provided transcription text. | ||||||
|
|
||||||
|
|
@@ -351,11 +390,21 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: | |||||
|
|
||||||
| llm_service = LLMService() | ||||||
|
|
||||||
| tldr = llm_service.call(PROMPT_SYSTEM_TLDR, transcript) | ||||||
| try: | ||||||
| obs.update_current_trace( | ||||||
| user_id=sub or email, | ||||||
| tags=["celery", "summarization"], | ||||||
| metadata={"title": title}, | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| logger.warning("Langfuse update trace failed: %s", e) | ||||||
| pass | ||||||
|
|
||||||
| tldr = llm_service.call_llm_gen("tldr", PROMPT_SYSTEM_TLDR, transcript) | ||||||
|
|
||||||
| logger.info("TLDR generated") | ||||||
|
|
||||||
| parts = llm_service.call(PROMPT_SYSTEM_PLAN, transcript) | ||||||
| parts = llm_service.call_llm_gen("plan", PROMPT_SYSTEM_PLAN, transcript) | ||||||
| logger.info("Plan generated") | ||||||
|
|
||||||
| parts = parts.split("\n") | ||||||
|
|
@@ -366,16 +415,22 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: | |||||
| for part in parts: | ||||||
| prompt_user_part = PROMPT_USER_PART.format(part=part, transcript=transcript) | ||||||
| logger.info("Summarizing part: %s", part) | ||||||
| parts_summarized.append(llm_service.call(PROMPT_SYSTEM_PART, prompt_user_part)) | ||||||
| parts_summarized.append( | ||||||
| llm_service.call_llm_gen("part", PROMPT_SYSTEM_PART, prompt_user_part) | ||||||
| ) | ||||||
|
|
||||||
| logger.info("Parts summarized") | ||||||
|
|
||||||
| raw_summary = "\n\n".join(parts_summarized) | ||||||
|
|
||||||
| next_steps = llm_service.call(PROMPT_SYSTEM_NEXT_STEP, transcript) | ||||||
| next_steps = llm_service.call_llm_gen( | ||||||
| "next_steps", PROMPT_SYSTEM_NEXT_STEP, transcript | ||||||
| ) | ||||||
| logger.info("Next steps generated") | ||||||
|
|
||||||
| cleaned_summary = llm_service.call(PROMPT_SYSTEM_CLEANING, raw_summary) | ||||||
| cleaned_summary = llm_service.call_llm_gen( | ||||||
| "cleaning", PROMPT_SYSTEM_CLEANING, raw_summary | ||||||
| ) | ||||||
| logger.info("Summary cleaned") | ||||||
|
|
||||||
| summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps | ||||||
|
|
@@ -395,3 +450,8 @@ def summarize_transcription(self, transcript: str, email: str, sub: str, title: | |||||
|
|
||||||
| logger.info("Webhook submitted successfully. Status: %s", response.status_code) | ||||||
| logger.debug("Response body: %s", response.text) | ||||||
| try: | ||||||
| obs.flush() | ||||||
| except Exception as e: | ||||||
| logger.warning("Langfuse flush failed: %s", e) | ||||||
| pass | ||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| """Wrapper around Langfuse observability.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from contextlib import nullcontext | ||
| from typing import Any, Callable, ContextManager | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| try: | ||
| from langfuse import Langfuse as _Langfuse | ||
| from langfuse import observe as _lf_observe | ||
| except Exception as e: | ||
| logger.debug("Langfuse import failed: %s", e) | ||
| _Langfuse = None | ||
| _lf_observe = None | ||
|
|
||
|
|
||
| class Observability: | ||
| """Wrapper around Langfuse observability.""" | ||
|
|
||
| def __init__( | ||
| self, is_enabled, langfuse_host, langfuse_public_key, langfuse_secret_key | ||
| ) -> None: | ||
| """Initialize the Observability instance.""" | ||
| self._client = None | ||
| if hasattr(langfuse_secret_key, "get_secret_value"): | ||
| langfuse_secret_key = langfuse_secret_key.get_secret_value() | ||
|
|
||
| self._enabled = bool( | ||
| is_enabled and langfuse_host and langfuse_public_key and langfuse_secret_key | ||
| ) | ||
|
|
||
| if not self._enabled or _Langfuse is None: | ||
| self._enabled = False | ||
| return | ||
|
|
||
| try: | ||
| self._client = _Langfuse( | ||
| public_key=langfuse_public_key, | ||
| secret_key=langfuse_secret_key, | ||
| host=langfuse_host, | ||
| ) | ||
| except Exception as e: | ||
| logger.warning("Langfuse init failed: %s", e) | ||
| self._enabled = False | ||
| self._client = None | ||
|
|
||
| def observe( | ||
| self, **decorator_kwargs | ||
| ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | ||
| """Decorator to observe a function with Langfuse. If disabled, returns a no-op decorator.""" # noqa: E501 | ||
| if self._enabled and self._client and _lf_observe is not None: | ||
| return _lf_observe(**decorator_kwargs) | ||
|
|
||
| def _noop(fn): | ||
| return fn | ||
|
|
||
| return _noop | ||
|
|
||
| def span(self, name: str, **kwargs) -> ContextManager[Any]: | ||
| """Context manager to create a span with Langfuse.""" | ||
| if self._enabled and self._client: | ||
| start_span = getattr(self._client, "start_as_current_span", None) | ||
| if callable(start_span): | ||
| return start_span(name=name, **kwargs) | ||
| return nullcontext() | ||
|
|
||
| def generation(self, **kwargs) -> ContextManager[Any]: | ||
| """Context manager to create a generation with Langfuse.""" | ||
| if self._enabled and self._client: | ||
| start_gen = getattr(self._client, "start_as_current_generation", None) | ||
| if callable(start_gen): | ||
| return start_gen(**kwargs) | ||
| return nullcontext() | ||
|
|
||
| def update_current_trace(self, **kwargs) -> None: | ||
| """Update the current trace with additional metadata.""" | ||
| if not (self._enabled and self._client): | ||
| return | ||
| try: | ||
| self._client.update_current_trace(**kwargs) | ||
| except Exception as e: | ||
| logger.warning("Langfuse update_current_trace failed: %s", e) | ||
| pass | ||
|
|
||
| def flush(self) -> None: | ||
| """Flush any buffered data to Langfuse.""" | ||
| if not (self._enabled and self._client): | ||
| return | ||
| try: | ||
| self._client.flush() | ||
| except Exception as e: | ||
| logger.warning("Langfuse flush failed: %s", e) | ||
| pass | ||
|
|
||
| @property | ||
| def is_enabled(self) -> bool: | ||
| """Check if observability is enabled.""" | ||
| return bool(self._enabled and self._client) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
🧩 Analysis chain
Verify the Langfuse version is current and secure.
The pinned version
langfuse==3.4.0should be validated to ensure it's up-to-date and free from known vulnerabilities.Run the following script to check the latest version and security advisories:
🏁 Script executed:
Length of output: 137
Update langfuse to latest version (3.5.2)
In
src/summary/pyproject.toml(line 18), changelangfuse==3.4.0tolangfuse==3.5.2. No known vulnerabilities.🤖 Prompt for AI Agents