From 9574a7ef418557e09e5f007022780e65211b8114 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sat, 10 Aug 2024 22:29:27 +0200 Subject: [PATCH] fix disk bench --- packages/disk/Cargo.toml | 2 +- packages/disk/benches/disk_benchmark.rs | 23 ++++++++++++++++------- packages/disk/src/disk/tasks/context.rs | 20 ++++++++++++++------ packages/disk/src/disk/tasks/mod.rs | 7 +++---- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/packages/disk/Cargo.toml b/packages/disk/Cargo.toml index 98b9ee6a8..6b2c43583 100644 --- a/packages/disk/Cargo.toml +++ b/packages/disk/Cargo.toml @@ -31,7 +31,7 @@ thiserror = "1" [dev-dependencies] tracing-subscriber = "0" rand = "0" -criterion = { version = "0", features = ["async_futures"] } +criterion = { version = "0", features = ["async_tokio"] } [[bench]] diff --git a/packages/disk/benches/disk_benchmark.rs b/packages/disk/benches/disk_benchmark.rs index 181480d8c..5fce17400 100644 --- a/packages/disk/benches/disk_benchmark.rs +++ b/packages/disk/benches/disk_benchmark.rs @@ -2,8 +2,8 @@ use std::fs; use std::sync::Arc; use bytes::BytesMut; -use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use disk::error::TorrentError; use disk::fs::NativeFileSystem; use disk::fs_cache::FileHandleCache; use disk::{Block, BlockMetadata, DiskManagerBuilder, FileSystem, IDiskMessage, InfoHash, ODiskMessage}; @@ -51,18 +51,25 @@ where { let mut block_send_guard = block_send.lock().await; block_send_guard.send(IDiskMessage::AddTorrent(metainfo)).await.unwrap(); - } // MutexGuard is dropped here + } while let Some(res_message) = { let mut block_recv_guard = block_recv.lock().await; block_recv_guard.next().await } { - match res_message.unwrap() { + let error = match res_message.unwrap() { ODiskMessage::TorrentAdded(_) => { break; } - ODiskMessage::FoundGoodPiece(_, _) => (), - _ => panic!("Didn't Receive TorrentAdded"), + ODiskMessage::FoundGoodPiece(_, _) => continue, + ODiskMessage::TorrentError(_, error) => error, + + other => panic!("should receive `TorrentAdded` or `FoundGoodPiece`, but got: {other:?}"), + }; + + match error { + TorrentError::ExistingInfoHash { .. } => break, + other => panic!("should receive `TorrentAdded` or `FoundGoodPiece`, but got: {other:?}"), } } } @@ -125,7 +132,7 @@ where match res_message.unwrap() { ODiskMessage::BlockProcessed(_) => blocks_sent -= 1, ODiskMessage::FoundGoodPiece(_, _) | ODiskMessage::FoundBadPiece(_, _) => (), - _ => panic!("Unexpected Message Received In process_blocks"), + other => panic!("should receive `BlockProcessed`, `FoundGoodPiece` or `FoundBadPiece`, but got: {other:?}"), } if blocks_sent == 0 { @@ -168,9 +175,11 @@ fn bench_process_file_with_fs( block_recv: block_recv.clone(), }; + let runner = &tokio::runtime::Runtime::new().unwrap(); + c.bench_with_input(id, &Arc::new(data), |b, i| { let metainfo_clone = metainfo.clone(); - b.to_async(FuturesExecutor).iter(move || { + b.to_async(runner).iter(move || { let data = i.clone(); let metainfo = metainfo_clone.clone(); async move { diff --git a/packages/disk/src/disk/tasks/context.rs b/packages/disk/src/disk/tasks/context.rs index aee391e43..6bc5dd0dd 100644 --- a/packages/disk/src/disk/tasks/context.rs +++ b/packages/disk/src/disk/tasks/context.rs @@ -1,3 +1,4 @@ +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -72,20 +73,27 @@ where &self.fs } - pub fn insert_torrent(&self, file: Metainfo, state: &Arc>) -> bool { + pub fn insert_torrent( + &self, + file: Metainfo, + state: &Arc>, + ) -> Result { let mut write_torrents = self .torrents .write() .expect("bip_disk: DiskManagerContext::insert_torrents Failed To Write Torrent"); let hash = file.info().info_hash(); - let hash_not_exists = !write_torrents.contains_key(&hash); - if hash_not_exists { - write_torrents.insert(hash, MetainfoState::new(file, state.clone())); - } + let entry = write_torrents.entry(hash); - hash_not_exists + match entry { + Entry::Occupied(key) => Err((hash, key.get().clone())), + Entry::Vacant(vac) => { + vac.insert(MetainfoState::new(file, state.clone())); + Ok(hash) + } + } } pub async fn update_torrent<'a, C, D>(self, hash: InfoHash, with_state: C) -> Option diff --git a/packages/disk/src/disk/tasks/mod.rs b/packages/disk/src/disk/tasks/mod.rs index 97951139e..526d6ea16 100644 --- a/packages/disk/src/disk/tasks/mod.rs +++ b/packages/disk/src/disk/tasks/mod.rs @@ -72,10 +72,9 @@ where // In case we are resuming a download, we need to send the diff for the newly added torrent send_piece_diff(&init_state, info_hash, sender, true).await; - if context.insert_torrent(file, &init_state) { - Ok(()) - } else { - Err(TorrentError::ExistingInfoHash { hash: info_hash }) + match context.insert_torrent(file, &init_state) { + Ok(_) => Ok(()), + Err((hash, _)) => Err(TorrentError::ExistingInfoHash { hash }), } }