Skip to content

Commit

Permalink
Tuning for multiple columns part 3: Utility analysis for multiple agg…
Browse files Browse the repository at this point in the history
…regations (#525)
  • Loading branch information
dvadym committed Sep 12, 2024
1 parent 916bd8e commit b09c365
Show file tree
Hide file tree
Showing 14 changed files with 330 additions and 69 deletions.
10 changes: 6 additions & 4 deletions analysis/cross_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,16 @@ def _per_partition_to_utility_report(
# Fill metric errors.
metric_errors = None
if dp_metrics:
assert len(per_partition_utility.metric_errors) == len(dp_metrics)
# > is when there is analysis for SUM(column1), SUM(column2) etc
assert len(per_partition_utility.metric_errors) >= len(dp_metrics)
metric_errors = []
for metric_error, dp_metric in zip(per_partition_utility.metric_errors,
dp_metrics):
for metric_error in per_partition_utility.metric_errors:
metric_errors.append(
_sum_metrics_to_metric_utility(metric_error, dp_metric,
_sum_metrics_to_metric_utility(metric_error,
metric_error.aggregation,
prob_to_keep, partition_weight))

# configuration_index is set on the next stages
return metrics.UtilityReport(configuration_index=-1,
partitions_info=partition_metrics,
metric_errors=metric_errors)
Expand Down
56 changes: 44 additions & 12 deletions analysis/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import copy
import dataclasses
from typing import Iterable, Optional, Sequence, Union
from typing import Iterable, List, Optional, Sequence, Tuple, Union

import pipeline_dp
from pipeline_dp import input_validators
Expand Down Expand Up @@ -65,19 +65,41 @@ def __post_init__(self):
raise ValueError(
"All set attributes in MultiParameterConfiguration must have "
"the same length.")
self._size = sizes[0]
if (self.min_sum_per_partition is None) != (self.max_sum_per_partition
is None):
raise ValueError(
"MultiParameterConfiguration: min_sum_per_partition and "
"max_sum_per_partition must be both set or both None.")
self._size = sizes[0]
if self.min_sum_per_partition:
# If elements of min_sum_per_partition and max_sum_per_partition are
# sequences, all of them should have the same size.
def all_elements_are_lists(a: list):
return all([isinstance(b, Sequence) for b in a])

def common_value_len(a: list) -> Optional[int]:
sizes = [len(v) for v in a]
return min(sizes) if min(sizes) == max(sizes) else None

if all_elements_are_lists(
self.min_sum_per_partition) and all_elements_are_lists(
self.max_sum_per_partition):
# multi-column case. Check that each configuration has the
# same number of elements (i.e. columns)
size1 = common_value_len(self.min_sum_per_partition)
size2 = common_value_len(self.max_sum_per_partition)
if size1 is None or size2 is None or size1 != size2:
raise ValueError("If elements of min_sum_per_partition and "
"max_sum_per_partition are sequences, then"
" they must have the same length.")

@property
def size(self):
return self._size

def get_aggregate_params(self, params: pipeline_dp.AggregateParams,
index: int) -> pipeline_dp.AggregateParams:
def get_aggregate_params(
self, params: pipeline_dp.AggregateParams, index: int
) -> Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]:
"""Returns AggregateParams with the index-th parameters."""
params = copy.copy(params)
if self.max_partitions_contributed:
Expand All @@ -86,16 +108,23 @@ def get_aggregate_params(self, params: pipeline_dp.AggregateParams,
if self.max_contributions_per_partition:
params.max_contributions_per_partition = self.max_contributions_per_partition[
index]
if self.min_sum_per_partition:
params.min_sum_per_partition = self.min_sum_per_partition[index]
if self.max_sum_per_partition:
params.max_sum_per_partition = self.max_sum_per_partition[index]
if self.noise_kind:
params.noise_kind = self.noise_kind[index]
if self.partition_selection_strategy:
params.partition_selection_strategy = self.partition_selection_strategy[
index]
return params
min_max_sum = []
if self.min_sum_per_partition:
min_sum = self.min_sum_per_partition[index]
max_sum = self.max_sum_per_partition[index]
if isinstance(min_sum, Sequence):
min_max_sum = list(zip(min_sum, max_sum))
else:
min_max_sum = [[min_sum, max_sum]]
else:
min_max_sum.append(
[params.min_sum_per_partition, params.max_sum_per_partition])
return params, min_max_sum


@dataclasses.dataclass
Expand Down Expand Up @@ -124,12 +153,15 @@ def n_configurations(self):


def get_aggregate_params(
options: UtilityAnalysisOptions
) -> Iterable[pipeline_dp.AggregateParams]:
options: UtilityAnalysisOptions
) -> Iterable[Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]]:
"""Returns AggregateParams which are specified by UtilityAnalysisOptions."""
multi_param_configuration = options.multi_param_configuration
if multi_param_configuration is None:
yield options.aggregate_params
agg_params = options.aggregate_params
yield agg_params, [[
agg_params.min_sum_per_partition, agg_params.max_sum_per_partition
]]
else:
for i in range(multi_param_configuration.size):
yield multi_param_configuration.get_aggregate_params(
Expand Down
6 changes: 4 additions & 2 deletions analysis/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ class UtilityReportBin:
was computed. The metric can be COUNT, PRIVACY_ID_COUNT, SUM.
Attributes:
partition_size_from: lower bound of partitions size.
partition_size_to: upper bound of partitions size.
partition_size_from: lower bound of the number of privacy units in
partitions.
partition_size_to: upper (exclusive) bound of the number of privacy
units in partitions.
report: the result of utility analysis for partitions of size
[partition_size_from, partition_size_to).
"""
Expand Down
10 changes: 6 additions & 4 deletions analysis/parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ def _add_dp_strategy_to_multi_parameter_configuration(
noise_kind: Optional[pipeline_dp.NoiseKind],
strategy_selector: dp_strategy_selector.DPStrategySelector) -> None:
params = [
configuration.get_aggregate_params(blueprint_params, i)
# get_aggregate_params returns a tuple (AggregateParams,
# min_max_sum_per_partitions)
# for multi-columns. DP Strategy (i.e. noise_kind, partition_selection)
# is independent from min_max_sum_per_partitions, it's fine to just get
# the first element of AggregateParam
configuration.get_aggregate_params(blueprint_params, i)[0]
for i in range(configuration.size)
]
# Initialize fields corresponding to DP strategy configuration
Expand Down Expand Up @@ -452,9 +457,6 @@ def _check_tune_args(options: TuneOptions, is_public_partitions: bool):
# Empty metrics means that partition selection tuning is performed.
raise ValueError("Empty metrics means tuning of partition selection"
" but public partitions were provided.")
elif len(metrics) > 1:
raise ValueError(
f"Tuning supports only one metric, but {metrics} given.")
else: # len(metrics) == 1
if metrics[0] not in [
pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT,
Expand Down
32 changes: 26 additions & 6 deletions analysis/per_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import abc
import copy
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union
import numpy as np
import math

Expand All @@ -31,7 +31,7 @@

# It corresponds to the aggregating per (privacy_id, partition_key).
# (count, sum, num_partition_privacy_id_contributes).
PreaggregatedData = Tuple[int, float, int]
PreaggregatedData = Tuple[int, Union[float, Sequence[float]], int]


class UtilityAnalysisCombiner(pipeline_dp.Combiner):
Expand Down Expand Up @@ -236,15 +236,23 @@ class SumCombiner(UtilityAnalysisCombiner):
def __init__(self,
spec: budget_accounting.MechanismSpec,
params: pipeline_dp.AggregateParams,
metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM):
metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM,
i_column: Optional[int] = None):
self._spec = spec
self._params = copy.deepcopy(params)
self._params = params
self._metric = metric
self._i_column = i_column

def create_accumulator(
self, data: Tuple[np.ndarray, np.ndarray,
np.ndarray]) -> AccumulatorType:
count, partition_sum, n_partitions = data
if self._i_column is not None:
# When i_column is set, it means that this is a multi-column
# case and this combiner processes i-th column. The partition_sum
# will be a 2d np.array: n_examples*n_columns
# extract corresponding column in case of multi-column case.
partition_sum = partition_sum[:, self._i_column]
del count # not used for SumCombiner
min_bound = self._params.min_sum_per_partition
max_bound = self._params.max_sum_per_partition
Expand Down Expand Up @@ -375,16 +383,28 @@ class CompoundCombiner(pipeline_dp.combiners.CompoundCombiner):
# improvements, on converting from sparse to dense mode, the data are
# converted to NumPy arrays. And internal combiners perform NumPy vector
# aggregations.
SparseAccumulatorType = Tuple[List[int], List[float], List[int]]
SparseAccumulatorType = Tuple[List[int], Union[List[float],
List[Sequence[float]]],
List[int]]
DenseAccumulatorType = List[Any]
AccumulatorType = Tuple[Optional[SparseAccumulatorType],
Optional[DenseAccumulatorType]]

def __init__(self, combiners: Iterable['Combiner'],
n_sum_aggregations: int):
super().__init__(combiners, return_named_tuple=False)
self._n_sum_aggregations = n_sum_aggregations

def create_accumulator(self, data: PreaggregatedData) -> AccumulatorType:
if not data:
# Handle empty partitions. Only encountered when public partitions
# are used.
return (([0], [0], [0]), None)
return (([], [], []), None)
# if self._n_sum_aggregations > 1:
# empty_sum = [(0,) * self._n_sum_aggregations]
# else:
# empty_sum = [0]
# return (([0], empty_sum, [0]), None)
return (([data[0]], [data[1]], [data[2]]), None)

def _to_dense(self,
Expand Down
23 changes: 21 additions & 2 deletions analysis/tests/data_structures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ class MultiParameterConfiguration(parameterized.TestCase):
max_sum_per_partition=None,
noise_kind=None,
partition_selection_strategy=None),
dict(testcase_name="min_sum_per_partition and max_sum_per_partition"
"have different length elements",
error_msg="If elements of min_sum_per_partition and "
"max_sum_per_partition are sequences, then they must "
"have the same length.",
max_partitions_contributed=None,
max_contributions_per_partition=None,
min_sum_per_partition=[(1, 2), (1,)],
max_sum_per_partition=[(3, 5), (2, 2)],
noise_kind=None,
partition_selection_strategy=None),
)
def test_validation(self, error_msg, max_partitions_contributed,
max_contributions_per_partition, min_sum_per_partition,
Expand All @@ -95,18 +106,26 @@ def test_get_aggregate_params(self):
selection_strategy = [
pipeline_dp.PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING
] * 3
max_sum_per_partition = [(1, 2), (3, 4), (5, 6)]
min_sum_per_partition = [(0, 0), (0, 0), (0, 1)]
multi_params = analysis.MultiParameterConfiguration(
max_partitions_contributed=max_partitions_contributed,
noise_kind=noise_kind,
partition_selection_strategy=selection_strategy)
partition_selection_strategy=selection_strategy,
min_sum_per_partition=min_sum_per_partition,
max_sum_per_partition=max_sum_per_partition)
self.assertTrue(3, multi_params.size)

expected_min_max_sum = [[(0, 1), (0, 2)], [(0, 3), (0, 4)],
[(0, 5), (1, 6)]]

for i in range(multi_params.size):
ith_params = multi_params.get_aggregate_params(params, i)
ith_params, min_max = multi_params.get_aggregate_params(params, i)
params.max_partitions_contributed = max_partitions_contributed[i]
params.noise_kind = noise_kind[i]
params.partition_selection_strategy = selection_strategy[i]
self.assertEqual(params, ith_params)
self.assertEqual(min_max, expected_min_max_sum[i])


if __name__ == '__main__':
Expand Down
6 changes: 0 additions & 6 deletions analysis/tests/parameter_tuning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,12 +576,6 @@ def test_tune_privacy_id_count(self):
" public partitions were provided",
metrics=[],
is_public_partitions=True),
dict(testcase_name="Multiple metrics",
error_msg="Tuning supports only one metric",
metrics=[
pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT
],
is_public_partitions=True),
dict(testcase_name="Mean is not supported",
error_msg=
"Tuning is supported only for Count, Privacy id count and Sum",
Expand Down
Loading

0 comments on commit b09c365

Please sign in to comment.