Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map_samples() and update_samples() #5480

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open

map_samples() and update_samples() #5480

wants to merge 6 commits into from

Conversation

brimoor
Copy link
Contributor

@brimoor brimoor commented Feb 9, 2025

Change log

Adds two new methods to the SampleCollection interface that can leverage multiprocessing pools to perform efficient operations:

  • update_samples(): applies an update_fcn to each sample and saves and sample edits
  • map_samples(): applies a map_fcn to each sample in the collection in parallel, optionally returning the outputs as a generator or applying a reduce_fcn or aggregate_fcn to the outputs

Notes

  • by default multiprocessing.cpu_count() workers are used to parallelize the work, unless the methods are invoked from daemon processes, in which case multiprocessing is automatically disabled.
  • a new fo.config.default_map_workers setting allows for global configuration of the number of workers used by these methods
  • the methods support both 'spawn' and 'fork' processes
  • shard_size and shard_method options allow for optionally configuring how batching is performed

Refer to the included documentation for full details.

Example 1: update

import fiftyone as fo
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("cifar10", split="train")
view = dataset.select_fields("ground_truth")

def update_fcn(sample):
    sample.ground_truth.label = sample.ground_truth.label.upper()

view.update_samples(update_fcn)

print(dataset.count_values("ground_truth.label"))
 100% |████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [3.3s elapsed, 0s remaining, 15.2K samples/s]      
{'DEER': 5000, 'HORSE': 5000, 'AIRPLANE': 5000, 'FROG': 5000, 'BIRD': 5000, 'AUTOMOBILE': 5000, 'SHIP': 5000, 'CAT': 5000, 'TRUCK': 5000, 'DOG': 5000}

You can optionally show per-worker progress bars via progress="workers":

view.update_samples(update_fcn, progress="workers")
Batch 04/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 899.01it/s]
Batch 03/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 894.90it/s]
Batch 11/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 900.14it/s]
Batch 06/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 895.61it/s]
Batch 13/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 903.09it/s]
Batch 08/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 895.33it/s]
Batch 05/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 893.26it/s]
Batch 02/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 889.17it/s]
Batch 01/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 888.16it/s]
Batch 09/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 893.69it/s]
Batch 12/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 896.80it/s]
Batch 14/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 903.28it/s]
Batch 10/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 893.63it/s]
Batch 07/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 891.26it/s]
Batch 15/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 905.06it/s]
Batch 16/16: 100%|██████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 911.72it/s]
{'DEER': 5000, 'HORSE': 5000, 'AIRPLANE': 5000, 'FROG': 5000, 'BIRD': 5000, 'AUTOMOBILE': 5000, 'SHIP': 5000, 'CAT': 5000, 'TRUCK': 5000, 'DOG': 5000}

Example 2: map

from collections import Counter

import fiftyone as fo
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("cifar10", split="train")
view = dataset.select_fields("ground_truth")

def map_fcn(sample):
    return sample.ground_truth.label.upper()

counts = Counter()
for _, label in view.map_samples(map_fcn):
    counts[label] += 1

print(dict(counts))
 100% |████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [3.6s elapsed, 0s remaining, 29.3K samples/s]      
{'FROG': 5000, 'TRUCK': 5000, 'CAT': 5000, 'AIRPLANE': 5000, 'SHIP': 5000, 'DEER': 5000, 'AUTOMOBILE': 5000, 'BIRD': 5000, 'HORSE': 5000, 'DOG': 5000}

You can optionally show per-worker progress bars via progress="workers":

counts = Counter()
for _, label in view.map_samples(map_fcn, progress="workers"):
    counts[label] += 1

print(dict(counts))
Batch 01/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1259.59it/s]
Batch 04/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1264.46it/s]
Batch 06/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1264.68it/s]
Batch 02/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1248.29it/s]
Batch 05/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1253.60it/s]
Batch 03/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1249.05it/s]
Batch 08/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1257.83it/s]
Batch 10/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1258.37it/s]
Batch 09/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1254.71it/s]
Batch 07/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1249.48it/s]
Batch 11/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1257.21it/s]
Batch 12/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1260.26it/s]
Batch 15/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1269.78it/s]
Batch 13/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1263.44it/s]
Batch 14/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1262.08it/s]
Batch 16/16: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:02<00:00, 1279.08it/s]
{'FROG': 5000, 'CAT': 5000, 'SHIP': 5000, 'TRUCK': 5000, 'AIRPLANE': 5000, 'DEER': 5000, 'HORSE': 5000, 'BIRD': 5000, 'AUTOMOBILE': 5000, 'DOG': 5000}

Example 3: map-reduce

from collections import Counter

import fiftyone as fo
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("cifar10", split="train")
view = dataset.select_fields("ground_truth")

def map_fcn(sample):
    return sample.ground_truth.label.upper()

class ReduceFcn(fo.ReduceFcn):
    def init(self):
        self.accumulator = Counter()

    def add(self, sample_id, output):
        self.accumulator[output] += 1

    def finalize(self):
        return dict(self.accumulator)

counts = view.map_samples(map_fcn, reduce_fcn=ReduceFcn)
print(counts)
 100% |████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [3.6s elapsed, 0s remaining, 28.2K samples/s]      
{'FROG': 5000, 'CAT': 5000, 'TRUCK': 5000, 'SHIP': 5000, 'AIRPLANE': 5000, 'DEER': 5000, 'HORSE': 5000, 'BIRD': 5000, 'AUTOMOBILE': 5000, 'DOG': 5000}

You can optionally show per-worker progress bars via progress="workers":

counts = view.map_samples(map_fcn, reduce_fcn=ReduceFcn, progress="workers")
print(counts)
Batch 02/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 939.93it/s]
Batch 01/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 930.13it/s]
Batch 03/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 931.15it/s]
Batch 04/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 931.22it/s]
Batch 09/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 934.58it/s]
Batch 05/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 926.89it/s]
Batch 06/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 927.09it/s]
Batch 10/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 930.45it/s]
Batch 07/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 924.83it/s]
Batch 08/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 924.96it/s]
Batch 12/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 927.47it/s]
Batch 13/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 928.50it/s]
Batch 11/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 924.44it/s]
Batch 14/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 932.36it/s]
Batch 16/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 938.89it/s]
Batch 15/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 936.41it/s]
{'FROG': 5000, 'CAT': 5000, 'TRUCK': 5000, 'DEER': 5000, 'AUTOMOBILE': 5000, 'BIRD': 5000, 'HORSE': 5000, 'SHIP': 5000, 'AIRPLANE': 5000, 'DOG': 5000}

Example 4: map-aggregate

from collections import Counter

import fiftyone as fo
import fiftyone.zoo as foz

dataset = foz.load_zoo_dataset("cifar10", split="train")
view = dataset.select_fields("ground_truth")

def map_fcn(sample):
    return sample.ground_truth.label.upper()

def aggregate_fcn(sample_collection, values):
    return dict(Counter(values.values()))

counts = view.map_samples(map_fcn, aggregate_fcn=aggregate_fcn)
print(counts)
 100% |████████████████████████████████████████████████████████████████████████████████████| 50000/50000 [3.6s elapsed, 0s remaining, 27.1K samples/s]      
{'FROG': 5000, 'TRUCK': 5000, 'DEER': 5000, 'AUTOMOBILE': 5000, 'BIRD': 5000, 'HORSE': 5000, 'SHIP': 5000, 'CAT': 5000, 'AIRPLANE': 5000, 'DOG': 5000}

You can optionally show per-worker progress bars via progress="workers":

