Skip to content

Commit

Permalink
improved docstrings and additional documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
j-bl committed Aug 30, 2023
1 parent 66aa19a commit f64d857
Show file tree
Hide file tree
Showing 8 changed files with 563 additions and 264 deletions.
24 changes: 20 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
# Beyond Debiasing: Actively Steering Feature Selection via Loss Regularization

## Overview
* [Overview](#overview)
* [Installation](#installation)
* [Usage in Python](#usage-in-python)
* [Repository organization](#repository)

## Overview <a name="overview"></a>

This repository provides code to use the method presented in our GCPR 2023 paper **"Beyond Debiasing: Actively Steering Feature Selection via Loss Regularization"**. If you want to get started, take a look at our [example network](regression_network.py) and the corresponding [jupyter notebook](feature_steering_example.ipynb).

If you are only interested in the implementation of the feature steering part of the loss, you can find it in `feat_steering_loss(...)` of [regression_network.py](regression_network.py).

<div align="center">
<img src="https://git.inf-cv.uni-jena.de/blunk/beyond-debiasing/raw/main/teaser.png" alt="By measuring the feature usage, we can steer the model towards (not) using features that are specifically (un-)desired." width="35%"/>
</div>
Expand All @@ -18,7 +26,9 @@ If you use our method, please cite:
year = {2023},
}

## Installation

## Installation <a name="installation"></a>

**Install with pip, Python and PyTorch 2.0+**

git clone https://git.inf-cv.uni-jena.de/blunk/beyond-debiasing.git
Expand All @@ -27,10 +37,16 @@ If you use our method, please cite:

First, create an environment with pip and Python first (Anaconda environment / Python virtual environment). We recommend to install [PyTorch with CUDA support](https://pytorch.org/get-started/locally/). Then, you can install all subsequent packages via pip as described above.

## Usage in Python

## Usage in Python <a name="usage-in-python"></a>

Since our method relies on loss regularization, it is very simple to add to your own networks - you only need to modify your loss function. To help with that, we provide an [exemplary network](regression_network.py) and a [jupyter notebook](feature_steering_example.ipynb) with example code.

## Repository Organization
You can find the implementation of the feature steering part of the loss in `feat_steering_loss(...)` of [regression_network.py](regression_network.py), which is where all the magic of our method takes place.


## Repository <a name="repository"></a>

* Installation:
* [`requirements.txt`](requirements.txt): List of required packages for installation with pip
* Feature attribution:
Expand Down
44 changes: 26 additions & 18 deletions algebra.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import numpy as np
from numpy.typing import NDArray

def has_leftinverse(matrix):
def has_leftinverse(matrix: NDArray) -> bool:
"""Returns whether the matrix is left-invertible. That is, it returns whether
a matrix A' exists to the given matrix A such that A'Ax=x for all x.
:param matrix: Matrix as np.array.
:return: Whether the given matrix has a left inverse.
:rtype: boolean.
Args:
matrix (NDArray): Matrix A as numpy array.
Returns:
bool: Whether the given matrix A has a left inverse.
"""

# A matrix can only have a left-inverse if it is of full column rank.
m, n = matrix.shape # rows, columns
m, n = matrix.shape # rows, columns
_, s, _ = np.linalg.svd(matrix)
rank = np.sum(s > np.finfo(matrix.dtype).eps)

return (rank==n and n <= m)
return rank == n and n <= m

def random_orthogonal_matrix(n, complex=False, seed=None):
"""A Random matrix distributed with Haar measure.

def random_orthogonal_matrix(
n: int, complex: bool = False, seed: int = None
) -> NDArray:
"""Random orthogonal matrix distributed with Haar measure.
Returns a random orthogonal matrix. To ensure randomness, we have to choose
from the distribution created by the Haar measure. The calculation follows
Expand All @@ -27,12 +32,13 @@ def random_orthogonal_matrix(n, complex=False, seed=None):
compact groups. In: NOTICES of the AMS, Vol. 54 (54 (2007).
URL: https://arxiv.org/pdf/math-ph/0609050.
:param n: Matrix returned has dimensions nxn.
:param complex: Whether or not the returned matrix contains complex numbers.
:param seed: If int the seed to generate a reproducible results.
:return: Random matrix distributed with Haar measure.
:rtype: np.array containing floats
Args:
n (int): Matrix returned has dimensions nxn.
complex (bool, optional): Whether or not the returned matrix contains complex numbers. Defaults to False.
seed (int, optional): If int the seed to generate reproducible results. Defaults to None.
Returns:
NDArray: Random orthogonal matrix distributed with Haar measure.
"""

if not seed is None:
Expand All @@ -41,12 +47,14 @@ def random_orthogonal_matrix(n, complex=False, seed=None):
# The original algorithm provided by Mezzari's is only defined for complex
# initialization.
if complex:
z = (np.random.randn(n,n) + 1j*np.random.randn(n,n))/np.lib.scimath.sqrt(2.0)
z = (np.random.randn(n, n) + 1j * np.random.randn(n, n)) / np.lib.scimath.sqrt(
2.0
)
else:
z = np.random.randn(n,n)
q,r = np.linalg.qr(z)
z = np.random.randn(n, n)
q, r = np.linalg.qr(z)
d = np.diagonal(r)
ph = d/np.absolute(d)
q = np.multiply(q,ph,q)
ph = d / np.absolute(d)
q = np.multiply(q, ph, q)

return q
34 changes: 19 additions & 15 deletions contextual_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@
import acd
import numpy as np
import torch
from torch import nn
from typing import Tuple


def get_cd_1d_by_modules(model, modules, inputs, feat_of_interest, device="cpu"):
def get_cd_1d_by_modules(modules, inputs, feat_of_interest, device="cpu"):
# Device.
inputs = inputs.to(device)

Expand All @@ -73,7 +74,7 @@ def get_cd_1d_by_modules(model, modules, inputs, feat_of_interest, device="cpu")
return relevant, irrelevant


def get_cd_1d(model, inputs, feat_of_interest, device="cpu"):
def get_cd_1d(model: nn.Module, inputs: torch.Tensor, feat_of_interest: torch.Tensor, device: str="cpu") -> Tuple[torch.Tensor, torch.Tensor]:
"""Calculates contextual decomposition scores for the given model.
The contextual decomposition performs feature attribution by decomposing
Expand All @@ -93,18 +94,21 @@ def get_cd_1d(model, inputs, feat_of_interest, device="cpu"):
Prediction of the Network = score of the features of interest
+ score of the other features
:param model: PyTorch-Model to generate the CD scores for.
:param inputs: Batched inputs to the model. Typically 2-dimensional tensor
containing the inputs for a single batch.
:param feat_of_interest: Integer or list of integers. Define which
dimensions of the input are part of the feature(s) of interest.
:param device: Device used by PyTorch (cuda / cpu).
:return: Tuple (scores_feat, scores_other). These are the scores for each
of the batched inputs. Here, scores_feat[i] + scores_other[i]=prediction[i].
Note that the feature scores are determined in a per-batch manner. Therefore,
the resulting feature scores are vectors.
:rtype: Tupel of one-dimensional tensors.
Args:
model (nn.Module): PyTorch model to generate the CD scores for.
inputs (torch.Tensor): Batched inputs to the model. Typically 2-dimensional
tensor containing the inputs for a single batch.
feat_of_interest (torch.Tensor): Integer or list of integers. Define which
dimensions of the input are part of the feature(s) of interest.
device (str, optional): Device used to store the PyTorch tensors
(cuda / cpu). Defaults to "cpu".
Returns:
Tuple[torch.Tensor, torch.Tensor]: Tuple (scores_feat, scores_other).
These are the scores for each of the batched inputs.
Here, scores_feat[i] + scores_other[i]=prediction[i].
Note that the feature scores are determined in a per-batch manner.
Therefore, the resulting feature scores are vectors.
"""

# Set model in evaluation mode.
Expand Down
69 changes: 55 additions & 14 deletions dataset_utils.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,78 @@
import torch
from torch.utils.data import TensorDataset, DataLoader
from numpy.typing import NDArray
from typing import Tuple, Union

def get_dataset_from_arrays(train_features, train_outputs, test_features, test_outputs, validation_features=None, validation_outputs=None, batch_size=1):
"""
Both test and train dataset are numpy arrays. Observations are represented
as rows, features as columns.
train_targets and test_targets are vectors, containing one value per row
(expected results).

def get_dataset_from_arrays(
train_features: NDArray,
train_outputs: NDArray,
test_features: NDArray,
test_outputs: NDArray,
validation_features: NDArray = None,
validation_outputs: NDArray = None,
batch_size: int = 1,
) -> Union[
Tuple[
TensorDataset, DataLoader, TensorDataset, DataLoader, TensorDataset, DataLoader
],
Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader],
]:
"""Create a dataset and dataloder from each of the datasets given as numpy arrays.
Creates dataset and dataloader for train, test and if given also validation
dataset. Observations are represented as rows, while features are represented
as columns. The output vectors specify the targets / desired outputs. They are
vectors containing one value per row (observation).
Args:
train_features (NDArray): Features of training dataset.
train_outputs (NDArray): Targets of training dataset.
test_features (NDArray): Features of test dataset.
test_outputs (NDArray): Targets of test dataset.
validation_features (NDArray, optional): Features of validation dataset. Defaults to None.
validation_outputs (NDArray, optional): Targets of validation dataset. Defaults to None.
batch_size (int, optional): Batch size of the created dataset. Defaults to 1.
Returns:
Union[Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader, TensorDataset, DataLoader], Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader]]: Tuple of dataset and dataloader for training, validation and if given also validation dataset.
"""

train_inputs = torch.tensor(train_features.tolist())
train_targets = torch.FloatTensor(train_outputs)
train_dataset = torch.utils.data.TensorDataset(train_inputs, train_targets)
train_dataset = TensorDataset(train_inputs, train_targets)

test_inputs = torch.tensor(test_features.tolist())
test_targets = torch.FloatTensor(test_outputs)
test_dataset = torch.utils.data.TensorDataset(test_inputs, test_targets)
test_dataset = TensorDataset(test_inputs, test_targets)

train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, shuffle=True, num_workers=1, pin_memory=True
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=1,
pin_memory=True,
)
test_loader = torch.utils.data.DataLoader(
test_loader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=False, num_workers=1
)

if not validation_features is None:
validation_inputs = torch.tensor(validation_features.tolist())
validation_targets = torch.FloatTensor(validation_outputs)
validation_dataset = torch.utils.data.TensorDataset(validation_inputs, validation_targets)
validation_dataset = TensorDataset(validation_inputs, validation_targets)

validation_loader = torch.utils.data.DataLoader(
validation_loader = DataLoader(
validation_dataset, batch_size=batch_size, shuffle=False, num_workers=1
)

return (train_dataset, train_loader, test_dataset, test_loader, validation_dataset, validation_loader)
return (
train_dataset,
train_loader,
test_dataset,
test_loader,
validation_dataset,
validation_loader,
)
else:
return (train_dataset, train_loader, test_dataset, test_loader)
Loading

0 comments on commit f64d857

Please sign in to comment.