Skip to content

Commit

Permalink
Merge pull request #6 from HudsonAlpha/bitvector_updates
Browse files Browse the repository at this point in the history
Bitvector updates
* includes removal of match statement in the rank calls
* includes updates to needletail and triple_accel
* includes other relatively minor optimizations and refactoring
  • Loading branch information
holtjma authored Dec 23, 2020
2 parents 98ac63e + 1966a65 commit de11e61
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 207 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fmlrc"
version = "0.1.2"
version = "0.1.3"
authors = ["holtjma <[email protected]>"]
edition = "2018"
license = "MIT OR Apache-2.0"
Expand All @@ -20,12 +20,12 @@ exitcode = "1.1.2"
flate2 = "1.0.14"
libmath = "0.1.4"
log = "0.4.8"
needletail = "0.3.2"
needletail = "0.4.0"
serde_json = "1.0.58"
subprocess = "0.2.4"
tempfile = "3.1.0"
threadpool = "1.7.1"
triple_accel = "0.3.0"
triple_accel = "0.3.4"

[dev-dependencies]
criterion = "0.3"
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ The actual corrections are _nearly_ identical (there are slight differences not
However, FMLRC v2 runs in less than half the time from both real time and CPU time perspectives.
While not explicitly measured, FMLRC v2 does use ~1GB of extra memory due to the 10-mer cache (`-C 10`).

| Metric | FMLRC v1 | FMLRC v2 (`-C 10`) |
| Metric | FMLRC v1.0.0 | FMLRC2 v0.1.3 (`-C 10`) |
| - | - | - |
| Recall | 0.9825 | 0.9825 |
| Precision | 0.9815 | 0.9815 |
| Real time | 7m29.908s | **3m27.428s** |
| CPU time | 51m34.704s | **22m24.359s** |
| Recall | 0.9825 | 0.9826 |
| Precision | 0.9815 | 0.9816 |
| Real time | 7m29.908s | **3m0.885s** |
| CPU time | 51m34.704s | **20m35.792s** |

## Reference
FMLRC v2 does not currently have a pre-print or paper. If you use FMLRC v2, please cite the FMLRC v1 paper:
Expand Down
41 changes: 37 additions & 4 deletions benches/bv_bwt_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use fmlrc::bv_bwt::BitVectorBWT;
use fmlrc::bwt_converter::convert_to_vec;
use fmlrc::read_correction::bridge_kmers;
use fmlrc::ropebwt2_util::create_bwt_from_strings;
use fmlrc::stats_util::*;
use fmlrc::string_util::*;

fn get_constant_bwt() -> BitVectorBWT {
Expand Down Expand Up @@ -59,11 +60,11 @@ pub fn bench_count_kmer(c: &mut Criterion) {
black_box(bwt.count_kmer(&altered_query));
}));