counts = view.map_samples(map_fcn, aggregate_fcn=aggregate_fcn, progress="workers")
print(counts)
Batch 02/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 865.92it/s]
Batch 01/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 859.73it/s]
Batch 03/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 858.47it/s]
Batch 05/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 858.49it/s]
Batch 06/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 856.41it/s]
Batch 04/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 852.13it/s]
Batch 07/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 852.47it/s]
Batch 08/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 853.20it/s]
Batch 09/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 852.46it/s]
Batch 11/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 851.89it/s]
Batch 13/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 852.86it/s]
Batch 10/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 849.53it/s]
Batch 12/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 851.02it/s]
Batch 14/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 851.98it/s]
Batch 15/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 853.88it/s]
Batch 16/16: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 3125/3125 [00:03<00:00, 854.53it/s]
{'FROG': 5000, 'TRUCK': 5000, 'DEER': 5000, 'CAT': 5000, 'AUTOMOBILE': 5000, 'SHIP': 5000, 'AIRPLANE': 5000, 'BIRD': 5000, 'HORSE': 5000, 'DOG': 5000}548.37it/s]

Summary by CodeRabbit

  • New Features

    • Introduced a new configuration option that lets users specify the number of workers to use for processing samples.
    • Enhanced dataset operations with improved support for parallel batch updates and mappings.
  • Documentation

    • Updated guidance on configuring parallel processing and clarified batch processing terminology (now referred to as “save contexts”).
  • Tests

    • Expanded test coverage to ensure robust and efficient parallel processing of dataset samples.
  • Chores

    • Added new package dependencies to bolster multiprocessing support and progress tracking.

Copy link
Contributor

coderabbitai bot commented Feb 9, 2025

Walkthrough

This change set introduces a new configuration field, default_map_workers, set via the environment variable FIFTYONE_DEFAULT_MAP_WORKERS with a default value of None. The SampleCollection class now includes a map_samples() method supporting both mapping and map-reduce operations, with corresponding updates in documentation including a dedicated section and examples. Additionally, multiprocessing utilities have been added in a new module, database connection handling has been updated in the ODM layer, and new dependencies (dill and tqdm) have been added to setup.py. Minor documentation and description clarifications were also made.

Changes

Files Change Summary
docs/source/user_guide/config.rst
docs/source/user_guide/using_datasets.rst
Added new configuration field (default_map_workers) and a new "Updating samples" section with examples for using update_samples(). Renamed section "Efficient batch edits" to "Save contexts."
fiftyone/core/collections.py
fiftyone/core/config.py
Introduced the update_samples() and map_samples() methods in SampleCollection and added the default_map_workers attribute in FiftyOneConfig.
fiftyone/core/odm/database.py Updated database connection logic to use the full mongoengine module and added a new _disconnect function for closing connections.
fiftyone/core/utils.py Updated the recommend_process_pool_workers function to clarify OS-specific behavior.
fiftyone/utils/multiprocessing.py Added a new module with multiprocessing utilities including functions: map_samples, _init_batches, _init_worker, and _map_batch.
setup.py Added "dill" and "tqdm" to the INSTALL_REQUIRES dependency list.
fiftyone/public.py Added import for ReduceFcn from .utils.multiprocessing.
docs/source/release-notes.rst Updated references from "efficient-batch-edits" to "save contexts."

Possibly related PRs

  • Adding created_at, last_modified_at, and read-only fields #4597: The changes in the main PR, which introduce a new configuration field default_map_workers for managing worker processes in mapping operations, are related to the retrieved PR, which adds created_at and last_modified_at fields along with read-only capabilities, as both involve enhancements to the SampleCollection class and its methods for processing samples.
  • add support for detection.mask_path #5120: The changes in the main PR, which introduce a new configuration field for specifying the number of workers in the map_samples() method, are related to the retrieved PR that adds support for detection.mask_path, as both involve enhancements to the handling of sample processing and worker configurations in the FiftyOne framework.
  • Adding builtin operators for more of the FO interface #4830: The changes in the main PR, which introduce a new configuration field for specifying the number of workers in the map_samples() method, are related to the modifications in the retrieved PR that enhance the functionality of the map_samples() method in the SampleCollection class, as both involve parallel processing and worker configuration.

Suggested labels

feature, cleaning

Suggested reviewers

  • minhtuev

Poem

I'm a hopper in the code, feeling ever so bright,
Mapping samples in parallel, dancing through the night.
With workers in a joyful race and docs dressed neat,
I nibble on the changes with a heartbeat so sweet.
CodeRabbit hops with glee – happy to see such change! 🐇

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@brimoor brimoor requested a review from Anddraca February 9, 2025 03:10
@brimoor brimoor changed the title Add map_samples() map_samples() Feb 9, 2025
@minhtuev
Copy link
Contributor

minhtuev commented Feb 9, 2025

This is awesome, I can check this out and run some benchmarking with evaluate_detections. Should we do another pass with Ray as well? We can do a comparison between Apache Beam vs. Pool Multiprocessing vs. Ray.

@brimoor brimoor mentioned this pull request Feb 9, 2025
@brimoor brimoor marked this pull request as ready for review February 9, 2025 05:54
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (7)
fiftyone/utils/multiprocessing.py (2)

130-143: Consider clarifying concurrency boundaries or warnings for large datasets.
When engaging multiple workers, it might be beneficial to highlight memory usage or concurrency implications for massive or distributed datasets. A short note in the docstring or an in-code comment can preempt performance pitfalls for large-scale users.


238-246: Global state initialization might limit maintainability.
Assigning “sample_collection,” “map_fcn,” etc. as global variables works for process-based concurrency but can be prone to future confusion. Consider storing them in a dedicated worker context or passing them directly into each function if the code grows more complex.

fiftyone/core/config.py (1)

241-246: “default_map_workers” adds convenient user configurability.
Dynamically controlling default workers for “map_samples” can improve performance across various workloads. As a future enhancement, consider validating that “default_map_workers” does not exceed CPU core counts to prevent potential oversubscription.

fiftyone/core/collections.py (1)

3217-3307: Implementation looks good with some minor suggestions.

The parallel processing implementation is well-structured and handles both mapping and map-reduce operations effectively. A few suggestions for improvement:

  1. Consider adding input validation for shard_method to ensure only valid values are passed
  2. Consider adding type hints for better IDE support and code maintainability
  3. Consider adding a warning when num_workers=1 is explicitly set, as this disables parallelization

Example type hints:

def map_samples(
    self,
    map_fcn: Callable[[Sample], Any],
    reduce_fcn: Optional[Callable[[SampleCollection, Dict[str, Any]], Any]] = None,
    save: Optional[bool] = None,
    num_workers: Optional[int] = None,
    shard_size: Optional[int] = None,
    shard_method: Literal["id", "slice"] = "id",
    progress: Optional[Union[bool, Callable]] = None,
) -> Optional[Any]:
docs/source/user_guide/config.rst (1)

97-100: New Config Field Addition: default_map_workers
The new configuration row is well integrated in the table and clearly shows the environment variable (FIFTYONE_DEFAULT_MAP_WORKERS), the default value (None), and the connection to the map_samples() method. For enhanced clarity, consider adding a brief note (if possible) on scenarios or performance implications when a user might want to override this default.

docs/source/user_guide/using_datasets.rst (2)

5640-5658: Performance Metrics Output Example

The text output block that shows batch processing speeds and the aggregated label counts effectively communicates the performance gains from parallel processing. This real-world metric example reinforces the efficiency of the new approach.
It would improve clarity to add a brief inline comment before the output block noting that these metrics are illustrative and may vary depending on system configuration.


