From 9fcb7f145059c1ac7124308a3fc3f3fab3d3912a Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Fri, 15 Aug 2025 20:04:39 +0000 Subject: [PATCH 01/13] fix: support vector features in PyOD anomaly detection batching logic --- .../apache_beam/ml/anomaly/detectors/pyod_adapter.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 10bd25514761..ce9c1b4a84f6 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -77,7 +77,13 @@ def run_inference( ) -> Iterable[PredictionResult]: np_batch = [] for row in batch: - np_batch.append(np.fromiter(row, dtype=np.float64)) + features = [] + for value in row: + if isinstance(value, (list, tuple, np.ndarray)): + features.extend(value) + else: + features.append(value) + np_batch.append(np.array(features, dtype=np.float64)) # stack a batch of samples into a 2-D array for better performance vectorized_batch = np.stack(np_batch, axis=0) From e1e3364746c7d7e3f9b0794cf87b99ec107c0d2c Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Sat, 16 Aug 2025 18:23:19 +0000 Subject: [PATCH 02/13] refactor: use generator and np.fromiter for efficient batching in PyOD anomaly detection --- .../ml/anomaly/detectors/pyod_adapter.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index ce9c1b4a84f6..90eaccd1ea31 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -75,15 +75,14 @@ def run_inference( model: PyODBaseDetector, inference_args: Optional[dict[str, Any]] = None ) -> Iterable[PredictionResult]: - np_batch = [] - for row in batch: - features = [] - for value in row: + def _flatten_row(row_values): + for value in row_values: if isinstance(value, (list, tuple, np.ndarray)): - features.extend(value) + yield from value else: - features.append(value) - np_batch.append(np.array(features, dtype=np.float64)) + yield value + + np_batch = [np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch] # stack a batch of samples into a 2-D array for better performance vectorized_batch = np.stack(np_batch, axis=0) From 8742cd1ee5f7872d21e767316d1d23804683ab19 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Mon, 18 Aug 2025 18:35:00 +0000 Subject: [PATCH 03/13] Fixing Linting issues --- .../ml/anomaly/detectors/pyod_adapter.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 90eaccd1ea31..75a1fb76746f 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -70,10 +70,10 @@ def load_model(self) -> PyODBaseDetector: return pickle.load(file) def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, Any]] = None ) -> Iterable[PredictionResult]: def _flatten_row(row_values): for value in row_values: @@ -82,13 +82,20 @@ def _flatten_row(row_values): else: yield value - np_batch = [np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch] + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) + for row in batch + ] # stack a batch of samples into a 2-D array for better performance vectorized_batch = np.stack(np_batch, axis=0) predictions = model.decision_function(vectorized_batch) - return _convert_to_result(batch, predictions, model_id=self._model_uri) + return _convert_to_result( + batch, + predictions, + model_id=self._model_uri + ) class PyODFactory(): @@ -110,5 +117,8 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: assert (isinstance(m, PyODBaseDetector)) threshold = float(m.threshold_) detector = OfflineDetector( - model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs) # type: ignore[arg-type] + model_handler, + threshold_criterion=FixedThreshold(threshold), + **kwargs + ) # type: ignore[arg-type] return detector From fff2cef765c8b0c45565b2190e4c6a99a28a37a2 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Mon, 18 Aug 2025 20:28:55 +0000 Subject: [PATCH 04/13] style: fix indentation and line-length in pyod_adapter.py --- .../ml/anomaly/detectors/pyod_adapter.py | 153 ++++++++++-------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 75a1fb76746f..1a706867703e 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -39,86 +39,97 @@ KeyedModelHandler = specifiable( # type: ignore[misc] KeyedModelHandler, on_demand_init=False, - just_in_time_init=False) + just_in_time_init=False, +) + _PostProcessingModelHandler = specifiable( # type: ignore[misc] _PostProcessingModelHandler, on_demand_init=False, - just_in_time_init=False) + just_in_time_init=False, +) @specifiable -class PyODModelHandler(ModelHandler[beam.Row, - PredictionResult, - PyODBaseDetector]): - """Implementation of the ModelHandler interface for PyOD [#]_ Models. - - The ModelHandler processes input data as `beam.Row` objects. - - **NOTE:** This API and its implementation are currently under active - development and may not be backward compatible. - - Args: - model_uri: The URI specifying the location of the pickled PyOD model. - - .. [#] https://github.com/yzhao062/pyod - """ - def __init__(self, model_uri: str): - self._model_uri = model_uri - - def load_model(self) -> PyODBaseDetector: - file = FileSystems.open(self._model_uri, 'rb') - return pickle.load(file) - - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None - ) -> Iterable[PredictionResult]: - def _flatten_row(row_values): - for value in row_values: - if isinstance(value, (list, tuple, np.ndarray)): - yield from value - else: - yield value - - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) - for row in batch - ] - - # stack a batch of samples into a 2-D array for better performance - vectorized_batch = np.stack(np_batch, axis=0) - predictions = model.decision_function(vectorized_batch) - - return _convert_to_result( - batch, - predictions, - model_id=self._model_uri - ) - - -class PyODFactory(): - @staticmethod - def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """A utility function to create OfflineDetector for a PyOD model. +class PyODModelHandler( + ModelHandler[beam.Row, PredictionResult, PyODBaseDetector] +): + """ModelHandler implementation for PyOD models. + + The ModelHandler processes input data as `beam.Row` objects. **NOTE:** This API and its implementation are currently under active development and may not be backward compatible. Args: - model_uri: The URI specifying the location of the pickled PyOD model. - **kwargs: Additional keyword arguments. + model_uri: The URI specifying the location of the pickled PyOD model. + + .. [#] https://github.com/yzhao062/pyod """ - model_handler = KeyedModelHandler( - PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( - OfflineDetector.score_prediction_adapter) - m = model_handler.load_model() - assert (isinstance(m, PyODBaseDetector)) - threshold = float(m.threshold_) - detector = OfflineDetector( - model_handler, - threshold_criterion=FixedThreshold(threshold), - **kwargs - ) # type: ignore[arg-type] - return detector + + def __init__(self, model_uri: str): + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + file = FileSystems.open(self._model_uri, "rb") + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, Any]] = None, + ) -> Iterable[PredictionResult]: + def _flatten_row(row_values): + for value in row_values: + if isinstance(value, (list, tuple, np.ndarray)): + yield from value + else: + yield value + + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch + ] + + # stack a batch of samples into a 2-D array for better performance + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) + + return _convert_to_result( + batch, + predictions, + model_id=self._model_uri, + ) + + +class PyODFactory: + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """A utility function to create OfflineDetector for a PyOD model. + + **NOTE:** This API and its implementation are currently under active + development and may not be backward compatible. + + Args: + model_uri: The URI specifying the location of the pickled + PyOD model. + **kwargs: Additional keyword arguments. + """ + model_handler = ( + KeyedModelHandler( + PyODModelHandler(model_uri=model_uri) + ).with_postprocess_fn( + OfflineDetector.score_prediction_adapter + ) + ) + + m = model_handler.load_model() + assert isinstance(m, PyODBaseDetector) + threshold = float(m.threshold_) + + detector = OfflineDetector( + model_handler, + threshold_criterion=FixedThreshold(threshold), + **kwargs, + ) # type: ignore[arg-type] + + return detector From c33d1bba9e803ad6d9c0783ac95935b8b3468140 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Tue, 19 Aug 2025 07:16:28 +0000 Subject: [PATCH 05/13] chore: apply yapf formatting to pyod_adapter.py --- .../ml/anomaly/detectors/pyod_adapter.py | 116 +++++++++--------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 1a706867703e..c9193da385a3 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -50,10 +50,10 @@ @specifiable -class PyODModelHandler( - ModelHandler[beam.Row, PredictionResult, PyODBaseDetector] -): - """ModelHandler implementation for PyOD models. +class PyODModelHandler(ModelHandler[beam.Row, + PredictionResult, + PyODBaseDetector]): + """ModelHandler implementation for PyOD models. The ModelHandler processes input data as `beam.Row` objects. @@ -65,46 +65,45 @@ class PyODModelHandler( .. [#] https://github.com/yzhao062/pyod """ - - def __init__(self, model_uri: str): - self._model_uri = model_uri - - def load_model(self) -> PyODBaseDetector: - file = FileSystems.open(self._model_uri, "rb") - return pickle.load(file) - - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None, - ) -> Iterable[PredictionResult]: - def _flatten_row(row_values): - for value in row_values: - if isinstance(value, (list, tuple, np.ndarray)): - yield from value - else: - yield value - - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch - ] - - # stack a batch of samples into a 2-D array for better performance - vectorized_batch = np.stack(np_batch, axis=0) - predictions = model.decision_function(vectorized_batch) - - return _convert_to_result( - batch, - predictions, - model_id=self._model_uri, - ) + def __init__(self, model_uri: str): + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + file = FileSystems.open(self._model_uri, "rb") + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, Any]] = None, + ) -> Iterable[PredictionResult]: + def _flatten_row(row_values): + for value in row_values: + if isinstance(value, (list, tuple, np.ndarray)): + yield from value + else: + yield value + + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch + ] + + # stack a batch of samples into a 2-D array for better performance + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) + + return _convert_to_result( + batch, + predictions, + model_id=self._model_uri, + ) class PyODFactory: - @staticmethod - def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """A utility function to create OfflineDetector for a PyOD model. + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """A utility function to create OfflineDetector for a PyOD model. **NOTE:** This API and its implementation are currently under active development and may not be backward compatible. @@ -114,22 +113,19 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: PyOD model. **kwargs: Additional keyword arguments. """ - model_handler = ( - KeyedModelHandler( - PyODModelHandler(model_uri=model_uri) - ).with_postprocess_fn( - OfflineDetector.score_prediction_adapter - ) - ) - - m = model_handler.load_model() - assert isinstance(m, PyODBaseDetector) - threshold = float(m.threshold_) - - detector = OfflineDetector( - model_handler, - threshold_criterion=FixedThreshold(threshold), - **kwargs, - ) # type: ignore[arg-type] - - return detector + model_handler = ( + KeyedModelHandler( + PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( + OfflineDetector.score_prediction_adapter)) + + m = model_handler.load_model() + assert isinstance(m, PyODBaseDetector) + threshold = float(m.threshold_) + + detector = OfflineDetector( + model_handler, + threshold_criterion=FixedThreshold(threshold), + **kwargs, + ) # type: ignore[arg-type] + + return detector From e204f024fdb17236fcb327c6c35eeb69e44dfa78 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Tue, 19 Aug 2025 11:57:07 +0000 Subject: [PATCH 06/13] style: format pyod_adapter to match repo yapf rules --- .../ml/anomaly/detectors/pyod_adapter.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index c9193da385a3..50f0ec298813 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -15,6 +15,12 @@ # limitations under the License. # +"""Utilities to adapt PyOD models for Beam's anomaly detection APIs. + +This module provides a ModelHandler implementation for PyOD detectors and a +factory for creating OfflineDetector wrappers around pickled PyOD models. +""" + import pickle from collections.abc import Iterable from collections.abc import Sequence @@ -66,6 +72,7 @@ class PyODModelHandler(ModelHandler[beam.Row, .. [#] https://github.com/yzhao062/pyod """ def __init__(self, model_uri: str): + super().__init__() self._model_uri = model_uri def load_model(self) -> PyODBaseDetector: @@ -78,6 +85,13 @@ def run_inference( model: PyODBaseDetector, inference_args: Optional[dict[str, Any]] = None, ) -> Iterable[PredictionResult]: + """Run inference on a batch of `beam.Row` examples. + + The handler supports vector features. Each input `beam.Row` is flattened + into a 1-D float array (expanding list/tuple/ndarray fields) and the + batch is stacked into a 2-D numpy array which is passed to PyOD's + `decision_function`. + """ def _flatten_row(row_values): for value in row_values: if isinstance(value, (list, tuple, np.ndarray)): @@ -93,14 +107,16 @@ def _flatten_row(row_values): vectorized_batch = np.stack(np_batch, axis=0) predictions = model.decision_function(vectorized_batch) - return _convert_to_result( - batch, - predictions, - model_id=self._model_uri, - ) + return _convert_to_result(batch, predictions, model_id=self._model_uri) class PyODFactory: + """Factory helpers for creating OfflineDetector instances from PyOD models. + + The factory currently only exposes a convenience method to wrap a pickled + PyOD model into an OfflineDetector with a fixed threshold taken from the + trained model. + """ @staticmethod def create_detector(model_uri: str, **kwargs) -> OfflineDetector: """A utility function to create OfflineDetector for a PyOD model. @@ -123,9 +139,7 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: threshold = float(m.threshold_) detector = OfflineDetector( - model_handler, - threshold_criterion=FixedThreshold(threshold), - **kwargs, + model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs ) # type: ignore[arg-type] return detector From 03c537700fd0d492d7e47f27faf4e345b587904f Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Tue, 19 Aug 2025 18:23:12 +0000 Subject: [PATCH 07/13] pyod: support vector features and fix lint/format issues in pyod_adapter.py --- .../ml/anomaly/detectors/pyod_adapter.py | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 50f0ec298813..c7b2533bc719 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -41,6 +41,7 @@ from apache_beam.ml.inference.utils import _convert_to_result from pyod.models.base import BaseDetector as PyODBaseDetector + # Turn the used ModelHandler into specifiable, but without lazy init. KeyedModelHandler = specifiable( # type: ignore[misc] KeyedModelHandler, @@ -56,10 +57,8 @@ @specifiable -class PyODModelHandler(ModelHandler[beam.Row, - PredictionResult, - PyODBaseDetector]): - """ModelHandler implementation for PyOD models. +class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, PyODBaseDetector]): + """ModelHandler implementation for PyOD models. The ModelHandler processes input data as `beam.Row` objects. @@ -71,55 +70,56 @@ class PyODModelHandler(ModelHandler[beam.Row, .. [#] https://github.com/yzhao062/pyod """ - def __init__(self, model_uri: str): - super().__init__() - self._model_uri = model_uri - - def load_model(self) -> PyODBaseDetector: - file = FileSystems.open(self._model_uri, "rb") - return pickle.load(file) - - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None, - ) -> Iterable[PredictionResult]: - """Run inference on a batch of `beam.Row` examples. + + def __init__(self, model_uri: str): + super().__init__() + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + file = FileSystems.open(self._model_uri, "rb") + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, Any]] = None, + ) -> Iterable[PredictionResult]: + """Run inference on a batch of `beam.Row` examples. The handler supports vector features. Each input `beam.Row` is flattened into a 1-D float array (expanding list/tuple/ndarray fields) and the batch is stacked into a 2-D numpy array which is passed to PyOD's `decision_function`. """ - def _flatten_row(row_values): - for value in row_values: - if isinstance(value, (list, tuple, np.ndarray)): - yield from value - else: - yield value - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch - ] + def _flatten_row(row_values): + for value in row_values: + if isinstance(value, (list, tuple, np.ndarray)): + yield from value + else: + yield value + + np_batch = [np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch] - # stack a batch of samples into a 2-D array for better performance - vectorized_batch = np.stack(np_batch, axis=0) - predictions = model.decision_function(vectorized_batch) + # stack a batch of samples into a 2-D array for better performance + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) - return _convert_to_result(batch, predictions, model_id=self._model_uri) + return _convert_to_result(batch, predictions, model_id=self._model_uri) class PyODFactory: - """Factory helpers for creating OfflineDetector instances from PyOD models. + """Factory helpers for creating OfflineDetector instances from PyOD models. The factory currently only exposes a convenience method to wrap a pickled PyOD model into an OfflineDetector with a fixed threshold taken from the trained model. """ - @staticmethod - def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """A utility function to create OfflineDetector for a PyOD model. + + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """A utility function to create OfflineDetector for a PyOD model. **NOTE:** This API and its implementation are currently under active development and may not be backward compatible. @@ -129,17 +129,17 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: PyOD model. **kwargs: Additional keyword arguments. """ - model_handler = ( - KeyedModelHandler( - PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( - OfflineDetector.score_prediction_adapter)) + model_handler = ( + KeyedModelHandler(PyODModelHandler(model_uri=model_uri)) + .with_postprocess_fn(OfflineDetector.score_prediction_adapter) + ) - m = model_handler.load_model() - assert isinstance(m, PyODBaseDetector) - threshold = float(m.threshold_) + m = model_handler.load_model() + assert isinstance(m, PyODBaseDetector) + threshold = float(m.threshold_) - detector = OfflineDetector( - model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs - ) # type: ignore[arg-type] + detector = OfflineDetector( + model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs + ) # type: ignore[arg-type] - return detector + return detector From f4189e7d141a7edfb8dd582e6ec8d570ff1158f1 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Wed, 20 Aug 2025 17:01:23 +0000 Subject: [PATCH 08/13] chore: format pyod_adapter with yapf (vector feature flattening only) --- .../ml/anomaly/detectors/pyod_adapter.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index c7b2533bc719..5d37aa93a492 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - """Utilities to adapt PyOD models for Beam's anomaly detection APIs. This module provides a ModelHandler implementation for PyOD detectors and a @@ -41,7 +40,6 @@ from apache_beam.ml.inference.utils import _convert_to_result from pyod.models.base import BaseDetector as PyODBaseDetector - # Turn the used ModelHandler into specifiable, but without lazy init. KeyedModelHandler = specifiable( # type: ignore[misc] KeyedModelHandler, @@ -57,7 +55,8 @@ @specifiable -class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, PyODBaseDetector]): +class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, + PyODBaseDetector]): """ModelHandler implementation for PyOD models. The ModelHandler processes input data as `beam.Row` objects. @@ -76,8 +75,8 @@ def __init__(self, model_uri: str): self._model_uri = model_uri def load_model(self) -> PyODBaseDetector: - file = FileSystems.open(self._model_uri, "rb") - return pickle.load(file) + with FileSystems.open(self._model_uri, "rb") as file: + return pickle.load(file) def run_inference( self, @@ -100,7 +99,9 @@ def _flatten_row(row_values): else: yield value - np_batch = [np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch] + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch + ] # stack a batch of samples into a 2-D array for better performance vectorized_batch = np.stack(np_batch, axis=0) @@ -129,17 +130,18 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: PyOD model. **kwargs: Additional keyword arguments. """ - model_handler = ( - KeyedModelHandler(PyODModelHandler(model_uri=model_uri)) - .with_postprocess_fn(OfflineDetector.score_prediction_adapter) - ) + model_handler = (KeyedModelHandler( + PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( + OfflineDetector.score_prediction_adapter)) m = model_handler.load_model() assert isinstance(m, PyODBaseDetector) threshold = float(m.threshold_) detector = OfflineDetector( - model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs + model_handler, + threshold_criterion=FixedThreshold(threshold), + **kwargs, ) # type: ignore[arg-type] return detector From d685cb4dadd0a43a708067791a84d7c39c9decf3 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Thu, 21 Aug 2025 14:34:33 +0000 Subject: [PATCH 09/13] Fixing linter and formate tests --- .../ml/anomaly/detectors/pyod_adapter.py | 94 +++++++++---------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 5d37aa93a492..c2e1861dbc95 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # + """Utilities to adapt PyOD models for Beam's anomaly detection APIs. This module provides a ModelHandler implementation for PyOD detectors and a @@ -55,9 +56,10 @@ @specifiable -class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, +class PyODModelHandler(ModelHandler[beam.Row, + PredictionResult, PyODBaseDetector]): - """ModelHandler implementation for PyOD models. + """ModelHandler implementation for PyOD models. The ModelHandler processes input data as `beam.Row` objects. @@ -69,58 +71,55 @@ class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, .. [#] https://github.com/yzhao062/pyod """ - - def __init__(self, model_uri: str): - super().__init__() - self._model_uri = model_uri - - def load_model(self) -> PyODBaseDetector: - with FileSystems.open(self._model_uri, "rb") as file: - return pickle.load(file) - - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None, - ) -> Iterable[PredictionResult]: - """Run inference on a batch of `beam.Row` examples. + def __init__(self, model_uri: str): + super().__init__() + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + with FileSystems.open(self._model_uri, "rb") as file: + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, Any]] = None, + ) -> Iterable[PredictionResult]: + """Run inference on a batch of `beam.Row` examples. The handler supports vector features. Each input `beam.Row` is flattened into a 1-D float array (expanding list/tuple/ndarray fields) and the batch is stacked into a 2-D numpy array which is passed to PyOD's `decision_function`. """ + def _flatten_row(row_values): + for value in row_values: + if isinstance(value, (list, tuple, np.ndarray)): + yield from value + else: + yield value - def _flatten_row(row_values): - for value in row_values: - if isinstance(value, (list, tuple, np.ndarray)): - yield from value - else: - yield value - - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch - ] + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch + ] - # stack a batch of samples into a 2-D array for better performance - vectorized_batch = np.stack(np_batch, axis=0) - predictions = model.decision_function(vectorized_batch) + # stack a batch of samples into a 2-D array for better performance + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) - return _convert_to_result(batch, predictions, model_id=self._model_uri) + return _convert_to_result(batch, predictions, model_id=self._model_uri) class PyODFactory: - """Factory helpers for creating OfflineDetector instances from PyOD models. + """Factory helpers for creating OfflineDetector instances from PyOD models. The factory currently only exposes a convenience method to wrap a pickled PyOD model into an OfflineDetector with a fixed threshold taken from the trained model. """ - - @staticmethod - def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """A utility function to create OfflineDetector for a PyOD model. + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """A utility function to create OfflineDetector for a PyOD model. **NOTE:** This API and its implementation are currently under active development and may not be backward compatible. @@ -130,18 +129,19 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: PyOD model. **kwargs: Additional keyword arguments. """ - model_handler = (KeyedModelHandler( + model_handler = ( + KeyedModelHandler( PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( OfflineDetector.score_prediction_adapter)) - m = model_handler.load_model() - assert isinstance(m, PyODBaseDetector) - threshold = float(m.threshold_) + m = model_handler.load_model() + assert isinstance(m, PyODBaseDetector) + threshold = float(m.threshold_) - detector = OfflineDetector( - model_handler, - threshold_criterion=FixedThreshold(threshold), - **kwargs, - ) # type: ignore[arg-type] + detector = OfflineDetector( + model_handler, + threshold_criterion=FixedThreshold(threshold), + **kwargs, + ) # type: ignore[arg-type] - return detector + return detector From 25d26198f613edf446a4eafa61243232b1a2ff78 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Thu, 21 Aug 2025 18:03:59 +0000 Subject: [PATCH 10/13] fix(pyod): support vector features; clean mypy unused ignores --- .../ml/anomaly/detectors/pyod_adapter.py | 39 ++++++++----------- .../ml/inference/xgboost_inference_test.py | 2 +- .../apache_beam/ml/transforms/handlers.py | 5 ++- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index c2e1861dbc95..28bfa87b7086 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -35,6 +35,7 @@ from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.thresholds import FixedThreshold from apache_beam.ml.inference.base import KeyedModelHandler +from typing import cast, Any from apache_beam.ml.inference.base import ModelHandler from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import _PostProcessingModelHandler @@ -113,35 +114,27 @@ def _flatten_row(row_values): class PyODFactory: """Factory helpers for creating OfflineDetector instances from PyOD models. - The factory currently only exposes a convenience method to wrap a pickled - PyOD model into an OfflineDetector with a fixed threshold taken from the - trained model. - """ + The factory currently only exposes a convenience method to wrap a pickled + PyOD model into an OfflineDetector with a fixed threshold taken from the + trained model. + """ @staticmethod def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """A utility function to create OfflineDetector for a PyOD model. - - **NOTE:** This API and its implementation are currently under active - development and may not be backward compatible. - - Args: - model_uri: The URI specifying the location of the pickled - PyOD model. - **kwargs: Additional keyword arguments. - """ - model_handler = ( - KeyedModelHandler( - PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( - OfflineDetector.score_prediction_adapter)) + """Create an OfflineDetector for a pickled PyOD model located at model_uri. - m = model_handler.load_model() + Args: + model_uri: URI to pickled PyOD model. + **kwargs: Extra keyword args forwarded to OfflineDetector. + """ + handler = KeyedModelHandler( + PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( + OfflineDetector.score_prediction_adapter) + m = handler.load_model() assert isinstance(m, PyODBaseDetector) threshold = float(m.threshold_) - detector = OfflineDetector( - model_handler, + cast(KeyedModelHandler[Any, beam.Row, Any, Any], handler), threshold_criterion=FixedThreshold(threshold), **kwargs, - ) # type: ignore[arg-type] - + ) return detector diff --git a/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py b/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py index 8e3fb77615cc..8050c4268a80 100644 --- a/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py @@ -52,7 +52,7 @@ def _compare_prediction_result(a: PredictionResult, b: PredictionResult): example_equal = numpy.array_equal(a.example.todense(), b.example.todense()) else: - example_equal = numpy.array_equal(a.example, b.example) # type: ignore[arg-type] + example_equal = numpy.array_equal(a.example, b.example) if isinstance(a.inference, dict): return all( x == y for x, y in zip(a.inference.values(), diff --git a/sdks/python/apache_beam/ml/transforms/handlers.py b/sdks/python/apache_beam/ml/transforms/handlers.py index 1e752049f6e5..97bdb4e083b4 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers.py +++ b/sdks/python/apache_beam/ml/transforms/handlers.py @@ -336,9 +336,10 @@ def _get_transformed_data_schema( if feature_type == schema_pb2.FeatureType.FLOAT: transformed_types[name] = Sequence[np.float32] elif feature_type == schema_pb2.FeatureType.INT: - transformed_types[name] = Sequence[np.int64] # type: ignore[assignment] + # Use built-in int for portability across numpy versions. + transformed_types[name] = Sequence[int] else: - transformed_types[name] = Sequence[bytes] # type: ignore[assignment] + transformed_types[name] = Sequence[bytes] return transformed_types def expand( From 79ffcd4c108ac7029da8124092794290d21b57f3 Mon Sep 17 00:00:00 2001 From: Vijay Raj Date: Thu, 21 Aug 2025 18:32:10 +0000 Subject: [PATCH 11/13] revert: restore handlers and xgboost test to master state (only keep pyod adapter change) --- .../apache_beam/ml/inference/xgboost_inference_test.py | 2 +- sdks/python/apache_beam/ml/transforms/handlers.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py b/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py index 8050c4268a80..8e3fb77615cc 100644 --- a/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/xgboost_inference_test.py @@ -52,7 +52,7 @@ def _compare_prediction_result(a: PredictionResult, b: PredictionResult): example_equal = numpy.array_equal(a.example.todense(), b.example.todense()) else: - example_equal = numpy.array_equal(a.example, b.example) + example_equal = numpy.array_equal(a.example, b.example) # type: ignore[arg-type] if isinstance(a.inference, dict): return all( x == y for x, y in zip(a.inference.values(), diff --git a/sdks/python/apache_beam/ml/transforms/handlers.py b/sdks/python/apache_beam/ml/transforms/handlers.py index 97bdb4e083b4..1e752049f6e5 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers.py +++ b/sdks/python/apache_beam/ml/transforms/handlers.py @@ -336,10 +336,9 @@ def _get_transformed_data_schema( if feature_type == schema_pb2.FeatureType.FLOAT: transformed_types[name] = Sequence[np.float32] elif feature_type == schema_pb2.FeatureType.INT: - # Use built-in int for portability across numpy versions. - transformed_types[name] = Sequence[int] + transformed_types[name] = Sequence[np.int64] # type: ignore[assignment] else: - transformed_types[name] = Sequence[bytes] + transformed_types[name] = Sequence[bytes] # type: ignore[assignment] return transformed_types def expand( From 33c3295f469c9a0c019a0e82e1d7b1c2e5d65f9d Mon Sep 17 00:00:00 2001 From: devnull Date: Sat, 23 Aug 2025 12:19:42 +0000 Subject: [PATCH 12/13] Fix(pyod): reformat and lint pyod_adapter.py (imports, typing, formatting) --- .../ml/anomaly/detectors/pyod_adapter.py | 146 ++++++++++-------- 1 file changed, 80 insertions(+), 66 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 28bfa87b7086..7a07faf8de58 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -15,32 +15,35 @@ # limitations under the License. # + """Utilities to adapt PyOD models for Beam's anomaly detection APIs. This module provides a ModelHandler implementation for PyOD detectors and a factory for creating OfflineDetector wrappers around pickled PyOD models. """ +# pylint: disable=bad-indentation,line-too-long + import pickle -from collections.abc import Iterable -from collections.abc import Sequence -from typing import Any -from typing import Optional +from collections.abc import Iterable, Sequence +from typing import Optional, cast import numpy as np +from pyod.models.base import BaseDetector as PyODBaseDetector import apache_beam as beam from apache_beam.io.filesystems import FileSystems from apache_beam.ml.anomaly.detectors.offline import OfflineDetector from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.thresholds import FixedThreshold -from apache_beam.ml.inference.base import KeyedModelHandler -from typing import cast, Any -from apache_beam.ml.inference.base import ModelHandler -from apache_beam.ml.inference.base import PredictionResult -from apache_beam.ml.inference.base import _PostProcessingModelHandler +from apache_beam.ml.inference.base import ( + KeyedModelHandler, + ModelHandler, + PredictionResult, + _PostProcessingModelHandler, +) from apache_beam.ml.inference.utils import _convert_to_result -from pyod.models.base import BaseDetector as PyODBaseDetector + # Turn the used ModelHandler into specifiable, but without lazy init. KeyedModelHandler = specifiable( # type: ignore[misc] @@ -57,10 +60,10 @@ @specifiable -class PyODModelHandler(ModelHandler[beam.Row, - PredictionResult, - PyODBaseDetector]): - """ModelHandler implementation for PyOD models. +class PyODModelHandler( + ModelHandler[beam.Row, PredictionResult, PyODBaseDetector] +): + """ModelHandler implementation for PyOD models. The ModelHandler processes input data as `beam.Row` objects. @@ -72,69 +75,80 @@ class PyODModelHandler(ModelHandler[beam.Row, .. [#] https://github.com/yzhao062/pyod """ - def __init__(self, model_uri: str): - super().__init__() - self._model_uri = model_uri - - def load_model(self) -> PyODBaseDetector: - with FileSystems.open(self._model_uri, "rb") as file: - return pickle.load(file) - - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, Any]] = None, - ) -> Iterable[PredictionResult]: - """Run inference on a batch of `beam.Row` examples. + + def __init__(self, model_uri: str): + super().__init__() + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + with FileSystems.open(self._model_uri, "rb") as file: + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, object]] = None, + ) -> Iterable[PredictionResult]: + """Run inference on a batch of `beam.Row` examples. The handler supports vector features. Each input `beam.Row` is flattened into a 1-D float array (expanding list/tuple/ndarray fields) and the batch is stacked into a 2-D numpy array which is passed to PyOD's `decision_function`. """ - def _flatten_row(row_values): - for value in row_values: - if isinstance(value, (list, tuple, np.ndarray)): - yield from value - else: - yield value - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch - ] + def _flatten_row(row_values): + for value in row_values: + if isinstance(value, (list, tuple, np.ndarray)): + yield from value + else: + yield value - # stack a batch of samples into a 2-D array for better performance - vectorized_batch = np.stack(np_batch, axis=0) - predictions = model.decision_function(vectorized_batch) + np_batch = [ + np.fromiter(_flatten_row(row), dtype=np.float64) + for row in batch + ] - return _convert_to_result(batch, predictions, model_id=self._model_uri) + # Stack a batch of samples into a 2-D array for performance. + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) + return _convert_to_result( + batch, + predictions, + model_id=self._model_uri, + ) -class PyODFactory: - """Factory helpers for creating OfflineDetector instances from PyOD models. - The factory currently only exposes a convenience method to wrap a pickled - PyOD model into an OfflineDetector with a fixed threshold taken from the - trained model. - """ - @staticmethod - def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """Create an OfflineDetector for a pickled PyOD model located at model_uri. +class PyODFactory: + """Factory helpers to create OfflineDetector instances from PyOD models. - Args: - model_uri: URI to pickled PyOD model. - **kwargs: Extra keyword args forwarded to OfflineDetector. + The factory exposes a convenience method to wrap a pickled PyOD model into + an OfflineDetector with a fixed threshold taken from the trained model. """ - handler = KeyedModelHandler( - PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( - OfflineDetector.score_prediction_adapter) - m = handler.load_model() - assert isinstance(m, PyODBaseDetector) - threshold = float(m.threshold_) - detector = OfflineDetector( - cast(KeyedModelHandler[Any, beam.Row, Any, Any], handler), - threshold_criterion=FixedThreshold(threshold), - **kwargs, - ) - return detector + + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """Create an OfflineDetector for a pickled PyOD model. + + Args: + model_uri: URI to pickled PyOD model. + **kwargs: Extra keyword args forwarded to OfflineDetector. + """ + handler = KeyedModelHandler( + PyODModelHandler(model_uri=model_uri) + ).with_postprocess_fn( + OfflineDetector.score_prediction_adapter + ) + + m = handler.load_model() + assert isinstance(m, PyODBaseDetector) + threshold = float(m.threshold_) + + detector = OfflineDetector( + cast(object, handler), + threshold_criterion=FixedThreshold(threshold), + **kwargs, + ) + return detector From 12fe54c87d980bca13221cd6378b45cfaf241f3f Mon Sep 17 00:00:00 2001 From: devnull Date: Wed, 27 Aug 2025 10:32:08 +0000 Subject: [PATCH 13/13] chore(ml-anomaly): refine PyOD adapter style, remove pylint disables, compact inference logic --- .../ml/anomaly/detectors/pyod_adapter.py | 86 ++++++------------- 1 file changed, 26 insertions(+), 60 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 7a07faf8de58..73dad118e17a 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -18,12 +18,10 @@ """Utilities to adapt PyOD models for Beam's anomaly detection APIs. -This module provides a ModelHandler implementation for PyOD detectors and a -factory for creating OfflineDetector wrappers around pickled PyOD models. +Provides a ModelHandler implementation for PyOD detectors and a factory for +creating OfflineDetector wrappers around pickled PyOD models. """ -# pylint: disable=bad-indentation,line-too-long - import pickle from collections.abc import Iterable, Sequence from typing import Optional, cast @@ -60,20 +58,16 @@ @specifiable -class PyODModelHandler( - ModelHandler[beam.Row, PredictionResult, PyODBaseDetector] -): +class PyODModelHandler(ModelHandler[beam.Row, PredictionResult, PyODBaseDetector]): """ModelHandler implementation for PyOD models. - The ModelHandler processes input data as `beam.Row` objects. - - **NOTE:** This API and its implementation are currently under active - development and may not be backward compatible. + Processes `beam.Row` inputs, flattening vector-like fields (list/tuple/ndarray) + into a single numeric feature vector per row before invoking the PyOD + model's ``decision_function``. + NOTE: Experimental; interface may change. Args: - model_uri: The URI specifying the location of the pickled PyOD model. - - .. [#] https://github.com/yzhao062/pyod + model_uri: Location of the pickled PyOD model. """ def __init__(self, model_uri: str): @@ -81,21 +75,19 @@ def __init__(self, model_uri: str): self._model_uri = model_uri def load_model(self) -> PyODBaseDetector: - with FileSystems.open(self._model_uri, "rb") as file: + with FileSystems.open(self._model_uri, 'rb') as file: return pickle.load(file) - def run_inference( - self, - batch: Sequence[beam.Row], - model: PyODBaseDetector, - inference_args: Optional[dict[str, object]] = None, + def run_inference( # type: ignore[override] + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[dict[str, object]] = None, ) -> Iterable[PredictionResult]: - """Run inference on a batch of `beam.Row` examples. + """Run inference on a batch of rows. - The handler supports vector features. Each input `beam.Row` is flattened - into a 1-D float array (expanding list/tuple/ndarray fields) and the - batch is stacked into a 2-D numpy array which is passed to PyOD's - `decision_function`. + Flattens vector-like fields, stacks the batch into a 2-D array and + returns PredictionResult objects. """ def _flatten_row(row_values): @@ -105,50 +97,24 @@ def _flatten_row(row_values): else: yield value - np_batch = [ - np.fromiter(_flatten_row(row), dtype=np.float64) - for row in batch - ] - - # Stack a batch of samples into a 2-D array for performance. + np_batch = [np.fromiter(_flatten_row(row), dtype=np.float64) for row in batch] vectorized_batch = np.stack(np_batch, axis=0) predictions = model.decision_function(vectorized_batch) - - return _convert_to_result( - batch, - predictions, - model_id=self._model_uri, - ) + return _convert_to_result(batch, predictions, model_id=self._model_uri) class PyODFactory: - """Factory helpers to create OfflineDetector instances from PyOD models. - - The factory exposes a convenience method to wrap a pickled PyOD model into - an OfflineDetector with a fixed threshold taken from the trained model. - """ + """Factory helpers to create OfflineDetector instances from PyOD models.""" @staticmethod def create_detector(model_uri: str, **kwargs) -> OfflineDetector: - """Create an OfflineDetector for a pickled PyOD model. - - Args: - model_uri: URI to pickled PyOD model. - **kwargs: Extra keyword args forwarded to OfflineDetector. - """ - handler = KeyedModelHandler( - PyODModelHandler(model_uri=model_uri) - ).with_postprocess_fn( - OfflineDetector.score_prediction_adapter - ) - - m = handler.load_model() - assert isinstance(m, PyODBaseDetector) - threshold = float(m.threshold_) - - detector = OfflineDetector( + handler = KeyedModelHandler(PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( + OfflineDetector.score_prediction_adapter) + model = handler.load_model() + assert isinstance(model, PyODBaseDetector) + threshold = float(model.threshold_) + return OfflineDetector( cast(object, handler), threshold_criterion=FixedThreshold(threshold), **kwargs, ) - return detector