Skip to content

Commit 40880f0

Browse files
authored
MRG: refactor fastgather/fastmultigather CSV output to use mpsc channels (#567)
* refactor CSV output for fastgather/fastmultigather to use mpsc * cargo fmt * tests mostly pass * fix skipmer test * upd comment * black * switch to min_max_scaled for rocksdb * black * ensure overlap is > 0 * rm print * cleanup * fix clippy messages about too-complex returns * cargo fmt * upd overlap * fix * fix tests * update docs * upd * break test again * do heinous dev stuff * fix fix comment * upd * upd * do not require -o after all
1 parent 48bb1c9 commit 40880f0

File tree

12 files changed

+138
-131
lines changed

12 files changed

+138
-131
lines changed

doc/README.md

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ sourmash scripts fastgather query.sig.gz database.zip -o results.csv --cores 4
312312

313313
### Running `fastmultigather`
314314

315-
`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs many CSVs:
315+
`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs a CSV file containing all the matches.
316316
```
317-
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches
317+
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches -o results.csv
318318
```
319319

320320
We suggest using standalone manifest CSVs wherever possible, especially if
@@ -327,32 +327,26 @@ this can be a significant time savings for large databases.
327327

328328
#### Output files for `fastmultigather`
329329

330-
On a database of sketches (but not on RocksDB indexes)
331-
`fastmultigather` will output two CSV files for each query, a
332-
`prefetch` file containing all overlapping matches between that query
333-
and the database, and a `gather` file containing the minimum
334-
metagenome cover for that query in the database.
330+
`fastmultigather` will output a gather file containing all results in
331+
one file, specified with `-o/--output`. `fastmultigather` gather CSVs
332+
provide the same columns as `fastgather`, above.
335333

336-
The prefetch CSV will be named `{signame}.prefetch.csv`, and the
337-
gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is
338-
the name of your sourmash signature.
334+
In addition, on a database of sketches (but not on RocksDB indexes)
335+
`fastmultigather` will output a `prefetch` file containing all
336+
overlapping matches between that query and the database. The prefetch
337+
CSV will be named `{signame}.prefetch.csv`, where `{signame}` is the
338+
name of your sourmash signature.
339339

340340
`--save-matches` is an optional flag that will save the matched hashes
341341
for each query in a separate sourmash signature
342342
`{signame}.matches.sig`. This can be useful for debugging or for
343343
further analysis.
344344

345-
When searching against a RocksDB index, `fastmultigather` will output
346-
a single file containing all gather results, specified with
347-
`-o/--output`. No prefetch results will be output.
348-
349-
`fastmultigather` gather CSVs provide the same columns as `fastgather`, above.
350-
351345
**Warning:** At the moment, if two different queries have the same
352-
`{signame}`, the CSVs for one of the queries will be overwritten by
353-
the other query. The behavior here is undefined in practice, because
354-
of multithreading: we don't know what queries will be executed when
355-
or files will be written first.
346+
`{signame}`, the output files for one query will be overwritten by
347+
the results from the other query. The behavior here is undefined in
348+
practice, because of multithreading: we don't know what queries will
349+
be executed when or files will be written first.
356350

357351
### Running `manysearch`
358352

src/fastgather.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/// fastgather: Run gather with a query against a list of files.
22
use anyhow::Result;
3+
34
use sourmash::prelude::Select;
45
use sourmash::selection::Selection;
56
use sourmash::sketch::minhash::KmerMinHash;
67

78
use crate::utils::{
8-
consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch,
9-
ReportType,
9+
consume_query_by_gather, csvwriter_thread, load_collection, load_sketches_above_threshold,
10+
write_prefetch, BranchwaterGatherResult, ReportType,
1011
};
1112

1213
#[allow(clippy::too_many_arguments)]
@@ -109,6 +110,10 @@ pub fn fastgather(
109110
.ok();
110111
}
111112

113+
let (send, recv) =
114+
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
115+
let gather_out_thrd = csvwriter_thread(recv, gather_output);
116+
112117
// run the gather!
113118
consume_query_by_gather(
114119
query_name,
@@ -117,8 +122,13 @@ pub fn fastgather(
117122
scaled as u32,
118123
matchlist,
119124
threshold_hashes,
120-
gather_output,
125+
Some(send),
121126
)
122127
.ok();
128+
129+
if let Err(e) = gather_out_thrd.join() {
130+
eprintln!("Unable to join internal thread: {:?}", e);
131+
}
132+
123133
Ok(())
124134
}

src/fastmultigather.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// fastmultigather: Run gather for multiple queries against a list of files.
22
use anyhow::Result;
3-
use rayon::prelude::*;
3+
use rayon::iter::ParallelIterator;
44

55
use sourmash::prelude::{Storage, ToWriter};
66
use sourmash::{selection::Selection, signature::SigsTrait};
@@ -22,7 +22,8 @@ use sourmash::sketch::minhash::KmerMinHash;
2222
use sourmash::sketch::Sketch;
2323

2424
use crate::utils::{
25-
consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType,
25+
consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch,
26+
BranchwaterGatherResult, PrefetchResult, ReportType,
2627
};
2728

2829
#[allow(clippy::too_many_arguments)]
@@ -34,6 +35,7 @@ pub fn fastmultigather(
3435
selection: Selection,
3536
allow_failed_sigpaths: bool,
3637
save_matches: bool,
38+
output_path: Option<String>,
3739
create_empty_results: bool,
3840
) -> Result<()> {
3941
let _ = env_logger::try_init();
@@ -82,6 +84,13 @@ pub fn fastmultigather(
8284
// load against sketches into memory
8385
let against = against_collection.load_sketches()?;
8486

87+
// set up a multi-producer, single-consumer channel.
88+
let (send, recv) =
89+
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
90+
91+
// spawn a thread that is dedicated to printing to a buffered output
92+
let gather_out_thrd = csvwriter_thread(recv, output_path);
93+
8594
// Iterate over all queries => do prefetch and gather!
8695
let processed_queries = AtomicUsize::new(0);
8796
let skipped_paths = AtomicUsize::new(0);
@@ -144,9 +153,8 @@ pub fn fastmultigather(
144153
})
145154
.collect();
146155

147-
if !matchlist.is_empty() {
156+
if !matchlist.is_empty() || create_empty_results {
148157
let prefetch_output = format!("{}.prefetch.csv", location);
149-
let gather_output = format!("{}.gather.csv", location);
150158

151159
// Save initial list of matches to prefetch output
152160
write_prefetch(
@@ -166,7 +174,7 @@ pub fn fastmultigather(
166174
common_scaled,
167175
matchlist,
168176
threshold_hashes,
169-
Some(gather_output),
177+
Some(send.clone()),
170178
)
171179
.ok();
172180

@@ -200,21 +208,6 @@ pub fn fastmultigather(
200208
}
201209
} else {
202210
println!("No matches to '{}'", location);
203-
if create_empty_results {
204-
let prefetch_output = format!("{}.prefetch.csv", location);
205-
let gather_output = format!("{}.gather.csv", location);
206-
// touch output files
207-
match std::fs::File::create(&prefetch_output) {
208-
Ok(_) => {}
209-
Err(e) => {
210-
eprintln!("Failed to create empty prefetch output: {}", e)
211-
}
212-
}
213-
match std::fs::File::create(&gather_output) {
214-
Ok(_) => {}
215-
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
216-
}
217-
}
218211
}
219212
}
220213
Err(_) => {
@@ -228,6 +221,11 @@ pub fn fastmultigather(
228221
}
229222
});
230223

224+
drop(send);
225+
if let Err(e) = gather_out_thrd.join() {
226+
eprintln!("Unable to join internal thread: {:?}", e);
227+
}
228+
231229
println!(
232230
"DONE. Processed {} queries total.",
233231
processed_queries.into_inner()

src/fastmultigather_rocksdb.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,9 @@ pub fn fastmultigather_rocksdb(
3131
println!("Loaded DB");
3232

3333
// grab scaled from the database.
34-
let max_db_scaled = db
34+
let (_, max_db_scaled) = db
3535
.collection()
36-
.manifest()
37-
.iter()
38-
.map(|r| r.scaled())
39-
.max()
36+
.min_max_scaled()
4037
.expect("no records in db?!");
4138

4239
let selection_scaled: u32 = match selection.scaled() {

src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,6 @@ fn do_fastmultigather(
155155
}
156156
}
157157
} else {
158-
if output_path.is_some() {
159-
bail!("output path specified, but not running fastmultigather against a rocksdb. See issue #239");
160-
}
161158
match fastmultigather::fastmultigather(
162159
query_filenames,
163160
siglist_path,
@@ -166,6 +163,7 @@ fn do_fastmultigather(
166163
selection,
167164
allow_failed_sigpaths,
168165
save_matches,
166+
output_path,
169167
create_empty_results,
170168
) {
171169
Ok(_) => Ok(0),

src/manysearch.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ use sourmash::signature::SigsTrait;
1919
use sourmash::sketch::minhash::KmerMinHash;
2020
use sourmash::storage::SigStore;
2121

22+
type AbundanceStats = (
23+
Option<u64>,
24+
Option<u64>,
25+
Option<f64>,
26+
Option<f64>,
27+
Option<f64>,
28+
);
29+
2230
pub fn manysearch(
2331
query_filepath: String,
2432
against_filepath: String,
@@ -174,16 +182,7 @@ pub fn manysearch(
174182
fn inflate_abundances(
175183
query: &KmerMinHash,
176184
against: &KmerMinHash,
177-
) -> Result<
178-
(
179-
Option<u64>,
180-
Option<u64>,
181-
Option<f64>,
182-
Option<f64>,
183-
Option<f64>,
184-
),
185-
SourmashError,
186-
> {
185+
) -> Result<AbundanceStats, SourmashError> {
187186
let abunds: Vec<u64>;
188187
let sum_weighted: u64;
189188
let sum_all_abunds: u64 = against.sum_abunds();

src/manysearch_rocksdb.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,9 @@ pub fn manysearch_rocksdb(
3636
println!("Loaded DB");
3737

3838
// grab scaled from the database.
39-
let max_db_scaled = db
39+
let (_, max_db_scaled) = db
4040
.collection()
41-
.manifest()
42-
.iter()
43-
.map(|r| r.scaled())
44-
.max()
41+
.min_max_scaled()
4542
.expect("no records in db?!");
4643

4744
let selection_scaled: u32 = match selection.scaled() {

src/multisearch.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ use crate::utils::multicollection::SmallSignature;
1717
use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType};
1818
use sourmash::ani_utils::ani_from_containment;
1919

20+
type OverlapStatsReturn = (
21+
f64,
22+
HashMap<u64, f64>,
23+
HashMap<u64, f64>,
24+
HashMap<String, HashMap<u64, f64>>,
25+
HashMap<u64, f64>,
26+
);
27+
2028
#[derive(Default, Clone, Debug)]
2129
struct ProbOverlapStats {
2230
prob_overlap: f64,
@@ -71,13 +79,7 @@ fn compute_single_prob_overlap(
7179
fn compute_prob_overlap_stats(
7280
queries: &Vec<SmallSignature>,
7381
againsts: &Vec<SmallSignature>,
74-
) -> (
75-
f64,
76-
HashMap<u64, f64>,
77-
HashMap<u64, f64>,
78-
HashMap<String, HashMap<u64, f64>>,
79-
HashMap<u64, f64>,
80-
) {
82+
) -> OverlapStatsReturn {
8183
let n_comparisons = againsts.len() as f64 * queries.len() as f64;
8284

8385
// Combine all the queries and against into a single signature each

src/python/sourmash_plugin_branchwater/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,14 @@ def __init__(self, p):
277277
p.add_argument(
278278
"-o",
279279
"--output",
280-
help="CSV output file for matches. Used for non-rocksdb searches only.",
280+
help="CSV output file containing gather matches",
281281
)
282282
p.add_argument(
283283
"--create-empty-results",
284+
"--create-empty-prefetch-results",
284285
action="store_true",
285286
default=False,
286-
help="create empty results file(s) even if no matches",
287+
help="create empty prefetch results file for each query, even if no matches (non-RockSDB only)",
287288
)
288289
p.add_argument(
289290
"--save-matches",

0 commit comments

Comments
 (0)