5691-5700: Map-Reduce Example with Reduce Function

The second code example demonstrates how to perform a map-reduce operation by having the mapping function return a lowercased label, with the reduce function aggregating these results using Python’s Counter. This example clearly shows how to leverage the reduce functionality for further processing of the outputs.
A short inline comment explaining that the reduce function aggregates label counts could help users unfamiliar with map-reduce patterns.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 21468df and 9b9909a.

📒 Files selected for processing (8)
  • docs/source/user_guide/config.rst (3 hunks)
  • docs/source/user_guide/using_datasets.rst (1 hunks)
  • fiftyone/core/collections.py (3 hunks)
  • fiftyone/core/config.py (1 hunks)
  • fiftyone/core/odm/database.py (3 hunks)
  • fiftyone/core/utils.py (1 hunks)
  • fiftyone/utils/multiprocessing.py (1 hunks)
  • setup.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py

231-231: Ambiguous variable name: l

(E741)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: test / test-app
  • GitHub Check: build
🔇 Additional comments (11)
fiftyone/utils/multiprocessing.py (2)

19-47: Good use of docstrings and examples to illustrate map vs map-reduce.
The in-code examples clearly explain the two usage patterns. This thorough documentation will help users adopt the new method quickly.


279-294: Verify autosave behavior and concurrency side-effects.
When using “iter_samples(autosave=save),” any sample modifications in parallel workers might lead to conflicting updates if multiple batches contain overlapping data. Confirm that your dataset usage and indexing strategy prevent concurrency collisions.

setup.py (1)

73-73: Confirm that “tqdm” is suitably pinned or versioned.
The new “tqdm” dependency is necessary for progress bars. Ensure that your pinned version aligns with other dependencies or older installed versions of “tqdm” in user environments to avoid conflicts.

fiftyone/core/odm/database.py (3)

20-20: LGTM!

The change to import the entire mongoengine module improves code clarity by making the source of functionality explicit.


222-222: LGTM!

The change to use mongoengine.connect() is consistent with the updated import and maintains the same functionality.


247-252: LGTM!

The new _disconnect() function properly handles database cleanup by resetting global clients and disconnecting all mongoengine connections.

fiftyone/core/utils.py (1)

2297-2300: LGTM!

The added documentation clearly explains the OS-specific default behavior for process pool workers, which improves developer understanding.

fiftyone/core/collections.py (1)

3330-3351: LGTM! Clean and efficient implementation.

The single-process mapping implementation is well-structured and handles all cases correctly, with proper defaults and error handling.

docs/source/user_guide/config.rst (2)

175-177: JSON Example Update for default_map_workers
The JSON output snippet now includes the "default_map_workers": null field correctly, ensuring that the configuration example is consistent with the new documentation entry.


226-228: CLI Config Output Consistency
The CLI configuration output also reflects the new default_map_workers field with its value set to null. This consistency will help users verify that their config changes are applied uniformly across both Python and CLI outputs.

docs/source/user_guide/using_datasets.rst (1)

5615-5638: New Map-Reduce Operations Section Introduction

This new section is a clear and valuable addition that explains the benefits of using the :meth:map_samples() method. The introductory text and the first code example—which converts sample labels to uppercase—are concise, well-formatted, and directly illustrate how users can modify sample data in parallel.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
fiftyone/utils/multiprocessing.py (1)

313-313: 🛠️ Refactor suggestion

Rename the ambiguous parameter "l" to a more descriptive name.

Using "l" can be confusing and conflicts with PEP8 guidance.

Apply this diff:

-def _init_worker(dataset_name, view_stages, m, s, r, p, l):
+def _init_worker(dataset_name, view_stages, m, s, r, p, lock_obj):
🧰 Tools
🪛 Ruff (0.8.2)

313-313: Ambiguous variable name: l

(E741)

🧹 Nitpick comments (2)
fiftyone/core/collections.py (1)

3218-3401: LGTM! Well-structured implementation of parallel sample processing.

The implementation provides a robust way to apply functions to samples in parallel with good defaults and flexibility. The docstring is comprehensive with clear examples.

A few suggestions for improvement:

 def map_samples(
     self,
     map_fcn,
     reduce_fcn=None,
     save=None,
     num_workers=None,
     shard_size=None, 
     shard_method="id",
     progress=None,
+    # Add type hints
 ):
+    # Add validation
+    if shard_method not in ("id", "slice"):
+        raise ValueError("shard_method must be 'id' or 'slice'")
+    
+    if shard_size is not None and shard_size <= 0:
+        raise ValueError("shard_size must be > 0")
+
+    # Validate map_fcn signature
+    if not callable(map_fcn):
+        raise TypeError("map_fcn must be callable")
docs/source/user_guide/using_datasets.rst (1)

5813-5833: Setting Values for Batch Edits
A new section on using set_values() is introduced to demonstrate how to update a field (or embedded field) on all samples using a single batch operation. The code is well written and emphasizes the efficiency gains over per-sample iteration. One suggestion would be to include a brief note about verifying type consistency when using set_values(), as mismatches could cause runtime errors.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9b9909a and 818a040.

📒 Files selected for processing (4)
  • docs/source/user_guide/using_datasets.rst (1 hunks)
  • fiftyone/__public__.py (1 hunks)
  • fiftyone/core/collections.py (4 hunks)
  • fiftyone/utils/multiprocessing.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/__public__.py

260-260: .utils.multiprocessing.ReduceFcn imported but unused

Remove unused import: .utils.multiprocessing.ReduceFcn

(F401)

fiftyone/utils/multiprocessing.py

313-313: Ambiguous variable name: l

(E741)

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: lint / eslint
  • GitHub Check: build
🔇 Additional comments (14)
fiftyone/__public__.py (1)

260-260: LGTM!

The addition of ReduceFcn to the public interface is appropriate as it supports the new map-reduce functionality.

🧰 Tools
🪛 Ruff (0.8.2)

260-260: .utils.multiprocessing.ReduceFcn imported but unused

Remove unused import: .utils.multiprocessing.ReduceFcn

(F401)

fiftyone/utils/multiprocessing.py (3)

20-147: Well-structured implementation with comprehensive documentation!

The function effectively handles both map and map-reduce operations with proper resource management and flexible progress tracking.


223-255: Clean and extensible reducer implementation!

The class provides a well-documented interface for both function-based and class-based reduction strategies.


257-310: LGTM! Helper functions are well-implemented.

The helper functions handle batch distribution, worker initialization, and batch processing effectively.

Also applies to: 341-377

fiftyone/core/collections.py (1)

3371-3401: LGTM! Clean implementation of single-process sample mapping.

The helper method handles both mapping and reduction cases efficiently with proper initialization of reducers.

docs/source/user_guide/using_datasets.rst (9)

5616-5625: Introduction of Map-Reduce Operations Section
This new section introduces the map-reduce functionality via the map_samples() method. The heading and the brief description clearly explain that the method iterates over samples in a collection, applies a user-defined mapping function (and optionally a reduce function), and can report performance metrics.


5626-5642: Mapping Example Code Block
The provided Python code snippet demonstrates a basic usage of map_samples(). It loads a CIFAR-10 dataset for training, selects the “ground_truth” field, and applies a mapping function that converts the label to uppercase. The inclusion of a code-block with line numbers helps the reader follow the example, and the subsequent text block of sample performance outputs reinforces the efficiency claims.


5642-5661: Output Block and Explanatory Notes
The textual output (showing batch speeds for each batch) is a practical touch that helps users understand the performance improvements achieved using this method. Additionally, the brief list of properties and considerations of map_samples() (such as nondeterministic order and avoiding global state modifications) is well stated.


