diff --git a/src/anthropic/_base_client.py b/src/anthropic/_base_client.py index 98d154c0..c8d2b051 100644 --- a/src/anthropic/_base_client.py +++ b/src/anthropic/_base_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import sys +import os import json import time import uuid @@ -35,6 +36,7 @@ overload, ) from typing_extensions import Literal, override, get_origin +from ._heuristics import HeuristicGuard import anyio import httpx @@ -1530,10 +1532,24 @@ def post( DeprecationWarning, stacklevel=2, ) + + if os.environ.get("ANTHROPIC_ENABLE_HEURISTIC_GUARD"): + if isinstance(body, dict): + body_dict = cast(Dict[str, Any], body) + messages: List[Dict[str, Any]] = body_dict.get("messages", []) + last_msg: Any = next( + (m.get("content", "") for m in reversed(messages) if m.get("role") == "user"), + "", + ) + if isinstance(last_msg, str) and last_msg: + analysis: Dict[str, Any] = HeuristicGuard.analyze(last_msg) + if not analysis.get("valid", True): + raise ValueError(f"BLOCKED: Score {analysis.get('S', 0)}") + opts = FinalRequestOptions.construct( method="post", url=path, json_data=body, content=content, files=to_httpx_files(files), **options ) - return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)) + return self.request(cast_to, opts, stream=stream, stream_cls=stream_cls) def patch( self, @@ -2270,6 +2286,20 @@ async def post( DeprecationWarning, stacklevel=2, ) + + if os.environ.get("ANTHROPIC_ENABLE_HEURISTIC_GUARD"): + if isinstance(body, dict): + body_dict = cast(Dict[str, Any], body) + messages: List[Dict[str, Any]] = body_dict.get("messages", []) + last_msg: Any = next( + (m.get("content", "") for m in reversed(messages) if m.get("role") == "user"), + "", + ) + if isinstance(last_msg, str) and last_msg: + analysis: Dict[str, Any] = HeuristicGuard.analyze(last_msg) + if not analysis.get("valid", True): + raise ValueError(f"BLOCKED: Score {analysis.get('S', 0)}") + opts = FinalRequestOptions.construct( method="post", url=path, json_data=body, content=content, files=await async_to_httpx_files(files), **options ) diff --git a/src/anthropic/_heuristics.py b/src/anthropic/_heuristics.py new file mode 100644 index 00000000..c7cb91ba --- /dev/null +++ b/src/anthropic/_heuristics.py @@ -0,0 +1,57 @@ +import typing + +class HeuristicGuard: + """ + Filtre heuristique pour requêtes LLM en Français. + """ + + OPERANTS: typing.Set[str] = { + 'donne', 'fais', 'analyse', 'génère', 'genere', 'calcule', + 'audit', 'verdict', 'système', 'crée', 'cree', 'optimise', + 'explique', 'compare', 'résume', 'resume', 'évalue', 'evalue', + 'teste', 'montre', 'prouve', 'liste', 'décris', 'decris', 'generate' + } + + FORMATS: typing.Set[str] = { + 'json', 'tableau', 'liste', 'markdown', 'csv', 'expert', + 'physique', 'code', 'python', 'sql' + } + + THRESH_OPTIMAL: float = 2.3 + THRESH_ADMISSIBLE: float = 0.3 # To adjust the threshold for admissible prompts, we can change this value. + + @classmethod + def analyze(cls, prompt: str) -> typing.Dict[str, typing.Any]: + prompt = prompt.strip() + if len(prompt) < 3: + return {"S": 0.0, "verdict": "INCOHERENCE", "valid": False} + + tokens: typing.List[str] = prompt.lower().split() + t_len: int = len(tokens) + + beta: float = 1.0 + if t_len < 4: beta *= 0.6 + if t_len > 100: beta *= 0.85 + + complex_terms = len([t for t in tokens if len(t) > 7]) + if t_len > 0 and (complex_terms / t_len) > 0.4: + beta *= 1.15 + + score_delta: float = 0.1 + if any(op in tokens for op in cls.OPERANTS): score_delta += 0.45 + if any(fmt in tokens for fmt in cls.FORMATS): score_delta += 0.35 + + delta_c: float = min(1.0, score_delta + 0.1) + lambda_val: float = max(0.08, 1.1 - (score_delta * 0.85)) + + s_score: float = (beta * delta_c) / lambda_val + + verdict: str = "INCOHERENCE" + if s_score >= cls.THRESH_OPTIMAL: verdict = "OPTIMAL" + elif s_score >= cls.THRESH_ADMISSIBLE: verdict = "ADMISSIBLE" + + return { + "S": round(s_score, 2), + "verdict": verdict, + "valid": s_score >= cls.THRESH_ADMISSIBLE + } \ No newline at end of file