diff --git a/README.md b/README.md
index 56967c7..99c8391 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,16 @@
# Beyond Debiasing: Actively Steering Feature Selection via Loss Regularization
-## Overview
+* [Overview](#overview)
+* [Installation](#installation)
+* [Usage in Python](#usage-in-python)
+* [Repository organization](#repository)
+
+## Overview
+
This repository provides code to use the method presented in our GCPR 2023 paper **"Beyond Debiasing: Actively Steering Feature Selection via Loss Regularization"**. If you want to get started, take a look at our [example network](regression_network.py) and the corresponding [jupyter notebook](feature_steering_example.ipynb).
+If you are only interested in the implementation of the feature steering part of the loss, you can find it in `feat_steering_loss(...)` of [regression_network.py](regression_network.py).
+
@@ -18,7 +26,9 @@ If you use our method, please cite:
year = {2023},
}
-## Installation
+
+## Installation
+
**Install with pip, Python and PyTorch 2.0+**
git clone https://git.inf-cv.uni-jena.de/blunk/beyond-debiasing.git
@@ -27,10 +37,16 @@ If you use our method, please cite:
First, create an environment with pip and Python first (Anaconda environment / Python virtual environment). We recommend to install [PyTorch with CUDA support](https://pytorch.org/get-started/locally/). Then, you can install all subsequent packages via pip as described above.
-## Usage in Python
+
+## Usage in Python
+
Since our method relies on loss regularization, it is very simple to add to your own networks - you only need to modify your loss function. To help with that, we provide an [exemplary network](regression_network.py) and a [jupyter notebook](feature_steering_example.ipynb) with example code.
-## Repository Organization
+You can find the implementation of the feature steering part of the loss in `feat_steering_loss(...)` of [regression_network.py](regression_network.py), which is where all the magic of our method takes place.
+
+
+## Repository
+
* Installation:
* [`requirements.txt`](requirements.txt): List of required packages for installation with pip
* Feature attribution:
diff --git a/algebra.py b/algebra.py
index 73095f6..195853a 100644
--- a/algebra.py
+++ b/algebra.py
@@ -1,24 +1,29 @@
import numpy as np
+from numpy.typing import NDArray
-def has_leftinverse(matrix):
+def has_leftinverse(matrix: NDArray) -> bool:
"""Returns whether the matrix is left-invertible. That is, it returns whether
a matrix A' exists to the given matrix A such that A'Ax=x for all x.
- :param matrix: Matrix as np.array.
- :return: Whether the given matrix has a left inverse.
- :rtype: boolean.
+ Args:
+ matrix (NDArray): Matrix A as numpy array.
+ Returns:
+ bool: Whether the given matrix A has a left inverse.
"""
# A matrix can only have a left-inverse if it is of full column rank.
- m, n = matrix.shape # rows, columns
+ m, n = matrix.shape # rows, columns
_, s, _ = np.linalg.svd(matrix)
rank = np.sum(s > np.finfo(matrix.dtype).eps)
- return (rank==n and n <= m)
+ return rank == n and n <= m
-def random_orthogonal_matrix(n, complex=False, seed=None):
- """A Random matrix distributed with Haar measure.
+
+def random_orthogonal_matrix(
+ n: int, complex: bool = False, seed: int = None
+) -> NDArray:
+ """Random orthogonal matrix distributed with Haar measure.
Returns a random orthogonal matrix. To ensure randomness, we have to choose
from the distribution created by the Haar measure. The calculation follows
@@ -27,12 +32,13 @@ def random_orthogonal_matrix(n, complex=False, seed=None):
compact groups. In: NOTICES of the AMS, Vol. 54 (54 (2007).
URL: https://arxiv.org/pdf/math-ph/0609050.
- :param n: Matrix returned has dimensions nxn.
- :param complex: Whether or not the returned matrix contains complex numbers.
- :param seed: If int the seed to generate a reproducible results.
- :return: Random matrix distributed with Haar measure.
- :rtype: np.array containing floats
+ Args:
+ n (int): Matrix returned has dimensions nxn.
+ complex (bool, optional): Whether or not the returned matrix contains complex numbers. Defaults to False.
+ seed (int, optional): If int the seed to generate reproducible results. Defaults to None.
+ Returns:
+ NDArray: Random orthogonal matrix distributed with Haar measure.
"""
if not seed is None:
@@ -41,12 +47,14 @@ def random_orthogonal_matrix(n, complex=False, seed=None):
# The original algorithm provided by Mezzari's is only defined for complex
# initialization.
if complex:
- z = (np.random.randn(n,n) + 1j*np.random.randn(n,n))/np.lib.scimath.sqrt(2.0)
+ z = (np.random.randn(n, n) + 1j * np.random.randn(n, n)) / np.lib.scimath.sqrt(
+ 2.0
+ )
else:
- z = np.random.randn(n,n)
- q,r = np.linalg.qr(z)
+ z = np.random.randn(n, n)
+ q, r = np.linalg.qr(z)
d = np.diagonal(r)
- ph = d/np.absolute(d)
- q = np.multiply(q,ph,q)
+ ph = d / np.absolute(d)
+ q = np.multiply(q, ph, q)
return q
diff --git a/contextual_decomposition.py b/contextual_decomposition.py
index 8c93d6a..55cf3be 100644
--- a/contextual_decomposition.py
+++ b/contextual_decomposition.py
@@ -45,9 +45,10 @@
import acd
import numpy as np
import torch
+from torch import nn
+from typing import Tuple
-
-def get_cd_1d_by_modules(model, modules, inputs, feat_of_interest, device="cpu"):
+def get_cd_1d_by_modules(modules, inputs, feat_of_interest, device="cpu"):
# Device.
inputs = inputs.to(device)
@@ -73,7 +74,7 @@ def get_cd_1d_by_modules(model, modules, inputs, feat_of_interest, device="cpu")
return relevant, irrelevant
-def get_cd_1d(model, inputs, feat_of_interest, device="cpu"):
+def get_cd_1d(model: nn.Module, inputs: torch.Tensor, feat_of_interest: torch.Tensor, device: str="cpu") -> Tuple[torch.Tensor, torch.Tensor]:
"""Calculates contextual decomposition scores for the given model.
The contextual decomposition performs feature attribution by decomposing
@@ -93,18 +94,21 @@ def get_cd_1d(model, inputs, feat_of_interest, device="cpu"):
Prediction of the Network = score of the features of interest
+ score of the other features
- :param model: PyTorch-Model to generate the CD scores for.
- :param inputs: Batched inputs to the model. Typically 2-dimensional tensor
- containing the inputs for a single batch.
- :param feat_of_interest: Integer or list of integers. Define which
- dimensions of the input are part of the feature(s) of interest.
- :param device: Device used by PyTorch (cuda / cpu).
- :return: Tuple (scores_feat, scores_other). These are the scores for each
- of the batched inputs. Here, scores_feat[i] + scores_other[i]=prediction[i].
- Note that the feature scores are determined in a per-batch manner. Therefore,
- the resulting feature scores are vectors.
- :rtype: Tupel of one-dimensional tensors.
-
+ Args:
+ model (nn.Module): PyTorch model to generate the CD scores for.
+ inputs (torch.Tensor): Batched inputs to the model. Typically 2-dimensional
+ tensor containing the inputs for a single batch.
+ feat_of_interest (torch.Tensor): Integer or list of integers. Define which
+ dimensions of the input are part of the feature(s) of interest.
+ device (str, optional): Device used to store the PyTorch tensors
+ (cuda / cpu). Defaults to "cpu".
+
+ Returns:
+ Tuple[torch.Tensor, torch.Tensor]: Tuple (scores_feat, scores_other).
+ These are the scores for each of the batched inputs.
+ Here, scores_feat[i] + scores_other[i]=prediction[i].
+ Note that the feature scores are determined in a per-batch manner.
+ Therefore, the resulting feature scores are vectors.
"""
# Set model in evaluation mode.
diff --git a/dataset_utils.py b/dataset_utils.py
index ee6b039..322c09a 100644
--- a/dataset_utils.py
+++ b/dataset_utils.py
@@ -1,37 +1,78 @@
import torch
+from torch.utils.data import TensorDataset, DataLoader
+from numpy.typing import NDArray
+from typing import Tuple, Union
-def get_dataset_from_arrays(train_features, train_outputs, test_features, test_outputs, validation_features=None, validation_outputs=None, batch_size=1):
- """
- Both test and train dataset are numpy arrays. Observations are represented
- as rows, features as columns.
- train_targets and test_targets are vectors, containing one value per row
- (expected results).
+
+def get_dataset_from_arrays(
+ train_features: NDArray,
+ train_outputs: NDArray,
+ test_features: NDArray,
+ test_outputs: NDArray,
+ validation_features: NDArray = None,
+ validation_outputs: NDArray = None,
+ batch_size: int = 1,
+) -> Union[
+ Tuple[
+ TensorDataset, DataLoader, TensorDataset, DataLoader, TensorDataset, DataLoader
+ ],
+ Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader],
+]:
+ """Create a dataset and dataloder from each of the datasets given as numpy arrays.
+
+ Creates dataset and dataloader for train, test and if given also validation
+ dataset. Observations are represented as rows, while features are represented
+ as columns. The output vectors specify the targets / desired outputs. They are
+ vectors containing one value per row (observation).
+
+ Args:
+ train_features (NDArray): Features of training dataset.
+ train_outputs (NDArray): Targets of training dataset.
+ test_features (NDArray): Features of test dataset.
+ test_outputs (NDArray): Targets of test dataset.
+ validation_features (NDArray, optional): Features of validation dataset. Defaults to None.
+ validation_outputs (NDArray, optional): Targets of validation dataset. Defaults to None.
+ batch_size (int, optional): Batch size of the created dataset. Defaults to 1.
+
+ Returns:
+ Union[Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader, TensorDataset, DataLoader], Tuple[TensorDataset, DataLoader, TensorDataset, DataLoader]]: Tuple of dataset and dataloader for training, validation and if given also validation dataset.
"""
train_inputs = torch.tensor(train_features.tolist())
train_targets = torch.FloatTensor(train_outputs)
- train_dataset = torch.utils.data.TensorDataset(train_inputs, train_targets)
+ train_dataset = TensorDataset(train_inputs, train_targets)
test_inputs = torch.tensor(test_features.tolist())
test_targets = torch.FloatTensor(test_outputs)
- test_dataset = torch.utils.data.TensorDataset(test_inputs, test_targets)
+ test_dataset = TensorDataset(test_inputs, test_targets)
- train_loader = torch.utils.data.DataLoader(
- train_dataset, batch_size=batch_size, shuffle=True, num_workers=1, pin_memory=True
+ train_loader = DataLoader(
+ train_dataset,
+ batch_size=batch_size,
+ shuffle=True,
+ num_workers=1,
+ pin_memory=True,
)
- test_loader = torch.utils.data.DataLoader(
+ test_loader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=False, num_workers=1
)
if not validation_features is None:
validation_inputs = torch.tensor(validation_features.tolist())
validation_targets = torch.FloatTensor(validation_outputs)
- validation_dataset = torch.utils.data.TensorDataset(validation_inputs, validation_targets)
+ validation_dataset = TensorDataset(validation_inputs, validation_targets)
- validation_loader = torch.utils.data.DataLoader(
+ validation_loader = DataLoader(
validation_dataset, batch_size=batch_size, shuffle=False, num_workers=1
)
- return (train_dataset, train_loader, test_dataset, test_loader, validation_dataset, validation_loader)
+ return (
+ train_dataset,
+ train_loader,
+ test_dataset,
+ test_loader,
+ validation_dataset,
+ validation_loader,
+ )
else:
return (train_dataset, train_loader, test_dataset, test_loader)
diff --git a/feature_steering_example.ipynb b/feature_steering_example.ipynb
index 30a4017..7ff7b50 100644
--- a/feature_steering_example.ipynb
+++ b/feature_steering_example.ipynb
@@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 1,
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -30,6 +30,12 @@
"\n",
"You can choose to generate feature attributions with the feature attribution method provided by Reimers et al. based on both **contextual decomposition** and **conditional mutual information**. Additionally, you can choose other hyperparameters such as the weight factor $\\lambda$ and the norm that is applied (L1 / L2 norm).\n",
"\n",
+ "\n",
+ "\n",
+ "If you are only interested in the actual implementation of our method, take a look at `feat_steering_loss(...)` in `regression_network.py`, where the feature steering part of the loss is calculated.\n",
+ "\n",
+ "\n",
+ "\n",
"## Dataset\n",
"We create a small regression dataset with redundant variables as described in our paper. That is, the created dataset has 9 input variables with a redundancy of 3 variables. In total, we generate 2000 samples, of which 1400 are used for training.\n",
"\n",
@@ -38,7 +44,7 @@
},
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -59,7 +65,7 @@
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -90,7 +96,7 @@
},
{
"cell_type": "code",
- "execution_count": 4,
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -134,59 +140,29 @@
"* The feature attributions $c_i$ are generated based on the feature attribution method proposed by Reimers et al. For this, the attribution modes `cmi` for feature attribution based on the (transformed) conditional mutual information and `contextual_decomposition` for feature attribution performed with contextual decomposition are available.\n",
"* Feature steering can be performed with feature attributions weighted with L1 norm (`loss_l1`) and L2 norm (`loss_l2`). That is, this modifies the norm applied for $|| \\cdot ||$.\n",
"* The indices of the features that shall be encouraged or discouraged (defining $D$ and $E$) are passed as lists.\n",
- "* The weight factor $\\lambda$ is specified as `lambda`."
+ "* The weight factor $\\lambda$ is specified as `lambda`.\n",
+ "\n",
+ "\n",
+ "\n",
+ "**Implementation:**\n",
+ "\n",
+ "If you want to take a closer look at the implementation of this feature steering loss, take a look at the function `feat_steering_loss(...)` in `regression_network.py`. Here, you can find the calculation of the feature steering part of the loss - which is what you need to add to your own network in order to apply our method.\n",
+ "\n",
+ ""
]
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": null,
"metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "Loss (per sample) after epoch 1: 4712089.607142857\n",
- "Loss (per sample) after epoch 2: 4480544.142857143\n",
- "Loss (per sample) after epoch 3: 4258867.017857143\n",
- "Loss (per sample) after epoch 4: 4050848.214285714\n",
- "Loss (per sample) after epoch 5: 3851129.5714285714\n",
- "Loss (per sample) after epoch 6: 3662716.375\n",
- "Loss (per sample) after epoch 7: 3484030.3214285714\n",
- "Loss (per sample) after epoch 8: 3317511.035714286\n",
- "Loss (per sample) after epoch 9: 3158147.5714285714\n",
- "Loss (per sample) after epoch 10: 3006144.9821428573\n",
- "Loss (per sample) after epoch 11: 2864496.8035714286\n",
- "Loss (per sample) after epoch 12: 2727674.410714286\n",
- "Loss (per sample) after epoch 13: 2597680.910714286\n",
- "Loss (per sample) after epoch 14: 2478867.535714286\n",
- "Loss (per sample) after epoch 15: 2361367.4553571427\n",
- "Loss (per sample) after epoch 16: 2251085.125\n",
- "Loss (per sample) after epoch 17: 2148403.6160714286\n"
- ]
- },
- {
- "ename": "KeyboardInterrupt",
- "evalue": "",
- "output_type": "error",
- "traceback": [
- "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
- "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
- "Cell \u001b[0;32mIn[9], line 13\u001b[0m\n\u001b[1;32m 4\u001b[0m feat_steering_config \u001b[39m=\u001b[39m {\n\u001b[1;32m 5\u001b[0m \u001b[39m\"\u001b[39m\u001b[39mattrib_mode\u001b[39m\u001b[39m\"\u001b[39m: \u001b[39m\"\u001b[39m\u001b[39mcmi\u001b[39m\u001b[39m\"\u001b[39m,\n\u001b[1;32m 6\u001b[0m \u001b[39m\"\u001b[39m\u001b[39msteering_mode\u001b[39m\u001b[39m\"\u001b[39m: \u001b[39m\"\u001b[39m\u001b[39mloss_l2\u001b[39m\u001b[39m\"\u001b[39m,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 9\u001b[0m \u001b[39m\"\u001b[39m\u001b[39mlambda\u001b[39m\u001b[39m\"\u001b[39m: \u001b[39m100.0\u001b[39m, \u001b[39m# Adapt accordingly for CMI / CD\u001b[39;00m\n\u001b[1;32m 10\u001b[0m }\n\u001b[1;32m 12\u001b[0m \u001b[39m# Train the network.\u001b[39;00m\n\u001b[0;32m---> 13\u001b[0m mlp\u001b[39m.\u001b[39;49mtrain(train_dataloader, feat_steering_config, epochs, learning_rate)\n",
- "File \u001b[0;32m~/OneDrive/Publikationen/2023 - Feature Steering GCPR/Offizielles Repository/beyond-debiasing/regression_network.py:169\u001b[0m, in \u001b[0;36mRegressionNetwork.train\u001b[0;34m(self, train_dataloader, feat_steering_config, epochs, learning_rate)\u001b[0m\n\u001b[1;32m 166\u001b[0m \u001b[39mif\u001b[39;00m loss\u001b[39m.\u001b[39misnan():\n\u001b[1;32m 167\u001b[0m \u001b[39mraise\u001b[39;00m \u001b[39mValueError\u001b[39;00m(\u001b[39m\"\u001b[39m\u001b[39mThe loss of your model is nan. Thus, no reasonable gradient can be computed!\u001b[39m\u001b[39m\"\u001b[39m)\n\u001b[0;32m--> 169\u001b[0m loss\u001b[39m.\u001b[39;49mbackward()\n\u001b[1;32m 170\u001b[0m optimizer\u001b[39m.\u001b[39mstep()\n\u001b[1;32m 172\u001b[0m \u001b[39m# Print statistics.\u001b[39;00m\n",
- "File \u001b[0;32m~/anaconda3/envs/featuresteering-minimal/lib/python3.11/site-packages/torch/_tensor.py:487\u001b[0m, in \u001b[0;36mTensor.backward\u001b[0;34m(self, gradient, retain_graph, create_graph, inputs)\u001b[0m\n\u001b[1;32m 477\u001b[0m \u001b[39mif\u001b[39;00m has_torch_function_unary(\u001b[39mself\u001b[39m):\n\u001b[1;32m 478\u001b[0m \u001b[39mreturn\u001b[39;00m handle_torch_function(\n\u001b[1;32m 479\u001b[0m Tensor\u001b[39m.\u001b[39mbackward,\n\u001b[1;32m 480\u001b[0m (\u001b[39mself\u001b[39m,),\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 485\u001b[0m inputs\u001b[39m=\u001b[39minputs,\n\u001b[1;32m 486\u001b[0m )\n\u001b[0;32m--> 487\u001b[0m torch\u001b[39m.\u001b[39;49mautograd\u001b[39m.\u001b[39;49mbackward(\n\u001b[1;32m 488\u001b[0m \u001b[39mself\u001b[39;49m, gradient, retain_graph, create_graph, inputs\u001b[39m=\u001b[39;49minputs\n\u001b[1;32m 489\u001b[0m )\n",
- "File \u001b[0;32m~/anaconda3/envs/featuresteering-minimal/lib/python3.11/site-packages/torch/autograd/__init__.py:200\u001b[0m, in \u001b[0;36mbackward\u001b[0;34m(tensors, grad_tensors, retain_graph, create_graph, grad_variables, inputs)\u001b[0m\n\u001b[1;32m 195\u001b[0m retain_graph \u001b[39m=\u001b[39m create_graph\n\u001b[1;32m 197\u001b[0m \u001b[39m# The reason we repeat same the comment below is that\u001b[39;00m\n\u001b[1;32m 198\u001b[0m \u001b[39m# some Python versions print out the first line of a multi-line function\u001b[39;00m\n\u001b[1;32m 199\u001b[0m \u001b[39m# calls in the traceback and some print out the last line\u001b[39;00m\n\u001b[0;32m--> 200\u001b[0m Variable\u001b[39m.\u001b[39;49m_execution_engine\u001b[39m.\u001b[39;49mrun_backward( \u001b[39m# Calls into the C++ engine to run the backward pass\u001b[39;49;00m\n\u001b[1;32m 201\u001b[0m tensors, grad_tensors_, retain_graph, create_graph, inputs,\n\u001b[1;32m 202\u001b[0m allow_unreachable\u001b[39m=\u001b[39;49m\u001b[39mTrue\u001b[39;49;00m, accumulate_grad\u001b[39m=\u001b[39;49m\u001b[39mTrue\u001b[39;49;00m)\n",
- "\u001b[0;31mKeyboardInterrupt\u001b[0m: "
- ]
- }
- ],
+ "outputs": [],
"source": [
"# Training configuration.\n",
"learning_rate = 0.01\n",
"epochs = 90\n",
"feat_steering_config = {\n",
- " \"attrib_mode\": \"cmi\",\n",
- " \"steering_mode\": \"loss_l2\",\n",
+ " \"attrib_mode\": \"cmi\", # contextual_decomposition\n",
+ " \"steering_mode\": \"loss_l2\", # loss_l1, none\n",
" \"encourage\": [0, 1, 2],\n",
" \"discourage\": [],\n",
" \"lambda\": 100.0, # Adapt accordingly for CMI / CD\n",
diff --git a/mixed_cmi_estimator.py b/mixed_cmi_estimator.py
index 067bb58..361987e 100644
--- a/mixed_cmi_estimator.py
+++ b/mixed_cmi_estimator.py
@@ -50,11 +50,16 @@
)
-def get_dist_array(data):
- """Returns pairwise distances for all columns of data measured with the
- manhattan distance.
+def get_dist_array(data: torch.Tensor) -> torch.Tensor:
+ """Returns pairwise distances for all columns of the input matrix measured
+ with the manhattan distance.
- Variables are represented by COLUMNS, observations by ROWS.
+ Args:
+ data (torch.Tensor): Input matrix. Variables are represented by COLUMNS,
+ observations by ROWS.
+
+ Returns:
+ torch.Tensor: Pairwise (manhattan) distances.
"""
# Works for 1D and 2d data.
@@ -79,7 +84,7 @@ def get_epsilon_distance(k, disArray):
"""Based on a tensor of pairwise distances per observation (tensor is
three-dimensional, quadratic, symmetric).
"""
- # data = torch.tensor([[0, 8, 2], [0, 8, 2], [0, 8, 2]])
+
if disArray.size()[0] == 1:
disArray = disArray.transpose(dim0=0, dim1=1)
N = disArray.size()[0]
@@ -115,10 +120,7 @@ def find_inter_cluster(eleInEachClass):
def con_entro_estimator(data, k, dN):
- """Estimates entropy of quantitative data.
-
- Variables are represented by COLUMNS, observations by ROWS.
- """
+ """Estimates entropy of quantitative data."""
# If only one row, count the number of columns (= length of the vector).
if data.size()[0] == 1:
@@ -153,14 +155,28 @@ def con_entro_estimator(data, k, dN):
return entropy
-def mixed_entro_estimator(data, dimCon, dimDis, k=0.1):
- """Estimates the entropie of the mixed variables with the given dimensions.
+def mixed_entro_estimator(
+ data: torch.Tensor, dimCon: torch.Tensor, dimDis: torch.Tensor, k: int = 0.1
+) -> torch.Tensor:
+ """Estimates the entropy of the mixed variables with the given dimensions.
+ The data is a matrix where COLUMNS represent variables and ROWS represent
+ observations. dimCon and dimDis contain the indices of which variables are
+ quantitative and which are qualitative.
Variables are represented by COLUMNS, observations by ROWS.
Args:
- k (float, optional): Neighborhood size is calculated as
- max(1, round(k*#all_neighbors)). k should be < 1 and defaults to 0.1.
+ data (torch.Tensor): Data with variables represented as columns and
+ observations represented as rows.
+ dimCon (torch.Tensor): Indices of data for the columns corresponding to
+ quantitative variables.
+ dimDis (torch.Tensor): Indices of data for the columns corresponding to
+ qualitative variables.
+ k (int, optional): Neighborhood size is calculated as
+ max(1, round(k*#all_neighbors)). k should be < 1 and defaults to 0.1.
+
+ Returns:
+ torch.Tensor: Entropy estimate.
"""
# Get number of quantitative variables and number of observations.
@@ -175,7 +191,7 @@ def mixed_entro_estimator(data, dimCon, dimDis, k=0.1):
if len(dimDis) != 0:
dataDis = torch.index_select(data, 1, dimDis)
if len(dimCon) == 0 and len(dimDis) == 0:
- # print('Input data is NULL!!!')
+ # Input data is null.
pass
# Calculate the entropie for the extracted data.
@@ -267,7 +283,14 @@ def mixed_entro_estimator(data, dimCon, dimDis, k=0.1):
return res
-def _mixed_cmi_model(data, xind, yind, zind, is_categorical, k=0.1):
+def _mixed_cmi_model(
+ data: torch.Tensor,
+ xind: torch.Tensor,
+ yind: torch.Tensor,
+ zind: torch.Tensor,
+ is_categorical: torch.Tensor,
+ k: int = 0.1,
+) -> torch.Tensor:
"""Estimates the CMI from qualitative and quantitative data.
The conditional mutual independence I(X;Y|Z) is -> 0 if X and Y are
@@ -279,28 +302,30 @@ def _mixed_cmi_model(data, xind, yind, zind, is_categorical, k=0.1):
an Associated Conditional Independence Test. In: Entropy (Basel, Switzerland)
24 (9). DOI: 10.3390/e24091234.
The implementation follows their R implementation licensed unter MIT license:
- https://github.com/leizan/CMIh2022/blob/main/method.R
-
- :param data: Observations of the variables x, y, and z. Variables are
- represented by COLUMNS, observations by ROWS.
- :param xind: One-dimensional tensor that contains a list of the indices
- corresponding to the columns of data containing the observations of the
- variable x.
- :param yind: One-dimensional tensor that contains a list of the indices
- corresponding to the columns of data containing the observations of the
- variable y.
- :param zind: One-dimensional tensor that contains a list of the indices
- corresponding to the columns of data containing the observations of the
- #variable z.
- :param is_categorical: One-dimensional tensor that contains a list of
- the indices corresponding to the columns of data that contain qualitative
- (=categorical) data. All other columns are expected to contain quantitative
- data.
- :param k: Neighborhood size for KNN. Calculated as
- Neighborhood size = max(1, round(k * #All neighbors)).
- :return: Estimate of the CMI I(X;Y|Z).
- :rtype: float.
+ https://github.com/leizan/CMIh2022/blob/main/method.R
+ Args:
+ data (torch.Tensor): Observations of the variables x, y, and z. Variables are
+ represented by COLUMNS, observations by ROWS.
+ xind (torch.Tensor): One-dimensional tensor that contains a list of the indices
+ corresponding to the columns of data containing the observations of the
+ variable x.
+ yind (torch.Tensor): One-dimensional tensor that contains a list of the indices
+ corresponding to the columns of data containing the observations of the
+ variable y.
+ zind (torch.Tensor): One-dimensional tensor that contains a list of the indices
+ corresponding to the columns of data containing the observations of the
+ variable z.
+ is_categorical (torch.Tensor): One-dimensional tensor that contains a list of
+ the indices corresponding to the columns of data that contain qualitative
+ (=categorical) data. All other columns are expected to contain quantitative
+ data.
+ k (int, optional): Neighborhood size for kNN. Calculated as
+ Neighborhood size = max(1, round(k * #All neighbors)). k should be < 1
+ and defaults to 0.1.
+
+ Returns:
+ torch.Tensor: Estimate of the CMI I(X;Y|Z).
"""
# data: Variables are represented by COLUMNS, obervations by ROWS.
@@ -366,16 +391,27 @@ def _mixed_cmi_model(data, xind, yind, zind, is_categorical, k=0.1):
def mixed_cmi_model(
- feature, output, target, feature_is_categorical, target_is_categorical
-):
- """Estimates the CMI from qualitative and quantitative data.
+ feature: torch.Tensor,
+ output: torch.Tensor,
+ target: torch.Tensor,
+ feature_is_categorical: bool,
+ target_is_categorical: bool,
+) -> torch.Tensor:
+ """Estimates the CMI for a set of a feature of interest, model output and
+ desired output.
+
+ Estimates the conditional mutual information CMI(feature, output | target).
+ Here, both the feature and the target are allowed to be both qualitative and
+ quantitative variables. The model output has to be quantitative. All input
+ tensors are only allowed to be one-dimensional tensors and describe batched
+ observations.
The conditional mutual information I(X;Y|Z) is -> 0 if X and Y are
- dissimilar and -> inf if they are similar.
- Here, the resulting CMI is only differentiable w.r.t. to non-categorical
- inputs (creating a histogram in a differentiable manner is not really
- reasonable).
- All input tensors are only allowed to be one-dimensional tensors.
+ dissimilar and -> inf if they are similar. Please note, that the returned CMI
+ is only differentiable with respect to non-categorical variables, i.e., if
+ you specify one of the three input variables as categorical you cannot
+ differentiate with respect to it afterward. That is, because in this case a
+ histogram is created which cannot be done in a differentiable manner.
Method:
Zan, Lei; Meynaoui, Anouar; Assaad, Charles K.; Devijver, Emilie; Gaussier,
@@ -384,7 +420,21 @@ def mixed_cmi_model(
24 (9). DOI: 10.3390/e24091234.
The implementation follows their R implementation and was adapted for
differentiability w.r.t. quantitative variables:
- https://github.com/leizan/CMIh2022/blob/main/method.R
+ https://github.com/leizan/CMIh2022/blob/main/method.R
+
+ Args:
+ feature (torch.Tensor): One-dimensional feature vector.
+ output (torch.Tensor): One-dimensional model output vector.
+ target (torch.Tensor): One-dimensional target vector.
+ feature_is_categorical (bool): Whether the feature is a categorical variable.
+ target_is_categorical (bool): Whether the target is a categorical variable.
+
+ Raises:
+ ValueError: Raised if one of feature, output and target is not a
+ one-dimensional tensor.
+
+ Returns:
+ torch.Tensor: Estimate of CMI(feature, output | target).
"""
if any([z.dim() != 1 for z in (feature, output, target)]):
diff --git a/regression_dataset.py b/regression_dataset.py
index 68d7cd9..4095738 100644
--- a/regression_dataset.py
+++ b/regression_dataset.py
@@ -1,49 +1,104 @@
import os
+from torch.utils.data import DataLoader
import numpy as np
+from numpy.typing import NDArray
+from typing import Tuple
from algebra import random_orthogonal_matrix
from make_regression import make_regression
from dataset_utils import get_dataset_from_arrays
-def make_regression_dataset(high_dim_transform=True, n_features_low_dim=4, n_uninformative_low_dim=0, n_high_dim = 128, noise_on_high_dim_snrdb=None,
- noise_on_output=0.0, n_train=50000, n_test=10000, n_validation=10000, normalize=False, seed = None, batch_size = 10, log_coefs = False):
- """
- The input variables are standard normal distributed and the coefficients
- of the regression task follow a standard uniform distribution. The bias
- of the regression problem is set to zero.
- Since a linear combination of normal distributed random variables is
- again normal distributed, the target variable of the regresssion is also
- normal distributed.
- The random variables are distributed similarly to the input variables.
-
- high_dim_transform:
- Whether a high-dimensional transformation shall be performed. In case
- no high-dimensional transformation is desired, all inputs like
- n_high_dim, ... are ignored and instead the identity transformation
- is used.
- noise_on_high_dim_snrdb is still used to add noise after the identity
- transformation.
- n_low_dim:
- Number of informative low-dimensional variables.
- n_informative_low_dim:
- Number of uninformative low-dimensional variables.
- noise_on_high_dim_snrdb:
- Additive gaussian noise is applied to the regression input variables after transformation into
- a higher dimension. Here, the variance of each input variable is determined and the noise is
- added so that the SNR corresponds to the given value (in dB). A reasonable value can for instance
- be 10 or 40.
- noise_on_output:
- Standard deviation of the gaussian noise (zero mean) applied to the output (before normalization).
+
+def make_regression_dataset(
+ high_dim_transform: bool = True,
+ n_features_low_dim: int = 4,
+ n_uninformative_low_dim: int = 0,
+ n_high_dim: int = 128,
+ noise_on_high_dim_snrdb: float = None,
+ noise_on_output: float = 0.0,
+ n_train: int = 50000,
+ n_test: int = 10000,
+ n_validation: int = 10000,
+ normalize: bool = False,
+ seed: int = None,
+ batch_size: int = 10,
+ log_coefs: bool = False,
+) -> Tuple[DataLoader, DataLoader, DataLoader, NDArray]:
+ """Creates a redundant regression dataset based on the specified parameters.
+
+ The dataset generation process follows the one described in our paper. First,
+ a low-dimensional linear regression problem is generated. Its input variables
+ are standard normal-distributed and the coefficients of the regression task
+ follow a standard-uniform distribution. The bias of the regression problem
+ is set to zero. Since a linear combination of normal-distributed random variables
+ is again normal-distributed, the target variable of the regresssion is also
+ normal-distributed. The uninformative variables are distributed similarly
+ to the input variables.
+ Afterward, the input variables of the regression problem generated in the
+ previous step are transformed into a higher-dimensional feature space if
+ high_dim_transform is specified. That is, redundancy is introduced.
+
+ Args:
+ high_dim_transform (bool, optional): Whether a high-dimensional transformation
+ shall be performed. In case no high-dimensional transformation is
+ desired, all inputs like n_high_dim, ... are ignored and instead the
+ identity transformation is used. noise_on_high_dim_snrdb is still
+ used to add noise after the identity transformation. Defaults to True.
+ n_features_low_dim (int, optional): Number of informative low-dimensional
+ variables. Defaults to 4.
+ n_uninformative_low_dim (int, optional): Number of uninformative
+ low-dimensional variables. Defaults to 0.
+ n_high_dim (int, optional): Number of features after transformation into
+ higher-dimensional feature space. Defaults to 128.
+ noise_on_high_dim_snrdb (float, optional): Additive gaussian noise is
+ applied to the regression input variables after transformation into
+ a higher dimension. Here, the variance of each input variable is
+ determined and the noise is added so that the SNR corresponds
+ to the given value (in dB). A reasonable value would be for instance
+ 10 or 40. Defaults to None.
+ noise_on_output (float, optional): Standard deviation of the gaussian
+ noise (zero mean) applied to the output (before normalization).
+ Defaults to 0.0.
+ n_train (int, optional): Number of samples generated for the training
+ dataset. Defaults to 50000.
+ n_test (int, optional): Number of samples generated for the test
+ dataset. Defaults to 10000.
+ n_validation (int, optional): Number of samples generated for the
+ validation dataset. Defaults to 10000.
+ normalize (bool, optional): Whether the target variable should be
+ normalized to mean zero and unit variance. Normalization is based on
+ statistics calculated over all samples generated in total. Defaults
+ to False.
+ seed (int, optional): If specified, the seed to generate a reproducible
+ dataset. Defaults to None.
+ batch_size (int, optional): Batch size of the created dataset. Defaults
+ to 10.
+ log_coefs (bool, optional): Whether the coefficients used to generate
+ the regression problem should be logged into a separate file. If
+ specified, logged to "lowdim_regression_coefficients.list" in the
+ current working directory. Defaults to False.
+
+ Raises:
+ ValueError: If high_dim_transform is False, the n_high_dim should be None.
+ ValueError: noise_on_high_dim_snrdb has to be strictly positive. For no
+ noise, specify None.
+
+ Returns:
+ Tuple[DataLoader, DataLoader, DataLoader, NDArray]: Tuple containing the
+ dataloader for training, validation and test dataset and the matrix
+ used to perform the dimensionality expansion.
"""
# Generate random rergession problem.
n_samples = n_train + n_validation + n_test
- features, output, coefficients = _regression_dataset(n_features=n_features_low_dim,
- n_uninformative=n_uninformative_low_dim,
- n_samples=n_samples,
- noise_on_output=noise_on_output,
- seed=seed)
+ features, output, coefficients = _regression_dataset(
+ n_features=n_features_low_dim,
+ n_uninformative=n_uninformative_low_dim,
+ n_samples=n_samples,
+ noise_on_output=noise_on_output,
+ seed=seed,
+ )
if log_coefs:
- f = open(os.path.join(os.getcwd(), "lowdim_regression_coefficients.list"),'w')
+ f = open(os.path.join(os.getcwd(), "lowdim_regression_coefficients.list"), "w")
f.writelines(coefficients)
f.close()
@@ -56,10 +111,15 @@ def make_regression_dataset(high_dim_transform=True, n_features_low_dim=4, n_uni
# Expand to high dimensional problem.
# If not desired, we apply the identity transformation.
if high_dim_transform:
- features, transformation_matrix = _inverse_pca_dataset(features, n_high_dim, seed=seed)
+ features, transformation_matrix = _inverse_pca_dataset(
+ features, n_high_dim, seed=seed
+ )
else:
if not n_high_dim is None:
- raise ValueError("When no dimensionality expansion is performed, the number of high dimensional features should not be set.")
+ raise ValueError(
+ "When no dimensionality expansion is performed, the number of \
+ high dimensional features should not be set."
+ )
transformation_matrix = np.identity(n=n_features_low_dim)
# Add noise if specified.
@@ -72,40 +132,76 @@ def make_regression_dataset(high_dim_transform=True, n_features_low_dim=4, n_uni
# With SNR = 10^(SNR in dB / 10):
# <-> Noise Variance = E(S^2) / (10^(SNR in dB / 10))
#
- if noise_on_high_dim_snrdb == 0.0:
- raise ValueError("A SNR of zero equals infite noise. For no noise specify \'None\'.")
- signal_second_moments = np.mean(features ** 2, axis=0)
+ if not noise_on_high_dim_snrdb > 0:
+ raise ValueError(
+ "SNR has to be strictly positive. Remember, that a SNR of zero \
+ equals infitely large noise. For no noise specify 'None'."
+ )
+ signal_second_moments = np.mean(features**2, axis=0)
noise_variances = signal_second_moments / (10 ** (noise_on_high_dim_snrdb / 10))
noise_stds = np.sqrt(noise_variances)
np.random.seed(seed=seed)
- features += features + np.random.normal(loc=0.0, scale=noise_stds, size=features.shape)
+ features += features + np.random.normal(
+ loc=0.0, scale=noise_stds, size=features.shape
+ )
# Divide into test, train, validation.
- _, train_dataloader, _, test_dataloader, _, validation_dataloader = get_dataset_from_arrays(train_features=features[:n_train],
- train_outputs=output[:n_train],
- test_features=features[n_train:n_train + n_test],
- test_outputs=output[n_train:n_train + n_test],
- validation_features=features[n_train + n_test:],
- validation_outputs=output[n_train + n_test:],
- batch_size=batch_size)
+ (
+ _,
+ train_dataloader,
+ _,
+ test_dataloader,
+ _,
+ validation_dataloader,
+ ) = get_dataset_from_arrays(
+ train_features=features[:n_train],
+ train_outputs=output[:n_train],
+ test_features=features[n_train : n_train + n_test],
+ test_outputs=output[n_train : n_train + n_test],
+ validation_features=features[n_train + n_test :],
+ validation_outputs=output[n_train + n_test :],
+ batch_size=batch_size,
+ )
- return (train_dataloader, test_dataloader, validation_dataloader, transformation_matrix)
+ return (
+ train_dataloader,
+ test_dataloader,
+ validation_dataloader,
+ transformation_matrix,
+ )
-def _regression_dataset(n_features, n_samples, n_uninformative=0, noise_on_output=0.0, seed=None):
+
+def _regression_dataset(
+ n_features: int,
+ n_samples: int,
+ n_uninformative: int = 0,
+ noise_on_output: float = 0.0,
+ seed: int = None,
+) -> Tuple[NDArray, NDArray, NDArray]:
"""Generates a random regression dataset with n_features parameters.
- :param n_features: Number of coefficients of the regression problem /
- dimensions of the input.
- :param n_uninformative: Number of noise variables (uninformative variables).
- The uninformative variables are distributed similarly to the coefficients of
- the regression problem.
- :param n_samples: Number of samples generated for the regression problem.
- :param seed: If int the seed to generate a reproducible regression dataset.
- :return: Coefficients of the regression problem and generated samples.
- :rtype: Tuple of (inputs of the generated samples, outputs of the generated
- samples, coefficients of the regression problem used to generate the samples).
+ n_features describes the total number of features. If n_uninformative > 0,
+ not all of these features are relevant for the generated regression problem.
+ The regression coefficients are drawn standard-uniformly.
+
+ Args:
+ n_features (int): Number of coefficients of the regression problem /
+ dimensions of the input.
+ n_samples (int): Number of samples generated for the regression problem.
+ n_uninformative (int, optional): Number of noise variables (uninformative
+ variables). The uninformative variables are distributed similarly to
+ the coefficients of the regression problem. Defaults to 0.
+ noise_on_output (float, optional): std of gaussian noise applied to the
+ output. Defaults to 0.0.
+ seed (int, optional): If specified, the seed to generate a reproducible
+ regression dataset. Defaults to None.
+ Returns:
+ Tuple[NDArray, NDArray, NDArray]: Coefficients of the regression problem
+ and generated samples. Tuple of (inputs of the generated samples,
+ outputs of the generated samples, coefficients of the regression problem
+ used to generate the samples).
"""
# n_samples: Number of observations to estimate coefficients
@@ -115,35 +211,43 @@ def _regression_dataset(n_features, n_samples, n_uninformative=0, noise_on_outpu
# features that are used to build the linear model
# n_targets: Number of dimensions of the output vector
# noise: std of gaussian noise applied to output
- features, output, coef = make_regression( n_samples = n_samples,
- n_features = n_features + n_uninformative,
- n_informative = n_features,
- n_targets = 1,
- noise = noise_on_output,
- coef = True,
- random_state = seed)
+ features, output, coef = make_regression(
+ n_samples=n_samples,
+ n_features=n_features + n_uninformative,
+ n_informative=n_features,
+ n_targets=1,
+ noise=noise_on_output,
+ coef=True,
+ random_state=seed,
+ )
return (features, output, coef)
-def _inverse_pca_dataset(features, n_high_dim_features, seed=None):
- """Input data is interpreted as output of a random PCA, transforms the data
- with an inverse transformation of a random PCA.
- :param features: Dataset to transform to higher dimension. Rows are
- interpreted as observations and columns as input features.
- :param n_high_dim_features: Number of output features per observation.
- :param seed: If int the seed to generate a reproducible dataset.
- :return: Transformed dataset and matrix used for transformation.
- :rtype: Tuple of (transformed dataset, transformation matrix).
+def _inverse_pca_dataset(
+ features: NDArray, n_high_dim_features: int, seed: int = None
+) -> Tuple[NDArray, NDArray]:
+ """Input data is interpreted as output of a PCA. We perform an "inverse PCA"
+ with a random transformation matrix.
+
+ Args:
+ features (NDArray): Dataset to transform to higher dimension. Rows are
+ interpreted as observations and columns as input features.
+ n_high_dim_features (int): Number of output features per observation.
+ seed (int, optional): If specified, the seed to generate a reproducible
+ dataset. Defaults to None.
+ Returns:
+ Tuple[NDArray, NDArray]: Tuple of (transformed dataset, transformation matrix).
"""
+
# Generate transformation to higher dimension.
# For theoretical reasons the change of basis matrix has to have a left inverse,
# but when doing PCA this is guaranteed even if we only care about the first
# k eigen vectors - in this case, the first k columns.
n_low_dim_features = features.shape[1]
change_of_basis_matrix = random_orthogonal_matrix(n_high_dim_features, seed=seed)
- transformation_matrix = change_of_basis_matrix[:,:n_low_dim_features]
+ transformation_matrix = change_of_basis_matrix[:, :n_low_dim_features]
# Apply transformation to higher dimension to the observational data.
high_dim_features = _matrix_transform_dataset(features, transformation_matrix)
@@ -153,23 +257,28 @@ def _inverse_pca_dataset(features, n_high_dim_features, seed=None):
return (high_dim_features, transformation_matrix)
-def _matrix_transform_dataset(features, transformation_matrix):
+
+def _matrix_transform_dataset(
+ features: NDArray, transformation_matrix: NDArray
+) -> NDArray:
"""Applies a given transformation matrix A to a dataset.
- The transformation matrix is applied to the rows x of the dataset via Ax=x'.
- :param type features: Dataset to apply transformation to. Rows are
- interpreted as observations and columns as input features.
- :param type transformation_matrix: Matrix used for transformation.
- :return: Transformed input dataset.
- :rtype: np.array
+ The transformation matrix A is applied to the rows x of the dataset via Ax=x'.
+
+ Args:
+ features (NDArray): Dataset to apply transformation to. Rows are
+ interpreted as observations and columns as input features.
+ transformation_matrix (NDArray): Matrix used for transformation.
+ Returns:
+ NDArray: Transformed input dataset.
"""
# features: rows are observations, columns the features.
# (transformed features to higher dimension, matrix used for transformation)
n_dim_features = features.shape[1]
- assert(transformation_matrix.shape[1] == n_dim_features)
+ assert transformation_matrix.shape[1] == n_dim_features
# Apply transformation to higher dimension to the observational data.
# Instead of individually applying the change of basis matrix via B*x=x' we
diff --git a/regression_network.py b/regression_network.py
index d19acb5..ce818aa 100644
--- a/regression_network.py
+++ b/regression_network.py
@@ -1,10 +1,43 @@
+"""Example for a network that uses our feature steering method.
+
+Essentially, all of the method's magic happens in the feat_steering_loss(...)
+function, which calculates the feature steering portion of the loss that is
+later added to the standard maximum-likelihood loss.
+Implementation-wise, it is very important to make sure that the feature steering
+part of the loss is calculated in a differentiable manner. Depending on the
+types of variables, this can be difficult when estimating the CMI.
+"""
+
import torch
from torch import nn
+from torch.utils.data import DataLoader
from contextual_decomposition import get_cd_1d_by_modules
from mixed_cmi_estimator import mixed_cmi_model
class RegressionNetwork(nn.Module):
- def __init__(self, input_shape, n_hidden_layers=2, hidden_dim_size=32, device='cpu'):
+ def __init__(self, input_shape: torch.Tensor, n_hidden_layers:int =2, hidden_dim_size:int=32, device:str='cpu'):
+ """Creates a new RegressionNetwork.
+
+ The regression network consists of an input layer, one or more hidden
+ layers with the same size each and has a scalar output. All hidden layers
+ have ReLU activation function.
+ The linear layers are initialized with Xavier initialization for the
+ weights and zero initialization for the biases.
+
+ Args:
+ input_shape (torch.Tensor): Input shape of the network.
+ n_hidden_layers (int, optional): Number of hidden layers. Defaults
+ to 2.
+ hidden_dim_size (int, optional): Size of the hidden linear layers.
+ Defaults to 32.
+ device (str, optional): Device used by PyTorch to store tensors for
+ computation. Defaults to 'cpu'.
+
+ Raises:
+ ValueError: The RegressionNetwork has to have at least one hidden
+ layer.
+ """
+
super().__init__()
self.device = device
@@ -41,7 +74,34 @@ def __init__(self, input_shape, n_hidden_layers=2, hidden_dim_size=32, device='c
def forward(self, x):
return self.layers(x)
- def feat_steering_loss(self, inputs, targets, outputs, feat_steering_config=None):
+ def feat_steering_loss(self, inputs:torch.Tensor, targets: torch.Tensor, outputs: torch.Tensor, feat_steering_config:dict =None) -> torch.Tensor:
+ """Returns feature steering loss.
+
+ This function is where all the magic of our method takes place.
+
+ The feature steering part of the loss depends on the configuration provided in
+ feat_steering_config. Here, you can specify the steering mode (none,
+ loss_l1, loss_l1) and the feature attribution mode (contextual_decomposition,
+ cmi). If cmi is specified, the cmi estimate is transformed as described
+ in the paper. Also, you can specify which features shall be encouraged and
+ discouraged (each as a list of indices).
+
+ Args:
+ inputs (torch.Tensor): Inputs to the network.
+ targets (torch.Tensor): Targets for the specified inputs.
+ outputs (torch.Tensor): Outputs generated by the network for the
+ specified inputs.
+ feat_steering_config (dict, optional): Configuration how feature steering
+ shall be performed. For more details, see above. Defaults to None.
+
+ Raises:
+ ValueError: Invalid norm for feature steering.
+ ValueError: Invalid feature attribution mode.
+
+ Returns:
+ torch.Tensor: Differentiable feature steering loss per sample.
+ """
+
# Get configuration for feature steering.
# Do not perform feature steering if it is not desired.
if feat_steering_config["steering_mode"] == "none":
@@ -54,36 +114,32 @@ def feat_steering_loss(self, inputs, targets, outputs, feat_steering_config=None
# Feature attribution.
if feat_steering_config["attrib_mode"] == "contextual_decomposition":
- scores_feat_to_encourage, _ = get_cd_1d_by_modules(self, self.layers, inputs, feat_to_encourage, device=self.device)
- scores_feat_to_discourage, _ = get_cd_1d_by_modules(self, self.layers, inputs, feat_to_discourage, device=self.device)
+ scores_feat_to_encourage, _ = get_cd_1d_by_modules(self.layers, inputs, feat_to_encourage, device=self.device)
+ scores_feat_to_discourage, _ = get_cd_1d_by_modules(self.layers, inputs, feat_to_discourage, device=self.device)
elif feat_steering_config["attrib_mode"] == "cmi":
# Estimate CMI.
if len(feat_to_encourage) > 0:
scores_feat_to_encourage = torch.stack([mixed_cmi_model(inputs[:,feat], outputs, targets, feature_is_categorical=False, target_is_categorical=False) for feat in feat_to_encourage], 0)
- # scores_feat_to_encourage = torch.stack([get_continuous_cmi(inputs[:,feat], outputs, z=targets, knn=0.2, seed=42) for feat in feat_to_encourage], 0)
else:
scores_feat_to_encourage = torch.tensor([]).float()
if len(feat_to_discourage) > 0:
scores_feat_to_discourage = torch.stack([mixed_cmi_model(inputs[:,feat], outputs, targets, feature_is_categorical=False, target_is_categorical=False) for feat in feat_to_discourage], 0)
- # scores_feat_to_discourage = torch.stack([get_continuous_cmi(inputs[:,feat], outputs, z=targets, knn=0.2, seed=42) for feat in feat_to_discourage], 0)
else:
scores_feat_to_discourage = torch.tensor([]).float()
# Transform to [0,1].
# NOTE: Even though in theory CMI >= 0, in practice our estimates
- # can be smaler than zero.
- # Make sure that sqrt does not receive values < 0. In analogy to the
- # Straight-Through Estimators (STEs) we apply our transformation only
- # to inputs >= 0 and use the identity transformation for inputs < 0.
+ # can be smaler than zero. Therefore, we need to avoid passing
+ # values < 0 to the sqrt.
+ # In analogy to Straight-Through Estimators (STEs) we apply our
+ # transformation only to inputs > 0 and use the identity transformation
+ # for inputs <= 0.
scores_feat_to_encourage[scores_feat_to_encourage > 0] = torch.sqrt(1 - torch.exp(-2*scores_feat_to_encourage[scores_feat_to_encourage > 0]))
scores_feat_to_discourage[scores_feat_to_discourage > 0] = torch.sqrt(1 - torch.exp(-2*scores_feat_to_discourage[scores_feat_to_discourage > 0]))
- else:
- raise NotImplementedError("The selected feature attribution mode is not yet implemented!")
-
- # Small corrections:
- # If there are no features to en- or discourage, we can explicitly set their contribution to 0.
+ # Numerical stability: If there are no features to en- or discourage,
+ # we can explicitly set their contribution to 0.
if len(feat_to_encourage) == 0:
scores_feat_to_encourage = torch.tensor(0)
if len(feat_to_discourage) == 0:
@@ -91,34 +147,55 @@ def feat_steering_loss(self, inputs, targets, outputs, feat_steering_config=None
# Feature steering.
if feat_steering_config["attrib_mode"] == "cmi":
- # With the CMI estimates we can have negative values even though in theory CMI >= 0.
- # L1 / L2 would emphasize them, but we want values < 0 to result in a smaller loss.
- # L1-Loss:
- # We know that our values should be almost > 0. Therefore, we apply the absolute value
- # only to values >= 0 and the identity transformation to all others (analogous to
- # Straight-Through Estimators, keeps gradients).
- # In practice, this results in ignoring the absolute value.
- #
- # L2-Loss:
- # Here, we also only square for values >= 0 and the identity transformation to all
- # others.
+ # The transformed CMI estimates can be negative. Since applying
+ # L1 / L2 norm would emphasize them, we only apply the norm to
+ # transformed estimates >= 0.
+ # To all other values, similarly to above the identity transformation
+ # is applied (analogous to Straight-Through Estimators, keeps
+ # gradients).
+ # In practice, this means that for L1 norm we perform the identity
+ # transformation.
if feat_steering_config["steering_mode"] == "loss_l2":
scores_feat_to_encourage[scores_feat_to_encourage >= 0] = torch.square(scores_feat_to_encourage[scores_feat_to_encourage >= 0])
scores_feat_to_discourage[scores_feat_to_discourage >= 0] = torch.square(scores_feat_to_discourage[scores_feat_to_discourage >= 0])
return feat_steering_config["lambda"] * (torch.sum(scores_feat_to_discourage) - torch.sum(scores_feat_to_encourage)) / inputs.size()[0] # Average over Batch
+ # Apply weight factor lambda.
if feat_steering_config["lambda"] == 0:
return torch.tensor(0.0)
elif feat_steering_config["steering_mode"] == "loss_l1":
feat_steering_loss = feat_steering_config["lambda"] * (torch.sum(torch.abs(scores_feat_to_discourage)) - torch.sum(torch.abs(scores_feat_to_encourage)))
elif feat_steering_config["steering_mode"] == "loss_l2":
feat_steering_loss = feat_steering_config["lambda"] * (torch.sum(torch.square(scores_feat_to_discourage)) - torch.sum(torch.square(scores_feat_to_encourage)))
- else:
- raise NotImplementedError("The selected feature steering mode is not yet implemented!")
return feat_steering_loss / inputs.size()[0] # Average over Batch
- def loss(self, inputs, targets, outputs, feat_steering_config=None):
+ def loss(self, inputs:torch.Tensor, targets:torch.Tensor, outputs:torch.Tensor, feat_steering_config:dict=None) -> torch.Tensor:
+ """Loss function of the network.
+
+ The loss of the network is composed of the standard maximum-likelihood
+ loss and the feature steering portion of the loss.
+ feat_steering_config specifies how the feature steering portion of the
+ loss shall be calculated.
+
+ Args:
+ inputs (torch.Tensor): Inputs to the network.
+ targets (torch.Tensor): Targets for the specified inputs.
+ outputs (torch.Tensor): Outputs generated by the network for the
+ specified inputs.
+ feat_steering_config (dict, optional): Configuration how feature steering
+ shall be performed. If None, no feature steering is performed.
+ Defaults to None.
+
+ Raises:
+ ValueError: Feature steering portion of the loss is nan. Therefore,
+ this portion of the loss has no reasonable gradient, which would
+ cause problems in later steps of the training process.
+
+ Returns:
+ torch.Tensor: Total loss per sample.
+ """
+
# For MSE make sure that outputs is a 1D tensor. That is, we need to
# prevent tensors of shape torch.Size([batch_size, 1]).
if len(outputs.size()) > 1:
@@ -134,11 +211,31 @@ def loss(self, inputs, targets, outputs, feat_steering_config=None):
else:
feat_steering_loss = self.feat_steering_loss(inputs, targets, outputs, feat_steering_config=feat_steering_config)
if feat_steering_loss.isnan():
- raise ValueError("The feature steering loss of your model is nan. Thus, no reasonable gradient can be computed! \
- The feature steering config was: " + str(feat_steering_config) + ".")
+ raise ValueError("The feature steering loss of your model is nan.\
+ Thus, no reasonable gradient can be computed.")
return loss + feat_steering_loss
- def train(self, train_dataloader, feat_steering_config, epochs=90, learning_rate=0.01):
+ def train(self, train_dataloader: DataLoader, feat_steering_config:dict, epochs:int=90, learning_rate:float=0.01):
+ """Performs training of the RegressionNetwork.
+
+ The network is trained using PyTorch's AdamW optimizer with default
+ parameters except for the learning rate.
+
+ Args:
+ train_dataloader (DataLoader): Contains the samples used for training.
+ feat_steering_config (dict): Configuration how feature steering
+ shall be performed. If None, no feature steering is performed.
+ epochs (int, optional): Number of epochs the network is trained.
+ Defaults to 90.
+ learning_rate (float, optional): Learning rate. Defaults to 0.01.
+
+ Raises:
+ ValueError: If the output of the network contains nan for a given input,
+ no reasonable loss can be computed.
+ ValueError: If the loss of the network is infinity / nan, no reasonable
+ gradient can be computed for optimization.
+ """
+
optimizer = torch.optim.AdamW(self.layers.parameters(), lr=learning_rate)
for epoch in range(epochs):
@@ -161,10 +258,8 @@ def train(self, train_dataloader, feat_steering_config, epochs=90, learning_rate
loss = self.loss(inputs, targets, outputs, feat_steering_config=feat_steering_config)
# Perform backward pass and modify weights accordingly.
- if loss == torch.inf:
- raise ValueError("The loss of your model is inf. Thus, no reasonable gradient can be computed!")
- if loss.isnan():
- raise ValueError("The loss of your model is nan. Thus, no reasonable gradient can be computed!")
+ if loss == torch.inf or loss.isnan():
+ raise ValueError("The loss of your model is inf / nan. Thus, no reasonable gradient can be computed!")
loss.backward()
optimizer.step()