5681-5687: Alternative Iteration with iter_samples()
A short snippet is provided that illustrates how one might alternatively iterate over samples using iter_samples() with autosave and progress reporting enabled. This clearly contrasts the traditional iterative approach with the new, parallelized map_samples() method, reinforcing the performance benefits.


5692-5709: Reduce Function Example
The documentation offers a “Reduce function” example that shows how to use a standard function (employing collections.Counter) as the reduce_fcn parameter. This example is clear, concise, and effectively demonstrates a common reduction pattern in batch processing.


5710-5775: Reduce Class Example
An alternative approach is demonstrated via a custom ReduceFcn class. This version explicitly defines an init(), update(), and finalize() method to aggregate mapping outputs. The example is well structured, clearly separating the accumulator’s initialization, per-batch updates, and final consolidation of results. This version will appeal to users looking for a more object‐oriented approach to specifying custom reduce behavior.


5776-5801: Worker Configuration Explanation
The passage following the reduce examples outlines multiple ways to configure the number of workers for the map_samples() method. It explains setting a default via the FiftyOne configuration, manually overriding with the num_workers parameter, and the fallback to recommend_process_pool_workers() if not set. This guidance is practical and will assist users in optimizing parallel performance.


5802-5812: Shard Size Option
The example showing how to pass a custom shard_size to the map_samples() method is clear and useful. It demonstrates that users can control the number of samples sent per worker, which might be helpful for tuning performance especially on large datasets.


5889-5913: Setting Label Values
The final section covers how to use set_label_values() for performing batch updates specifically on label fields. The example clearly illustrates obtaining sample label IDs, constructing a mapping, and updating a new “random” attribute on each label. This section is informative and neatly ties into the overall theme of efficient batch processing in datasets.

@brimoor brimoor requested a review from jleven February 9, 2025 18:55
def map_fcn(sample):
return sample.ground_truth.label.lower()

def reduce_fcn(sample_collection, values):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m pretty uncomfortable naming this “reduce”. We put ourselves in an awkward position moving forward if we name this “reduce” since it is not a “reduce” function. If we introduce a proper reduce function later, what would we name it? Might users familiar with reduce find this confusing? Let’s please name it “aggregate” (or something else) instead of misnaming it using a word that already has a different meaning. And let’s not refer to “map-reduce” if we aren’t performing a map reduce operation.

Copy link
Contributor Author

@brimoor brimoor Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that reduce_fcn supports two options:

  • This version you've commented on, which I agree is more like an "aggregate function"
  • A ReduceFcn subclass, which is modeled after Beam's CombineFn and is like a traditional reduce operator

For clarity, I could get onboard with adding a new aggregate_fcn argument that callers must use if they want to provide an "aggregate function" like this.

This would reserve reduce_fcn for the true "reduce" functionality. Thinking ahead, if we want to use map_samples(map_fcn, reduce_fcn=) internally in the SDK and support swapping out the multiprocessing backend for distributed computing backends (Beam, Ray, etc), we'd need to find some kind of universal interface for defining reduce functions, so that the caller's reduction could be performed by any backend. Comparing the ReduceFcn classes from this PR and #5449, this feels tricky to achieve (for example, this PR's reducer operates in the main process, while Beam's does not; this changes how they are implemented).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think it is important to support multiple backends in the future, it might make sense to ship map_samples(map_fcn, aggregate_fcn=) and NOT include a reduce_fcn= argument just yet.

After all, most reductions could be expressed idiomatically as FiftyOne aggregations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Josh and your suggestion to not include reduce. Even the beam docs call it aggregate as it's not really the same thing as a traditional reduce most people are familiar with. Also embedding reduce inside of map() really doesn't make sense for other backends that have a true reducer like spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added aggregate_fcn as discussed.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
fiftyone/core/collections.py (4)

13-13: Add copyright year range

The copyright notice should use a year range to indicate both the initial creation and latest modification years.

-| Copyright 2017-2025, Voxel51, Inc.
+| Copyright 2017-2024, Voxel51, Inc.

3218-3404: Review the new map_samples() method implementation

The new map_samples() method provides a powerful way to parallelize operations across samples using multiprocessing. The implementation looks solid with good error handling and configuration options. A few suggestions for improvement:

  1. Consider adding input validation for num_workers to ensure it's a positive integer
  2. Add validation for shard_size to ensure it's a positive integer when provided
  3. Consider adding a timeout parameter to handle hanging worker processes
def map_samples(
    self,
    map_fcn,
    reduce_fcn=None, 
    save=None,
    num_workers=None,
    shard_size=None,
    shard_method="id",
    progress=None,
+   timeout=None,
):
    """Applies the given function to each sample in the collection.
    
    Args:
        ...
+       timeout (None): optional timeout in seconds for worker processes
    """
+   if num_workers is not None and not isinstance(num_workers, int):
+       raise ValueError("num_workers must be a positive integer")
+   
+   if shard_size is not None and not isinstance(shard_size, int):
+       raise ValueError("shard_size must be a positive integer")

    if num_workers is None:
        num_workers = fo.config.default_map_workers

3218-3268: Improve docstring with more examples

The docstring would benefit from additional examples showing common use cases and error handling.

