11"""
22Comprehensive text cleaning and preprocessing pipeline.
33"""
4+ import asyncio
45import logging
56import os
6- import threading
77from concurrent .futures import ThreadPoolExecutor
88from typing import List , Tuple , Optional
99
1212
1313logger = logging .getLogger (__name__ )
1414
15- # Thread-local storage for passing detected language to pipeline steps
16- # that need it (e.g. fuzzy date replacement). Each thread running
17- # _process_single sets its own value, so there is no cross-thread leaking.
18- _thread_ctx = threading .local ()
19-
20-
2115class TextCleaner :
2216
2317 def __init__ (self , cfg : Optional [TextCleanerConfig ] = None ):
@@ -84,16 +78,25 @@ def __init__(self, cfg: Optional[TextCleanerConfig] = None):
8478 ner_backend = self .cfg .ner_backend ,
8579 gliner_config = gliner_config ,
8680 torch_model_names = torch_model_names ,
81+ ner_batch_size = self .cfg .ner_batch_size ,
82+ ensemble_models = dict (self .cfg .ner_ensemble ) if self .cfg .ner_ensemble is not None else None ,
83+ ensemble_default_keys = tuple (self .cfg .ner_ensemble_default_keys ) if self .cfg .ner_ensemble_default_keys is not None else None ,
8784 )
8885 else :
8986 self .GeneralNER = None
9087
9188 self .batch_size = 8
9289 self ._pipeline = []
90+ self ._post_fuzzy_pipeline = []
9391 self ._init_pipeline ()
9492
9593 def _init_pipeline (self ):
96- """Build the pipeline steps list based on config."""
94+ """Build the pipeline steps lists based on config.
95+
96+ Steps that require language context (fuzzy date replacement) are split
97+ into pre- and post-fuzzy lists. _process_single calls them in order:
98+ self._pipeline → fuzzy (explicit, with ctx) → self._post_fuzzy_pipeline.
99+ """
97100 cfg = self .cfg
98101
99102 if cfg .check_fix_bad_unicode :
@@ -110,22 +113,25 @@ def _init_pipeline(self):
110113 self ._pipeline .append (self ._replace_emails )
111114 if cfg .check_replace_dates :
112115 self ._pipeline .append (self ._replace_dates )
113- if cfg . check_fuzzy_replace_dates :
114- self . _pipeline . append ( self . _fuzzy_replace_dates )
116+ # fuzzy_replace_dates runs here (between replace_dates and replace_years)
117+ # but is called explicitly in _process_single with language context.
115118 if cfg .check_replace_years :
116- self ._pipeline .append (self ._replace_years )
119+ self ._post_fuzzy_pipeline .append (self ._replace_years )
117120 if cfg .check_replace_phone_numbers :
118- self ._pipeline .append (self ._replace_phone_numbers )
121+ self ._post_fuzzy_pipeline .append (self ._replace_phone_numbers )
119122 if cfg .check_replace_numbers :
120- self ._pipeline .append (self ._replace_numbers )
123+ self ._post_fuzzy_pipeline .append (self ._replace_numbers )
121124 if cfg .check_replace_currency_symbols :
122- self ._pipeline .append (self ._replace_currency_symbols )
125+ self ._post_fuzzy_pipeline .append (self ._replace_currency_symbols )
123126 if cfg .check_remove_isolated_letters :
124- self ._pipeline .append (self ._remove_isolated_letters )
127+ self ._post_fuzzy_pipeline .append (self ._remove_isolated_letters )
125128 if cfg .check_remove_isolated_special_symbols :
126- self ._pipeline .append (self ._remove_isolated_special_symbols )
129+ self ._post_fuzzy_pipeline .append (self ._remove_isolated_special_symbols )
127130 if cfg .check_normalize_whitespace :
128- self ._pipeline .append (self ._normalize_whitespace )
131+ self ._post_fuzzy_pipeline .append (self ._normalize_whitespace )
132+ # User-provided custom steps — appended after all built-in steps
133+ for step_fn in cfg .custom_pipeline_steps :
134+ self ._post_fuzzy_pipeline .append (step_fn )
129135
130136 def _detect_language (self , text : str ) -> Optional [str ]:
131137 """Detect language as a pure function (no instance mutation)."""
@@ -151,15 +157,30 @@ def _process_single(self, text: str) -> Tuple[str, Optional[str], Optional[str]]
151157 # Detect language (pure function, thread-safe)
152158 language = self ._detect_language (text )
153159
154- current_text = text
160+ # Pass language explicitly through pipeline context dict
161+ ctx = {"language" : language }
155162
156- # Store language in thread-local so pipeline steps can access it
157- _thread_ctx .language = language
163+ current_text = text
158164
159- # Apply non-NER pipeline steps
165+ # Pre-fuzzy pipeline steps (unicode fix → html → urls → emails → dates)
160166 for step in self ._pipeline :
161167 current_text = step (current_text )
162168
169+ # Fuzzy date replacement — requires language context, called explicitly
170+ # to avoid thread-local; positioned between replace_dates and replace_years.
171+ if self .cfg .check_fuzzy_replace_dates :
172+ lang = ctx .get ("language" )
173+ current_text = self .ProcessDateTime .fuzzy_replace_dates (
174+ current_text ,
175+ replace_with = self .cfg .replace_with_dates ,
176+ score_cutoff = self .cfg .fuzzy_date_score_cutoff ,
177+ language = lang ,
178+ )
179+
180+ # Post-fuzzy pipeline steps (years → phones → numbers → symbols → whitespace)
181+ for step in self ._post_fuzzy_pipeline :
182+ current_text = step (current_text )
183+
163184 # NER processing
164185 if self .cfg .check_ner_process and self .GeneralNER is not None :
165186 current_text = self .GeneralNER .ner_process (
@@ -217,6 +238,30 @@ def process_batch(self, texts: List[str], batch_size: int = None) -> List[Tuple[
217238
218239 return results
219240
241+ async def aprocess_batch (self , texts : List [str ], batch_size : int = None ) -> List [Tuple [str , Optional [str ], Optional [str ]]]:
242+ """Async version of process_batch for use with asyncio-based frameworks (FastAPI, aiohttp).
243+
244+ Runs process_batch in a thread-pool executor so it doesn't block the event loop.
245+ """
246+ loop = asyncio .get_running_loop ()
247+ return await loop .run_in_executor (None , self .process_batch , texts , batch_size )
248+
249+ def warmup (self , languages : Optional [List [str ]] = None ) -> None :
250+ """Pre-load NER models to avoid first-request latency.
251+
252+ Args:
253+ languages: Language names to pre-load (e.g. ``['ENGLISH', 'DUTCH']``).
254+ If ``None``, pre-loads models for all supported languages.
255+ """
256+ if not self .cfg .check_ner_process or self .GeneralNER is None :
257+ return
258+ langs = languages or list (self .cfg .supported_languages )
259+ for lang in langs :
260+ try :
261+ self .GeneralNER .load_language (lang )
262+ except Exception as e :
263+ logger .warning ("warmup: skipping %s: %s" , lang , e )
264+
220265 def process (self , text : str ) -> Tuple [str , Optional [str ], Optional [str ]]:
221266 """Process a single text. Maintains backward compatibility.
222267
@@ -248,12 +293,12 @@ def _replace_emails(self, text):
248293 def _replace_dates (self , text ):
249294 return self .ProcessDateTime .replace_dates (text , replace_with = self .cfg .replace_with_dates )
250295
251- def _fuzzy_replace_dates (self , text ):
296+ def _fuzzy_replace_dates (self , text , language = None ):
252297 return self .ProcessDateTime .fuzzy_replace_dates (
253298 text ,
254299 replace_with = self .cfg .replace_with_dates ,
255300 score_cutoff = self .cfg .fuzzy_date_score_cutoff ,
256- language = getattr ( _thread_ctx , ' language' , 'ENGLISH' ) ,
301+ language = language ,
257302 )
258303
259304 def _replace_years (self , text ):
0 commit comments