Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
asofter committed May 6, 2024
2 parents 74af5a6 + 6d49d3a commit 9def3cb
Show file tree
Hide file tree
Showing 103 changed files with 1,274 additions and 814 deletions.
45 changes: 24 additions & 21 deletions llm_guard/input_scanners/anonymize_helpers/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
from typing import Dict, List, Sequence

import spacy
from presidio_analyzer import (
Expand All @@ -11,17 +10,20 @@
)
from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer
from presidio_analyzer.nlp_engine import NlpEngine, NlpEngineProvider
from spacy.cli import download # type: ignore

from .ner_mapping import NERConfig
from .predefined_recognizers import _get_predefined_recognizers
from .predefined_recognizers.zh import CustomPatternRecognizer
from .regex_patterns import RegexPattern
from .transformers_recognizer import TransformersRecognizer


def _add_recognizers(
registry: RecognizerRegistry,
regex_groups,
custom_names,
supported_languages: List[str] = ["en"],
regex_groups: list[RegexPattern],
custom_names: list[str],
supported_languages: list[str] = ["en"],
) -> RecognizerRegistry:
"""
Create a RecognizerRegistry and populate it with regex patterns and custom names.
Expand All @@ -37,13 +39,13 @@ def _add_recognizers(
for language in supported_languages:
# custom recognizer per language
if len(custom_names) > 0:
custom_recognier = PatternRecognizer
custom_recognizer = PatternRecognizer

if language == "zh":
custom_recognier = CustomPatternRecognizer
custom_recognizer = CustomPatternRecognizer

registry.add_recognizer(
custom_recognier(
custom_recognizer(
supported_entity="CUSTOM",
supported_language=language,
deny_list=custom_names,
Expand All @@ -56,20 +58,21 @@ def _add_recognizers(

for pattern_data in regex_groups:
languages = pattern_data["languages"] or ["en"]

label = pattern_data["name"]
reuse = pattern_data.get("reuse", False)

patterns = map(
lambda exp: Pattern(name=label, regex=exp, score=pattern_data["score"]),
pattern_data.get("expressions", []) or [],
patterns: list[Pattern] = list(
map(
lambda exp: Pattern(name=label, regex=exp, score=pattern_data["score"]),
pattern_data.get("expressions", []) or [],
)
)

for language in languages:
if language not in supported_languages:
continue

if reuse:
if isinstance(reuse, dict):
new_recognizer = copy.deepcopy(
registry.get_recognizers(language=reuse["language"], entities=[reuse["name"]])[
0
Expand All @@ -90,13 +93,13 @@ def _add_recognizers(
return registry


def _get_nlp_engine(languages: List[str] = ["en"]) -> NlpEngine:
def _get_nlp_engine(languages: list[str] = ["en"]) -> NlpEngine:
models = []

for language in languages:
if not spacy.util.is_package(f"{language}_core_web_sm"):
# Use small spacy model, for faster inference.
spacy.cli.download(f"{language}_core_web_sm")
download(f"{language}_core_web_sm")
models.append({"lang_code": language, "model_name": f"{language}_core_web_sm"})

configuration = {"nlp_engine_name": "spacy", "models": models}
Expand All @@ -106,17 +109,17 @@ def _get_nlp_engine(languages: List[str] = ["en"]) -> NlpEngine:

def get_transformers_recognizer(
*,
recognizer_conf: Dict,
recognizer_conf: NERConfig,
use_onnx: bool = False,
supported_language: str = "en",
) -> EntityRecognizer:
"""
This function loads a transformers recognizer given a recognizer configuration.
Args:
recognizer_conf (Dict): Configuration to recognize PII data.
use_onnx (bool): Whether to use the ONNX version of the model. Default is False.
supported_language (str): The language to use for the recognizer. Default is "en".
recognizer_conf: Configuration to recognize PII data.
use_onnx: Whether to use the ONNX version of the model. Default is False.
supported_language: The language to use for the recognizer. Default is "en".
"""
model = recognizer_conf.get("DEFAULT_MODEL")
supported_entities = recognizer_conf.get("PRESIDIO_SUPPORTED_ENTITIES")
Expand All @@ -134,9 +137,9 @@ def get_transformers_recognizer(

def get_analyzer(
recognizer: EntityRecognizer,
regex_groups,
custom_names: Sequence[str],
supported_languages: List[str] = ["en"],
regex_groups: list[RegexPattern],
custom_names: list[str],
supported_languages: list[str] = ["en"],
) -> AnalyzerEngine:
nlp_engine = _get_nlp_engine(languages=supported_languages)

Expand Down
60 changes: 38 additions & 22 deletions llm_guard/input_scanners/anonymize_helpers/faker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import string
from typing import Optional
from typing import Callable, cast

from faker import Faker

fake = Faker()
fake.seed_instance(100)

_entity_faker_map = {

_entity_faker_map: dict[str, Callable[[], str]] = {
# Global entities
"CREDIT_CARD": fake.credit_card_number,
"EMAIL_ADDRESS": fake.email,
Expand All @@ -19,37 +22,50 @@
"UUID": fake.uuid4,
"LOCATION": fake.city,
"DATE_TIME": fake.date,
"CRYPTO": lambda _: "bc1"
+ "".join(fake.random_choices(string.ascii_lowercase + string.digits, length=26)),
"NRP": lambda _: str(fake.random_number(digits=8, fix_len=True)),
"MEDICAL_LICENSE": lambda _: fake.bothify(text="??######").upper(),
"CRYPTO": cast(
Callable[[], str],
lambda _: "bc1"
+ "".join(fake.random_choices(string.ascii_lowercase + string.digits, length=26)),
),
"NRP": cast(Callable[[], str], lambda _: str(fake.random_number(digits=8, fix_len=True))),
"MEDICAL_LICENSE": cast(Callable[[], str], lambda _: fake.bothify(text="??######").upper()),
# US-specific entities
"US_BANK_NUMBER": fake.bban,
"US_SSN": fake.ssn,
"US_DRIVER_LICENSE": lambda _: str(fake.random_number(digits=9, fix_len=True)),
"US_ITIN": lambda _: fake.bothify(text="9##-7#-####"),
"US_PASSPORT": lambda _: fake.bothify(text="#####??").upper(),
"US_DRIVER_LICENSE": cast(
Callable[[], str], lambda _: str(fake.random_number(digits=9, fix_len=True))
),
"US_ITIN": cast(Callable[[], str], lambda _: fake.bothify(text="9##-7#-####")),
"US_PASSPORT": cast(Callable[[], str], lambda _: fake.bothify(text="#####??").upper()),
# UK-specific entities
"UK_NHS": lambda _: str(fake.random_number(digits=10, fix_len=True)),
"UK_NHS": cast(Callable[[], str], lambda _: str(fake.random_number(digits=10, fix_len=True))),
# Spain-specific entities
"ES_NIF": lambda _: fake.bothify(text="########?").upper(),
"ES_NIF": cast(Callable[[], str], lambda _: fake.bothify(text="########?").upper()),
# Italy-specific entities
"IT_FISCAL_CODE": lambda _: fake.bothify(text="??????##?##?###?").upper(),
"IT_DRIVER_LICENSE": lambda _: fake.bothify(text="?A#######?").upper(),
"IT_VAT_CODE": lambda _: fake.bothify(text="IT???????????"),
"IT_PASSPORT": lambda _: str(fake.random_number(digits=9, fix_len=True)),
"IT_IDENTITY_CARD": lambda _: lambda _: str(fake.random_number(digits=7, fix_len=True)),
"IT_FISCAL_CODE": cast(
Callable[[], str], lambda _: fake.bothify(text="??????##?##?###?").upper()
),
"IT_DRIVER_LICENSE": cast(Callable[[], str], lambda _: fake.bothify(text="?A#######?").upper()),
"IT_VAT_CODE": cast(Callable[[], str], lambda _: fake.bothify(text="IT???????????")),
"IT_PASSPORT": cast(
Callable[[], str], lambda _: str(fake.random_number(digits=9, fix_len=True))
),
"IT_IDENTITY_CARD": cast(
Callable[[], str], lambda _: lambda _: str(fake.random_number(digits=7, fix_len=True))
),
# Singapore-specific entities
"SG_NRIC_FIN": lambda _: fake.bothify(text="????####?").upper(),
"SG_NRIC_FIN": cast(Callable[[], str], lambda _: fake.bothify(text="????####?").upper()),
# Australia-specific entities
"AU_ABN": lambda _: str(fake.random_number(digits=11, fix_len=True)),
"AU_ACN": lambda _: str(fake.random_number(digits=9, fix_len=True)),
"AU_TFN": lambda _: str(fake.random_number(digits=9, fix_len=True)),
"AU_MEDICARE": lambda _: str(fake.random_number(digits=10, fix_len=True)),
"AU_ABN": cast(Callable[[], str], lambda _: str(fake.random_number(digits=11, fix_len=True))),
"AU_ACN": cast(Callable[[], str], lambda _: str(fake.random_number(digits=9, fix_len=True))),
"AU_TFN": cast(Callable[[], str], lambda _: str(fake.random_number(digits=9, fix_len=True))),
"AU_MEDICARE": cast(
Callable[[], str], lambda _: str(fake.random_number(digits=10, fix_len=True))
),
}


def get_fake_value(entity_type: str) -> Optional[str]:
def get_fake_value(entity_type: str) -> str | None:
if entity_type not in _entity_faker_map:
return None

Expand Down
30 changes: 23 additions & 7 deletions llm_guard/input_scanners/anonymize_helpers/ner_mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
from typing import TypedDict

from llm_guard.model import Model

BERT_BASE_NER_CONF = {

class NERConfig(TypedDict):
PRESIDIO_SUPPORTED_ENTITIES: list[str]
DEFAULT_MODEL: Model
LABELS_TO_IGNORE: list[str]
DEFAULT_EXPLANATION: str
DATASET_TO_PRESIDIO_MAPPING: dict[str, str]
MODEL_TO_PRESIDIO_MAPPING: dict[str, str]
CHUNK_OVERLAP_SIZE: int
CHUNK_SIZE: int
ID_SCORE_MULTIPLIER: float
ID_ENTITY_NAME: str


BERT_BASE_NER_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -37,7 +53,7 @@
"ID_ENTITY_NAME": "ID",
}

BERT_LARGE_NER_CONF = {
BERT_LARGE_NER_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -74,7 +90,7 @@
"ID_ENTITY_NAME": "ID",
}

BERT_ZH_NER_CONF = {
BERT_ZH_NER_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -110,7 +126,7 @@
"ID_ENTITY_NAME": "ID",
}

DISTILBERT_AI4PRIVACY_v2_CONF = {
DISTILBERT_AI4PRIVACY_v2_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -203,7 +219,7 @@
"ID_ENTITY_NAME": "ID",
}

DEBERTA_AI4PRIVACY_v2_CONF = {
DEBERTA_AI4PRIVACY_v2_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -296,7 +312,7 @@
"ID_ENTITY_NAME": "ID",
}

MDEBERTA_AI4PRIVACY_v2_CONF = {
MDEBERTA_AI4PRIVACY_v2_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down Expand Up @@ -389,7 +405,7 @@
"ID_ENTITY_NAME": "ID",
}

DEBERTA_LAKSHYAKH93_CONF = {
DEBERTA_LAKSHYAKH93_CONF: NERConfig = {
"PRESIDIO_SUPPORTED_ENTITIES": [
"LOCATION",
"PERSON",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List
from typing import Callable

from presidio_analyzer import EntityRecognizer


def _get_predefined_recognizers(language: str) -> List[EntityRecognizer]:
def _get_predefined_recognizers(language: str) -> list[Callable[..., EntityRecognizer]]:
if language == "zh":
from .zh import CryptoRecognizer, EmailRecognizer, IpRecognizer, PhoneRecognizer

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import re
from typing import List

from presidio_analyzer import Pattern, PatternRecognizer


class CustomPatternRecognizer(PatternRecognizer):
def _deny_list_to_regex(self, deny_list: List[str]) -> Pattern:
def _deny_list_to_regex(self, deny_list: list[str]) -> Pattern:
"""
Convert a list of characters to a matching regex.
Expand Down
39 changes: 34 additions & 5 deletions llm_guard/input_scanners/anonymize_helpers/regex_patterns.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
from typing import Dict, List, Optional
from __future__ import annotations

