Skip to content

Commit 6e8ee7d

Browse files
committed
validate: stops all the threads as soon as an error is found
Signed-off-by: Gaëtan Lehmann <[email protected]>
1 parent b6d3f13 commit 6e8ee7d

File tree

1 file changed

+34
-18
lines changed

1 file changed

+34
-18
lines changed

src/validate.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use parse_size::parse_size;
88
use std::fs::File;
99
use std::io::{self, Read, Seek};
1010
use std::path::PathBuf;
11-
use std::sync::mpsc;
11+
use std::sync::atomic::{AtomicBool, Ordering};
12+
use std::sync::{Arc, mpsc};
1213
use std::thread;
1314
use std::time::Instant;
1415

@@ -78,31 +79,46 @@ pub fn validate(args: &ValidateArgs) -> anyhow::Result<i32> {
7879
let chunks_per_thread = num_chunks.div_ceil(num_threads as u64);
7980
let chunk_position = args.position / chunk_size as u64;
8081
let (tx, rx) = mpsc::channel::<u64>();
82+
let cancel: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
8183

8284
let handles: Vec<_> = (0..num_threads as u64)
8385
.map(|i| {
8486
let file = file.clone();
8587
let tx = tx.clone();
88+
let cancel = cancel.clone();
8689
thread::spawn(move || -> anyhow::Result<_> {
87-
let mut file = File::open(file)?;
88-
let mut thread_hasher = Hasher::new();
89-
let start_chunk = i * chunks_per_thread + chunk_position;
90-
let end_chunk = ((i + 1) * chunks_per_thread).min(num_chunks) + chunk_position;
91-
let mut buffer = vec![0; chunk_size];
92-
file.seek(io::SeekFrom::Start(start_chunk * chunk_size as u64))?;
93-
let mut total_read_size: u64 = 0;
94-
let mut progress_bytes: u64 = 0;
95-
for chunk in start_chunk..end_chunk {
96-
let read_size = read_exact_or_eof(&mut file, &mut buffer)?;
97-
validate_chunk(chunk, &buffer[..read_size], &mut thread_hasher)?;
98-
total_read_size += read_size as u64;
99-
progress_bytes += read_size as u64;
100-
if chunk % 100 == 0 {
101-
tx.send(progress_bytes)?;
102-
progress_bytes = 0;
90+
let run = || {
91+
let mut file = File::open(file)?;
92+
let mut thread_hasher = Hasher::new();
93+
let start_chunk = i * chunks_per_thread + chunk_position;
94+
let end_chunk =
95+
((i + 1) * chunks_per_thread).min(num_chunks) + chunk_position;
96+
let mut buffer = vec![0; chunk_size];
97+
file.seek(io::SeekFrom::Start(start_chunk * chunk_size as u64))?;
98+
let mut total_read_size: u64 = 0;
99+
let mut progress_bytes: u64 = 0;
100+
for chunk in start_chunk..end_chunk {
101+
let read_size = read_exact_or_eof(&mut file, &mut buffer)?;
102+
validate_chunk(chunk, &buffer[..read_size], &mut thread_hasher)?;
103+
total_read_size += read_size as u64;
104+
progress_bytes += read_size as u64;
105+
if chunk % 100 == 0 {
106+
tx.send(progress_bytes)?;
107+
progress_bytes = 0;
108+
}
109+
if cancel.load(Ordering::Relaxed) {
110+
// just quit early
111+
return Ok((total_read_size, thread_hasher));
112+
}
103113
}
114+
Ok((total_read_size, thread_hasher))
115+
};
116+
let result = run();
117+
if result.is_err() {
118+
// tell the other thread to stop there
119+
cancel.store(true, Ordering::Relaxed);
104120
}
105-
Ok((total_read_size, thread_hasher))
121+
result
106122
})
107123
})
108124
.collect();

0 commit comments

Comments
 (0)