c.bench_function("prefix_kmer", |b| b.iter(|| {
c.bench_function("prefix_kmer_noalloc", |b| b.iter(|| {
black_box(bwt.prefix_kmer_noalloc(&query[1..], &vec![1, 2, 3, 5], &mut counts));
}));

c.bench_function("absent_prefix_kmer", |b| b.iter(|| {
c.bench_function("absent_prefix_kmer_noalloc", |b| b.iter(|| {
black_box(bwt.prefix_kmer_noalloc(&altered_query[1..], &vec![1, 2, 3, 5], &mut counts));
}));

Expand All @@ -83,12 +84,44 @@ pub fn bench_fixed_counts(c: &mut Criterion) {

//this is the correct sequence, alter it below
let query = convert_stoi(&"AACGGATCAAGCTTACCAGTATTTACGT");
let altered_query = convert_stoi(&"AACGGATCAAGCTTACCAGTATTTACGA");
let mut counts: Vec<u64> = vec![0; 4];

let mut rev_query: Vec<u8> = vec![0; query.len()-1];
for (i, c) in query[1..].iter().rev().enumerate() {
rev_query[i] = *c;
}
let mut rev_altered_query: Vec<u8> = vec![0; altered_query.len()-1];
for (i, c) in altered_query[1..].iter().rev().enumerate() {
rev_altered_query[i] = *c;
}

let subquery = &query[..query.len()-1];
c.bench_function("postfix_kmer_noalloc_fixed", |b| b.iter(|| {
black_box(bwt.postfix_kmer_noalloc_fixed(&query[..query.len()-1], &mut counts));
black_box(bwt.postfix_kmer_noalloc_fixed(subquery, &mut counts));
}));

c.bench_function("absent_postfix_kmer_noalloc_fixed", |b| b.iter(|| {
black_box(bwt.postfix_kmer_noalloc_fixed(&altered_query, &mut counts));
}));

c.bench_function("prefix_revkmer_noalloc_fixed", |b| b.iter(|| {
black_box(bwt.prefix_revkmer_noalloc_fixed(&rev_query, &mut counts));
}));

c.bench_function("absent_prefix_revkmer_noalloc_fixed", |b| b.iter(|| {
black_box(bwt.prefix_revkmer_noalloc_fixed(&rev_altered_query, &mut counts));
}));
}

pub fn bench_stats_util(c: &mut Criterion) {
let test: Vec<u64> = vec![3; 20000];
c.bench_function("calculate_bounded_median", |b| b.iter(|| {
for x in 0..7 {
black_box(calculate_bounded_median(&test, x));
}
}));
}

criterion_group!(benches, bench_string_util, bench_count_kmer, bench_fixed_counts);
criterion_group!(benches, bench_string_util, bench_count_kmer, bench_fixed_counts, bench_stats_util);
criterion_main!(benches);
93 changes: 48 additions & 45 deletions src/bin/fmlrc2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate needletail;

use clap::{Arg, App, value_t, values_t};
use log::{info, error};
use needletail::parse_sequence_path;
use needletail::parse_fastx_file;
use std::fs::File;
use std::sync::{Arc, mpsc};
use threadpool::ThreadPool;
Expand Down Expand Up @@ -158,7 +158,7 @@ fn main() {
error!("--begin_index set to value larger than --end_index");
std::process::exit(exitcode::DATAERR);
}
kmer_sizes.sort();
kmer_sizes.sort_unstable();
info!("\tk-mer sizes: {:?}", kmer_sizes);
info!("\tabs. mininimum count: {}", min_count);
info!("\tdyn. minimimum fraction: {}", min_frac);
Expand Down Expand Up @@ -208,57 +208,60 @@ fn main() {
let mut results_received: u64 = 0;

info!("Starting read correction processes...");
let parsing_result = parse_sequence_path(
long_read_fn,
|_| {},
|record| {
if read_index >= begin_id && read_index < end_id {
//if we've filled our queue, then we should wait until we get some results back
if jobs_queued - results_received >= JOB_SLOTS {
let rx_value: CorrectionResults = rx.recv().unwrap();
if verbose_mode {
info!("Job #{:?}: {:.2} -> {:.2}", rx_value.read_index, rx_value.avg_before, rx_value.avg_after);
match parse_fastx_file(&long_read_fn) {
Ok(mut fastx_reader) => {
while let Some(raw_record) = fastx_reader.next() {
let record = match raw_record {
Ok(record) => { record },
Err(e) => {
error!("Invalid record while parsing long read file: {:?}", e);
std::process::exit(exitcode::IOERR);
}
match fasta_writer.write_correction(rx_value) {
Ok(()) => {},
Err(e) => {
error!("Failed while writing read correction: {:?}", e);
std::process::exit(exitcode::IOERR);
};
if read_index >= begin_id && read_index < end_id {
//if we've filled our queue, then we should wait until we get some results back
if jobs_queued - results_received >= JOB_SLOTS {
let rx_value: CorrectionResults = rx.recv().unwrap();
if verbose_mode {
info!("Job #{:?}: {:.2} -> {:.2}", rx_value.read_index, rx_value.avg_before, rx_value.avg_after);
}
match fasta_writer.write_correction(rx_value) {
Ok(()) => {},
Err(e) => {
error!("Failed while writing read correction: {:?}", e);
std::process::exit(exitcode::IOERR);
}
};
results_received += 1;
if results_received % UPDATE_INTERVAL == 0 {
info!("Processed {} reads...", results_received);
}
};
results_received += 1;
if results_received % UPDATE_INTERVAL == 0 {
info!("Processed {} reads...", results_received);
}
}

//clone the transmit channel and submit the pool job
let tx = tx.clone();
let arc_bwt = arc_bwt.clone();
let arc_params = arc_params.clone();
let read_data: LongReadFA = LongReadFA {
read_index: jobs_queued,
label: String::from_utf8((*record.id).to_vec()).unwrap(),
seq: String::from_utf8((*record.seq).to_vec()).unwrap()
};
//println!("Submitting {:?}", jobs_queued);
pool.execute(move|| {
let correction_results: CorrectionResults = correction_job(arc_bwt, read_data, arc_params);
tx.send(correction_results).expect("channel will be there waiting for the pool");
});
jobs_queued += 1;
//clone the transmit channel and submit the pool job
let tx = tx.clone();
let arc_bwt = arc_bwt.clone();
let arc_params = arc_params.clone();
let read_data: LongReadFA = LongReadFA {
read_index: jobs_queued,
label: String::from_utf8(record.id().to_vec()).unwrap(),
seq: String::from_utf8(record.seq().to_vec()).unwrap()
};
//println!("Submitting {:?}", jobs_queued);
pool.execute(move|| {
let correction_results: CorrectionResults = correction_job(arc_bwt, read_data, arc_params);
tx.send(correction_results).expect("channel will be there waiting for the pool");
});
jobs_queued += 1;
}
read_index += 1;
}
read_index += 1;
}
);

match parsing_result {
Ok(_) => {},
},
Err(e) => {
error!("Failed to parse long read file: {:?}", e);
error!("Failed to open long read file: {:?}", e);
std::process::exit(exitcode::IOERR);
}
};
}

while results_received < jobs_queued {
let rx_value: CorrectionResults = rx.recv().unwrap();
Expand Down
Loading

0 comments on commit de11e61

Please sign in to comment.