From b09c365bf6d5799e155ed9212f9ea5d63d54411f Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko <53558779+dvadym@users.noreply.github.com> Date: Thu, 12 Sep 2024 15:38:41 +0200 Subject: [PATCH] Tuning for multiple columns part 3: Utility analysis for multiple aggregations (#525) --- analysis/cross_partition_combiners.py | 10 +- analysis/data_structures.py | 56 +++++++-- analysis/metrics.py | 6 +- analysis/parameter_tuning.py | 10 +- analysis/per_partition_combiners.py | 32 +++++- analysis/tests/data_structures_test.py | 23 +++- analysis/tests/parameter_tuning_test.py | 6 - .../tests/per_partition_combiners_test.py | 68 +++++++++-- .../tests/utility_analysis_engine_test.py | 106 +++++++++++++++++- analysis/tests/utility_analysis_test.py | 11 +- analysis/utility_analysis.py | 14 +-- analysis/utility_analysis_engine.py | 27 +++-- .../run_without_frameworks_tuning.py | 28 ++++- pipeline_dp/data_extractors.py | 2 +- 14 files changed, 330 insertions(+), 69 deletions(-) diff --git a/analysis/cross_partition_combiners.py b/analysis/cross_partition_combiners.py index d7372bea..9b60674f 100644 --- a/analysis/cross_partition_combiners.py +++ b/analysis/cross_partition_combiners.py @@ -208,14 +208,16 @@ def _per_partition_to_utility_report( # Fill metric errors. metric_errors = None if dp_metrics: - assert len(per_partition_utility.metric_errors) == len(dp_metrics) + # > is when there is analysis for SUM(column1), SUM(column2) etc + assert len(per_partition_utility.metric_errors) >= len(dp_metrics) metric_errors = [] - for metric_error, dp_metric in zip(per_partition_utility.metric_errors, - dp_metrics): + for metric_error in per_partition_utility.metric_errors: metric_errors.append( - _sum_metrics_to_metric_utility(metric_error, dp_metric, + _sum_metrics_to_metric_utility(metric_error, + metric_error.aggregation, prob_to_keep, partition_weight)) + # configuration_index is set on the next stages return metrics.UtilityReport(configuration_index=-1, partitions_info=partition_metrics, metric_errors=metric_errors) diff --git a/analysis/data_structures.py b/analysis/data_structures.py index ff01af02..9b285496 100644 --- a/analysis/data_structures.py +++ b/analysis/data_structures.py @@ -15,7 +15,7 @@ import copy import dataclasses -from typing import Iterable, Optional, Sequence, Union +from typing import Iterable, List, Optional, Sequence, Tuple, Union import pipeline_dp from pipeline_dp import input_validators @@ -65,19 +65,41 @@ def __post_init__(self): raise ValueError( "All set attributes in MultiParameterConfiguration must have " "the same length.") + self._size = sizes[0] if (self.min_sum_per_partition is None) != (self.max_sum_per_partition is None): raise ValueError( "MultiParameterConfiguration: min_sum_per_partition and " "max_sum_per_partition must be both set or both None.") - self._size = sizes[0] + if self.min_sum_per_partition: + # If elements of min_sum_per_partition and max_sum_per_partition are + # sequences, all of them should have the same size. + def all_elements_are_lists(a: list): + return all([isinstance(b, Sequence) for b in a]) + + def common_value_len(a: list) -> Optional[int]: + sizes = [len(v) for v in a] + return min(sizes) if min(sizes) == max(sizes) else None + + if all_elements_are_lists( + self.min_sum_per_partition) and all_elements_are_lists( + self.max_sum_per_partition): + # multi-column case. Check that each configuration has the + # same number of elements (i.e. columns) + size1 = common_value_len(self.min_sum_per_partition) + size2 = common_value_len(self.max_sum_per_partition) + if size1 is None or size2 is None or size1 != size2: + raise ValueError("If elements of min_sum_per_partition and " + "max_sum_per_partition are sequences, then" + " they must have the same length.") @property def size(self): return self._size - def get_aggregate_params(self, params: pipeline_dp.AggregateParams, - index: int) -> pipeline_dp.AggregateParams: + def get_aggregate_params( + self, params: pipeline_dp.AggregateParams, index: int + ) -> Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]: """Returns AggregateParams with the index-th parameters.""" params = copy.copy(params) if self.max_partitions_contributed: @@ -86,16 +108,23 @@ def get_aggregate_params(self, params: pipeline_dp.AggregateParams, if self.max_contributions_per_partition: params.max_contributions_per_partition = self.max_contributions_per_partition[ index] - if self.min_sum_per_partition: - params.min_sum_per_partition = self.min_sum_per_partition[index] - if self.max_sum_per_partition: - params.max_sum_per_partition = self.max_sum_per_partition[index] if self.noise_kind: params.noise_kind = self.noise_kind[index] if self.partition_selection_strategy: params.partition_selection_strategy = self.partition_selection_strategy[ index] - return params + min_max_sum = [] + if self.min_sum_per_partition: + min_sum = self.min_sum_per_partition[index] + max_sum = self.max_sum_per_partition[index] + if isinstance(min_sum, Sequence): + min_max_sum = list(zip(min_sum, max_sum)) + else: + min_max_sum = [[min_sum, max_sum]] + else: + min_max_sum.append( + [params.min_sum_per_partition, params.max_sum_per_partition]) + return params, min_max_sum @dataclasses.dataclass @@ -124,12 +153,15 @@ def n_configurations(self): def get_aggregate_params( - options: UtilityAnalysisOptions -) -> Iterable[pipeline_dp.AggregateParams]: + options: UtilityAnalysisOptions +) -> Iterable[Tuple[pipeline_dp.AggregateParams, List[Tuple[float, float]]]]: """Returns AggregateParams which are specified by UtilityAnalysisOptions.""" multi_param_configuration = options.multi_param_configuration if multi_param_configuration is None: - yield options.aggregate_params + agg_params = options.aggregate_params + yield agg_params, [[ + agg_params.min_sum_per_partition, agg_params.max_sum_per_partition + ]] else: for i in range(multi_param_configuration.size): yield multi_param_configuration.get_aggregate_params( diff --git a/analysis/metrics.py b/analysis/metrics.py index f48e35b7..c828fa6f 100644 --- a/analysis/metrics.py +++ b/analysis/metrics.py @@ -272,8 +272,10 @@ class UtilityReportBin: was computed. The metric can be COUNT, PRIVACY_ID_COUNT, SUM. Attributes: - partition_size_from: lower bound of partitions size. - partition_size_to: upper bound of partitions size. + partition_size_from: lower bound of the number of privacy units in + partitions. + partition_size_to: upper (exclusive) bound of the number of privacy + units in partitions. report: the result of utility analysis for partitions of size [partition_size_from, partition_size_to). """ diff --git a/analysis/parameter_tuning.py b/analysis/parameter_tuning.py index 2fac701d..b5408f9e 100644 --- a/analysis/parameter_tuning.py +++ b/analysis/parameter_tuning.py @@ -244,7 +244,12 @@ def _add_dp_strategy_to_multi_parameter_configuration( noise_kind: Optional[pipeline_dp.NoiseKind], strategy_selector: dp_strategy_selector.DPStrategySelector) -> None: params = [ - configuration.get_aggregate_params(blueprint_params, i) + # get_aggregate_params returns a tuple (AggregateParams, + # min_max_sum_per_partitions) + # for multi-columns. DP Strategy (i.e. noise_kind, partition_selection) + # is independent from min_max_sum_per_partitions, it's fine to just get + # the first element of AggregateParam + configuration.get_aggregate_params(blueprint_params, i)[0] for i in range(configuration.size) ] # Initialize fields corresponding to DP strategy configuration @@ -452,9 +457,6 @@ def _check_tune_args(options: TuneOptions, is_public_partitions: bool): # Empty metrics means that partition selection tuning is performed. raise ValueError("Empty metrics means tuning of partition selection" " but public partitions were provided.") - elif len(metrics) > 1: - raise ValueError( - f"Tuning supports only one metric, but {metrics} given.") else: # len(metrics) == 1 if metrics[0] not in [ pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT, diff --git a/analysis/per_partition_combiners.py b/analysis/per_partition_combiners.py index b5e91e02..7861c129 100644 --- a/analysis/per_partition_combiners.py +++ b/analysis/per_partition_combiners.py @@ -16,7 +16,7 @@ import abc import copy from dataclasses import dataclass -from typing import Any, List, Optional, Tuple +from typing import Any, Iterable, List, Optional, Sequence, Tuple, Union import numpy as np import math @@ -31,7 +31,7 @@ # It corresponds to the aggregating per (privacy_id, partition_key). # (count, sum, num_partition_privacy_id_contributes). -PreaggregatedData = Tuple[int, float, int] +PreaggregatedData = Tuple[int, Union[float, Sequence[float]], int] class UtilityAnalysisCombiner(pipeline_dp.Combiner): @@ -236,15 +236,23 @@ class SumCombiner(UtilityAnalysisCombiner): def __init__(self, spec: budget_accounting.MechanismSpec, params: pipeline_dp.AggregateParams, - metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM): + metric: pipeline_dp.Metrics = pipeline_dp.Metrics.SUM, + i_column: Optional[int] = None): self._spec = spec - self._params = copy.deepcopy(params) + self._params = params self._metric = metric + self._i_column = i_column def create_accumulator( self, data: Tuple[np.ndarray, np.ndarray, np.ndarray]) -> AccumulatorType: count, partition_sum, n_partitions = data + if self._i_column is not None: + # When i_column is set, it means that this is a multi-column + # case and this combiner processes i-th column. The partition_sum + # will be a 2d np.array: n_examples*n_columns + # extract corresponding column in case of multi-column case. + partition_sum = partition_sum[:, self._i_column] del count # not used for SumCombiner min_bound = self._params.min_sum_per_partition max_bound = self._params.max_sum_per_partition @@ -375,16 +383,28 @@ class CompoundCombiner(pipeline_dp.combiners.CompoundCombiner): # improvements, on converting from sparse to dense mode, the data are # converted to NumPy arrays. And internal combiners perform NumPy vector # aggregations. - SparseAccumulatorType = Tuple[List[int], List[float], List[int]] + SparseAccumulatorType = Tuple[List[int], Union[List[float], + List[Sequence[float]]], + List[int]] DenseAccumulatorType = List[Any] AccumulatorType = Tuple[Optional[SparseAccumulatorType], Optional[DenseAccumulatorType]] + def __init__(self, combiners: Iterable['Combiner'], + n_sum_aggregations: int): + super().__init__(combiners, return_named_tuple=False) + self._n_sum_aggregations = n_sum_aggregations + def create_accumulator(self, data: PreaggregatedData) -> AccumulatorType: if not data: # Handle empty partitions. Only encountered when public partitions # are used. - return (([0], [0], [0]), None) + return (([], [], []), None) + # if self._n_sum_aggregations > 1: + # empty_sum = [(0,) * self._n_sum_aggregations] + # else: + # empty_sum = [0] + # return (([0], empty_sum, [0]), None) return (([data[0]], [data[1]], [data[2]]), None) def _to_dense(self, diff --git a/analysis/tests/data_structures_test.py b/analysis/tests/data_structures_test.py index 07ede3b6..c8b08f7b 100644 --- a/analysis/tests/data_structures_test.py +++ b/analysis/tests/data_structures_test.py @@ -70,6 +70,17 @@ class MultiParameterConfiguration(parameterized.TestCase): max_sum_per_partition=None, noise_kind=None, partition_selection_strategy=None), + dict(testcase_name="min_sum_per_partition and max_sum_per_partition" + "have different length elements", + error_msg="If elements of min_sum_per_partition and " + "max_sum_per_partition are sequences, then they must " + "have the same length.", + max_partitions_contributed=None, + max_contributions_per_partition=None, + min_sum_per_partition=[(1, 2), (1,)], + max_sum_per_partition=[(3, 5), (2, 2)], + noise_kind=None, + partition_selection_strategy=None), ) def test_validation(self, error_msg, max_partitions_contributed, max_contributions_per_partition, min_sum_per_partition, @@ -95,18 +106,26 @@ def test_get_aggregate_params(self): selection_strategy = [ pipeline_dp.PartitionSelectionStrategy.GAUSSIAN_THRESHOLDING ] * 3 + max_sum_per_partition = [(1, 2), (3, 4), (5, 6)] + min_sum_per_partition = [(0, 0), (0, 0), (0, 1)] multi_params = analysis.MultiParameterConfiguration( max_partitions_contributed=max_partitions_contributed, noise_kind=noise_kind, - partition_selection_strategy=selection_strategy) + partition_selection_strategy=selection_strategy, + min_sum_per_partition=min_sum_per_partition, + max_sum_per_partition=max_sum_per_partition) self.assertTrue(3, multi_params.size) + expected_min_max_sum = [[(0, 1), (0, 2)], [(0, 3), (0, 4)], + [(0, 5), (1, 6)]] + for i in range(multi_params.size): - ith_params = multi_params.get_aggregate_params(params, i) + ith_params, min_max = multi_params.get_aggregate_params(params, i) params.max_partitions_contributed = max_partitions_contributed[i] params.noise_kind = noise_kind[i] params.partition_selection_strategy = selection_strategy[i] self.assertEqual(params, ith_params) + self.assertEqual(min_max, expected_min_max_sum[i]) if __name__ == '__main__': diff --git a/analysis/tests/parameter_tuning_test.py b/analysis/tests/parameter_tuning_test.py index c5093802..d63fc2f1 100644 --- a/analysis/tests/parameter_tuning_test.py +++ b/analysis/tests/parameter_tuning_test.py @@ -576,12 +576,6 @@ def test_tune_privacy_id_count(self): " public partitions were provided", metrics=[], is_public_partitions=True), - dict(testcase_name="Multiple metrics", - error_msg="Tuning supports only one metric", - metrics=[ - pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT - ], - is_public_partitions=True), dict(testcase_name="Mean is not supported", error_msg= "Tuning is supported only for Count, Privacy id count and Sum", diff --git a/analysis/tests/per_partition_combiners_test.py b/analysis/tests/per_partition_combiners_test.py index 876c44da..dbc385c7 100644 --- a/analysis/tests/per_partition_combiners_test.py +++ b/analysis/tests/per_partition_combiners_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """UtilityAnalysisCountCombinerTest.""" +import copy import dataclasses import numpy as np @@ -58,7 +59,7 @@ def _create_sparse_combiner_acc( return (counts, sums, n_partitions) -class UtilityAnalysisCountCombinerTest(parameterized.TestCase): +class CountCombinerTest(parameterized.TestCase): @parameterized.named_parameters( dict(testcase_name='empty', @@ -273,7 +274,7 @@ def _create_combiner_params_for_sum( )) -class UtilityAnalysisSumCombinerTest(parameterized.TestCase): +class SumCombinerTest(parameterized.TestCase): @parameterized.named_parameters( dict(testcase_name='empty', @@ -366,6 +367,23 @@ def test_merge(self): # Test that no type is np.float64 self.assertTrue(_check_none_are_np_float64(merged_acc)) + def test_create_accumulator_for_multi_columns(self): + params = _create_combiner_params_for_sum(0, 5) + combiner = combiners.SumCombiner(*params, i_column=1) + data = (np.array([1, 1]), np.array([[1, 10], + [2, 20]]), np.array([100, 150])) + partition_sum, clipping_to_min_error, clipping_to_max_error, expected_l0_bounding_error, var_cross_partition_error = combiner.create_accumulator( + data) + self.assertEqual(partition_sum, 30) + self.assertEqual(clipping_to_min_error, 0) + self.assertEqual(clipping_to_max_error, -20) + self.assertAlmostEqual(expected_l0_bounding_error, + -9.91666667, + delta=1e-8) + self.assertAlmostEqual(var_cross_partition_error, + 0.41305556, + delta=1e-8) + def _create_combiner_params_for_privacy_id_count() -> Tuple[ pipeline_dp.budget_accounting.MechanismSpec, pipeline_dp.AggregateParams]: @@ -381,7 +399,7 @@ def _create_combiner_params_for_privacy_id_count() -> Tuple[ )) -class UtilityAnalysisPrivacyIdCountCombinerTest(parameterized.TestCase): +class PrivacyIdCountCombinerTest(parameterized.TestCase): @parameterized.named_parameters( dict(testcase_name='empty', @@ -463,17 +481,36 @@ def test_merge(self): self.assertTrue(_check_none_are_np_float64(merged_acc)) -class UtilityAnalysisCompoundCombinerTest(parameterized.TestCase): +class CompoundCombinerTest(parameterized.TestCase): def _create_combiner(self) -> combiners.CompoundCombiner: mechanism_spec, params = _create_combiner_params_for_count() count_combiner = combiners.CountCombiner(mechanism_spec, params) return combiners.CompoundCombiner([count_combiner], - return_named_tuple=False) + n_sum_aggregations=0) + + def _create_combiner_2_columns(self) -> combiners.CompoundCombiner: + mechanism_spec, params1 = _create_combiner_params_for_sum(0, 1) + sum_combiner1 = combiners.SumCombiner(mechanism_spec, + params1, + i_column=0) + params2 = copy.deepcopy(params1) + params2.max_sum_per_partition = 5 + sum_combiner2 = combiners.SumCombiner(mechanism_spec, + params2, + i_column=1) + return combiners.CompoundCombiner([sum_combiner1, sum_combiner2], + n_sum_aggregations=2) + + def test_create_accumulator_empty_data_multi_columns(self): + + sparse, dense = self._create_combiner_2_columns().create_accumulator(()) + self.assertEqual(sparse, ([], [], [])) + self.assertIsNone(dense) def test_create_accumulator_empty_data(self): sparse, dense = self._create_combiner().create_accumulator(()) - self.assertEqual(sparse, ([0], [0], [0])) + self.assertEqual(sparse, ([], [], [])) self.assertIsNone(dense) def test_create_accumulator(self): @@ -485,6 +522,13 @@ def test_create_accumulator(self): self.assertEqual(([len(data)], [sum(data)], [n_partitions]), sparse) self.assertIsNone(dense) + def test_create_accumulator_2_sum_columns(self): + combiner = self._create_combiner_2_columns() + pre_aggregate_data = [1, [2, 3], 4] # count, sum, n_partitions + sparse, dense = combiner.create_accumulator(pre_aggregate_data) + self.assertEqual(([1], [[2, 3]], [4]), sparse) + self.assertIsNone(dense) + def test_to_dense(self): combiner = self._create_combiner() sparse_acc = ([1, 3], [10, 20], [100, 200]) @@ -493,6 +537,16 @@ def test_to_dense(self): self.assertEqual(2, num_privacy_ids) self.assertSequenceEqual((4, 0, -1.0, -2.98, 0.0298), count_acc) + def test_to_dense_2_columns(self): + combiner = self._create_combiner_2_columns() + sparse_acc = ([1, 3], [(10, 20), (100, 200)], [100, 200]) + dense = combiner._to_dense(sparse_acc) + num_privacy_ids, (sum1_acc, sum2_acc) = dense + self.assertEqual(2, num_privacy_ids) + self.assertSequenceEqual( + (110, 0, -108, -1.9849999999999999, 0.014875000000000001), sum1_acc) + self.assertSequenceEqual((220, 0, -210, -9.925, 0.371875), sum2_acc) + def test_merge_sparse(self): combiner = self._create_combiner() sparse_acc1 = ([1], [10], [100]) @@ -611,7 +665,7 @@ def test_two_internal_combiners(self): sum_mechanism_spec, sum_params = _create_combiner_params_for_sum(0, 5) sum_combiner = combiners.SumCombiner(sum_mechanism_spec, sum_params) combiner = combiners.CompoundCombiner([count_combiner, sum_combiner], - return_named_tuple=False) + n_sum_aggregations=1) data, n_partitions = [1, 2, 3], 100 acc = combiner.create_accumulator((len(data), sum(data), n_partitions)) diff --git a/analysis/tests/utility_analysis_engine_test.py b/analysis/tests/utility_analysis_engine_test.py index d0410b3f..8e001b13 100644 --- a/analysis/tests/utility_analysis_engine_test.py +++ b/analysis/tests/utility_analysis_engine_test.py @@ -259,7 +259,7 @@ def test_multi_parameters(self): [self.assertLen(partition_metrics, 2) for partition_metrics in output] expected_pk0 = [ - metrics.RawStatistics(privacy_id_count=2, count=1), + metrics.RawStatistics(privacy_id_count=1, count=1), metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, sum=1.0, clipping_to_min_error=0.0, @@ -278,7 +278,7 @@ def test_multi_parameters(self): noise_kind=pipeline_dp.NoiseKind.GAUSSIAN) ] expected_pk1 = [ - metrics.RawStatistics(privacy_id_count=2, count=2), + metrics.RawStatistics(privacy_id_count=1, count=2), metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, sum=2.0, clipping_to_min_error=0.0, @@ -395,6 +395,108 @@ def test_partition_sampling(self, mock_sampler_init): data_extractors=data_extractors) mock_sampler_init.assert_called_once_with(0.25) + def test_utility_analysis_for_2_columns(self): + # Arrange + aggregate_params = pipeline_dp.AggregateParams( + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, + metrics=[pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.SUM], + max_partitions_contributed=1, + max_contributions_per_partition=1, + max_sum_per_partition=0.5, + min_sum_per_partition=0) + + multi_param = analysis.MultiParameterConfiguration( + max_partitions_contributed=[1, 2], + min_sum_per_partition=[(0, 0), (0, 1)], + max_sum_per_partition=[(3, 10), (5, 20)], + ) + + budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, + total_delta=1e-10) + + engine = utility_analysis_engine.UtilityAnalysisEngine( + budget_accountant=budget_accountant, + backend=pipeline_dp.LocalBackend()) + + # Input collection has 2 privacy id, which contributes to 1 partition + # 2 and 1 times correspondingly. + input = [(0, "pk", 2, 3), (0, "pk", 0, 0), (1, "pk", 15, 20)] + data_extractors = pipeline_dp.data_extractors.MultiValueDataExtractors( + privacy_id_extractor=lambda x: x[0], + partition_extractor=lambda x: x[1], + value_extractors=[lambda x: x[2], lambda x: x[3]]) + + options = analysis.UtilityAnalysisOptions( + epsilon=1, + delta=0, + aggregate_params=aggregate_params, + multi_param_configuration=multi_param) + output = engine.analyze(input, + options=options, + data_extractors=data_extractors, + public_partitions=["pk"]) + + budget_accountant.compute_budgets() + + output = list(output) + self.assertLen(output, 1) + # Each partition has 2 metrics (for both parameter set). + [self.assertLen(partition_metrics, 2) for partition_metrics in output] + + expected = [ + metrics.RawStatistics(privacy_id_count=2, count=3), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.SUM, + sum=17, + clipping_to_min_error=0, + clipping_to_max_error=-12, + expected_l0_bounding_error=0.0, + std_l0_bounding_error=0.0, + std_noise=52.359375, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.SUM, + sum=23, + clipping_to_min_error=0, + clipping_to_max_error=-10, + expected_l0_bounding_error=0.0, + std_l0_bounding_error=0.0, + std_noise=174.53125, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, + sum=3, + clipping_to_min_error=0.0, + clipping_to_max_error=-1.0, + expected_l0_bounding_error=0.0, + std_l0_bounding_error=0.0, + std_noise=17.453125, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.SUM, + sum=17, + clipping_to_min_error=0, + clipping_to_max_error=-10, + expected_l0_bounding_error=0.0, + std_l0_bounding_error=0.0, + std_noise=123.41223040396463, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.SUM, + sum=23, + clipping_to_min_error=0, + clipping_to_max_error=0, + expected_l0_bounding_error=0, + std_l0_bounding_error=0.0, + std_noise=493.6489216158585, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT, + sum=3, + clipping_to_min_error=0.0, + clipping_to_max_error=-1.0, + expected_l0_bounding_error=0.0, + std_l0_bounding_error=0.0, + std_noise=24.682446080792925, + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN), + ] + + self.assertSequenceEqual(output[0][1], expected) + if __name__ == '__main__': absltest.main() diff --git a/analysis/tests/utility_analysis_test.py b/analysis/tests/utility_analysis_test.py index 961fc08e..c3dc83d7 100644 --- a/analysis/tests/utility_analysis_test.py +++ b/analysis/tests/utility_analysis_test.py @@ -179,9 +179,10 @@ def test_wo_public_partitions(self, pre_aggregated: bool): ]) expected_copy = copy.deepcopy(expected) expected.utility_report_histogram = [ - metrics.UtilityReportBin(partition_size_from=20, - partition_size_to=50, - report=expected_copy) + metrics.UtilityReportBin( + partition_size_from=10, # 10 privacy ids + partition_size_to=20, + report=expected_copy) ] common.assert_dataclasses_are_equal(self, report, expected) self.assertLen(per_partition_result, 10) @@ -322,9 +323,9 @@ def test_unnest_metrics(self): output = list(utility_analysis._unnest_metrics(input_data)) self.assertLen(output, 4) self.assertEqual(output[0], ((0, None), input_data[0])) - self.assertEqual(output[1], ((0, 100), input_data[0])) + self.assertEqual(output[1], ((0, 1), input_data[0])) self.assertEqual(output[2], ((1, None), input_data[1])) - self.assertEqual(output[3], ((1, 100), input_data[1])) + self.assertEqual(output[3], ((1, 1), input_data[1])) @parameterized.named_parameters( dict(testcase_name="without pre-threshold", diff --git a/analysis/utility_analysis.py b/analysis/utility_analysis.py index 4ac499db..779cc241 100644 --- a/analysis/utility_analysis.py +++ b/analysis/utility_analysis.py @@ -160,7 +160,9 @@ def _pack_per_partition_metrics( where each element corresponds to one of the configuration of the input parameters. """ - n_metrics = len(utility_result) // n_configurations + assert (len(utility_result) - 1) % n_configurations == 0 + n_metrics = (len(utility_result) - + 1) // n_configurations # -1 because of raw_statistics raw_statistics = utility_result[0] # Create 'result' with empty elements. @@ -200,13 +202,11 @@ def _unnest_metrics( """Unnests metrics from different configurations.""" for i, metric in enumerate(metrics): yield ((i, None), metric) - if metrics[0].metric_errors: - partition_size = metrics[0].metric_errors[0].sum - else: - # Select partitions case. - partition_size = metrics[0].raw_statistics.privacy_id_count - # Emits metrics for computing histogram by partition size. + # Choose bucket based on the number of privacy id count. + partition_size = metrics[0].raw_statistics.privacy_id_count bucket = _get_lower_bound(partition_size) + + # Emits metrics for computing histogram by partition size. yield ((i, bucket), metric) diff --git a/analysis/utility_analysis_engine.py b/analysis/utility_analysis_engine.py index 3c0309c8..b8c0e2d8 100644 --- a/analysis/utility_analysis_engine.py +++ b/analysis/utility_analysis_engine.py @@ -101,7 +101,9 @@ def _create_compound_combiner( ) -> combiners.CompoundCombiner: # Create Utility analysis combiners. internal_combiners = [per_partition_combiners.RawStatisticsCombiner()] - for params in data_structures.get_aggregate_params(self._options): + n_sum_aggregations = 0 + for params, min_max_sum_per_partition in data_structures.get_aggregate_params( + self._options): # Each parameter configuration has own BudgetAccountant which allows # different mechanisms to be used in different configurations. budget_accountant = copy.deepcopy(self._budget_accountant) @@ -123,24 +125,33 @@ def _create_compound_combiner( budget_accountant.request_budget( pipeline_dp.MechanismType.GENERIC), params)) if pipeline_dp.Metrics.SUM in aggregate_params.metrics: - internal_combiners.append( - per_partition_combiners.SumCombiner( - budget_accountant.request_budget(mechanism_type), - params)) + n_sum_aggregations = len(min_max_sum_per_partition) + for i_column, (min_sum, + max_sum) in enumerate(min_max_sum_per_partition): + if len(min_max_sum_per_partition) == 1: + i_column = None # 1 column, no need to set index + sum_params = copy.deepcopy(params) + sum_params.min_sum_per_partition = min_sum + sum_params.max_sum_per_partition = max_sum + internal_combiners.append( + per_partition_combiners.SumCombiner( + budget_accountant.request_budget(mechanism_type), + sum_params, + i_column=i_column)) if pipeline_dp.Metrics.COUNT in aggregate_params.metrics: internal_combiners.append( per_partition_combiners.CountCombiner( budget_accountant.request_budget(mechanism_type), - params)) + copy.deepcopy(params))) if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics: internal_combiners.append( per_partition_combiners.PrivacyIdCountCombiner( budget_accountant.request_budget(mechanism_type), - params)) + copy.deepcopy(params))) budget_accountant.compute_budgets() return per_partition_combiners.CompoundCombiner( - internal_combiners, return_named_tuple=False) + internal_combiners, n_sum_aggregations=n_sum_aggregations) def _select_private_partitions_internal( self, col, max_partitions_contributed: int, diff --git a/examples/restaurant_visits/run_without_frameworks_tuning.py b/examples/restaurant_visits/run_without_frameworks_tuning.py index de2a38ab..26688faf 100644 --- a/examples/restaurant_visits/run_without_frameworks_tuning.py +++ b/examples/restaurant_visits/run_without_frameworks_tuning.py @@ -40,6 +40,11 @@ 'If true, the data is preaggregated before tuning') flags.DEFINE_integer('pre_threshold', None, 'Pre threshold which is used in the DP aggregation') +flags.DEFINE_boolean( + 'multiple_metrics', False, + 'If True performs tuning for COUNT, PRIVACY_ID_COUNT and ' + ' SUM(spent_minutes), SUM(spent_money). Otherwise only for ' + 'COUNT') def write_to_file(col, filename): @@ -64,18 +69,33 @@ def load_data(input_file: str) -> list: def get_aggregate_params(): - # Limit contributions to 1 per partition, contribution error will be half of the count. + metrics = [pipeline_dp.Metrics.COUNT] + if FLAGS.multiple_metrics: + metrics.extend( + [pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.SUM]) return pipeline_dp.AggregateParams( noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, - metrics=[pipeline_dp.Metrics.COUNT], + metrics=metrics, max_partitions_contributed=1, max_contributions_per_partition=1, + max_sum_per_partition=1, + min_sum_per_partition=0, pre_threshold=FLAGS.pre_threshold) def get_data_extractors(): # Specify how to extract privacy_id, partition_key and value from an # element of restaurant_visits_rows. + if FLAGS.multiple_metrics: + return pipeline_dp.data_extractors.MultiValueDataExtractors( + partition_extractor=lambda row: row.day, + privacy_id_extractor=lambda row: row.user_id, + value_extractor=None, + value_extractors=[ + lambda row: row.spent_money, + lambda row: row.spent_minutes, + ], + ) return pipeline_dp.DataExtractors( partition_extractor=lambda row: row.day, privacy_id_extractor=lambda row: row.user_id, @@ -109,7 +129,9 @@ def tune_parameters(): minimizing_function = parameter_tuning.MinimizingFunction.ABSOLUTE_ERROR parameters_to_tune = parameter_tuning.ParametersToTune( - max_partitions_contributed=True, max_contributions_per_partition=True) + max_partitions_contributed=True, + max_contributions_per_partition=True, + max_sum_per_partition=True) tune_options = parameter_tuning.TuneOptions( epsilon=1, delta=1e-5, diff --git a/pipeline_dp/data_extractors.py b/pipeline_dp/data_extractors.py index 90e23a80..2839efe9 100644 --- a/pipeline_dp/data_extractors.py +++ b/pipeline_dp/data_extractors.py @@ -31,7 +31,7 @@ class DataExtractors: @dataclasses.dataclass -class MultiValueDataExtactors(DataExtractors): +class MultiValueDataExtractors(DataExtractors): """Data extractors with multiple value extractors. Each extractor corresponds to the different value to aggregate.