diff --git a/fiftyone/utils/beam.py b/fiftyone/utils/beam.py
index d94f8ae2d53..182ef23ff49 100644
--- a/fiftyone/utils/beam.py
+++ b/fiftyone/utils/beam.py
@@ -5,13 +5,21 @@
| `voxel51.com `_
|
"""
+import inspect
+import itertools
import logging
+import os
+
+from bson import ObjectId
import numpy as np
+from tqdm.auto import tqdm
import fiftyone.core.dataset as fod
import fiftyone.core.utils as fou
import fiftyone.core.view as fov
+import fiftyone.operators.store as foos
+os.environ["GRPC_VERBOSITY"] = "NONE"
fou.ensure_import("apache_beam")
import apache_beam as beam
@@ -338,6 +346,255 @@ def beam_export(
)
+def beam_map(
+ sample_collection,
+ map_fcn,
+ reduce_fcn=None,
+ aggregate_fcn=None,
+ save=False,
+ shard_size=None,
+ shard_method="id",
+ num_workers=None,
+ options=None,
+ progress=False,
+ verbose=False,
+):
+ """Applies the given function to each sample in the collection via
+ `Apache Beam `_.
+
+ When only a ``map_fcn`` is provided, this function effectively performs
+ the following map operation with the outer loop in parallel::
+
+ for batch_view in fou.iter_slices(sample_collection, shard_size):
+ for sample in batch_view.iter_samples(autosave=save):
+ map_fcn(sample)
+
+ When a ``reduce_fcn`` is provided, this function effectively performs the
+ following map-reduce operation with the outer loop in parallel:
+
+ reducer = reduce_fcn(...)
+
+ for batch_view in fou.iter_slices(sample_collection, shard_size):
+ for sample in batch_view.iter_samples(autosave=save):
+ sample_output = sample.map_fcn(sample)
+
+ # Outputs are fed to reducer.add_input()
+ yield sample.id, sample_output
+
+ output = reducer.extract_output(...)
+
+ When a ``aggregate_fcn`` is provided, this function effectively performs
+ the following map-aggregate operation with the outer loop in parallel::
+
+ outputs = {}
+
+ for batch_view in fou.iter_slices(sample_collection, shard_size):
+ for sample in batch_view.iter_samples(autosave=save):
+ outputs[sample.id] = map_fcn(sample)
+
+ output = aggregate_fcn(sample_collection, outputs)
+
+ Example::
+
+ import fiftyone as fo
+ import fiftyone.utils.beam as foub
+ import fiftyone.zoo as foz
+
+ dataset = foz.load_zoo_dataset("cifar10", split="train")
+ view = dataset.select_fields("ground_truth")
+
+ #
+ # Example 1: map-save
+ #
+
+ def map_fcn(sample):
+ sample.ground_truth.label = sample.ground_truth.label.upper()
+
+ foub.beam_map(view, map_fcn, save=True, progress=True)
+
+ print(dataset.count_values("ground_truth.label"))
+
+ #
+ # Example 2: map-reduce
+ #
+
+ def map_fcn(sample):
+ return sample.ground_truth.label.lower()
+
+ class ReduceFcn(foub.ReduceFcn):
+ def create_accumulator(self):
+ from collections import Counter
+ return Counter()
+
+ def add_input(self, accumulator, input):
+ sample_id, value = input
+ accumulator[value] += 1
+ return accumulator
+
+ def merge_accumulators(self, accumulators):
+ from collections import Counter
+ accumulator = Counter()
+ for a in accumulators:
+ accumulator.update(a)
+ return accumulator
+
+ def extract_output(self, accumulator):
+ counts = dict(accumulator)
+ self._store_output(counts)
+
+ counts = foub.beam_map(view, map_fcn, reduce_fcn=ReduceFcn, progress=True)
+ print(counts)
+
+ #
+ # Example 3: map-aggregate
+ #
+
+ def map_fcn(sample):
+ return sample.ground_truth.label.lower()
+
+ def aggregate_fcn(sample_collection, values):
+ from collections import Counter
+ return dict(Counter(values.values()))
+
+ counts = foub.beam_map(view, map_fcn, aggregate_fcn=aggregate_fcn, progress=True)
+ print(counts)
+
+ Args:
+ sample_collection: a
+ :class:`fiftyone.core.collections.SampleCollection`
+ map_fcn: a function to apply to each sample
+ reduce_fcn (None): an optional :class:`fiftyone.utils.beam.ReduceFcn`
+ to reduce the map outputs. See above for usage information
+ aggregate_fcn (None): an optional function to aggregate the map
+ outputs. See above for usage information
+ save (False): whether to save any sample edits applied by ``map_fcn``
+ shard_size (None): an optional number of samples to distribute to each
+ worker at a time. By default, samples are evenly distributed to
+ workers with one shard per worker
+ shard_method ("id"): whether to use IDs (``"id"``) or slices
+ (``"slice"``) to assign samples to workers
+ num_workers (None): the number of workers to use when no ``options``
+ are provided. The default is
+ :meth:`fiftyone.core.utils.recommend_process_pool_workers`
+ options (None): a
+ ``apache_beam.options.pipeline_options.PipelineOptions`` that
+ configures how to run the pipeline. By default, the pipeline will
+ be run via Beam's direct runner using multiprocessing with
+ ``num_workers`` workers
+ progress (False): whether to render progress bar(s) for each worker
+ verbose (False): whether to log the Beam pipeline's messages
+
+ Returns:
+ the output of ``reduce_fcn`` or ``aggregate_fcn`` if provided, else
+ None
+ """
+ if shard_method == "slice":
+ n = len(sample_collection)
+ else:
+ ids = sample_collection.values("id")
+ n = len(ids)
+
+ if num_workers is None:
+ num_workers = fou.recommend_process_pool_workers()
+
+ # Must cap size of select(ids) stages
+ if shard_method == "id":
+ max_shard_size = fou.recommend_batch_size_for_value(
+ ObjectId(), max_size=100000
+ )
+
+ if shard_size is not None:
+ shard_size = min(shard_size, max_shard_size)
+ elif n > num_workers * max_shard_size:
+ shard_size = max_shard_size
+
+ if shard_size is not None:
+ # Fixed size shards
+ edges = list(range(0, n + 1, shard_size))
+ if edges[-1] < n:
+ edges.append(n)
+ else:
+ # Split collection into exactly `num_workers` shards
+ edges = [int(round(b)) for b in np.linspace(0, n, num_workers + 1)]
+
+ if shard_method == "slice":
+ # Slice batches
+ slices = list(zip(edges[:-1], edges[1:]))
+ else:
+ # ID batches
+ slices = [ids[i:j] for i, j in zip(edges[:-1], edges[1:])]
+
+ num_shards = len(slices)
+ batches = list(
+ zip(
+ range(num_shards),
+ itertools.repeat(num_shards),
+ slices,
+ )
+ )
+
+ if isinstance(sample_collection, fov.DatasetView):
+ dataset_name = sample_collection._root_dataset.name
+ view_stages = sample_collection._serialize()
+ else:
+ dataset_name = sample_collection.name
+ view_stages = None
+
+ has_reducer = reduce_fcn is not None or aggregate_fcn is not None
+
+ map_batch = MapBatch(
+ dataset_name,
+ map_fcn,
+ view_stages=view_stages,
+ save=save,
+ return_outputs=has_reducer,
+ progress=progress,
+ )
+
+ if has_reducer:
+ result_key = f"beam_map_{str(ObjectId())}"
+
+ if reduce_fcn is not None:
+ reduce_cls = reduce_fcn
+ else:
+ reduce_cls = ReduceFcn
+
+ reduce_fn = reduce_cls(
+ dataset_name,
+ view_stages=view_stages,
+ aggregate_fcn=aggregate_fcn,
+ result_key=result_key,
+ )
+ else:
+ result_key = None
+ reduce_fn = None
+
+ if options is None:
+ options = PipelineOptions(
+ runner="direct",
+ direct_num_workers=min(num_workers, num_shards),
+ direct_running_mode="multi_processing",
+ )
+
+ logger = logging.getLogger()
+ level = logger.level if verbose else logging.CRITICAL
+ with fou.SetAttributes(logger, level=level):
+ with beam.Pipeline(options=options) as pipeline:
+ pcoll = (
+ pipeline
+ | "InitMap" >> beam.Create(batches)
+ | "MapBatches" >> beam.ParDo(map_batch)
+ )
+
+ if reduce_fn is not None:
+ _ = pcoll | "ReduceFcn" >> beam.CombineGlobally(reduce_fn)
+
+ sample_collection.reload()
+
+ if result_key is not None:
+ return _get_key(sample_collection, result_key)
+
+
class ImportBatch(beam.DoFn):
def __init__(
self,
@@ -508,6 +765,144 @@ def process(self, element, **kwargs):
self._sample_collection[start:stop].export(**kwargs)
+class MapBatch(beam.DoFn):
+ def __init__(
+ self,
+ dataset_name,
+ map_fcn,
+ view_stages=None,
+ save=False,
+ return_outputs=True,
+ progress=False,
+ ):
+ self.dataset_name = dataset_name
+ self.map_fcn = map_fcn
+ self.view_stages = view_stages
+ self.save = save
+ self.return_outputs = return_outputs
+ self.progress = progress
+ self._sample_collection = None
+
+ def setup(self):
+ import fiftyone as fo
+ import fiftyone.core.view as fov
+
+ dataset = fo.load_dataset(self.dataset_name)
+
+ if self.view_stages:
+ sample_collection = fov.DatasetView._build(
+ dataset, self.view_stages
+ )
+ else:
+ sample_collection = dataset
+
+ self._sample_collection = sample_collection
+
+ def process(self, element, **kwargs):
+ i, num_batches, batch = element
+
+ if isinstance(batch, tuple):
+ # Slice batches
+ start, stop = batch
+ total = stop - start
+ batch_view = self._sample_collection[start:stop]
+ else:
+ # ID batches
+ sample_ids = batch
+ total = len(sample_ids)
+ batch_view = fov.make_optimized_select_view(
+ self._sample_collection, sample_ids
+ )
+
+ if self.progress:
+ desc = f"Batch {i + 1:0{len(str(num_batches))}}/{num_batches}"
+ with tqdm(total=total, desc=desc, position=i) as pbar:
+ for sample in batch_view.iter_samples(autosave=self.save):
+ result = self.map_fcn(sample)
+ if self.return_outputs:
+ yield sample.id, result
+
+ pbar.update()
+ else:
+ for sample in batch_view.iter_samples(autosave=self.save):
+ result = self.map_fcn(sample)
+ if self.return_outputs:
+ yield sample.id, result
+
+
+class ReduceFcn(beam.CombineFn):
+ def __init__(
+ self,
+ dataset_name,
+ view_stages=None,
+ aggregate_fcn=None,
+ result_key=None,
+ ):
+ self.dataset_name = dataset_name
+ self.view_stages = view_stages
+ self.aggregate_fcn = aggregate_fcn
+ self.result_key = result_key
+
+ self._sample_collection = None
+
+ def setup(self):
+ import fiftyone as fo
+ import fiftyone.core.view as fov
+
+ dataset = fo.load_dataset(self.dataset_name)
+
+ if self.view_stages:
+ sample_collection = fov.DatasetView._build(
+ dataset, self.view_stages
+ )
+ else:
+ sample_collection = dataset
+
+ self._sample_collection = sample_collection
+
+ def create_accumulator(self):
+ return {}
+
+ def add_input(self, accumulator, input):
+ sample_id, value = input
+ accumulator[sample_id] = value
+ return accumulator
+
+ def merge_accumulators(self, accumulators):
+ accumulator = {}
+ for a in accumulators:
+ accumulator.update(a)
+ return accumulator
+
+ def extract_output(self, accumulator):
+ if self.aggregate_fcn is None:
+ return
+
+ output = self.aggregate_fcn(self._sample_collection, accumulator)
+ if output is None:
+ return
+
+ self._store_output(output)
+
+ def _store_output(self, output):
+ if self.result_key is None:
+ return
+
+ _set_key(self._sample_collection, self.result_key, output)
+
+
+def _set_key(sample_collection, key, value, ttl=60):
+ dataset_id = sample_collection._root_dataset._doc.id
+ store = foos.ExecutionStore.create("beam", dataset_id=dataset_id)
+ store.set(key, value, ttl=ttl)
+
+
+def _get_key(sample_collection, key):
+ dataset_id = sample_collection._root_dataset._doc.id
+ store = foos.ExecutionStore.create("beam", dataset_id=dataset_id)
+ return store.get(key)
+
+
def _pop_first(x):
xi = iter(x)
diff --git a/setup.py b/setup.py
index e359dc4f874..73f04a8521f 100644
--- a/setup.py
+++ b/setup.py
@@ -70,6 +70,7 @@ def get_version():
"starlette>=0.24.0",
"strawberry-graphql",
"tabulate",
+ "tqdm",
"xmltodict",
"universal-analytics-python3>=1.0.1,<2",
"pydash",