def map_samples(self, map_fcn, reduce_fcn=None, save=None, num_workers=None, 
    shard_size=None, shard_method="id", progress=None):
    """Applies the given function to each sample in the collection.

    Examples::

        import fiftyone as fo
        import fiftyone.zoo as foz

        dataset = foz.load_zoo_dataset("cifar10", split="train")
        view = dataset.select_fields("ground_truth")

+       # Example 4: Error handling
+       def map_fcn(sample):
+           if not hasattr(sample, "ground_truth"):
+               raise ValueError("Sample missing ground truth")
+           return sample.ground_truth.label
+
+       try:
+           dataset.map_samples(map_fcn)
+       except ValueError as e:
+           print(f"Error: {e}")
+
+       # Example 5: Using timeout
+       def slow_map_fcn(sample):
+           time.sleep(10)
+           return sample.ground_truth.label
+
+       # Will timeout after 5 seconds
+       dataset.map_samples(slow_map_fcn, timeout=5)

3363-3372: Add type hints to improve code readability

Consider adding type hints to the function parameters and return value to improve code readability and enable better IDE support.

+from typing import Optional, Union, Callable, Any

def _map_samples_single(
    self,
-   map_fcn,
-   reduce_fcn=None,
-   save=None,
-   progress=False,
+   map_fcn: Callable[[Any], Any],
+   reduce_fcn: Optional[Union[Callable, type]] = None,
+   save: Optional[bool] = None,
+   progress: Union[bool, str] = False,
+) -> Optional[Any]:
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 818a040 and df308f7.

📒 Files selected for processing (1)
  • fiftyone/core/collections.py (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: lint / eslint
  • GitHub Check: build

@jacobsela
Copy link
Contributor

@brimoor Can this launch non-daemon processes?

Copy link
Contributor

@kaixi-wang kaixi-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to test on a large dataset for which the optimization is really needed. I don't see any mention of how to configure the runners and so I'm worried that just using the direct runner will quickly hit memory issues (https://beam.apache.org/documentation/runners/direct/)

@brimoor
Copy link
Contributor Author

brimoor commented Feb 16, 2025

@kaixi-wang this PR uses multiprocessing.Pool. Beam experiments are in #5449.

What makes you think there will be memory issues? iter_samples(autosave=True) does not store things in memory beyond what is sent in each save batch, which by default is around 1MB per worker.

For me, running the example code in the PR description achieves an ~8x speedup over the same operation using a single iter_samples(autosave=True) call when using num_workers=multiprocessing.cpu_count()=16 on my machine. And I see no reason those gains would change for larger datasets.

@brimoor
Copy link
Contributor Author

brimoor commented Feb 16, 2025

@brimoor Can this launch non-daemon processes?

@jacobsela can you elaborate on what you're asking?

@kaixi-wang
Copy link
Contributor

@kaixi-wang this PR uses multiprocessing.Pool. Beam experiments are in #5449.

What makes you think there will be memory issues? iter_samples(autosave=True) does not store things in memory beyond what is sent in each save batch, which by default is around 1MB per worker.

For me, running the example code in the PR description achieves an ~8x speedup over the same operation using a single iter_samples(autosave=True) call when using num_workers=multiprocessing.cpu_count()=16 on my machine. And I see no reason those gains would change for larger datasets.

Oh yeah sorry. Mixed up the PRs. But yes in your example, there “should” be no issue since you are only using values on the sample you are iterating over. However, based on my recent attempts/frustrations to make/do anything with large datasets, I’m cautious in recommending a solution for big data without verifying/testing it… although I do trust that you know this part of the code significantly better than me

Additionally, the concern with memory specifically stems from thinking about the case where external data/side inputs are required. Since each process worker will require a copy of all the same inputs (except for samples) and there doesn’t appear to be direct support for selectively partitioning/streaming or sharing objects, this version still might not be ideal for solving the common pain point of adding fields/setting values that are not computed directly from the existing fields.

@kaixi-wang
Copy link
Contributor

@brimoor Can this launch non-daemon processes?

@jacobsela are you thinking about potential issues relating to nested multiprocessing and that daemon processes can’t create child processes? For example, if the map fn were to call some method like ‘compute_’ that used multiprocessing under the hood ?

@jacobsela
Copy link
Contributor

@brimoor @kaixi-wang Exactly, daemon processes aren't able to launch their own child processes. I wrote my own similar util for this purpose and because it uses Pool it ran into the same issue.

If we want to parallelize model inference/training/anything that uses a data loader it is pretty useful to be able to launch more nested child processes.

@minhtuev
Copy link
Contributor

@jacobsela : that might be a little bit out of scope of what we want to do here, on one hand, I think it's reasonable to assume that map_fnc does not launch its child processes. On another hand, since map_samples is designed to be a replacement for iter_samples, do we use iter_samples anywhere in inference/training?

@jacobsela
Copy link
Contributor

@minhtuev I understand, I don't mean for this to be an explicit request. I was just wondering for my own use cases. FWIW the way compute_embeddings and apply_model are currently implemented does use iter_samples.

@minhtuev
Copy link
Contributor

@jacobsela : thanks for the input, I have created a ticket to investigate all usage patterns of iter_samples in Fiftyone this week. So we can take stock of how much of these we can translate to map_samples.

@minhtuev
Copy link
Contributor

@brimoor : thinking about it, I really like the idea of having different backends for map_samples!

For example, we can have map_samples(..., backend="apache_beam") and then we can pass run beam!

The backend options can include:

  • threads
  • multiprocessing_core
  • apache_beam
  • ray (!)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
fiftyone/core/collections.py (1)

3218-3414: ⚠️ Potential issue

Add error handling to protect against exceptions in map_fcn

The _map_samples_single() implementation should handle errors gracefully to prevent crashes. Consider wrapping the map function call in a try-except block.

def _map_samples_single(
    self,
    map_fcn,
    reduce_fcn=None,
    aggregate_fcn=None,
    save=None,
    progress=False,
):
    if save is None:
        save = reducer is None

    if reducer is not None:
        reducer.init()

    if progress == "global":
        progress = True

+   try:
        for sample in self.iter_samples(autosave=save, progress=progress):
+           try:
                sample_output = map_fcn(sample)
+           except Exception as e:
+               raise ValueError(
+                   f"Error applying map_fcn to sample {sample.id}: {str(e)}"
+               ) from e
            if reducer is not None and sample_output is not None:
                reducer.update({sample.id: sample_output})

        if reducer is not None:
            return reducer.finalize()
+   except Exception as e:
+       raise ValueError(f"Error in map_samples: {str(e)}") from e
🧹 Nitpick comments (5)
fiftyone/utils/multiprocessing.py (3)

151-224: Consider adding error handling for invalid progress callback.

While the implementation is solid, the progress callback handling could be more robust.

Add validation for the progress callback:

 elif callable(progress):
+    if not hasattr(progress, '__call__'):
+        raise ValueError("progress callback must be callable")
     global_progress = progress
     progress = False

226-259: Consider adding type hints for better code maintainability.

The ReduceFcn class is well-designed but could benefit from type hints to improve code maintainability and IDE support.

Add type hints:

-    def __init__(self, sample_collection, aggregate_fcn=None):
+    def __init__(self, sample_collection: "fo.core.collections.SampleCollection", 
+                 aggregate_fcn: Optional[Callable] = None) -> None:
         self.sample_collection = sample_collection
         self.aggregate_fcn = aggregate_fcn
         self.accumulator = None

-    def update(self, outputs):
+    def update(self, outputs: Dict[str, Any]) -> None:

327-327: Document MongoDB connection handling behavior.

The code properly disconnects MongoDB clients before initializing workers, but this behavior should be documented.

Add a docstring explaining the MongoDB connection handling:

 def _init_worker(dataset_name, view_stages, m, s, r, p, l):
+    """Initialize a worker process.
+    
+    Note: This function automatically disconnects and recreates MongoDB connections
+    to ensure each worker process has its own connection pool, following PyMongo's
+    multiprocessing guidelines.
+    """
     from tqdm.auto import tqdm
fiftyone/core/collections.py (2)

3355-3382: Consider using a context manager for the multiprocessing pool

The multiprocessing pool should be properly managed to ensure cleanup of resources.

def map_samples(self, ...):
    if num_workers is None:
        if multiprocessing.current_process().daemon:
            num_workers = 1
        elif fo.config.default_map_workers is not None:
            num_workers = fo.config.default_map_workers
        else:
            num_workers = fou.recommend_process_pool_workers()

    if num_workers is not None and num_workers <= 1:
        return self._map_samples_single(...)

+   with multiprocessing.Pool(num_workers) as pool:
        return foum.map_samples(
            self,
            map_fcn,
            reduce_fcn=reduce_fcn,
            aggregate_fcn=aggregate_fcn,
            save=save,
            num_workers=num_workers,
            shard_size=shard_size,
            shard_method=shard_method,
            progress=progress,
        )

3229-3353: Enhance docstring with performance considerations

The docstring should include information about performance implications and best practices.

Add the following to the docstring:

"""
+Performance Considerations:
+---------------------------
+- The optimal num_workers depends on CPU cores and I/O patterns
+- For CPU-bound tasks, use num_workers = cpu_count
+- For I/O-bound tasks, num_workers can be higher
+- Large shard_size improves throughput but increases memory usage
+- Small shard_size reduces memory but increases overhead
"""
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df308f7 and a52e268.

📒 Files selected for processing (3)
  • fiftyone/core/collections.py (4 hunks)
  • fiftyone/utils/multiprocessing.py (1 hunks)
  • setup.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • setup.py
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py

9-9: multiprocessing imported but unused

Remove unused import: multiprocessing

(F401)


316-316: Ambiguous variable name: l

(E741)

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: e2e / test-e2e
  • GitHub Check: lint / eslint
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (3)
fiftyone/utils/multiprocessing.py (3)

1-20: LGTM! Well-organized imports and module setup.

The imports are appropriately chosen for the module's functionality, including essential dependencies for multiprocessing, database operations, and progress tracking.

🧰 Tools
🪛 Ruff (0.8.2)

9-9: multiprocessing imported but unused

Remove unused import: multiprocessing

(F401)


21-150: Excellent documentation with comprehensive examples!

The docstring provides clear explanations and practical examples demonstrating all three operation modes: map, map-reduce, and map-aggregate. The parameter descriptions are thorough and the examples are executable.


316-316: Rename the ambiguous parameter "l" to a more descriptive name.

Using "l" as a parameter name can be confusing and conflicts with PEP8 guidance.

🧰 Tools
🪛 Ruff (0.8.2)

316-316: Ambiguous variable name: l

(E741)

@brimoor brimoor changed the title map_samples() map_samples() and update_samples() Feb 23, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (11)
fiftyone/utils/multiprocessing.py (4)

171-178: Consider adding a warning when running in daemon mode.

When running in a daemon process, the function falls back to single-worker mode without notifying the user. This could lead to unexpected performance characteristics.

 if multiprocessing.current_process().daemon:
+    fou.logger.warning(
+        "Running in daemon mode - falling back to single worker processing"
+    )
     num_workers = 1

232-239: Fix boolean comparison style.

The code uses an explicit comparison to False which is not Pythonic.

-    elif progress != False:
+    elif progress:
🧰 Tools
🪛 Ruff (0.8.2)

232-232: Avoid inequality comparisons to False; use if progress: for truth checks

Replace with progress

(E712)


292-300: Add timeout configuration for queue operations.

The hardcoded 0.01 second timeout in the queue polling loop could be made configurable for different workloads.

Consider adding a queue_timeout parameter to allow users to tune this value based on their specific use case.


566-572: Consider using atomic operations for counters.

The code uses explicit locking for counter increments which could be replaced with atomic operations for better performance.

Consider using multiprocessing.Value('i', 0, lock=True) to create atomic counters instead of explicit locking.

tests/unittests/view_tests.py (3)

113-153: Test coverage looks good, consider adding error case tests.

The test_update_samples method provides good coverage of the basic functionality, but could be enhanced with:

  • Tests for invalid update functions
  • Tests for error handling when workers fail
  • Tests for memory usage with large datasets

154-236: Test coverage looks good, consider adding edge case tests.

The test_map_samples method effectively tests the core functionality with both map-reduce and map-aggregate operations. Consider adding:

  • Tests for empty datasets
  • Tests for invalid map/reduce/aggregate functions
  • Tests for memory usage with large datasets
  • Tests for handling worker failures

8-8: Add Counter import to imports section.

Move the Counter import to the standard library imports section at the top for better organization.

-from collections import Counter
+
 from copy import deepcopy
 from datetime import date, datetime, timedelta
 import math
+from collections import Counter
tests/unittests/dataset_tests.py (2)

1026-1066: LGTM! Well-structured test for concurrent sample updates.

The test provides good coverage of different worker configurations and sharding methods. It effectively validates the bounds of integer fields after updates.

Consider adding assertions to verify that the updates are atomic by checking that no intermediate values exist between the expected bounds.

 def test_update_samples(self):
     dataset = fo.Dataset()
     dataset.add_samples(
         [fo.Sample(filepath="image%d.jpg" % i, int=i) for i in range(50)]
     )

     self.assertTupleEqual(dataset.bounds("int"), (0, 49))

     def update_fcn(sample):
         sample.int += 1

     dataset.update_samples(update_fcn, num_workers=2)
     self.assertTupleEqual(dataset.bounds("int"), (1, 50))
+    # Verify no intermediate values exist
+    self.assertNotIn(0, dataset.distinct("int"))
+    self.assertNotIn(51, dataset.distinct("int"))

1067-1148: LGTM! Comprehensive test coverage for mapping operations.

The test effectively validates map-reduce and map-aggregate operations with different worker configurations and sharding methods.

Consider adding test cases for:

  1. Error handling when map function raises an exception
  2. Memory usage with large datasets
  3. Cancellation of long-running operations

Example test case for error handling:

def test_map_samples_error_handling(self):
    dataset = fo.Dataset()
    dataset.add_samples(
        [fo.Sample(filepath="image%d.jpg" % i) for i in range(50)]
    )

    def map_fcn_with_error(sample):
        if sample.filepath == "image25.jpg":
            raise ValueError("Simulated error")
        return sample.filepath.upper()

    with self.assertRaises(ValueError):
        dataset.map_samples(map_fcn_with_error, num_workers=2)
fiftyone/core/collections.py (2)

3216-3279: Consider adding parameter validation for update_fcn

The update_samples() method should validate that update_fcn is callable before proceeding with the operation.

def update_samples(
    self,
    update_fcn,
    num_workers=None,
    shard_size=None,
    shard_method="id",
    progress=None,
):
+   if not callable(update_fcn):
+       raise ValueError("update_fcn must be a callable")

    return self._map_samples(
        update_fcn,
        return_outputs=False,
        save=True,
        num_workers=num_workers,
        shard_size=shard_size,
        shard_method=shard_method,
        progress=progress,
    )

3281-3452: Add type validation for reduce_fcn and aggregate_fcn

The map_samples() method should validate the types of reduce_fcn and aggregate_fcn to ensure they match the expected interfaces.

def map_samples(
    self,
    map_fcn,
    reduce_fcn=None,
    aggregate_fcn=None,
    save=False,
    num_workers=None,
    shard_size=None,
    shard_method="id",
    progress=None,
):
+   if not callable(map_fcn):
+       raise ValueError("map_fcn must be a callable")
+
+   if reduce_fcn is not None and not (
+       inspect.isclass(reduce_fcn) or callable(reduce_fcn)
+   ):
+       raise ValueError("reduce_fcn must be a class or callable")
+
+   if aggregate_fcn is not None and not callable(aggregate_fcn):
+       raise ValueError("aggregate_fcn must be a callable")

    return self._map_samples(
        map_fcn,
        reduce_fcn=reduce_fcn,
        aggregate_fcn=aggregate_fcn,
        save=save,
        num_workers=num_workers,
        shard_size=shard_size,
        shard_method=shard_method,
        progress=progress,
    )
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a52e268 and 166b7c4.

📒 Files selected for processing (7)
  • docs/source/release-notes.rst (3 hunks)
  • docs/source/user_guide/using_datasets.rst (2 hunks)
  • fiftyone/core/collections.py (2 hunks)
  • fiftyone/core/utils.py (3 hunks)
  • fiftyone/utils/multiprocessing.py (1 hunks)
  • tests/unittests/dataset_tests.py (2 hunks)
  • tests/unittests/view_tests.py (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • docs/source/release-notes.rst
🚧 Files skipped from review as they are similar to previous changes (2)
  • fiftyone/core/utils.py
  • docs/source/user_guide/using_datasets.rst
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py

232-232: Avoid inequality comparisons to False; use if progress: for truth checks

Replace with progress

(E712)


499-499: Ambiguous variable name: l

(E741)

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: lint / eslint
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build
🔇 Additional comments (4)
fiftyone/utils/multiprocessing.py (3)

459-468: Good use of batch size optimization!

The code intelligently caps the shard size based on ObjectId value recommendations, which helps prevent MongoDB query document size limits.


499-499: Rename ambiguous parameter.

The parameter l is ambiguous and conflicts with PEP8 guidance.

-def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, l):
+def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, lock):

-    if l is not None:
-        tqdm.set_lock(l)
+    if lock is not None:
+        tqdm.set_lock(lock)

Also applies to: 531-532

🧰 Tools
🪛 Ruff (0.8.2)

499-499: Ambiguous variable name: l

(E741)


514-517: Good handling of MongoDB connections!

The code properly disconnects and recreates MongoDB clients for each worker process, following best practices for PyMongo with multiprocessing.

fiftyone/core/collections.py (1)

3428-3452: Implementation looks good

The _map_samples() helper method correctly delegates to foum.map_samples() with appropriate parameter passing.

@brimoor brimoor requested review from kaixi-wang, minhtuev and jleven and removed request for kaixi-wang February 23, 2025 20:02
@@ -3212,6 +3213,243 @@ def _set_labels(self, field_name, sample_ids, label_docs, progress=False):
def _delete_labels(self, ids, fields=None):
self._dataset.delete_labels(ids=ids, fields=fields)

def update_samples(
Copy link
Contributor Author

@brimoor brimoor Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an update_samples() method for the case where the user just wants to apply sample edits to a collection.

for batch_view in fou.iter_slices(sample_collection, shard_size):
for sample in batch_view.iter_samples(autosave=save):
sample_output = map_fcn(sample)
yield sample.id, sample_output
Copy link
Contributor Author

@brimoor brimoor Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j053y dataset.map_samples(map_fcn) now returns a generator as suggested ✅

@@ -2294,6 +2292,10 @@ def recommend_thread_pool_workers(num_workers=None):
def recommend_process_pool_workers(num_workers=None):
"""Recommends a number of workers for a process pool.

