diff --git a/protocols/MasterServer.py b/protocols/MasterServer.py index db42eb1..aeb04de 100644 --- a/protocols/MasterServer.py +++ b/protocols/MasterServer.py @@ -76,23 +76,39 @@ def _fetch_url(self, url: str): return str_data def _bulk_write(self, updates: list): - # Chunk size for bulk write + # Check if there are any updates to perform + if len(updates) == 0: + print("No updates to perform.") + return + + # Determine the number of workers for parallel processing + # The number of workers is the minimum of 32 and the number of CPUs plus 4 max_workers = min(32, os.cpu_count() + 4) + + # Calculate the chunk size for each worker + # The chunk size is the ceiling of the division of the number of updates by the number of workers chunk_size = -(-len(updates) // max_workers) - # Split the updates into chunks + # Split the updates into chunks for each worker update_chunks = [updates[i:i + chunk_size] - for i in range(0, len(updates), chunk_size)] + for i in range(0, len(updates), chunk_size)] + # Initialize a progress bar for tracking the progress of the bulk write pbar = tqdm(total=len(updates), desc=f"[{self.key}] Bulk Write") + # Define a function for performing the bulk write for a chunk of updates def perform_bulk_write(i: int): + # Perform the bulk write for the chunk self.collection.bulk_write(update_chunks[i], ordered=False) + # Update the progress bar pbar.update(len(update_chunks[i])) + # Create a ThreadPoolExecutor for parallel processing with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Map the function to perform the bulk write for each chunk of updates results = executor.map(perform_bulk_write, range(max_workers)) + # Return the results of the bulk write return results def _remove_old_documents(self, minutes: int):