Skip to content

refactor: Change concurrency approach #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open

refactor: Change concurrency approach #100

wants to merge 25 commits into from

Conversation

rklaehn
Copy link
Collaborator

@rklaehn rklaehn commented Jul 9, 2025

Description

Before, the concurrency approach for slots / handles for individual hashes was based on a top level task pool and "slots" that were managed in a map in the main actor. There were some tricky race conditions for the case where a handle would be "revived" when it was already executing its Drop fn - which does crucial things like writing the bitfield file. Also, there was actual parallelism per hash, which is probably not beneficial at all.

Now basically each hash runs effectively single threaded, meaning that we can later go from actual mutexes to more lightweight synchronisation primitives like https://crates.io/crates/atomic_refcell . Unfortunately everything must still be Send due to the fact that we run this whole thing on a multi-threaded executor 🤷 , thank you tokio. Otherwise we could just use a RefCell.

Now the concurrency is based on a task pool that will always contain at most a single task per hash. Multiple tasks that operate on the same hash are being handled concurrently, but not in parallel, using a FuturesUnordered. The drop case is handled in a cleaner way - when an actor becomes idle, it "gives back" its state to the owner - the manager actor.

If a task is being spawned while drop runs, these tasks go into the inbox and the actor gets revived immediately afterwards. The manager also has a pool of inactive actors to prevent reallocations.

All this is abstracted away by the entity_manager.

The entire entity_manager module could at some point become a separate crate, but for now I have inlined it so I don't need to do another crate.

Breaking Changes

Notes & open questions

Change checklist

  • Self-review.
  • Documentation updates following the style guide, if relevant.
  • Tests if relevant.
  • All breaking changes documented.

Copy link

github-actions bot commented Jul 9, 2025

Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh-blobs/pr/100/docs/iroh_blobs/

Last updated: 2025-07-11T11:57:10Z

@n0bot n0bot bot added this to iroh Jul 9, 2025
@github-project-automation github-project-automation bot moved this to 🏗 In progress in iroh Jul 9, 2025
@rklaehn
Copy link
Collaborator Author

rklaehn commented Jul 9, 2025

Here is where I am going with this. Now that we have the guarantee that each hash runs single threaded (not the same thread, thanks tokio, but at least 1 thread at a time), we can make concurrency much more lightweight: #101

@rklaehn rklaehn requested a review from Frando July 9, 2025 09:54
Command::ListBlobs(cmd) => {
trace!("{cmd:?}");
let (tx, rx) = tokio::sync::oneshot::channel();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change!

}
for hash in to_remove {
if let Some(slot) = self.handles.remove(&hash) {
// do a quick check if the handle has become alive in the meantime, and reinsert it
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of this nasty race condition handling was the whole purpose of the exercise. Also, we don't need a scheduled cleanup task anymore since the entity manager will deal with it. If an actor becomes idle it will be removed from the map after shutdown.

@@ -4,6 +4,7 @@ use iroh_metrics::{Counter, MetricsGroup};

/// Enum of metrics for the module
#[allow(missing_docs)]
#[allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this dead code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to rethink what metrics to collect, many of those aren't even relevant anymore.

@@ -0,0 +1,1038 @@
#![allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thing was a separate crate. I inlined it so I don't have to publish and maintain yet another crate. The dead code part is the friendly API of the crate, if you don't want to embed the entity manager into your own actor.

But I want to embed the entity manager, since I don't want client -> iroh-blobs main actor -> entity manager -> entity. Save one hop.

}
}

/// A manager for entities identified by an entity id.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you basically wrote a new actor system? 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's irpc, in a way. (At least my idea how async boundaries should be structured). This is very specialized, so it's more like a task manager.

You get concurrency but not parallelism per entity, and parallelism between separate entities. An entity here is the state for a hash.

@@ -299,6 +300,7 @@ mod tests {
let outboard_path = options.outboard_path(&bh);
let sizes_path = options.sizes_path(&bh);
let bitfield_path = options.bitfield_path(&bh);
tokio::time::sleep(Duration::from_millis(100)).await; // allow for some time for the file to be written
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very racy, isn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. There is some delay between when an entity becomes idle and when the bitfield gets written to disk. The bitfield is ephemeral, but can be expensive to compute. You need to call shutdown before terminating the process to make sure all bitfields are persisted, but here we are looking for the file before shutting down, so there is currently no API to wait for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what to do about this. I could add an await_idle(hash). but that is not really a generic API, so it would have to be just for the FS store.

@@ -811,7 +811,7 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R
}
}
let guard = ctx.lock().await;
let handle = guard.as_ref().and_then(|x| x.upgrade());
let handle = guard.as_ref().map(|x| x.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be just .cloned()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but this is gone in one of the subsequent PRs anyway, so I would prefer to keep it as is...

@@ -84,15 +83,16 @@ use bao_tree::{
};
use bytes::Bytes;
use delete_set::{BaoFilePart, ProtectHandle};
use entity_manager::{EntityManagerState, SpawnArg};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the only usage of the entity_manager for now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wrote it generically just so I don't have to mix up all the recycling logic with the actual user code in blobs. I mean, I think it would also be an useful lib, but we can take it out and publish it later.

@rklaehn rklaehn marked this pull request as ready for review July 10, 2025 15:29
@rklaehn
Copy link
Collaborator Author

rklaehn commented Jul 10, 2025

One thing I am not sure about - the entity manager gives you an async but exclusive on_shutdown fn to perform cleanup like saving data to disk or an async io system.

Should it also do the same for waking up, have an async per entity cb that would be called on wakeup? I did not do it because I thought you might want to do this lazily or partially inside your state, so having 1 cb would be restrictive. But it sure would make the 2 test databases in the tests much more pleasant to write...

Comment on lines +673 to +674
/// The entity manager internally uses a main actor and per-entity actors. Per entity actors
/// and their inbox queues are recycled when they become idle, to save allocations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is recycling entity actors in the hot path? Having this optimization costs us a bunch of complexity, another trait (Reset), and a bunch of recycle fns.
Perhaps the code could be slightly easier but pretty much equally as performant without recycling?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably does not matter for the fs store, since that is bound by redb for small entries and file io for large entries. But my hope is that by having the entity actors super lightweight I can justify using this thing also for the in-mem store, and there it would definitely matter.

Also, the recycling kind of fell out automatically. The main complexity is that you need to give back the recv side of the entity actor inbox to the main actor (or actor state) so that it can continue to send messages while the actor is shutting down, and then reawake the actor if messages have been sent during shutdown.

Once you do that, it is kind of natural to just give back the rest of the state once the shutdown is complete, and it does not add that much complexity over what you have to do anyway to deal with the shutdown race condition.

@rklaehn
Copy link
Collaborator Author

rklaehn commented Jul 11, 2025

One thing I am not sure about - the entity manager gives you an async but exclusive on_shutdown fn to perform cleanup like saving data to disk or an async io system.

Should it also do the same for waking up, have an async per entity cb that would be called on wakeup? I did not do it because I thought you might want to do this lazily or partially inside your state, so having 1 cb would be restrictive. But it sure would make the 2 test databases in the tests much more pleasant to write...

Nah, best handled by a tokio::sync::OnceCell or something custom.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: 🏗 In progress
Development

Successfully merging this pull request may close these issues.

3 participants