diff --git a/fiftyone/utils/beam.py b/fiftyone/utils/beam.py index d94f8ae2d53..182ef23ff49 100644 --- a/fiftyone/utils/beam.py +++ b/fiftyone/utils/beam.py @@ -5,13 +5,21 @@ | `voxel51.com `_ | """ +import inspect +import itertools import logging +import os + +from bson import ObjectId import numpy as np +from tqdm.auto import tqdm import fiftyone.core.dataset as fod import fiftyone.core.utils as fou import fiftyone.core.view as fov +import fiftyone.operators.store as foos +os.environ["GRPC_VERBOSITY"] = "NONE" fou.ensure_import("apache_beam") import apache_beam as beam @@ -338,6 +346,255 @@ def beam_export( ) +def beam_map( + sample_collection, + map_fcn, + reduce_fcn=None, + aggregate_fcn=None, + save=False, + shard_size=None, + shard_method="id", + num_workers=None, + options=None, + progress=False, + verbose=False, +): + """Applies the given function to each sample in the collection via + `Apache Beam `_. + + When only a ``map_fcn`` is provided, this function effectively performs + the following map operation with the outer loop in parallel:: + + for batch_view in fou.iter_slices(sample_collection, shard_size): + for sample in batch_view.iter_samples(autosave=save): + map_fcn(sample) + + When a ``reduce_fcn`` is provided, this function effectively performs the + following map-reduce operation with the outer loop in parallel: + + reducer = reduce_fcn(...) + + for batch_view in fou.iter_slices(sample_collection, shard_size): + for sample in batch_view.iter_samples(autosave=save): + sample_output = sample.map_fcn(sample) + + # Outputs are fed to reducer.add_input() + yield sample.id, sample_output + + output = reducer.extract_output(...) + + When a ``aggregate_fcn`` is provided, this function effectively performs + the following map-aggregate operation with the outer loop in parallel:: + + outputs = {} + + for batch_view in fou.iter_slices(sample_collection, shard_size): + for sample in batch_view.iter_samples(autosave=save): + outputs[sample.id] = map_fcn(sample) + + output = aggregate_fcn(sample_collection, outputs) + + Example:: + + import fiftyone as fo + import fiftyone.utils.beam as foub + import fiftyone.zoo as foz + + dataset = foz.load_zoo_dataset("cifar10", split="train") + view = dataset.select_fields("ground_truth") + + # + # Example 1: map-save + # + + def map_fcn(sample): + sample.ground_truth.label = sample.ground_truth.label.upper() + + foub.beam_map(view, map_fcn, save=True, progress=True) + + print(dataset.count_values("ground_truth.label")) + + # + # Example 2: map-reduce + # + + def map_fcn(sample): + return sample.ground_truth.label.lower() + + class ReduceFcn(foub.ReduceFcn): + def create_accumulator(self): + from collections import Counter + return Counter() + + def add_input(self, accumulator, input): + sample_id, value = input + accumulator[value] += 1 + return accumulator + + def merge_accumulators(self, accumulators): + from collections import Counter + accumulator = Counter() + for a in accumulators: + accumulator.update(a) + return accumulator + + def extract_output(self, accumulator): + counts = dict(accumulator) + self._store_output(counts) + + counts = foub.beam_map(view, map_fcn, reduce_fcn=ReduceFcn, progress=True) + print(counts) + + # + # Example 3: map-aggregate + # + + def map_fcn(sample): + return sample.ground_truth.label.lower() + + def aggregate_fcn(sample_collection, values): + from collections import Counter + return dict(Counter(values.values())) + + counts = foub.beam_map(view, map_fcn, aggregate_fcn=aggregate_fcn, progress=True) + print(counts) + + Args: + sample_collection: a + :class:`fiftyone.core.collections.SampleCollection` + map_fcn: a function to apply to each sample + reduce_fcn (None): an optional :class:`fiftyone.utils.beam.ReduceFcn` + to reduce the map outputs. See above for usage information + aggregate_fcn (None): an optional function to aggregate the map + outputs. See above for usage information + save (False): whether to save any sample edits applied by ``map_fcn`` + shard_size (None): an optional number of samples to distribute to each + worker at a time. By default, samples are evenly distributed to + workers with one shard per worker + shard_method ("id"): whether to use IDs (``"id"``) or slices + (``"slice"``) to assign samples to workers + num_workers (None): the number of workers to use when no ``options`` + are provided. The default is + :meth:`fiftyone.core.utils.recommend_process_pool_workers` + options (None): a + ``apache_beam.options.pipeline_options.PipelineOptions`` that + configures how to run the pipeline. By default, the pipeline will + be run via Beam's direct runner using multiprocessing with + ``num_workers`` workers + progress (False): whether to render progress bar(s) for each worker + verbose (False): whether to log the Beam pipeline's messages + + Returns: + the output of ``reduce_fcn`` or ``aggregate_fcn`` if provided, else + None + """ + if shard_method == "slice": + n = len(sample_collection) + else: + ids = sample_collection.values("id") + n = len(ids) + + if num_workers is None: + num_workers = fou.recommend_process_pool_workers() + + # Must cap size of select(ids) stages + if shard_method == "id": + max_shard_size = fou.recommend_batch_size_for_value( + ObjectId(), max_size=100000 + ) + + if shard_size is not None: + shard_size = min(shard_size, max_shard_size) + elif n > num_workers * max_shard_size: + shard_size = max_shard_size + + if shard_size is not None: + # Fixed size shards + edges = list(range(0, n + 1, shard_size)) + if edges[-1] < n: + edges.append(n) + else: + # Split collection into exactly `num_workers` shards + edges = [int(round(b)) for b in np.linspace(0, n, num_workers + 1)] + + if shard_method == "slice": + # Slice batches + slices = list(zip(edges[:-1], edges[1:])) + else: + # ID batches + slices = [ids[i:j] for i, j in zip(edges[:-1], edges[1:])] + + num_shards = len(slices) + batches = list( + zip( + range(num_shards), + itertools.repeat(num_shards), + slices, + ) + ) + + if isinstance(sample_collection, fov.DatasetView): + dataset_name = sample_collection._root_dataset.name + view_stages = sample_collection._serialize() + else: + dataset_name = sample_collection.name + view_stages = None + + has_reducer = reduce_fcn is not None or aggregate_fcn is not None + + map_batch = MapBatch( + dataset_name, + map_fcn, + view_stages=view_stages, + save=save, + return_outputs=has_reducer, + progress=progress, + ) + + if has_reducer: + result_key = f"beam_map_{str(ObjectId())}" + + if reduce_fcn is not None: + reduce_cls = reduce_fcn + else: + reduce_cls = ReduceFcn + + reduce_fn = reduce_cls( + dataset_name, + view_stages=view_stages, + aggregate_fcn=aggregate_fcn, + result_key=result_key, + ) + else: + result_key = None + reduce_fn = None + + if options is None: + options = PipelineOptions( + runner="direct", + direct_num_workers=min(num_workers, num_shards), + direct_running_mode="multi_processing", + ) + + logger = logging.getLogger() + level = logger.level if verbose else logging.CRITICAL + with fou.SetAttributes(logger, level=level): + with beam.Pipeline(options=options) as pipeline: + pcoll = ( + pipeline + | "InitMap" >> beam.Create(batches) + | "MapBatches" >> beam.ParDo(map_batch) + ) + + if reduce_fn is not None: + _ = pcoll | "ReduceFcn" >> beam.CombineGlobally(reduce_fn) + + sample_collection.reload() + + if result_key is not None: + return _get_key(sample_collection, result_key) + + class ImportBatch(beam.DoFn): def __init__( self, @@ -508,6 +765,144 @@ def process(self, element, **kwargs): self._sample_collection[start:stop].export(**kwargs) +class MapBatch(beam.DoFn): + def __init__( + self, + dataset_name, + map_fcn, + view_stages=None, + save=False, + return_outputs=True, + progress=False, + ): + self.dataset_name = dataset_name + self.map_fcn = map_fcn + self.view_stages = view_stages + self.save = save + self.return_outputs = return_outputs + self.progress = progress + self._sample_collection = None + + def setup(self): + import fiftyone as fo + import fiftyone.core.view as fov + + dataset = fo.load_dataset(self.dataset_name) + + if self.view_stages: + sample_collection = fov.DatasetView._build( + dataset, self.view_stages + ) + else: + sample_collection = dataset + + self._sample_collection = sample_collection + + def process(self, element, **kwargs): + i, num_batches, batch = element + + if isinstance(batch, tuple): + # Slice batches + start, stop = batch + total = stop - start + batch_view = self._sample_collection[start:stop] + else: + # ID batches + sample_ids = batch + total = len(sample_ids) + batch_view = fov.make_optimized_select_view( + self._sample_collection, sample_ids + ) + + if self.progress: + desc = f"Batch {i + 1:0{len(str(num_batches))}}/{num_batches}" + with tqdm(total=total, desc=desc, position=i) as pbar: + for sample in batch_view.iter_samples(autosave=self.save): + result = self.map_fcn(sample) + if self.return_outputs: + yield sample.id, result + + pbar.update() + else: + for sample in batch_view.iter_samples(autosave=self.save): + result = self.map_fcn(sample) + if self.return_outputs: + yield sample.id, result + + +class ReduceFcn(beam.CombineFn): + def __init__( + self, + dataset_name, + view_stages=None, + aggregate_fcn=None, + result_key=None, + ): + self.dataset_name = dataset_name + self.view_stages = view_stages + self.aggregate_fcn = aggregate_fcn + self.result_key = result_key + + self._sample_collection = None + + def setup(self): + import fiftyone as fo + import fiftyone.core.view as fov + + dataset = fo.load_dataset(self.dataset_name) + + if self.view_stages: + sample_collection = fov.DatasetView._build( + dataset, self.view_stages + ) + else: + sample_collection = dataset + + self._sample_collection = sample_collection + + def create_accumulator(self): + return {} + + def add_input(self, accumulator, input): + sample_id, value = input + accumulator[sample_id] = value + return accumulator + + def merge_accumulators(self, accumulators): + accumulator = {} + for a in accumulators: + accumulator.update(a) + return accumulator + + def extract_output(self, accumulator): + if self.aggregate_fcn is None: + return + + output = self.aggregate_fcn(self._sample_collection, accumulator) + if output is None: + return + + self._store_output(output) + + def _store_output(self, output): + if self.result_key is None: + return + + _set_key(self._sample_collection, self.result_key, output) + + +def _set_key(sample_collection, key, value, ttl=60): + dataset_id = sample_collection._root_dataset._doc.id + store = foos.ExecutionStore.create("beam", dataset_id=dataset_id) + store.set(key, value, ttl=ttl) + + +def _get_key(sample_collection, key): + dataset_id = sample_collection._root_dataset._doc.id + store = foos.ExecutionStore.create("beam", dataset_id=dataset_id) + return store.get(key) + + def _pop_first(x): xi = iter(x) diff --git a/setup.py b/setup.py index e359dc4f874..73f04a8521f 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def get_version(): "starlette>=0.24.0", "strawberry-graphql", "tabulate", + "tqdm", "xmltodict", "universal-analytics-python3>=1.0.1,<2", "pydash",