-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
map_samples()
and update_samples()
#5480
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThis change set introduces a new configuration field, Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
This is awesome, I can check this out and run some benchmarking with evaluate_detections. Should we do another pass with Ray as well? We can do a comparison between Apache Beam vs. Pool Multiprocessing vs. Ray. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
fiftyone/utils/multiprocessing.py (2)
130-143
: Consider clarifying concurrency boundaries or warnings for large datasets.
When engaging multiple workers, it might be beneficial to highlight memory usage or concurrency implications for massive or distributed datasets. A short note in the docstring or an in-code comment can preempt performance pitfalls for large-scale users.
238-246
: Global state initialization might limit maintainability.
Assigning “sample_collection,” “map_fcn,” etc. as global variables works for process-based concurrency but can be prone to future confusion. Consider storing them in a dedicated worker context or passing them directly into each function if the code grows more complex.fiftyone/core/config.py (1)
241-246
: “default_map_workers” adds convenient user configurability.
Dynamically controlling default workers for “map_samples” can improve performance across various workloads. As a future enhancement, consider validating that “default_map_workers” does not exceed CPU core counts to prevent potential oversubscription.fiftyone/core/collections.py (1)
3217-3307
: Implementation looks good with some minor suggestions.The parallel processing implementation is well-structured and handles both mapping and map-reduce operations effectively. A few suggestions for improvement:
- Consider adding input validation for
shard_method
to ensure only valid values are passed- Consider adding type hints for better IDE support and code maintainability
- Consider adding a warning when
num_workers=1
is explicitly set, as this disables parallelizationExample type hints:
def map_samples( self, map_fcn: Callable[[Sample], Any], reduce_fcn: Optional[Callable[[SampleCollection, Dict[str, Any]], Any]] = None, save: Optional[bool] = None, num_workers: Optional[int] = None, shard_size: Optional[int] = None, shard_method: Literal["id", "slice"] = "id", progress: Optional[Union[bool, Callable]] = None, ) -> Optional[Any]:docs/source/user_guide/config.rst (1)
97-100
: New Config Field Addition: default_map_workers
The new configuration row is well integrated in the table and clearly shows the environment variable (FIFTYONE_DEFAULT_MAP_WORKERS), the default value (None), and the connection to the map_samples() method. For enhanced clarity, consider adding a brief note (if possible) on scenarios or performance implications when a user might want to override this default.docs/source/user_guide/using_datasets.rst (2)
5640-5658
: Performance Metrics Output ExampleThe text output block that shows batch processing speeds and the aggregated label counts effectively communicates the performance gains from parallel processing. This real-world metric example reinforces the efficiency of the new approach.
It would improve clarity to add a brief inline comment before the output block noting that these metrics are illustrative and may vary depending on system configuration.
5691-5700
: Map-Reduce Example with Reduce FunctionThe second code example demonstrates how to perform a map-reduce operation by having the mapping function return a lowercased label, with the reduce function aggregating these results using Python’s Counter. This example clearly shows how to leverage the reduce functionality for further processing of the outputs.
A short inline comment explaining that the reduce function aggregates label counts could help users unfamiliar with map-reduce patterns.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
docs/source/user_guide/config.rst
(3 hunks)docs/source/user_guide/using_datasets.rst
(1 hunks)fiftyone/core/collections.py
(3 hunks)fiftyone/core/config.py
(1 hunks)fiftyone/core/odm/database.py
(3 hunks)fiftyone/core/utils.py
(1 hunks)fiftyone/utils/multiprocessing.py
(1 hunks)setup.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py
231-231: Ambiguous variable name: l
(E741)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: test / test-app
- GitHub Check: build
🔇 Additional comments (11)
fiftyone/utils/multiprocessing.py (2)
19-47
: Good use of docstrings and examples to illustrate map vs map-reduce.
The in-code examples clearly explain the two usage patterns. This thorough documentation will help users adopt the new method quickly.
279-294
: Verify autosave behavior and concurrency side-effects.
When using “iter_samples(autosave=save),” any sample modifications in parallel workers might lead to conflicting updates if multiple batches contain overlapping data. Confirm that your dataset usage and indexing strategy prevent concurrency collisions.setup.py (1)
73-73
: Confirm that “tqdm” is suitably pinned or versioned.
The new “tqdm” dependency is necessary for progress bars. Ensure that your pinned version aligns with other dependencies or older installed versions of “tqdm” in user environments to avoid conflicts.fiftyone/core/odm/database.py (3)
20-20
: LGTM!The change to import the entire
mongoengine
module improves code clarity by making the source of functionality explicit.
222-222
: LGTM!The change to use
mongoengine.connect()
is consistent with the updated import and maintains the same functionality.
247-252
: LGTM!The new
_disconnect()
function properly handles database cleanup by resetting global clients and disconnecting all mongoengine connections.fiftyone/core/utils.py (1)
2297-2300
: LGTM!The added documentation clearly explains the OS-specific default behavior for process pool workers, which improves developer understanding.
fiftyone/core/collections.py (1)
3330-3351
: LGTM! Clean and efficient implementation.The single-process mapping implementation is well-structured and handles all cases correctly, with proper defaults and error handling.
docs/source/user_guide/config.rst (2)
175-177
: JSON Example Update for default_map_workers
The JSON output snippet now includes the "default_map_workers": null field correctly, ensuring that the configuration example is consistent with the new documentation entry.
226-228
: CLI Config Output Consistency
The CLI configuration output also reflects the new default_map_workers field with its value set to null. This consistency will help users verify that their config changes are applied uniformly across both Python and CLI outputs.docs/source/user_guide/using_datasets.rst (1)
5615-5638
: New Map-Reduce Operations Section IntroductionThis new section is a clear and valuable addition that explains the benefits of using the :meth:
map_samples()
method. The introductory text and the first code example—which converts sample labels to uppercase—are concise, well-formatted, and directly illustrate how users can modify sample data in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
fiftyone/utils/multiprocessing.py (1)
313-313
: 🛠️ Refactor suggestionRename the ambiguous parameter "l" to a more descriptive name.
Using "l" can be confusing and conflicts with PEP8 guidance.
Apply this diff:
-def _init_worker(dataset_name, view_stages, m, s, r, p, l): +def _init_worker(dataset_name, view_stages, m, s, r, p, lock_obj):🧰 Tools
🪛 Ruff (0.8.2)
313-313: Ambiguous variable name:
l
(E741)
🧹 Nitpick comments (2)
fiftyone/core/collections.py (1)
3218-3401
: LGTM! Well-structured implementation of parallel sample processing.The implementation provides a robust way to apply functions to samples in parallel with good defaults and flexibility. The docstring is comprehensive with clear examples.
A few suggestions for improvement:
def map_samples( self, map_fcn, reduce_fcn=None, save=None, num_workers=None, shard_size=None, shard_method="id", progress=None, + # Add type hints ): + # Add validation + if shard_method not in ("id", "slice"): + raise ValueError("shard_method must be 'id' or 'slice'") + + if shard_size is not None and shard_size <= 0: + raise ValueError("shard_size must be > 0") + + # Validate map_fcn signature + if not callable(map_fcn): + raise TypeError("map_fcn must be callable")docs/source/user_guide/using_datasets.rst (1)
5813-5833
: Setting Values for Batch Edits
A new section on using set_values() is introduced to demonstrate how to update a field (or embedded field) on all samples using a single batch operation. The code is well written and emphasizes the efficiency gains over per-sample iteration. One suggestion would be to include a brief note about verifying type consistency when using set_values(), as mismatches could cause runtime errors.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
docs/source/user_guide/using_datasets.rst
(1 hunks)fiftyone/__public__.py
(1 hunks)fiftyone/core/collections.py
(4 hunks)fiftyone/utils/multiprocessing.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/__public__.py
260-260: .utils.multiprocessing.ReduceFcn
imported but unused
Remove unused import: .utils.multiprocessing.ReduceFcn
(F401)
fiftyone/utils/multiprocessing.py
313-313: Ambiguous variable name: l
(E741)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-app
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: lint / eslint
- GitHub Check: build
🔇 Additional comments (14)
fiftyone/__public__.py (1)
260-260
: LGTM!The addition of
ReduceFcn
to the public interface is appropriate as it supports the new map-reduce functionality.🧰 Tools
🪛 Ruff (0.8.2)
260-260:
.utils.multiprocessing.ReduceFcn
imported but unusedRemove unused import:
.utils.multiprocessing.ReduceFcn
(F401)
fiftyone/utils/multiprocessing.py (3)
20-147
: Well-structured implementation with comprehensive documentation!The function effectively handles both map and map-reduce operations with proper resource management and flexible progress tracking.
223-255
: Clean and extensible reducer implementation!The class provides a well-documented interface for both function-based and class-based reduction strategies.
257-310
: LGTM! Helper functions are well-implemented.The helper functions handle batch distribution, worker initialization, and batch processing effectively.
Also applies to: 341-377
fiftyone/core/collections.py (1)
3371-3401
: LGTM! Clean implementation of single-process sample mapping.The helper method handles both mapping and reduction cases efficiently with proper initialization of reducers.
docs/source/user_guide/using_datasets.rst (9)
5616-5625
: Introduction of Map-Reduce Operations Section
This new section introduces the map-reduce functionality via the map_samples() method. The heading and the brief description clearly explain that the method iterates over samples in a collection, applies a user-defined mapping function (and optionally a reduce function), and can report performance metrics.
5626-5642
: Mapping Example Code Block
The provided Python code snippet demonstrates a basic usage of map_samples(). It loads a CIFAR-10 dataset for training, selects the “ground_truth” field, and applies a mapping function that converts the label to uppercase. The inclusion of a code-block with line numbers helps the reader follow the example, and the subsequent text block of sample performance outputs reinforces the efficiency claims.
5642-5661
: Output Block and Explanatory Notes
The textual output (showing batch speeds for each batch) is a practical touch that helps users understand the performance improvements achieved using this method. Additionally, the brief list of properties and considerations of map_samples() (such as nondeterministic order and avoiding global state modifications) is well stated.
5681-5687
: Alternative Iteration with iter_samples()
A short snippet is provided that illustrates how one might alternatively iterate over samples using iter_samples() with autosave and progress reporting enabled. This clearly contrasts the traditional iterative approach with the new, parallelized map_samples() method, reinforcing the performance benefits.
5692-5709
: Reduce Function Example
The documentation offers a “Reduce function” example that shows how to use a standard function (employing collections.Counter) as the reduce_fcn parameter. This example is clear, concise, and effectively demonstrates a common reduction pattern in batch processing.
5710-5775
: Reduce Class Example
An alternative approach is demonstrated via a custom ReduceFcn class. This version explicitly defines an init(), update(), and finalize() method to aggregate mapping outputs. The example is well structured, clearly separating the accumulator’s initialization, per-batch updates, and final consolidation of results. This version will appeal to users looking for a more object‐oriented approach to specifying custom reduce behavior.
5776-5801
: Worker Configuration Explanation
The passage following the reduce examples outlines multiple ways to configure the number of workers for the map_samples() method. It explains setting a default via the FiftyOne configuration, manually overriding with the num_workers parameter, and the fallback to recommend_process_pool_workers() if not set. This guidance is practical and will assist users in optimizing parallel performance.
5802-5812
: Shard Size Option
The example showing how to pass a custom shard_size to the map_samples() method is clear and useful. It demonstrates that users can control the number of samples sent per worker, which might be helpful for tuning performance especially on large datasets.
5889-5913
: Setting Label Values
The final section covers how to use set_label_values() for performing batch updates specifically on label fields. The example clearly illustrates obtaining sample label IDs, constructing a mapping, and updating a new “random” attribute on each label. This section is informative and neatly ties into the overall theme of efficient batch processing in datasets.
def map_fcn(sample): | ||
return sample.ground_truth.label.lower() | ||
|
||
def reduce_fcn(sample_collection, values): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m pretty uncomfortable naming this “reduce”. We put ourselves in an awkward position moving forward if we name this “reduce” since it is not a “reduce” function. If we introduce a proper reduce function later, what would we name it? Might users familiar with reduce find this confusing? Let’s please name it “aggregate” (or something else) instead of misnaming it using a word that already has a different meaning. And let’s not refer to “map-reduce” if we aren’t performing a map reduce operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that reduce_fcn
supports two options:
- This version you've commented on, which I agree is more like an "aggregate function"
- A
ReduceFcn
subclass, which is modeled after Beam'sCombineFn
and is like a traditional reduce operator
For clarity, I could get onboard with adding a new aggregate_fcn
argument that callers must use if they want to provide an "aggregate function" like this.
This would reserve reduce_fcn
for the true "reduce" functionality. Thinking ahead, if we want to use map_samples(map_fcn, reduce_fcn=)
internally in the SDK and support swapping out the multiprocessing backend for distributed computing backends (Beam, Ray, etc), we'd need to find some kind of universal interface for defining reduce functions, so that the caller's reduction could be performed by any backend. Comparing the ReduceFcn
classes from this PR and #5449, this feels tricky to achieve (for example, this PR's reducer operates in the main process, while Beam's does not; this changes how they are implemented).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we think it is important to support multiple backends in the future, it might make sense to ship map_samples(map_fcn, aggregate_fcn=)
and NOT include a reduce_fcn=
argument just yet.
After all, most reductions could be expressed idiomatically as FiftyOne aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Josh and your suggestion to not include reduce. Even the beam docs call it aggregate as it's not really the same thing as a traditional reduce most people are familiar with. Also embedding reduce inside of map() really doesn't make sense for other backends that have a true reducer like spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added aggregate_fcn
as discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
fiftyone/core/collections.py (4)
13-13
: Add copyright year rangeThe copyright notice should use a year range to indicate both the initial creation and latest modification years.
-| Copyright 2017-2025, Voxel51, Inc. +| Copyright 2017-2024, Voxel51, Inc.
3218-3404
: Review the newmap_samples()
method implementationThe new
map_samples()
method provides a powerful way to parallelize operations across samples using multiprocessing. The implementation looks solid with good error handling and configuration options. A few suggestions for improvement:
- Consider adding input validation for
num_workers
to ensure it's a positive integer- Add validation for
shard_size
to ensure it's a positive integer when provided- Consider adding a timeout parameter to handle hanging worker processes
def map_samples( self, map_fcn, reduce_fcn=None, save=None, num_workers=None, shard_size=None, shard_method="id", progress=None, + timeout=None, ): """Applies the given function to each sample in the collection. Args: ... + timeout (None): optional timeout in seconds for worker processes """ + if num_workers is not None and not isinstance(num_workers, int): + raise ValueError("num_workers must be a positive integer") + + if shard_size is not None and not isinstance(shard_size, int): + raise ValueError("shard_size must be a positive integer") if num_workers is None: num_workers = fo.config.default_map_workers
3218-3268
: Improve docstring with more examplesThe docstring would benefit from additional examples showing common use cases and error handling.
def map_samples(self, map_fcn, reduce_fcn=None, save=None, num_workers=None, shard_size=None, shard_method="id", progress=None): """Applies the given function to each sample in the collection. Examples:: import fiftyone as fo import fiftyone.zoo as foz dataset = foz.load_zoo_dataset("cifar10", split="train") view = dataset.select_fields("ground_truth") + # Example 4: Error handling + def map_fcn(sample): + if not hasattr(sample, "ground_truth"): + raise ValueError("Sample missing ground truth") + return sample.ground_truth.label + + try: + dataset.map_samples(map_fcn) + except ValueError as e: + print(f"Error: {e}") + + # Example 5: Using timeout + def slow_map_fcn(sample): + time.sleep(10) + return sample.ground_truth.label + + # Will timeout after 5 seconds + dataset.map_samples(slow_map_fcn, timeout=5)
3363-3372
: Add type hints to improve code readabilityConsider adding type hints to the function parameters and return value to improve code readability and enable better IDE support.
+from typing import Optional, Union, Callable, Any def _map_samples_single( self, - map_fcn, - reduce_fcn=None, - save=None, - progress=False, + map_fcn: Callable[[Any], Any], + reduce_fcn: Optional[Union[Callable, type]] = None, + save: Optional[bool] = None, + progress: Union[bool, str] = False, +) -> Optional[Any]:
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
fiftyone/core/collections.py
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-app
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: lint / eslint
- GitHub Check: build
@brimoor Can this launch non-daemon processes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to test on a large dataset for which the optimization is really needed. I don't see any mention of how to configure the runners and so I'm worried that just using the direct runner will quickly hit memory issues (https://beam.apache.org/documentation/runners/direct/)
@kaixi-wang this PR uses What makes you think there will be memory issues? For me, running the example code in the PR description achieves an ~8x speedup over the same operation using a single |
@jacobsela can you elaborate on what you're asking? |
Oh yeah sorry. Mixed up the PRs. But yes in your example, there “should” be no issue since you are only using values on the sample you are iterating over. However, based on my recent attempts/frustrations to make/do anything with large datasets, I’m cautious in recommending a solution for big data without verifying/testing it… although I do trust that you know this part of the code significantly better than me Additionally, the concern with memory specifically stems from thinking about the case where external data/side inputs are required. Since each process worker will require a copy of all the same inputs (except for samples) and there doesn’t appear to be direct support for selectively partitioning/streaming or sharing objects, this version still might not be ideal for solving the common pain point of adding fields/setting values that are not computed directly from the existing fields. |
@jacobsela are you thinking about potential issues relating to nested multiprocessing and that daemon processes can’t create child processes? For example, if the map fn were to call some method like ‘compute_’ that used multiprocessing under the hood ? |
@brimoor @kaixi-wang Exactly, daemon processes aren't able to launch their own child processes. I wrote my own similar util for this purpose and because it uses Pool it ran into the same issue. If we want to parallelize model inference/training/anything that uses a data loader it is pretty useful to be able to launch more nested child processes. |
@jacobsela : that might be a little bit out of scope of what we want to do here, on one hand, I think it's reasonable to assume that |
@minhtuev I understand, I don't mean for this to be an explicit request. I was just wondering for my own use cases. FWIW the way |
@jacobsela : thanks for the input, I have created a ticket to investigate all usage patterns of |
@brimoor : thinking about it, I really like the idea of having different backends for For example, we can have The backend options can include:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
fiftyone/core/collections.py (1)
3218-3414
:⚠️ Potential issueAdd error handling to protect against exceptions in map_fcn
The
_map_samples_single()
implementation should handle errors gracefully to prevent crashes. Consider wrapping the map function call in a try-except block.def _map_samples_single( self, map_fcn, reduce_fcn=None, aggregate_fcn=None, save=None, progress=False, ): if save is None: save = reducer is None if reducer is not None: reducer.init() if progress == "global": progress = True + try: for sample in self.iter_samples(autosave=save, progress=progress): + try: sample_output = map_fcn(sample) + except Exception as e: + raise ValueError( + f"Error applying map_fcn to sample {sample.id}: {str(e)}" + ) from e if reducer is not None and sample_output is not None: reducer.update({sample.id: sample_output}) if reducer is not None: return reducer.finalize() + except Exception as e: + raise ValueError(f"Error in map_samples: {str(e)}") from e
🧹 Nitpick comments (5)
fiftyone/utils/multiprocessing.py (3)
151-224
: Consider adding error handling for invalid progress callback.While the implementation is solid, the progress callback handling could be more robust.
Add validation for the progress callback:
elif callable(progress): + if not hasattr(progress, '__call__'): + raise ValueError("progress callback must be callable") global_progress = progress progress = False
226-259
: Consider adding type hints for better code maintainability.The
ReduceFcn
class is well-designed but could benefit from type hints to improve code maintainability and IDE support.Add type hints:
- def __init__(self, sample_collection, aggregate_fcn=None): + def __init__(self, sample_collection: "fo.core.collections.SampleCollection", + aggregate_fcn: Optional[Callable] = None) -> None: self.sample_collection = sample_collection self.aggregate_fcn = aggregate_fcn self.accumulator = None - def update(self, outputs): + def update(self, outputs: Dict[str, Any]) -> None:
327-327
: Document MongoDB connection handling behavior.The code properly disconnects MongoDB clients before initializing workers, but this behavior should be documented.
Add a docstring explaining the MongoDB connection handling:
def _init_worker(dataset_name, view_stages, m, s, r, p, l): + """Initialize a worker process. + + Note: This function automatically disconnects and recreates MongoDB connections + to ensure each worker process has its own connection pool, following PyMongo's + multiprocessing guidelines. + """ from tqdm.auto import tqdmfiftyone/core/collections.py (2)
3355-3382
: Consider using a context manager for the multiprocessing poolThe multiprocessing pool should be properly managed to ensure cleanup of resources.
def map_samples(self, ...): if num_workers is None: if multiprocessing.current_process().daemon: num_workers = 1 elif fo.config.default_map_workers is not None: num_workers = fo.config.default_map_workers else: num_workers = fou.recommend_process_pool_workers() if num_workers is not None and num_workers <= 1: return self._map_samples_single(...) + with multiprocessing.Pool(num_workers) as pool: return foum.map_samples( self, map_fcn, reduce_fcn=reduce_fcn, aggregate_fcn=aggregate_fcn, save=save, num_workers=num_workers, shard_size=shard_size, shard_method=shard_method, progress=progress, )
3229-3353
: Enhance docstring with performance considerationsThe docstring should include information about performance implications and best practices.
Add the following to the docstring:
""" +Performance Considerations: +--------------------------- +- The optimal num_workers depends on CPU cores and I/O patterns +- For CPU-bound tasks, use num_workers = cpu_count +- For I/O-bound tasks, num_workers can be higher +- Large shard_size improves throughput but increases memory usage +- Small shard_size reduces memory but increases overhead """
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
fiftyone/core/collections.py
(4 hunks)fiftyone/utils/multiprocessing.py
(1 hunks)setup.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- setup.py
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py
9-9: multiprocessing
imported but unused
Remove unused import: multiprocessing
(F401)
316-316: Ambiguous variable name: l
(E741)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-app
- GitHub Check: e2e / test-e2e
- GitHub Check: lint / eslint
- GitHub Check: build / build
- GitHub Check: build
🔇 Additional comments (3)
fiftyone/utils/multiprocessing.py (3)
1-20
: LGTM! Well-organized imports and module setup.The imports are appropriately chosen for the module's functionality, including essential dependencies for multiprocessing, database operations, and progress tracking.
🧰 Tools
🪛 Ruff (0.8.2)
9-9:
multiprocessing
imported but unusedRemove unused import:
multiprocessing
(F401)
21-150
: Excellent documentation with comprehensive examples!The docstring provides clear explanations and practical examples demonstrating all three operation modes: map, map-reduce, and map-aggregate. The parameter descriptions are thorough and the examples are executable.
316-316
: Rename the ambiguous parameter "l" to a more descriptive name.Using "l" as a parameter name can be confusing and conflicts with PEP8 guidance.
🧰 Tools
🪛 Ruff (0.8.2)
316-316: Ambiguous variable name:
l
(E741)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (11)
fiftyone/utils/multiprocessing.py (4)
171-178
: Consider adding a warning when running in daemon mode.When running in a daemon process, the function falls back to single-worker mode without notifying the user. This could lead to unexpected performance characteristics.
if multiprocessing.current_process().daemon: + fou.logger.warning( + "Running in daemon mode - falling back to single worker processing" + ) num_workers = 1
232-239
: Fix boolean comparison style.The code uses an explicit comparison to
False
which is not Pythonic.- elif progress != False: + elif progress:🧰 Tools
🪛 Ruff (0.8.2)
232-232: Avoid inequality comparisons to
False
; useif progress:
for truth checksReplace with
progress
(E712)
292-300
: Add timeout configuration for queue operations.The hardcoded 0.01 second timeout in the queue polling loop could be made configurable for different workloads.
Consider adding a
queue_timeout
parameter to allow users to tune this value based on their specific use case.
566-572
: Consider using atomic operations for counters.The code uses explicit locking for counter increments which could be replaced with atomic operations for better performance.
Consider using
multiprocessing.Value('i', 0, lock=True)
to create atomic counters instead of explicit locking.tests/unittests/view_tests.py (3)
113-153
: Test coverage looks good, consider adding error case tests.The test_update_samples method provides good coverage of the basic functionality, but could be enhanced with:
- Tests for invalid update functions
- Tests for error handling when workers fail
- Tests for memory usage with large datasets
154-236
: Test coverage looks good, consider adding edge case tests.The test_map_samples method effectively tests the core functionality with both map-reduce and map-aggregate operations. Consider adding:
- Tests for empty datasets
- Tests for invalid map/reduce/aggregate functions
- Tests for memory usage with large datasets
- Tests for handling worker failures
8-8
: Add Counter import to imports section.Move the Counter import to the standard library imports section at the top for better organization.
-from collections import Counter + from copy import deepcopy from datetime import date, datetime, timedelta import math +from collections import Countertests/unittests/dataset_tests.py (2)
1026-1066
: LGTM! Well-structured test for concurrent sample updates.The test provides good coverage of different worker configurations and sharding methods. It effectively validates the bounds of integer fields after updates.
Consider adding assertions to verify that the updates are atomic by checking that no intermediate values exist between the expected bounds.
def test_update_samples(self): dataset = fo.Dataset() dataset.add_samples( [fo.Sample(filepath="image%d.jpg" % i, int=i) for i in range(50)] ) self.assertTupleEqual(dataset.bounds("int"), (0, 49)) def update_fcn(sample): sample.int += 1 dataset.update_samples(update_fcn, num_workers=2) self.assertTupleEqual(dataset.bounds("int"), (1, 50)) + # Verify no intermediate values exist + self.assertNotIn(0, dataset.distinct("int")) + self.assertNotIn(51, dataset.distinct("int"))
1067-1148
: LGTM! Comprehensive test coverage for mapping operations.The test effectively validates map-reduce and map-aggregate operations with different worker configurations and sharding methods.
Consider adding test cases for:
- Error handling when map function raises an exception
- Memory usage with large datasets
- Cancellation of long-running operations
Example test case for error handling:
def test_map_samples_error_handling(self): dataset = fo.Dataset() dataset.add_samples( [fo.Sample(filepath="image%d.jpg" % i) for i in range(50)] ) def map_fcn_with_error(sample): if sample.filepath == "image25.jpg": raise ValueError("Simulated error") return sample.filepath.upper() with self.assertRaises(ValueError): dataset.map_samples(map_fcn_with_error, num_workers=2)fiftyone/core/collections.py (2)
3216-3279
: Consider adding parameter validation forupdate_fcn
The
update_samples()
method should validate thatupdate_fcn
is callable before proceeding with the operation.def update_samples( self, update_fcn, num_workers=None, shard_size=None, shard_method="id", progress=None, ): + if not callable(update_fcn): + raise ValueError("update_fcn must be a callable") return self._map_samples( update_fcn, return_outputs=False, save=True, num_workers=num_workers, shard_size=shard_size, shard_method=shard_method, progress=progress, )
3281-3452
: Add type validation for reduce_fcn and aggregate_fcnThe
map_samples()
method should validate the types ofreduce_fcn
andaggregate_fcn
to ensure they match the expected interfaces.def map_samples( self, map_fcn, reduce_fcn=None, aggregate_fcn=None, save=False, num_workers=None, shard_size=None, shard_method="id", progress=None, ): + if not callable(map_fcn): + raise ValueError("map_fcn must be a callable") + + if reduce_fcn is not None and not ( + inspect.isclass(reduce_fcn) or callable(reduce_fcn) + ): + raise ValueError("reduce_fcn must be a class or callable") + + if aggregate_fcn is not None and not callable(aggregate_fcn): + raise ValueError("aggregate_fcn must be a callable") return self._map_samples( map_fcn, reduce_fcn=reduce_fcn, aggregate_fcn=aggregate_fcn, save=save, num_workers=num_workers, shard_size=shard_size, shard_method=shard_method, progress=progress, )
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
docs/source/release-notes.rst
(3 hunks)docs/source/user_guide/using_datasets.rst
(2 hunks)fiftyone/core/collections.py
(2 hunks)fiftyone/core/utils.py
(3 hunks)fiftyone/utils/multiprocessing.py
(1 hunks)tests/unittests/dataset_tests.py
(2 hunks)tests/unittests/view_tests.py
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- docs/source/release-notes.rst
🚧 Files skipped from review as they are similar to previous changes (2)
- fiftyone/core/utils.py
- docs/source/user_guide/using_datasets.rst
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py
232-232: Avoid inequality comparisons to False
; use if progress:
for truth checks
Replace with progress
(E712)
499-499: Ambiguous variable name: l
(E741)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-app
- GitHub Check: lint / eslint
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: build
🔇 Additional comments (4)
fiftyone/utils/multiprocessing.py (3)
459-468
: Good use of batch size optimization!The code intelligently caps the shard size based on ObjectId value recommendations, which helps prevent MongoDB query document size limits.
499-499
: Rename ambiguous parameter.The parameter
l
is ambiguous and conflicts with PEP8 guidance.-def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, l): +def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, lock): - if l is not None: - tqdm.set_lock(l) + if lock is not None: + tqdm.set_lock(lock)Also applies to: 531-532
🧰 Tools
🪛 Ruff (0.8.2)
499-499: Ambiguous variable name:
l
(E741)
514-517
: Good handling of MongoDB connections!The code properly disconnects and recreates MongoDB clients for each worker process, following best practices for PyMongo with multiprocessing.
fiftyone/core/collections.py (1)
3428-3452
: Implementation looks goodThe
_map_samples()
helper method correctly delegates tofoum.map_samples()
with appropriate parameter passing.
@@ -3212,6 +3213,243 @@ def _set_labels(self, field_name, sample_ids, label_docs, progress=False): | |||
def _delete_labels(self, ids, fields=None): | |||
self._dataset.delete_labels(ids=ids, fields=fields) | |||
|
|||
def update_samples( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an update_samples()
method for the case where the user just wants to apply sample edits to a collection.
for batch_view in fou.iter_slices(sample_collection, shard_size): | ||
for sample in batch_view.iter_samples(autosave=save): | ||
sample_output = map_fcn(sample) | ||
yield sample.id, sample_output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j053y dataset.map_samples(map_fcn)
now returns a generator as suggested ✅
@@ -2294,6 +2292,10 @@ def recommend_thread_pool_workers(num_workers=None): | |||
def recommend_process_pool_workers(num_workers=None): | |||
"""Recommends a number of workers for a process pool. | |||
|
|||
By default, ``multiprocessing.cpu_count()`` workers will be recommended if | |||
you are running on macOS or Linux, while a single worker will be | |||
recommended on Windows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: can we use num_workers=multiprocessing.cpu_count()
by default instead of num_workers=1
for Windows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a ticket to track this, will test tomorrow
- None, otherwise | ||
""" | ||
if num_workers is None: | ||
if multiprocessing.current_process().daemon: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Automatically defaulting to no multiprocessing when this code is invoked in a daemon process ✅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a logging here, in case users are expecting multiprocessing output
if return_outputs: | ||
batch_count = multiprocessing.Value("i", 0) | ||
sample_count = None | ||
queue = multiprocessing.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j053y using a queue to implement process communication as suggested ✅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will run a benchmark today to test the performance of the queue implementation ✅
initargs=( | ||
dataset_name, | ||
view_stages, | ||
pickle.dumps(map_fcn), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using dill
to serialize the function allows us to support map_fcn
that are lambda functions with 'spawn'
contexts ✅
Without this, the example code in the docs/PR descriptions only works for 'fork'
contexts, which aren't available on Windows and will apparently be removed entirely in Python 3.14
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can run a test tomorrow to see if this works on Windows as well
), | ||
) | ||
|
||
pb = fou.ProgressBar(total=num_samples, progress=progress) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using multiprocessing.Queue/Value
allows for a single global progress bar in the main context ✅
Users can optionally pass progress="workers"
if they want to see the per-worker progress bars 😎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
fiftyone/utils/multiprocessing.py (1)
535-573
: 🛠️ Refactor suggestionConsider adding error handling for map_fcn execution.
The
_map_batch
function should handle potential exceptions frommap_fcn
to prevent worker crashes and provide meaningful error messages.def _map_batch(input): i, num_batches, batch = input + + def safe_map_fcn(sample): + try: + return map_fcn(sample) + except Exception as e: + return f"Error processing sample {sample.id}: {str(e)}" if isinstance(batch, tuple): # Slice batches start, stop = batch total = stop - start batch_view = sample_collection[start:stop] else: # ID batches sample_ids = batch total = len(sample_ids) batch_view = fov.make_optimized_select_view( sample_collection, sample_ids ) if progress: desc = f"Batch {i + 1:0{len(str(num_batches))}}/{num_batches}" with tqdm(total=total, desc=desc, position=i) as pb: for sample in batch_view.iter_samples(autosave=save): - sample_output = map_fcn(sample) + sample_output = safe_map_fcn(sample) if queue is not None: queue.put((sample.id, sample_output)) pb.update() else: for sample in batch_view.iter_samples(autosave=save): - sample_output = map_fcn(sample) + sample_output = safe_map_fcn(sample) if queue is not None: queue.put((sample.id, sample_output)) if sample_count is not None: with sample_count.get_lock(): sample_count.value += 1
♻️ Duplicate comments (1)
fiftyone/utils/multiprocessing.py (1)
231-231
: 🛠️ Refactor suggestionRename parameter 'l' to improve readability.
The parameter name 'l' is ambiguous and could be confused with the number '1'.
-def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, l): +def _init_worker(dataset_name, view_stages, m, bc, sc, q, s, p, lock_obj):
🧹 Nitpick comments (8)
fiftyone/utils/multiprocessing.py (1)
232-232
: Simplify boolean comparison.Replace inequality comparison with
False
with a direct boolean check.-elif progress != False: +elif progress:🧰 Tools
🪛 Ruff (0.8.2)
232-232: Avoid inequality comparisons to
False
; useif progress:
for truth checksReplace with
progress
(E712)
tests/unittests/view_tests.py (2)
112-153
: LGTM! Well-structured test for update_samples()The test thoroughly validates the update_samples() functionality with different worker configurations and shard methods. The test coverage includes:
- Multiple workers with different shard configurations
- Main process execution
- Verification of results through bounds checking
Consider adding test cases for:
+ # Test error handling + def test_update_samples_errors(self): + dataset = fo.Dataset() + dataset.add_samples( + [fo.Sample(filepath="image%d.jpg" % i, int=i) for i in range(50)] + ) + + # Test invalid num_workers + with self.assertRaises(ValueError): + dataset.update_samples(lambda x: x, num_workers=-1) + + # Test invalid shard_size + with self.assertRaises(ValueError): + dataset.update_samples(lambda x: x, shard_size=0)
154-236
: LGTM! Comprehensive test for map_samples()The test effectively validates the map_samples() functionality with:
- Custom mapping function
- Custom reduce function
- Custom aggregate function
- Multiple worker configurations
- Different shard methods
Consider adding test cases for:
+ # Test error handling and edge cases + def test_map_samples_errors(self): + dataset = fo.Dataset() + dataset.add_samples( + [fo.Sample(filepath="image%d.jpg" % i) for i in range(50)] + ) + + # Test invalid map function + with self.assertRaises(ValueError): + dataset.map_samples(None) + + # Test empty dataset + empty_dataset = fo.Dataset() + results = list(empty_dataset.map_samples(lambda x: x.filepath)) + self.assertEqual(len(results), 0)tests/unittests/dataset_tests.py (2)
1026-1066
: Add test coverage for edge cases in update_samples().The test has good basic coverage but would benefit from additional test cases:
- Test error handling for invalid update functions
- Test behavior with empty dataset
- Verify update_samples() works with different field types beyond integers
- Add assertions to verify sample order is preserved
Here's a suggested addition to improve test coverage:
def test_update_samples_edge_cases(self): dataset = fo.Dataset() # Test empty dataset dataset.update_samples(lambda x: x, num_workers=2) self.assertEqual(len(dataset), 0) # Test invalid update function with self.assertRaises(ValueError): dataset.update_samples(None) # Test different field types dataset.add_samples([ fo.Sample(filepath=f"image{i}.jpg", int_field=i, str_field="test", float_field=1.0) for i in range(5) ]) def update_fcn(sample): sample.int_field += 1 sample.str_field += "_updated" sample.float_field *= 2 dataset.update_samples(update_fcn, num_workers=2) # Verify updates across types self.assertEqual(dataset.bounds("int_field"), (1, 5)) self.assertTrue(all("_updated" in s for s in dataset.values("str_field"))) self.assertEqual(dataset.bounds("float_field"), (2.0, 2.0))
1067-1148
: Enhance test coverage for map_samples() edge cases.While the test covers the main functionality well, consider adding:
- Test error handling for invalid mapping/reduce functions
- Test behavior with empty dataset
- Test with more complex data types and transformations
- Verify order preservation with different worker counts
Here's a suggested addition to improve test coverage:
def test_map_samples_edge_cases(self): dataset = fo.Dataset() # Test empty dataset results = dataset.map_samples(lambda x: x.id, num_workers=2) self.assertEqual(len(list(results)), 0) # Test invalid functions with self.assertRaises(ValueError): dataset.map_samples(None) with self.assertRaises(ValueError): dataset.map_samples(lambda x: x.id, reduce_fcn=None) # Test complex transformations dataset.add_samples([ fo.Sample( filepath=f"image{i}.jpg", detections=fo.Detections( detections=[fo.Detection(label="cat", confidence=0.9)] ) ) for i in range(5) ]) # Map detections to new format def map_fcn(sample): return { "id": sample.id, "labels": [d.label for d in sample.detections.detections] } results = dataset.map_samples( map_fcn, num_workers=2, shard_size=2 ) mapped = list(results) self.assertEqual(len(mapped), 5) self.assertTrue(all(r["labels"] == ["cat"] for r in mapped))fiftyone/core/collections.py (1)
3216-3451
: Consider adding error handling for worker failures.While the implementation is solid, it would be good to add explicit error handling for worker process failures to provide better feedback to users.
def _map_samples( self, map_fcn, reduce_fcn=None, aggregate_fcn=None, return_outputs=True, save=False, num_workers=None, shard_size=None, shard_method="id", progress=None, ): + try: return foum.map_samples( self, map_fcn, reduce_fcn=reduce_fcn, aggregate_fcn=aggregate_fcn, return_outputs=return_outputs, save=save, num_workers=num_workers, shard_size=shard_size, shard_method=shard_method, progress=progress, ) + except Exception as e: + raise ValueError( + f"Error during parallel processing: {str(e)}" + ) from edocs/source/user_guide/using_datasets.rst (2)
5734-5948
: Comprehensive Coverage of Map-Reduce Operations
The “Map-reduce operations” section introduces the newmap_samples()
functionality with robust examples for both the reduce and aggregate function patterns. The two group tabs (one for a customReduceFcn
subclass and another for an aggregate function) are explained clearly, and the inclusion of progress bar examples helps illustrate the parallel processing behavior. Consider adding a short summary at the very start of the section that briefly compares when to use a reduce function versus an aggregate function, which may help users decide which approach best suits their needs.
6027-6047
: Clear Usage ofset_label_values()
for Bulk Label Updates
The “Setting label values” section clearly describes how to update label attributes in bulk using theset_label_values()
method. The accompanying code example makes the process intuitive. One minor suggestion: you might add a brief note regarding maintaining consistency of label IDs when applying such bulk updates, in case users need to consider schema implications.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
docs/source/release-notes.rst
(3 hunks)docs/source/user_guide/using_datasets.rst
(2 hunks)fiftyone/core/collections.py
(2 hunks)fiftyone/core/utils.py
(3 hunks)fiftyone/utils/multiprocessing.py
(1 hunks)tests/unittests/dataset_tests.py
(2 hunks)tests/unittests/view_tests.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- fiftyone/core/utils.py
- docs/source/release-notes.rst
🧰 Additional context used
🪛 Ruff (0.8.2)
fiftyone/utils/multiprocessing.py
232-232: Avoid inequality comparisons to False
; use if progress:
for truth checks
Replace with progress
(E712)
499-499: Ambiguous variable name: l
(E741)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: build
🔇 Additional comments (11)
fiftyone/utils/multiprocessing.py (5)
171-178
: LGTM! Smart handling of daemon processes and worker configuration.The code correctly handles daemon processes by defaulting to single-process execution, and respects the global configuration for default workers.
459-468
: LGTM! Smart handling of shard sizes.The code intelligently caps the shard size based on ObjectId value recommendations and optimizes for the number of workers.
514-516
: LGTM! Proper MongoDB client handling in multiprocessing.The code correctly disconnects and recreates MongoDB clients in worker processes, following PyMongo's best practices for multiprocessing.
247-247
: LGTM! Smart use of dill for function serialization.Using
dill
instead of the standardpickle
module enables support for lambda functions in 'spawn' contexts.
257-257
: LGTM! Efficient progress tracking implementation.The code effectively uses
multiprocessing.Queue/Value
for a unified progress bar in the main context, with an option for per-worker progress bars.fiftyone/core/collections.py (3)
3216-3279
: LGTM! Well-designed parallel processing method for updating samples.The
update_samples()
method provides an efficient way to apply updates to samples in parallel. The implementation is clean and the docstring provides clear examples.
3281-3426
: LGTM! Comprehensive implementation of parallel mapping operations.The
map_samples()
method provides flexible parallel processing capabilities with support for map, map-reduce and map-aggregate operations. The implementation is robust and the docstring includes clear examples for each use case.
3428-3451
: LGTM! Clean internal implementation.The
_map_samples()
method provides a clean internal implementation that delegates to the multiprocessing utilities. The code is concise and well-structured.docs/source/user_guide/using_datasets.rst (3)
5543-5567
: Excellent Explanation for Save Contexts
The new “Save contexts” section provides a thorough explanation of how to efficiently batch sample saves using a context manager. The code snippet is clear and demonstrates the benefits compared to per‑sample saves (i.e. fewer database connections).
5616-5641
: Clear and Informative “Updating Samples” Section
The documentation for theupdate_samples()
method is well written. It details the usage of the method—including leveraging multiprocessing—with illustrative code, explanations of parameters likeshard_size
andnum_workers
, and a reminder to select only the required fields for performance.
5951-5971
: Efficient Batch Field Editing withset_values()
The “Setting values” section succinctly demonstrates how to use theset_values()
method to assign values to a field across all samples efficiently. The step-by-step example comparing it to individually saving samples is well presented and reinforces the performance benefits of a batch update strategy.
save=False, | ||
progress=None, | ||
): | ||
if reduce_fcn is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should raise an error/warning if both reduce_fnc
and aggregate_fcn
are passed in
num_workers (None): the number of workers to use. The default is | ||
:meth:`fiftyone.core.utils.recommend_process_pool_workers`. If this | ||
value is <= 1, all work is done in the main process | ||
shard_size (None): an optional number of samples to distribute to each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider removing shard_size since it is confusing to the users. If num_workers * shard_size < n, we will just end up using max_shard_size anyway :) so might as well not need it
Change log
Adds two new methods to the
SampleCollection
interface that can leverage multiprocessing pools to perform efficient operations:update_samples()
: applies anupdate_fcn
to each sample and saves and sample editsmap_samples()
: applies amap_fcn
to each sample in the collection in parallel, optionally returning the outputs as a generator or applying areduce_fcn
oraggregate_fcn
to the outputsNotes
multiprocessing.cpu_count()
workers are used to parallelize the work, unless the methods are invoked from daemon processes, in which case multiprocessing is automatically disabled.fo.config.default_map_workers
setting allows for global configuration of the number of workers used by these methods'spawn'
and'fork'
processesshard_size
andshard_method
options allow for optionally configuring how batching is performedRefer to the included documentation for full details.
Example 1: update
You can optionally show per-worker progress bars via
progress="workers"
:Example 2: map
You can optionally show per-worker progress bars via
progress="workers"
:Example 3: map-reduce
You can optionally show per-worker progress bars via
progress="workers"
:Example 4: map-aggregate
You can optionally show per-worker progress bars via
progress="workers"
:Summary by CodeRabbit
New Features
Documentation
Tests
Chores