from typing import TypedDict

from llm_guard.util import get_logger

LOGGER = get_logger()

DEFAULT_REGEX_PATTERNS = [

class DefaultRegexPatterns(TypedDict):
name: str
expressions: list[str]
examples: list[str]
context: list[str]
score: float
languages: list[str]


class RegexPatternsReuse(TypedDict):
name: str
languages: list[str]
reuse: dict[str, str]


class RegexPattern(TypedDict):
name: str
expressions: list[str]
context: list[str]
score: float
languages: list[str]
reuse: dict[str, str] | None


DEFAULT_REGEX_PATTERNS: list[DefaultRegexPatterns | RegexPatternsReuse] = [
{
"expressions": [
r"(?:(4\d{3}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4})|(3[47]\d{2}[-\s]?\d{6}[-\s]?\d{5})|(3(?:0[0-5]|[68]\d)\d{11}))"
Expand Down Expand Up @@ -166,11 +193,13 @@
]


def get_regex_patterns(regex_patterns: Optional[List[Dict]] = None) -> List[Dict]:
def get_regex_patterns(
regex_patterns: list[DefaultRegexPatterns | RegexPatternsReuse] | None = None,
) -> list[RegexPattern]:
if not regex_patterns:
regex_patterns = DEFAULT_REGEX_PATTERNS

result = []
result: list[RegexPattern] = []
for group in regex_patterns:
result.append(
{
Expand All @@ -179,7 +208,7 @@ def get_regex_patterns(regex_patterns: Optional[List[Dict]] = None) -> List[Dict
"context": group.get("context", []),
"score": group.get("score", 0.75),
"languages": group.get("languages", ["en"]),
"reuse": group.get("reuse", False),
"reuse": group.get("reuse", None),
}
)
LOGGER.debug("Loaded regex pattern", group_name=group["name"])
Expand Down
Loading

0 comments on commit 9def3cb

Please sign in to comment.