Skip to content

Commit 69a6091

Browse files
committed
generate: stops all the threads as soon as an error is found
Signed-off-by: Gaëtan Lehmann <[email protected]>
1 parent 6e8ee7d commit 69a6091

File tree

1 file changed

+39
-23
lines changed

1 file changed

+39
-23
lines changed

src/generate.rs

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use rand_pcg::Pcg64Mcg;
1010
use std::fs::OpenOptions;
1111
use std::io::{self, Seek as _, Write};
1212
use std::path::PathBuf;
13-
use std::sync::mpsc;
13+
use std::sync::atomic::{AtomicBool, Ordering};
14+
use std::sync::{Arc, mpsc};
1415
use std::thread;
1516
use std::time::Instant;
1617

@@ -104,37 +105,52 @@ pub fn generate(args: &GenerateArgs) -> anyhow::Result<i32> {
104105
let chunks_per_thread = num_chunks.div_ceil(num_threads as u64);
105106
let chunk_position = args.position / chunk_size as u64;
106107
let (tx, rx) = mpsc::channel::<u64>();
108+
let cancel: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
107109

108110
let handles: Vec<_> = (0..num_threads as u64)
109111
.map(|i| {
110112
let file = file.clone();
111113
let position = args.position;
112114
let tx = tx.clone();
115+
let cancel = cancel.clone();
113116
thread::spawn(move || -> anyhow::Result<_> {
114-
let mut writer = OpenOptions::new().write(true).open(file)?;
115-
let mut thread_hasher = Hasher::new();
116-
let mut rng = Pcg64Mcg::from_seed(seed);
117-
let mut buffer = vec![0; buffer_size];
118-
let start_chunk = i * chunks_per_thread + chunk_position;
119-
let end_chunk = ((i + 1) * chunks_per_thread).min(num_chunks) + chunk_position;
120-
writer.seek(io::SeekFrom::Start(start_chunk * chunk_size as u64))?;
121-
rng.advance(((start_chunk * buffer_size as u64) / 8).into());
122-
let mut total_write_size: u64 = 0;
123-
let mut progress_bytes: u64 = 0;
124-
for chunk in start_chunk..end_chunk {
125-
let write_size = ((position + stream_size - (chunk * chunk_size as u64))
126-
as usize)
127-
.min(chunk_size);
128-
generate_chunk(&mut rng, &mut buffer, write_size, &mut thread_hasher);
129-
writer.write_all(&buffer[..write_size])?;
130-
total_write_size += write_size as u64;
131-
progress_bytes += write_size as u64;
132-
if chunk % 100 == 0 {
133-
tx.send(progress_bytes)?;
134-
progress_bytes = 0;
117+
let run = || {
118+
let mut writer = OpenOptions::new().write(true).open(file)?;
119+
let mut thread_hasher = Hasher::new();
120+
let mut rng = Pcg64Mcg::from_seed(seed);
121+
let mut buffer = vec![0; buffer_size];
122+
let start_chunk = i * chunks_per_thread + chunk_position;
123+
let end_chunk =
124+
((i + 1) * chunks_per_thread).min(num_chunks) + chunk_position;
125+
writer.seek(io::SeekFrom::Start(start_chunk * chunk_size as u64))?;
126+
rng.advance(((start_chunk * buffer_size as u64) / 8).into());
127+
let mut total_write_size: u64 = 0;
128+
let mut progress_bytes: u64 = 0;
129+
for chunk in start_chunk..end_chunk {
130+
let write_size =
131+
((position + stream_size - (chunk * chunk_size as u64)) as usize)
132+
.min(chunk_size);
133+
generate_chunk(&mut rng, &mut buffer, write_size, &mut thread_hasher);
134+
writer.write_all(&buffer[..write_size])?;
135+
total_write_size += write_size as u64;
136+
progress_bytes += write_size as u64;
137+
if chunk % 100 == 0 {
138+
tx.send(progress_bytes)?;
139+
progress_bytes = 0;
140+
}
141+
if cancel.load(Ordering::Relaxed) {
142+
// just quit early
143+
return Ok((total_write_size, thread_hasher));
144+
}
135145
}
146+
Ok((total_write_size, thread_hasher))
147+
};
148+
let result = run();
149+
if result.is_err() {
150+
// tell the other thread to stop there
151+
cancel.store(true, Ordering::Relaxed);
136152
}
137-
Ok((total_write_size, thread_hasher))
153+
result
138154
})
139155
})
140156
.collect();

0 commit comments

Comments
 (0)