By default, ``multiprocessing.cpu_count()`` workers will be recommended if
you are running on macOS or Linux, while a single worker will be
recommended on Windows.
Copy link
Contributor Author

@brimoor brimoor Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: can we use num_workers=multiprocessing.cpu_count() by default instead of num_workers=1 for Windows?

Copy link
Contributor

@minhtuev minhtuev Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a ticket to track this, will test tomorrow

- None, otherwise
"""
if num_workers is None:
if multiprocessing.current_process().daemon:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automatically defaulting to no multiprocessing when this code is invoked in a daemon process ✅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a logging here, in case users are expecting multiprocessing output

if return_outputs:
batch_count = multiprocessing.Value("i", 0)
sample_count = None
queue = multiprocessing.Queue()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j053y using a queue to implement process communication as suggested ✅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will run a benchmark today to test the performance of the queue implementation ✅

initargs=(
dataset_name,
view_stages,
pickle.dumps(map_fcn),
Copy link
Contributor Author

@brimoor brimoor Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using dill to serialize the function allows us to support map_fcn that are lambda functions with 'spawn' contexts ✅

Without this, the example code in the docs/PR descriptions only works for 'fork' contexts, which aren't available on Windows and will apparently be removed entirely in Python 3.14

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can run a test tomorrow to see if this works on Windows as well

),
)

pb = fou.ProgressBar(total=num_samples, progress=progress)
Copy link
Contributor Author

@brimoor brimoor Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using multiprocessing.Queue/Value allows for a single global progress bar in the main context ✅

Users can optionally pass progress="workers" if they want to see the per-worker progress bars 😎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
fiftyone/utils/multiprocessing.py (1)

535-573: 🛠️ Refactor suggestion

Consider adding error handling for map_fcn execution.

The _map_batch function should handle potential exceptions from map_fcn to prevent worker crashes and provide meaningful error messages.

 def _map_batch(input):
     i, num_batches, batch = input
+    
+    def safe_map_fcn(sample):
+        try:
+            return map_fcn(sample)
+        except Exception as e:
+            return f"Error processing sample {sample.id}: {str(e)}"

     if isinstance(batch, tuple):
         # Slice batches
         start, stop = batch
         total = stop - start
         batch_view = sample_collection[start:stop]
     else:
         # ID batches
         sample_ids = batch
         total = len(sample_ids)
         batch_view = fov.make_optimized_select_view(
             sample_collection, sample_ids
         )

     if progress:
         desc = f"Batch {i + 1:0{len(str(num_batches))}}/{num_batches}"
         with tqdm(total=total, desc=desc, position=i) as pb:
             for sample in batch_view.iter_samples(autosave=save):
-                sample_output = map_fcn(sample)
+                sample_output = safe_map_fcn(sample)
                 if queue is not None:
                     queue.put((sample.id, sample_output))

                 pb.update()
     else:
         for sample in batch_view.iter_samples(autosave=save):
-            sample_output = map_fcn(sample)
+            sample_output = safe_map_fcn(sample)
             if queue is not None:
                 queue.put((sample.id, sample_output))

             if sample_count is not None:
                 with sample_count.get_lock():
                     sample_count.value += 1
♻️ Duplicate comments (1)
fiftyone/utils/multiprocessing.py (1)

231-231: 🛠️ Refactor suggestion

Rename parameter 'l' to improve readability.

The parameter name 'l' is ambiguous and could be confused with the number '1'.

-def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, l):
+def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, lock_obj):
🧹 Nitpick comments (8)
fiftyone/utils/multiprocessing.py (1)

232-232: Simplify boolean comparison.

Replace inequality comparison with False with a direct boolean check.

-elif progress != False:
+elif progress:
🧰 Tools
🪛 Ruff (0.8.2)

232-232: Avoid inequality comparisons to False; use if progress: for truth checks

Replace with progress

(E712)

tests/unittests/view_tests.py (2)

112-153: LGTM! Well-structured test for update_samples()

The test thoroughly validates the update_samples() functionality with different worker configurations and shard methods. The test coverage includes:

  • Multiple workers with different shard configurations
  • Main process execution
  • Verification of results through bounds checking

Consider adding test cases for:

+    # Test error handling
+    def test_update_samples_errors(self):
+        dataset = fo.Dataset()
+        dataset.add_samples(
+            [fo.Sample(filepath="image%d.jpg" % i, int=i) for i in range(50)]
+        )
+
+        # Test invalid num_workers
+        with self.assertRaises(ValueError):
+            dataset.update_samples(lambda x: x, num_workers=-1)
+
+        # Test invalid shard_size
+        with self.assertRaises(ValueError):
+            dataset.update_samples(lambda x: x, shard_size=0)

154-236: LGTM! Comprehensive test for map_samples()

The test effectively validates the map_samples() functionality with:

  • Custom mapping function
  • Custom reduce function
  • Custom aggregate function
  • Multiple worker configurations
  • Different shard methods

Consider adding test cases for:

+    # Test error handling and edge cases
+    def test_map_samples_errors(self):
+        dataset = fo.Dataset()
+        dataset.add_samples(
+            [fo.Sample(filepath="image%d.jpg" % i) for i in range(50)]
+        )
+
+        # Test invalid map function
+        with self.assertRaises(ValueError):
+            dataset.map_samples(None)
+
+        # Test empty dataset
+        empty_dataset = fo.Dataset()
+        results = list(empty_dataset.map_samples(lambda x: x.filepath))
+        self.assertEqual(len(results), 0)
tests/unittests/dataset_tests.py (2)

1026-1066: Add test coverage for edge cases in update_samples().

The test has good basic coverage but would benefit from additional test cases:

  1. Test error handling for invalid update functions
  2. Test behavior with empty dataset
  3. Verify update_samples() works with different field types beyond integers
  4. Add assertions to verify sample order is preserved

Here's a suggested addition to improve test coverage:

def test_update_samples_edge_cases(self):
    dataset = fo.Dataset()
    
    # Test empty dataset
    dataset.update_samples(lambda x: x, num_workers=2)
    self.assertEqual(len(dataset), 0)
    
    # Test invalid update function
    with self.assertRaises(ValueError):
        dataset.update_samples(None)
        
    # Test different field types
    dataset.add_samples([
        fo.Sample(filepath=f"image{i}.jpg", 
                 int_field=i,
                 str_field="test",
                 float_field=1.0)
        for i in range(5)
    ])
    
    def update_fcn(sample):
        sample.int_field += 1
        sample.str_field += "_updated" 
        sample.float_field *= 2
        
    dataset.update_samples(update_fcn, num_workers=2)
    
    # Verify updates across types
    self.assertEqual(dataset.bounds("int_field"), (1, 5))
    self.assertTrue(all("_updated" in s for s in dataset.values("str_field")))
    self.assertEqual(dataset.bounds("float_field"), (2.0, 2.0))

1067-1148: Enhance test coverage for map_samples() edge cases.

While the test covers the main functionality well, consider adding:

  1. Test error handling for invalid mapping/reduce functions
  2. Test behavior with empty dataset
  3. Test with more complex data types and transformations
  4. Verify order preservation with different worker counts

Here's a suggested addition to improve test coverage:

def test_map_samples_edge_cases(self):
    dataset = fo.Dataset()
    
    # Test empty dataset
    results = dataset.map_samples(lambda x: x.id, num_workers=2)
    self.assertEqual(len(list(results)), 0)
    
    # Test invalid functions
    with self.assertRaises(ValueError):
        dataset.map_samples(None)
    
    with self.assertRaises(ValueError):
        dataset.map_samples(lambda x: x.id, reduce_fcn=None)
        
    # Test complex transformations
    dataset.add_samples([
        fo.Sample(
            filepath=f"image{i}.jpg",
            detections=fo.Detections(
                detections=[fo.Detection(label="cat", confidence=0.9)]
            )
        )
        for i in range(5)
    ])
    
    # Map detections to new format
    def map_fcn(sample):
        return {
            "id": sample.id,
            "labels": [d.label for d in sample.detections.detections]
        }
        
    results = dataset.map_samples(
        map_fcn,
        num_workers=2,
        shard_size=2
    )
    
    mapped = list(results)
    self.assertEqual(len(mapped), 5)
    self.assertTrue(all(r["labels"] == ["cat"] for r in mapped))
fiftyone/core/collections.py (1)

3216-3451: Consider adding error handling for worker failures.

While the implementation is solid, it would be good to add explicit error handling for worker process failures to provide better feedback to users.

def _map_samples(
    self,
    map_fcn,
    reduce_fcn=None,
    aggregate_fcn=None,
    return_outputs=True,
    save=False,
    num_workers=None,
    shard_size=None,
    shard_method="id",
    progress=None,
):
+   try:
        return foum.map_samples(
            self,
            map_fcn,
            reduce_fcn=reduce_fcn,
            aggregate_fcn=aggregate_fcn,
            return_outputs=return_outputs,
            save=save,
            num_workers=num_workers,
            shard_size=shard_size,
            shard_method=shard_method,
            progress=progress,
        )
+   except Exception as e:
+       raise ValueError(
+           f"Error during parallel processing: {str(e)}"
+       ) from e
docs/source/user_guide/using_datasets.rst (2)

5734-5948: Comprehensive Coverage of Map-Reduce Operations
The “Map-reduce operations” section introduces the new map_samples() functionality with robust examples for both the reduce and aggregate function patterns. The two group tabs (one for a custom ReduceFcn subclass and another for an aggregate function) are explained clearly, and the inclusion of progress bar examples helps illustrate the parallel processing behavior. Consider adding a short summary at the very start of the section that briefly compares when to use a reduce function versus an aggregate function, which may help users decide which approach best suits their needs.


6027-6047: Clear Usage of set_label_values() for Bulk Label Updates
The “Setting label values” section clearly describes how to update label attributes in bulk using the set_label_values() method. The accompanying code example makes the process intuitive. One minor suggestion: you might add a brief note regarding maintaining consistency of label IDs when applying such bulk updates, in case users need to consider schema implications.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 166b7c4 and e4e9b67.

📒 Files selected for processing (7)
  • docs/source/release-notes.rst (3 hunks)
  • docs/source/user_guide/using_datasets.rst (2 hunks)
  • fiftyone/core/collections.py (2 hunks)
  • fiftyone/core/utils.py (3 hunks)
  • fiftyone/utils/multiprocessing.py (1 hunks)
  • tests/unittests/dataset_tests.py (2 hunks)
  • tests/unittests/view_tests.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • fiftyone/core/utils.py
  • docs/source/release-notes.rst
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py

232-232: Avoid inequality comparisons to False; use if progress: for truth checks

Replace with progress

(E712)


499-499: Ambiguous variable name: l

(E741)

⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build
🔇 Additional comments (11)
fiftyone/utils/multiprocessing.py (5)

171-178: LGTM! Smart handling of daemon processes and worker configuration.

The code correctly handles daemon processes by defaulting to single-process execution, and respects the global configuration for default workers.


459-468: LGTM! Smart handling of shard sizes.

The code intelligently caps the shard size based on ObjectId value recommendations and optimizes for the number of workers.


514-516: LGTM! Proper MongoDB client handling in multiprocessing.

The code correctly disconnects and recreates MongoDB clients in worker processes, following PyMongo's best practices for multiprocessing.


247-247: LGTM! Smart use of dill for function serialization.

Using dill instead of the standard pickle module enables support for lambda functions in 'spawn' contexts.


257-257: LGTM! Efficient progress tracking implementation.

The code effectively uses multiprocessing.Queue/Value for a unified progress bar in the main context, with an option for per-worker progress bars.

fiftyone/core/collections.py (3)

3216-3279: LGTM! Well-designed parallel processing method for updating samples.

The update_samples() method provides an efficient way to apply updates to samples in parallel. The implementation is clean and the docstring provides clear examples.


3281-3426: LGTM! Comprehensive implementation of parallel mapping operations.

The map_samples() method provides flexible parallel processing capabilities with support for map, map-reduce and map-aggregate operations. The implementation is robust and the docstring includes clear examples for each use case.


3428-3451: LGTM! Clean internal implementation.

The _map_samples() method provides a clean internal implementation that delegates to the multiprocessing utilities. The code is concise and well-structured.

docs/source/user_guide/using_datasets.rst (3)

5543-5567: Excellent Explanation for Save Contexts
The new “Save contexts” section provides a thorough explanation of how to efficiently batch sample saves using a context manager. The code snippet is clear and demonstrates the benefits compared to per‑sample saves (i.e. fewer database connections).


5616-5641: Clear and Informative “Updating Samples” Section
The documentation for the update_samples() method is well written. It details the usage of the method—including leveraging multiprocessing—with illustrative code, explanations of parameters like shard_size and num_workers, and a reminder to select only the required fields for performance.


5951-5971: Efficient Batch Field Editing with set_values()
The “Setting values” section succinctly demonstrates how to use the set_values() method to assign values to a field across all samples efficiently. The step-by-step example comparing it to individually saving samples is well presented and reinforces the performance benefits of a batch update strategy.

save=False,
progress=None,
):
if reduce_fcn is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should raise an error/warning if both reduce_fnc and aggregate_fcn are passed in

num_workers (None): the number of workers to use. The default is
:meth:`fiftyone.core.utils.recommend_process_pool_workers`. If this
value is <= 1, all work is done in the main process
shard_size (None): an optional number of samples to distribute to each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider removing shard_size since it is confusing to the users. If num_workers * shard_size < n, we will just end up using max_shard_size anyway :) so might as well not need it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants