diff --git a/alibi_detect/od/_lof.py b/alibi_detect/od/_lof.py new file mode 100644 index 000000000..0196dae40 --- /dev/null +++ b/alibi_detect/od/_lof.py @@ -0,0 +1,217 @@ +from typing import Callable, Union, Optional, Dict, Any, List, Tuple +from typing import TYPE_CHECKING +from typing_extensions import Literal + +import numpy as np + +from alibi_detect.base import outlier_prediction_dict +from alibi_detect.exceptions import _catch_error as catch_error +from alibi_detect.od.base import TransformProtocol, TransformProtocolType +from alibi_detect.base import BaseDetector, FitMixin, ThresholdMixin +from alibi_detect.od.pytorch import LOFTorch, Ensembler +from alibi_detect.od.base import get_aggregator, get_normalizer, NormalizerLiterals, AggregatorLiterals +from alibi_detect.utils.frameworks import BackendValidator +from alibi_detect.version import __version__ + + +if TYPE_CHECKING: + import torch + + +backends = { + 'pytorch': (LOFTorch, Ensembler) +} + + +class LOF(BaseDetector, FitMixin, ThresholdMixin): + def __init__( + self, + k: Union[int, np.ndarray, List[int], Tuple[int]], + kernel: Optional[Callable] = None, + normalizer: Optional[Union[TransformProtocolType, NormalizerLiterals]] = 'PValNormalizer', + aggregator: Union[TransformProtocol, AggregatorLiterals] = 'AverageAggregator', + backend: Literal['pytorch'] = 'pytorch', + device: Optional[Union[Literal['cuda', 'gpu', 'cpu'], 'torch.device']] = None, + ) -> None: + """ + Local Outlier Factor (LOF) outlier detector. + + The LOF detector is a non-parametric method for outlier detection. It computes the local density + deviation of a given data point with respect to its neighbors. It considers as outliers the + samples that have a substantially lower density than their neighbors. + + The detector can be initialized with `k` a single value or an array of values. If `k` is a single value then + the score method uses the distance/kernel similarity to the k-th nearest neighbor. If `k` is an array of + values then the score method uses the distance/kernel similarity to each of the specified `k` neighbors. + In the latter case, an `aggregator` must be specified to aggregate the scores. + + Note that, in the multiple k case, a normalizer can be provided. If a normalizer is passed then it is fit in + the `infer_threshold` method and so this method must be called before the `predict` method. If this is not + done an exception is raised. If `k` is a single value then the predict method can be called without first + calling `infer_threshold` but only scores will be returned and not outlier predictions. + + Parameters + ---------- + k + Number of nearest neighbors to compute distance to. `k` can be a single value or + an array of integers. If an array is passed, an aggregator is required to aggregate + the scores. If `k` is a single value we compute the local outlier factor for that `k`. + Otherwise if `k` is a list then we compute and aggregate the local outlier factor for each + value in `k`. + kernel + Kernel function to use for outlier detection. If ``None``, `torch.cdist` is used. + Otherwise if a kernel is specified then instead of using `torch.cdist` the kernel + defines the k nearest neighbor distance. + normalizer + Normalizer to use for outlier detection. If ``None``, no normalization is applied. + For a list of available normalizers, see :mod:`alibi_detect.od.pytorch.ensemble`. + aggregator + Aggregator to use for outlier detection. Can be set to ``None`` if `k` is a single + value. For a list of available aggregators, see :mod:`alibi_detect.od.pytorch.ensemble`. + backend + Backend used for outlier detection. Defaults to ``'pytorch'``. Options are ``'pytorch'``. + device + Device type used. The default tries to use the GPU and falls back on CPU if needed. + Can be specified by passing either ``'cuda'``, ``'gpu'``, ``'cpu'`` or an instance of + ``torch.device``. + + Raises + ------ + ValueError + If `k` is an array and `aggregator` is None. + NotImplementedError + If choice of `backend` is not implemented. + """ + super().__init__() + + backend_str: str = backend.lower() + BackendValidator( + backend_options={'pytorch': ['pytorch']}, + construct_name=self.__class__.__name__ + ).verify_backend(backend_str) + + backend_cls, ensembler_cls = backends[backend] + ensembler = None + + if aggregator is None and isinstance(k, (list, np.ndarray, tuple)): + raise ValueError('If `k` is a `np.ndarray`, `list` or `tuple`, ' + 'the `aggregator` argument cannot be ``None``.') + + if isinstance(k, (list, np.ndarray, tuple)): + ensembler = ensembler_cls( + normalizer=get_normalizer(normalizer), + aggregator=get_aggregator(aggregator) + ) + + self.backend = backend_cls(k, kernel=kernel, ensembler=ensembler, device=device) + + # set metadata + self.meta['detector_type'] = 'outlier' + self.meta['data_type'] = 'numeric' + self.meta['online'] = False + + def fit(self, x_ref: np.ndarray) -> None: + """Fit the detector on reference data. + + Parameters + ---------- + x_ref + Reference data used to fit the detector. + """ + self.backend.fit(self.backend._to_tensor(x_ref)) + + @catch_error('NotFittedError') + @catch_error('ThresholdNotInferredError') + def score(self, x: np.ndarray) -> np.ndarray: + """Score `x` instances using the detector. + + Computes the local outlier factor for each point in `x`. This is the density of each point `x` + relative to those of its neighbors in `x_ref`. If `k` is an array of values then the score for + each `k` is aggregated using the ensembler. + + Parameters + ---------- + x + Data to score. The shape of `x` should be `(n_instances, n_features)`. + + Returns + ------- + Outlier scores. The shape of the scores is `(n_instances,)`. The higher the score, the more anomalous the \ + instance. + + Raises + ------ + NotFittedError + If called before detector has been fit. + ThresholdNotInferredError + If k is a list and a threshold was not inferred. + """ + score = self.backend.score(self.backend._to_tensor(x)) + score = self.backend._ensembler(score) + return self.backend._to_numpy(score) + + @catch_error('NotFittedError') + def infer_threshold(self, x: np.ndarray, fpr: float) -> None: + """Infer the threshold for the LOF detector. + + The threshold is computed so that the outlier detector would incorrectly classify `fpr` proportion of the + reference data as outliers. + + Parameters + ---------- + x + Reference data used to infer the threshold. + fpr + False positive rate used to infer the threshold. The false positive rate is the proportion of + instances in `x` that are incorrectly classified as outliers. The false positive rate should + be in the range ``(0, 1)``. + + Raises + ------ + ValueError + Raised if `fpr` is not in ``(0, 1)``. + NotFittedError + If called before detector has been fit. + """ + self.backend.infer_threshold(self.backend._to_tensor(x), fpr) + + @catch_error('NotFittedError') + @catch_error('ThresholdNotInferredError') + def predict(self, x: np.ndarray) -> Dict[str, Any]: + """Predict whether the instances in `x` are outliers or not. + + Scores the instances in `x` and if the threshold was inferred, returns the outlier labels and p-values as well. + + Parameters + ---------- + x + Data to predict. The shape of `x` should be `(n_instances, n_features)`. + + Returns + ------- + Dictionary with keys 'data' and 'meta'. 'data' contains the outlier scores. If threshold inference was \ + performed, 'data' also contains the threshold value, outlier labels and p-vals . The shape of the scores is \ + `(n_instances,)`. The higher the score, the more anomalous the instance. 'meta' contains information about \ + the detector. + + Raises + ------ + NotFittedError + If called before detector has been fit. + ThresholdNotInferredError + If k is a list and a threshold was not inferred. + """ + outputs = self.backend.predict(self.backend._to_tensor(x)) + output = outlier_prediction_dict() + output['data'] = { + **output['data'], + **self.backend._to_numpy(outputs) + } + output['meta'] = { + **output['meta'], + 'name': self.__class__.__name__, + 'detector_type': 'outlier', + 'online': False, + 'version': __version__, + } + return output diff --git a/alibi_detect/od/pytorch/__init__.py b/alibi_detect/od/pytorch/__init__.py index b5be430c2..e18f7de4e 100644 --- a/alibi_detect/od/pytorch/__init__.py +++ b/alibi_detect/od/pytorch/__init__.py @@ -1,6 +1,7 @@ from alibi_detect.utils.missing_optional_dependency import import_optional KNNTorch = import_optional('alibi_detect.od.pytorch.knn', ['KNNTorch']) +LOFTorch = import_optional('alibi_detect.od.pytorch.lof', ['LOFTorch']) MahalanobisTorch = import_optional('alibi_detect.od.pytorch.mahalanobis', ['MahalanobisTorch']) KernelPCATorch, LinearPCATorch = import_optional('alibi_detect.od.pytorch.pca', ['KernelPCATorch', 'LinearPCATorch']) Ensembler = import_optional('alibi_detect.od.pytorch.ensemble', ['Ensembler']) diff --git a/alibi_detect/od/pytorch/lof.py b/alibi_detect/od/pytorch/lof.py new file mode 100644 index 000000000..055af2d18 --- /dev/null +++ b/alibi_detect/od/pytorch/lof.py @@ -0,0 +1,164 @@ +from typing import Optional, Union, List, Tuple +from typing_extensions import Literal +import numpy as np +import torch + +from alibi_detect.od.pytorch.ensemble import Ensembler +from alibi_detect.od.pytorch.base import TorchOutlierDetector + + +class LOFTorch(TorchOutlierDetector): + def __init__( + self, + k: Union[np.ndarray, List, Tuple, int], + kernel: Optional[torch.nn.Module] = None, + ensembler: Optional[Ensembler] = None, + device: Optional[Union[Literal['cuda', 'gpu', 'cpu'], 'torch.device']] = None, + ): + """PyTorch backend for LOF detector. + + Parameters + ---------- + k + Number of nearest neighbors used to compute the local outlier factor. `k` can be a single + value or an array of integers. If `k` is a single value the score method uses the + distance/kernel similarity to the `k`-th nearest neighbor. If `k` is a list then it uses + the distance/kernel similarity to each of the specified `k` neighbors. + kernel + If a kernel is specified then instead of using `torch.cdist` the kernel defines the `k` nearest + neighbor distance. + ensembler + If `k` is an array of integers then the ensembler must not be ``None``. Should be an instance + of :py:obj:`alibi_detect.od.pytorch.ensemble.ensembler`. Responsible for combining + multiple scores into a single score. + device + Device type used. The default tries to use the GPU and falls back on CPU if needed. + Can be specified by passing either ``'cuda'``, ``'gpu'``, ``'cpu'`` or an instance of + ``torch.device``. + """ + TorchOutlierDetector.__init__(self, device=device) + self.kernel = kernel + self.ensemble = isinstance(k, (np.ndarray, list, tuple)) + self.ks = torch.tensor(k) if self.ensemble else torch.tensor([k], device=self.device) + self.ensembler = ensembler + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """Detect if `x` is an outlier. + + Parameters + ---------- + x + `torch.Tensor` with leading batch dimension. + + Returns + ------- + `torch.Tensor` of ``bool`` values with leading batch dimension. + + Raises + ------ + ThresholdNotInferredError + If called before detector has had `infer_threshold` method called. + """ + raw_scores = self.score(x) + scores = self._ensembler(raw_scores) + if not torch.jit.is_scripting(): + self.check_threshold_inferred() + preds = scores > self.threshold + return preds + + def _make_mask(self, reachabilities: torch.Tensor): + """Generate a mask for computing the average reachability. + + If k is an array then we need to compute the average reachability for each k separately. To do + this we use a mask to weight the reachability of each k-close neighbor by 1/k and the rest to 0. + """ + mask = torch.zeros_like(reachabilities[0], device=self.device) + for i, k in enumerate(self.ks): + mask[:k, i] = torch.ones(k, device=self.device)/k + return mask + + def _compute_K(self, x, y): + """Compute the distance matrix matrix between `x` and `y`.""" + return torch.exp(-self.kernel(x, y)) if self.kernel is not None else torch.cdist(x, y) + + def score(self, x: torch.Tensor) -> torch.Tensor: + """Computes the score of `x` + + Parameters + ---------- + x + The tensor of instances. First dimension corresponds to batch. + + Returns + ------- + Tensor of scores for each element in `x`. + + Raises + ------ + NotFittedError + If called before detector has been fit. + """ + self.check_fitted() + + # compute the distance matrix between x and x_ref + K = self._compute_K(x, self.x_ref) + + # compute k nearest neighbors for maximum k in self.ks + max_k = torch.max(self.ks) + bot_k_items = torch.topk(K, int(max_k), dim=1, largest=False) + bot_k_inds, bot_k_dists = bot_k_items.indices, bot_k_items.values + + # To compute the reachabilities we get the k-distances of each object in the instances + # k nearest neighbors. Then we take the maximum of their k-distances and the distance + # to the instance. + lower_bounds = self.knn_dists_ref[bot_k_inds] + reachabilities = torch.max(bot_k_dists[:, :, None], lower_bounds) + + # Compute the average reachability for each instance. We use a mask to manage each k in + # self.ks separately. + mask = self._make_mask(reachabilities) + avg_reachabilities = (reachabilities*mask[None, :, :]).sum(1) + + # Compute the LOF score for each instance. Note we don't take 1/avg_reachabilities as + # avg_reachabilities is the denominator in the LOF formula. + factors = (self.ref_inv_avg_reachabilities[bot_k_inds] * mask[None, :, :]).sum(1) + lofs = (avg_reachabilities * factors) + return lofs if self.ensemble else lofs[:, 0] + + def fit(self, x_ref: torch.Tensor): + """Fits the detector + + Parameters + ---------- + x_ref + The Dataset tensor. + """ + # compute the distance matrix + K = self._compute_K(x_ref, x_ref) + # set diagonal to max distance to prevent torch.topk from returning the instance itself + K += torch.eye(len(K), device=self.device) * torch.max(K) + + # compute k nearest neighbors for maximum k in self.ks + max_k = torch.max(self.ks) + bot_k_items = torch.topk(K, int(max_k), dim=1, largest=False) + bot_k_inds, bot_k_dists = bot_k_items.indices, bot_k_items.values + + # store the k-distances for each instance for each k. + self.knn_dists_ref = bot_k_dists[:, self.ks-1] + + # To compute the reachabilities we get the k-distances of each object in the instances + # k nearest neighbors. Then we take the maximum of their k-distances and the distance + # to the instance. + lower_bounds = self.knn_dists_ref[bot_k_inds] + reachabilities = torch.max(bot_k_dists[:, :, None], lower_bounds) + + # Compute the average reachability for each instance. We use a mask to manage each k in + # self.ks separately. + mask = self._make_mask(reachabilities) + avg_reachabilities = (reachabilities*mask[None, :, :]).sum(1) + + # Compute the inverse average reachability for each instance. + self.ref_inv_avg_reachabilities = 1/avg_reachabilities + + self.x_ref = x_ref + self._set_fitted() diff --git a/alibi_detect/od/tests/test__knn/test__knn_backend.py b/alibi_detect/od/tests/test__knn/test__knn_backend.py index 1c9727116..37f8c3216 100644 --- a/alibi_detect/od/tests/test__knn/test__knn_backend.py +++ b/alibi_detect/od/tests/test__knn/test__knn_backend.py @@ -56,7 +56,7 @@ def test_knn_torch_backend_ensemble(ensembler): def test_knn_torch_backend_ensemble_ts(tmp_path, ensembler): """ - Test the knn torch backend can be initalized as an ensemble and + Test the knn torch backend can be initialized as an ensemble and torchscripted, as well as saved and loaded to and from disk. """ @@ -78,7 +78,7 @@ def test_knn_torch_backend_ensemble_ts(tmp_path, ensembler): def test_knn_torch_backend_ts(tmp_path): """ - Test the knn torch backend can be initalized and torchscripted, as well as + Test the knn torch backend can be initialized and torchscripted, as well as saved and loaded to and from disk. """ diff --git a/alibi_detect/od/tests/test__lof/test__lof.py b/alibi_detect/od/tests/test__lof/test__lof.py new file mode 100644 index 000000000..f46d126d4 --- /dev/null +++ b/alibi_detect/od/tests/test__lof/test__lof.py @@ -0,0 +1,269 @@ +import pytest +import numpy as np +import torch + +from alibi_detect.od._lof import LOF +from alibi_detect.od.pytorch.ensemble import AverageAggregator, TopKAggregator, MaxAggregator, \ + MinAggregator, ShiftAndScaleNormalizer, PValNormalizer +from alibi_detect.exceptions import NotFittedError, ThresholdNotInferredError + +from sklearn.datasets import make_moons + + +def make_lof_detector(k=5, aggregator=None, normalizer=None): + lof_detector = LOF( + k=k, aggregator=aggregator, + normalizer=normalizer + ) + x_ref = np.random.randn(100, 2) + lof_detector.fit(x_ref) + lof_detector.infer_threshold(x_ref, 0.1) + return lof_detector + + +def test_unfitted_lof_single_score(): + lof_detector = LOF(k=10) + x = np.array([[0, 10], [0.1, 0]]) + x_ref = np.random.randn(100, 2) + + # test infer_threshold raises exception when not fitted + with pytest.raises(NotFittedError) as err: + _ = lof_detector.infer_threshold(x_ref, 0.1) + assert str(err.value) == 'LOF has not been fit!' + + # test score raises exception when not fitted + with pytest.raises(NotFittedError) as err: + _ = lof_detector.score(x) + assert str(err.value) == 'LOF has not been fit!' + + # test predict raises exception when not fitted + with pytest.raises(NotFittedError) as err: + _ = lof_detector.predict(x) + assert str(err.value) == 'LOF has not been fit!' + + +def test_fitted_lof_score(): + """ + Test fitted but not threshold inferred non-ensemble detectors can still score data using the predict method. + Unlike the ensemble detectors, the non-ensemble detectors do not require the ensembler to be fit in the + infer_threshold method. See the test_fitted_lof_ensemble_score test for the ensemble case. + """ + lof_detector = LOF(k=10) + x_ref = np.random.randn(100, 2) + lof_detector.fit(x_ref) + x = np.array([[0, 10], [0.1, 0]]) + y = lof_detector.predict(x) + y = y['data'] + assert y['instance_score'][0] > y['instance_score'][1] + assert not y['threshold_inferred'] + assert y['threshold'] is None + assert y['is_outlier'] is None + assert y['p_value'] is None + + +def test_fitted_lof_ensemble_score(): + """ + Test fitted but not threshold inferred ensemble detectors correctly raise an error when calling + the predict method. This is because the ensembler is fit in the infer_threshold method. + """ + lof_detector = LOF(k=[10, 14, 18]) + x_ref = np.random.randn(100, 2) + lof_detector.fit(x_ref) + x = np.array([[0, 10], [0.1, 0]]) + with pytest.raises(ThresholdNotInferredError): + lof_detector.predict(x) + + with pytest.raises(ThresholdNotInferredError): + lof_detector.score(x) + + +def test_incorrect_lof_ensemble_init(): + # test lof ensemble with aggregator passed as None raises exception + + with pytest.raises(ValueError) as err: + LOF(k=[8, 9, 10], aggregator=None) + assert str(err.value) == ('If `k` is a `np.ndarray`, `list` or `tuple`, ' + 'the `aggregator` argument cannot be ``None``.') + + +def test_fitted_lof_predict(): + """ + Test that a detector fitted on data and with threshold inferred correctly, will score + and label outliers, as well as return the p-values using the predict method. Also Check + that the score method gives the same results. + """ + + lof_detector = make_lof_detector(k=10) + x_ref = np.random.randn(100, 2) + lof_detector.infer_threshold(x_ref, 0.1) + x = np.array([[0, 10], [0, 0.1]]) + + y = lof_detector.predict(x) + y = y['data'] + scores = lof_detector.score(x) + assert np.all(y['instance_score'] == scores) + assert y['instance_score'][0] > y['instance_score'][1] + assert y['threshold_inferred'] + assert y['threshold'] is not None + assert y['p_value'].all() + assert (y['is_outlier'] == [True, False]).all() + + +@pytest.mark.parametrize("aggregator", [AverageAggregator, lambda: TopKAggregator(k=7), + MaxAggregator, MinAggregator]) +@pytest.mark.parametrize("normalizer", [ShiftAndScaleNormalizer, PValNormalizer, lambda: None]) +def test_unfitted_lof_ensemble(aggregator, normalizer): + lof_detector = LOF( + k=[8, 9, 10], + aggregator=aggregator(), + normalizer=normalizer() + ) + x = np.array([[0, 10], [0.1, 0]]) + + # Test unfit lof ensemble raises exception when calling predict method. + with pytest.raises(NotFittedError) as err: + _ = lof_detector.predict(x) + assert str(err.value) == 'LOF has not been fit!' + + +@pytest.mark.parametrize("aggregator", [AverageAggregator, lambda: TopKAggregator(k=7), + MaxAggregator, MinAggregator]) +@pytest.mark.parametrize("normalizer", [ShiftAndScaleNormalizer, PValNormalizer, lambda: None]) +def test_fitted_lof_ensemble(aggregator, normalizer): + lof_detector = LOF( + k=[8, 9, 10], + aggregator=aggregator(), + normalizer=normalizer() + ) + x_ref = np.random.randn(100, 2) + lof_detector.fit(x_ref) + x = np.array([[0, 10], [0, 0.1]]) + + # test ensemble raises ThresholdNotInferredError if only fit and not threshold inferred and + # the normalizer is not None. + if normalizer() is not None: + with pytest.raises(ThresholdNotInferredError): + lof_detector.predict(x) + else: + lof_detector.predict(x) + + +@pytest.mark.parametrize("aggregator", [AverageAggregator, lambda: TopKAggregator(k=7), + MaxAggregator, MinAggregator]) +@pytest.mark.parametrize("normalizer", [ShiftAndScaleNormalizer, PValNormalizer, lambda: None]) +def test_fitted_lof_ensemble_predict(aggregator, normalizer): + lof_detector = make_lof_detector( + k=[8, 9, 10], + aggregator=aggregator(), + normalizer=normalizer() + ) + x = np.array([[0, 10], [0, 0.1]]) + + # test fitted detectors with inferred thresholds can score data using the predict method. + y = lof_detector.predict(x) + y = y['data'] + assert y['threshold_inferred'] + assert y['threshold'] is not None + assert y['p_value'].all() + assert (y['is_outlier'] == [True, False]).all() + + # test fitted detectors with inferred thresholds can score data using the score method. + scores = lof_detector.score(x) + assert np.all(y['instance_score'] == scores) + + +@pytest.mark.parametrize("aggregator", [AverageAggregator, lambda: TopKAggregator(k=7), + MaxAggregator, MinAggregator]) +@pytest.mark.parametrize("normalizer", [ShiftAndScaleNormalizer, PValNormalizer, lambda: None]) +def test_lof_ensemble_torch_script(aggregator, normalizer): + lof_detector = make_lof_detector(k=[5, 6, 7], aggregator=aggregator(), normalizer=normalizer()) + ts_lof = torch.jit.script(lof_detector.backend) + x = torch.tensor([[0, 10], [0, 0.1]]) + + # test torchscripted ensemble lof detector can be saved and loaded correctly. + y = ts_lof(x) + assert torch.all(y == torch.tensor([True, False])) + + +def test_lof_single_torchscript(): + lof_detector = make_lof_detector(k=5) + ts_lof = torch.jit.script(lof_detector.backend) + x = torch.tensor([[0, 10], [0, 0.1]]) + + # test torchscripted single lof detector can be saved and loaded correctly. + y = ts_lof(x) + assert torch.all(y == torch.tensor([True, False])) + + +@pytest.mark.parametrize("aggregator", [AverageAggregator, lambda: TopKAggregator(k=7), + MaxAggregator, MinAggregator, lambda: 'AverageAggregator', + lambda: 'TopKAggregator', lambda: 'MaxAggregator', + lambda: 'MinAggregator']) +@pytest.mark.parametrize("normalizer", [ShiftAndScaleNormalizer, PValNormalizer, lambda: None, + lambda: 'ShiftAndScaleNormalizer', lambda: 'PValNormalizer']) +def test_lof_ensemble_integration(tmp_path, aggregator, normalizer): + """Test lof ensemble detector on moons dataset. + + Tests ensemble lof detector with every combination of aggregator and normalizer on the moons dataset. + Fits and infers thresholds in each case. Verifies that the detector can correctly detect inliers + and outliers and that it can be serialized using the torchscript. + """ + + lof_detector = LOF( + k=[10, 14, 18], + aggregator=aggregator(), + normalizer=normalizer() + ) + X_ref, _ = make_moons(1001, shuffle=True, noise=0.05, random_state=None) + X_ref, x_inlier = X_ref[0:1000], X_ref[1000][None] + lof_detector.fit(X_ref) + lof_detector.infer_threshold(X_ref, 0.1) + result = lof_detector.predict(x_inlier) + result = result['data']['is_outlier'][0] + assert not result + + x_outlier = np.array([[-1, 1.5]]) + result = lof_detector.predict(x_outlier) + result = result['data']['is_outlier'][0] + assert result + + ts_lof = torch.jit.script(lof_detector.backend) + x = torch.tensor([x_inlier[0], x_outlier[0]], dtype=torch.float32) + y = ts_lof(x) + assert torch.all(y == torch.tensor([False, True])) + + ts_lof.save(tmp_path / 'lof.pt') + lof_detector = torch.load(tmp_path / 'lof.pt') + y = lof_detector(x) + assert torch.all(y == torch.tensor([False, True])) + + +def test_lof_integration(tmp_path): + """Test lof detector on moons dataset. + + Tests lof detector on the moons dataset. Fits and infers thresholds and verifies that the detector can + correctly detect inliers and outliers. Checks that it can be serialized using the torchscript. + """ + lof_detector = LOF(k=18) + X_ref, _ = make_moons(1001, shuffle=True, noise=0.05, random_state=None) + X_ref, x_inlier = X_ref[0:1000], X_ref[1000][None] + lof_detector.fit(X_ref) + lof_detector.infer_threshold(X_ref, 0.1) + result = lof_detector.predict(x_inlier) + result = result['data']['is_outlier'][0] + assert not result + + x_outlier = np.array([[-1, 1.5]]) + result = lof_detector.predict(x_outlier) + result = result['data']['is_outlier'][0] + assert result + + ts_lof = torch.jit.script(lof_detector.backend) + x = torch.tensor([x_inlier[0], x_outlier[0]], dtype=torch.float32) + y = ts_lof(x) + assert torch.all(y == torch.tensor([False, True])) + + ts_lof.save(tmp_path / 'lof.pt') + lof_detector = torch.load(tmp_path / 'lof.pt') + y = lof_detector(x) + assert torch.all(y == torch.tensor([False, True])) diff --git a/alibi_detect/od/tests/test__lof/test__lof_backend.py b/alibi_detect/od/tests/test__lof/test__lof_backend.py new file mode 100644 index 000000000..fd41e7c6d --- /dev/null +++ b/alibi_detect/od/tests/test__lof/test__lof_backend.py @@ -0,0 +1,220 @@ +import pytest +import torch + +from alibi_detect.od.pytorch.lof import LOFTorch +from alibi_detect.utils.pytorch.kernels import GaussianRBF +from alibi_detect.od.pytorch.ensemble import Ensembler, PValNormalizer, AverageAggregator +from alibi_detect.exceptions import NotFittedError, ThresholdNotInferredError + + +@pytest.fixture(scope='function') +def ensembler(request): + return Ensembler( + normalizer=PValNormalizer(), + aggregator=AverageAggregator() + ) + + +def test_lof_torch_backend(): + """ + Test the lof torch backend can be correctly initialized, fit and used to + predict outliers. + """ + + lof_torch = LOFTorch(k=5) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + outputs = lof_torch.predict(x) + assert outputs.instance_score.shape == (3, ) + assert outputs.is_outlier is None + assert outputs.p_value is None + scores = lof_torch.score(x) + assert torch.all(scores == outputs.instance_score) + + lof_torch.infer_threshold(x_ref, 0.1) + outputs = lof_torch.predict(x) + assert torch.all(outputs.is_outlier == torch.tensor([False, False, True])) + assert torch.all(lof_torch(x) == torch.tensor([False, False, True])) + + +def test_lof_torch_backend_ensemble(ensembler): + """ + Test the lof torch backend can be correctly initialized as an ensemble, fit + on data and used to predict outliers. + """ + + lof_torch = LOFTorch(k=[4, 5], ensembler=ensembler) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + lof_torch.infer_threshold(x_ref, 0.1) + outputs = lof_torch.predict(x) + assert torch.all(outputs.is_outlier == torch.tensor([False, False, True])) + assert torch.all(lof_torch(x) == torch.tensor([False, False, True])) + + +def test_lof_torch_backend_ensemble_ts(tmp_path, ensembler): + """ + Test the lof torch backend can be initialized as an ensemble and + torch scripted, as well as saved and loaded to and from disk. + """ + + lof_torch = LOFTorch(k=[4, 5], ensembler=ensembler) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + lof_torch.infer_threshold(x_ref, 0.1) + pred_1 = lof_torch(x) + lof_torch = torch.jit.script(lof_torch) + pred_2 = lof_torch(x) + assert torch.all(pred_1 == pred_2) + + lof_torch.save(tmp_path / 'lof_torch.pt') + lof_torch = torch.load(tmp_path / 'lof_torch.pt') + pred_2 = lof_torch(x) + assert torch.all(pred_1 == pred_2) + + +def test_lof_torch_backend_ts(tmp_path): + """ + Test the lof torch backend can be initialized and torch scripted, as well as + saved and loaded to and from disk. + """ + + lof_torch = LOFTorch(k=7) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + lof_torch.infer_threshold(x_ref, 0.1) + pred_1 = lof_torch(x) + lof_torch = torch.jit.script(lof_torch) + pred_2 = lof_torch(x) + assert torch.all(pred_1 == pred_2) + + lof_torch.save(tmp_path / 'lof_torch.pt') + lof_torch = torch.load(tmp_path / 'lof_torch.pt') + pred_2 = lof_torch(x) + assert torch.all(pred_1 == pred_2) + + +def test_lof_kernel(ensembler): + """ + Test the lof torch backend can be correctly initialized with a kernel, fit + on data and used to predict outliers. + """ + + kernel = GaussianRBF(sigma=torch.tensor((1))) + lof_torch = LOFTorch(k=[4, 5], kernel=kernel, ensembler=ensembler) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + lof_torch.infer_threshold(x_ref, 0.1) + outputs = lof_torch.predict(x) + assert torch.all(outputs.is_outlier == torch.tensor([0, 0, 1])) + assert torch.all(lof_torch(x) == torch.tensor([0, 0, 1])) + + +@pytest.mark.skip(reason="Can't convert GaussianRBF to torch script due to torch script type constraints") +def test_lof_kernel_ts(ensembler): + """ + Test the lof torch backend can be correctly initialized with a kernel, + and torch scripted, as well as saved and loaded to and from disk. + """ + + kernel = GaussianRBF(sigma=torch.tensor((0.25))) + lof_torch = LOFTorch(k=[4, 5], kernel=kernel, ensembler=ensembler) + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + x = torch.randn((3, 10)) * torch.tensor([[1], [1], [100]]) + lof_torch.infer_threshold(x_ref, 0.1) + pred_1 = lof_torch(x) + lof_torch = torch.jit.script(lof_torch) + pred_2 = lof_torch(x) + assert torch.all(pred_1 == pred_2) + + +def test_lof_torch_backend_ensemble_fit_errors(ensembler): + """Tests the correct errors are raised when using the LOFTorch backend as an ensemble.""" + lof_torch = LOFTorch(k=[4, 5], ensembler=ensembler) + + # Test that the backend raises an error if it is not fitted before + # calling forward method. + x = torch.randn((1, 10)) + with pytest.raises(NotFittedError) as err: + lof_torch(x) + assert str(err.value) == 'LOFTorch has not been fit!' + + # Test that the backend raises an error if it is not fitted before + # predicting. + with pytest.raises(NotFittedError) as err: + lof_torch.predict(x) + assert str(err.value) == 'LOFTorch has not been fit!' + + # Test the backend updates fitted flag on fit. + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + assert lof_torch.fitted + + # Test that the backend raises an if the forward method is called without the + # threshold being inferred. + with pytest.raises(ThresholdNotInferredError) as err: + lof_torch(x) + assert str(err.value) == 'LOFTorch has no threshold set, call `infer_threshold` to fit one!' + + # Test that the backend can call predict without the threshold being inferred. + with pytest.raises(ThresholdNotInferredError) as err: + lof_torch.predict(x) + assert str(err.value) == 'LOFTorch has no threshold set, call `infer_threshold` to fit one!' + + +def test_lof_torch_backend_fit_errors(): + """Tests the correct errors are raised when using the LOFTorch backend as a single detector.""" + lof_torch = LOFTorch(k=4) + + # Test that the backend raises an error if it is not fitted before + # calling forward method. + x = torch.randn((1, 10)) + with pytest.raises(NotFittedError) as err: + lof_torch(x) + assert str(err.value) == 'LOFTorch has not been fit!' + + # Test that the backend raises an error if it is not fitted before + # predicting. + with pytest.raises(NotFittedError) as err: + lof_torch.predict(x) + assert str(err.value) == 'LOFTorch has not been fit!' + + # Test the backend updates fitted flag on fit. + x_ref = torch.randn((1024, 10)) + lof_torch.fit(x_ref) + assert lof_torch.fitted + + # Test that the backend raises an if the forward method is called without the + # threshold being inferred. + with pytest.raises(ThresholdNotInferredError) as err: + lof_torch(x) + assert str(err.value) == 'LOFTorch has no threshold set, call `infer_threshold` to fit one!' + + # Test that the backend can call predict without the threshold being inferred. + lof_torch.predict(x) + + +def test_lof_infer_threshold_value_errors(): + """Tests the correct errors are raised when using incorrect choice of fpr for the LOFTorch backend detector.""" + lof_torch = LOFTorch(k=4) + x = torch.randn((1024, 10)) + lof_torch.fit(x) + + # fpr must be greater than 1/len(x) otherwise it excludes all points in the reference dataset + with pytest.raises(ValueError) as err: + lof_torch.infer_threshold(x, 1/1025) + assert str(err.value) == '`fpr` must be greater than `1/len(x)=0.0009765625`.' + + # fpr must be between 0 and 1 + with pytest.raises(ValueError) as err: + lof_torch.infer_threshold(x, 1.1) + assert str(err.value) == '`fpr` must be in `(0, 1)`.' + + lof_torch.infer_threshold(x, 0.99) + lof_torch.infer_threshold(x, 1/1023) diff --git a/alibi_detect/tests/test_dep_management.py b/alibi_detect/tests/test_dep_management.py index 60431e82d..f6c056a97 100644 --- a/alibi_detect/tests/test_dep_management.py +++ b/alibi_detect/tests/test_dep_management.py @@ -136,6 +136,7 @@ def test_od_backend_dependencies(opt_dep): ('KernelPCATorch', ['torch', 'keops']), ('LinearPCATorch', ['torch', 'keops']), ('GMMTorch', ['torch', 'keops']), + ('LOFTorch', ['torch', 'keops']), ]: dependency_map[dependency] = relations from alibi_detect.od import pytorch as od_pt_backend