Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alignment and query cleanups #271

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 1 addition & 40 deletions metagraph/src/cli/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ Config::Config(int argc, char *argv[]) {
alignment_edit_distance = true;
} else if (!strcmp(argv[i], "--max-hull-depth")) {
max_hull_depth = atoll(get_value(i++));
} else if (!strcmp(argv[i], "--batch-align")) {
batch_align = true;
} else if (!strcmp(argv[i], "--align-length")) {
alignment_length = atoi(get_value(i++));
} else if (!strcmp(argv[i], "--align-queue-size")) {
Expand Down Expand Up @@ -460,12 +458,6 @@ Config::Config(int argc, char *argv[]) {
print_usage_and_exit = true;
}

// only the best alignment is used in query
// |alignment_num_alternative_paths| must be set to 1
if (identity == QUERY && align_sequences
&& alignment_num_alternative_paths != 1)
print_usage_and_exit = true;

if (identity == ALIGN && infbase.empty())
print_usage_and_exit = true;

Expand All @@ -485,8 +477,7 @@ Config::Config(int argc, char *argv[]) {
if ((identity == QUERY || identity == SERVER_QUERY) && infbase.empty())
print_usage_and_exit = true;

if ((identity == QUERY || identity == SERVER_QUERY || identity == ALIGN)
&& alignment_num_alternative_paths == 0) {
if (alignment_num_alternative_paths == 0) {
std::cerr << "Error: align-alternative-alignments must be > 0" << std::endl;
print_usage_and_exit = true;
}
Expand Down Expand Up @@ -1078,7 +1069,6 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
#if ! _PROTEIN_GRAPH
fprintf(stderr, "\t --fwd-and-reverse \tfor each input sequence, query its reverse complement as well [off]\n");
#endif
fprintf(stderr, "\t --align \t\talign sequences instead of mapping k-mers [off]\n");
fprintf(stderr, "\t --sparse \t\tuse row-major sparse matrix for row annotation [off]\n");
fprintf(stderr, "\n");
fprintf(stderr, "\t --count-labels \t\tcount labels for k-mers from querying sequences [off]\n");
Expand All @@ -1087,40 +1077,11 @@ void Config::print_usage(const std::string &prog_name, IdentityType identity) {
fprintf(stderr, "\t --discovery-fraction [FLOAT] fraction of labeled k-mers required for annotation [0.7]\n");
fprintf(stderr, "\t --labels-delimiter [STR]\tdelimiter for annotation labels [\":\"]\n");
fprintf(stderr, "\t --suppress-unlabeled \tdo not show results for sequences missing in graph [off]\n");
// fprintf(stderr, "\t-d --distance [INT] \tmax allowed alignment distance [0]\n");
fprintf(stderr, "\n");
fprintf(stderr, "\t-p --parallel [INT] \tuse multiple threads for computation [1]\n");
// fprintf(stderr, "\t --cache-size [INT] \tnumber of uncompressed rows to store in the cache [0]\n");
fprintf(stderr, "\t --fast \t\tquery in batches [off]\n");
fprintf(stderr, "\t --batch-size \tquery batch size (number of base pairs) [100000000]\n");
fprintf(stderr, "\n");
fprintf(stderr, "Available options for --align:\n");
fprintf(stderr, "\t --align-both-strands \t\t\treturn best alignments for either input sequence or its reverse complement [off]\n");
// fprintf(stderr, "\t --align-alternative-alignments \tthe number of alternative paths to report per seed [1]\n");
fprintf(stderr, "\t --align-min-path-score [INT]\t\t\tthe minimum score that a reported path can have [0]\n");
fprintf(stderr, "\t --align-edit-distance \t\t\tuse unit costs for scoring matrix [off]\n");
fprintf(stderr, "\t --align-queue-size [INT]\t\t\tmaximum size of the priority queue for alignment [20]\n");
fprintf(stderr, "\t --align-vertical-bandwidth [INT]\t\tmaximum width of a window to consider in alignment step [inf]\n");
fprintf(stderr, "\t --align-max-nodes-per-seq-char [FLOAT]\tmaximum number of nodes to consider per sequence character [10.0]\n");
fprintf(stderr, "\t --align-max-ram [FLOAT]\t\tmaximum amount of RAM used per alignment in MB [200.0]\n");
fprintf(stderr, "\n");
fprintf(stderr, "\t --batch-align \t\talign against query graph [off]\n");
fprintf(stderr, "\t --max-hull-forks [INT]\tmaximum number of forks to take when expanding query graph [4]\n");
fprintf(stderr, "\t --max-hull-depth [INT]\tmaximum number of steps to traverse when expanding query graph [max_nodes_per_seq_char * max_seq_len]\n");
fprintf(stderr, "\n");
fprintf(stderr, "Advanced options for scoring:\n");
fprintf(stderr, "\t --align-match-score [INT]\t\t\tpositive match score [2]\n");
fprintf(stderr, "\t --align-mm-transition-penalty [INT]\t\tpositive transition penalty (DNA only) [3]\n");
fprintf(stderr, "\t --align-mm-transversion-penalty [INT]\tpositive transversion penalty (DNA only) [3]\n");
fprintf(stderr, "\t --align-gap-open-penalty [INT]\t\tpositive gap opening penalty [5]\n");
fprintf(stderr, "\t --align-gap-extension-penalty [INT]\t\tpositive gap extension penalty [2]\n");
fprintf(stderr, "\t --align-min-cell-score [INT]\t\t\tthe minimum value that a cell in the alignment table can hold [0]\n");
fprintf(stderr, "\t --align-xdrop [INT]\t\t\t\tthe maximum difference between the current and the best alignment [27]\n");
fprintf(stderr, "\n");
fprintf(stderr, "Advanced options for seeding:\n");
fprintf(stderr, "\t --align-min-seed-length [INT]\t\tthe minimum length of a seed [graph k]\n");
fprintf(stderr, "\t --align-max-seed-length [INT]\t\tthe maximum length of a seed [graph k]\n");
fprintf(stderr, "\t --align-max-num-seeds-per-locus [INT]\tthe maximum number of allowed inexact seeds per locus [inf]\n");
} break;
case SERVER_QUERY: {
fprintf(stderr, "Usage: %s server_query -i <GRAPH> -a <ANNOTATION> [options]\n\n", prog_name.c_str());
Expand Down
1 change: 0 additions & 1 deletion metagraph/src/cli/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class Config {
bool dump_text_anno = false;
bool sparse = false;
bool fast = false;
bool batch_align = false;
bool count_labels = false;
bool suppress_unlabeled = false;
bool clear_dummy = false;
Expand Down
84 changes: 8 additions & 76 deletions metagraph/src/cli/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,20 @@
#include "common/threads/threading.hpp"
#include "common/vectors/vector_algorithm.hpp"
#include "annotation/representation/annotation_matrix/static_annotators_def.hpp"
#include "graph/alignment/dbg_aligner.hpp"
#include "graph/representation/hash/dbg_hash_ordered.hpp"
#include "graph/representation/succinct/dbg_succinct.hpp"
#include "graph/representation/succinct/boss_construct.hpp"
#include "seq_io/sequence_io.hpp"
#include "config/config.hpp"
#include "load/load_graph.hpp"
#include "load/load_annotated_graph.hpp"
#include "cli/align.hpp"


namespace mtg {
namespace cli {

const size_t kRowBatchSize = 100'000;
const bool kPrefilterWithBloom = true;
const char ALIGNED_SEQ_HEADER_FORMAT[] = "{}:{}:{}:{}";

using namespace mtg::annot::binmat;
using namespace mtg::graph;
Expand All @@ -40,15 +37,10 @@ typedef typename mtg::graph::DeBruijnGraph::node_index node_index;

QueryExecutor::QueryExecutor(const Config &config,
const graph::AnnotatedDBG &anno_graph,
std::unique_ptr<graph::align::DBGAlignerConfig>&& aligner_config,
ThreadPool &thread_pool)
: config_(config),
anno_graph_(anno_graph),
aligner_config_(std::move(aligner_config)),
thread_pool_(thread_pool) {
if (aligner_config_ && aligner_config_->forward_and_reverse_complement)
throw std::runtime_error("Error: align_both_strands must be off when querying");
}
thread_pool_(thread_pool) {}

std::string QueryExecutor::execute_query(const std::string &seq_name,
const std::string &sequence,
Expand Down Expand Up @@ -800,17 +792,7 @@ int query_graph(Config *config) {

Timer timer;

std::unique_ptr<align::DBGAlignerConfig> aligner_config;
if (config->align_sequences) {
assert(config->alignment_num_alternative_paths == 1u
&& "only the best alignment is used in query");

aligner_config.reset(new align::DBGAlignerConfig(
initialize_aligner_config(graph->get_k(), *config)
));
}

QueryExecutor executor(*config, *anno_graph, std::move(aligner_config), thread_pool);
QueryExecutor executor(*config, *anno_graph, thread_pool);

// iterate over input files
for (const auto &file : files) {
Expand All @@ -824,51 +806,17 @@ int query_graph(Config *config) {
return 0;
}

void align_sequence(std::string &name, std::string &seq,
const DeBruijnGraph &graph,
const align::DBGAlignerConfig &aligner_config) {
auto alignments
= build_aligner(graph, aligner_config)->align(seq);

assert(alignments.size() <= 1 && "Only the best alignment is needed");

if (alignments.size()) {
auto &match = alignments[0];
// sequence for querying -- the best alignment
if (match.get_offset()) {
seq = graph.get_node_sequence(match[0]).substr(0, match.get_offset())
+ match.get_sequence();
} else {
seq = const_cast<std::string&&>(match.get_sequence());
}

name = fmt::format(ALIGNED_SEQ_HEADER_FORMAT, name, seq,
match.get_score(), match.get_cigar().to_string());

} else {
// no alignment was found
// the original sequence will be queried
name = fmt::format(ALIGNED_SEQ_HEADER_FORMAT, name, seq,
0, fmt::format("{}S", seq.length()));
}
}

std::string query_sequence(size_t id, std::string name, std::string seq,
const AnnotatedDBG &anno_graph,
const Config &config,
const align::DBGAlignerConfig *aligner_config) {
if (aligner_config) {
align_sequence(name, seq, anno_graph.get_graph(), *aligner_config);
}

const Config &config) {
return QueryExecutor::execute_query(fmt::format_int(id).str() + '\t' + name, seq,
config.count_labels, config.print_signature,
config.suppress_unlabeled, config.num_top_labels,
config.discovery_fraction, config.anno_labels_delimiter,
anno_graph);
}

void QueryExecutor::query_fasta(const string &file,
void QueryExecutor::query_fasta(const std::string &file,
const std::function<void(const std::string &)> &callback) {
logger->trace("Parsing sequences from file '{}'", file);

Expand All @@ -885,9 +833,8 @@ void QueryExecutor::query_fasta(const string &file,
size_t seq_count = 0;

for (const seq_io::kseq_t &kseq : fasta_parser) {
thread_pool_.enqueue([&](const auto&... args) {
callback(query_sequence(args..., anno_graph_,
config_, aligner_config_.get()));
thread_pool_.enqueue([&](size_t id, std::string &name, std::string &seq) {
callback(query_sequence(id, name, seq, anno_graph_, config_));
}, seq_count++, std::string(kseq.name.s), std::string(kseq.seq.s));
}

Expand Down Expand Up @@ -918,19 +865,6 @@ ::batched_query_fasta(seq_io::FastaParser &fasta_parser,
num_bytes_read += it->seq.l;
}

if (aligner_config_ && !config_.batch_align) {
logger->trace("Aligning sequences from batch against the full graph...");
batch_timer.reset();

#pragma omp parallel for num_threads(get_num_threads()) schedule(dynamic)
for (size_t i = 0; i < seq_batch.size(); ++i) {
align_sequence(seq_batch[i].first, seq_batch[i].second,
anno_graph_.get_graph(), *aligner_config_);
}
logger->trace("Sequences alignment took {} sec", batch_timer.elapsed());
batch_timer.reset();
}

auto query_graph = construct_query_graph(
anno_graph_,
[&](auto callback) {
Expand All @@ -939,8 +873,7 @@ ::batched_query_fasta(seq_io::FastaParser &fasta_parser,
}
},
get_num_threads(),
anno_graph_.get_graph().is_canonical_mode() || config_.canonical,
aligner_config_ && config_.batch_align ? &config_ : NULL
anno_graph_.get_graph().is_canonical_mode() || config_.canonical
);

logger->trace("Query graph constructed for batch of sequences"
Expand All @@ -951,8 +884,7 @@ ::batched_query_fasta(seq_io::FastaParser &fasta_parser,
#pragma omp parallel for num_threads(get_num_threads()) schedule(dynamic)
for (size_t i = 0; i < seq_batch.size(); ++i) {
callback(query_sequence(seq_count++, seq_batch[i].first, seq_batch[i].second,
*query_graph, config_,
config_.batch_align ? aligner_config_.get() : NULL));
*query_graph, config_));
}

logger->trace("Batch of {} bytes from '{}' queried in {} sec", num_bytes_read,
Expand Down
5 changes: 0 additions & 5 deletions metagraph/src/cli/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ namespace seq_io {

namespace graph {
class AnnotatedDBG;
namespace align {
class DBGAlignerConfig;
}
}


Expand Down Expand Up @@ -49,7 +46,6 @@ class QueryExecutor {
public:
QueryExecutor(const Config &config,
const graph::AnnotatedDBG &anno_graph,
std::unique_ptr<graph::align::DBGAlignerConfig>&& aligner_config,
ThreadPool &thread_pool);

void query_fasta(const std::string &file_path,
Expand All @@ -68,7 +64,6 @@ class QueryExecutor {
private:
const Config &config_;
const graph::AnnotatedDBG &anno_graph_;
std::unique_ptr<graph::align::DBGAlignerConfig> aligner_config_;
ThreadPool &thread_pool_;

void batched_query_fasta(mtg::seq_io::FastaParser &fasta_parser,
Expand Down
21 changes: 1 addition & 20 deletions metagraph/src/cli/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ std::string convert_query_response_to_json(const std::string &ret_str) {
if (!query_desc_parts.empty())
res_obj[SEQ_DESCRIPTION_JSON_FIELD] = query_desc_parts[0];

if (query_desc_parts.size() > 1) {
// we aligned first, so extracting aligned sequence and score:

res_obj[SEQUENCE_JSON_FIELD] = query_desc_parts[1];
res_obj[SCORE_JSON_FIELD] = (int)atoi(query_desc_parts[2].c_str());
res_obj[CIGAR_JSON_FIELD] = query_desc_parts[3];
}

res_obj["results"] = Json::Value(Json::arrayValue);

for (size_t i = 2; i < parts.size(); ++i) {
Expand Down Expand Up @@ -129,10 +121,6 @@ std::string process_search_request(const std::string &received_message,
config.discovery_fraction
= json.get("discovery_fraction", config.discovery_fraction).asDouble();

config.alignment_max_nodes_per_seq_char = json.get(
"max_num_nodes_per_seq_char",
config.alignment_max_nodes_per_seq_char).asDouble();

if (config.discovery_fraction < 0.0 || config.discovery_fraction > 1.0) {
throw std::domain_error(
"Discovery fraction should be within [0, 1.0]. Instead got "
Expand All @@ -143,13 +131,6 @@ std::string process_search_request(const std::string &received_message,
config.num_top_labels = json.get("num_labels", config.num_top_labels).asInt();
config.fast = json.get("fast", config.fast).asBool();

std::unique_ptr<graph::align::DBGAlignerConfig> aligner_config;
if (json.get("align", false).asBool()) {
aligner_config.reset(new graph::align::DBGAlignerConfig(
initialize_aligner_config(anno_graph.get_graph().get_k(), config)
));
}

std::ostringstream oss;
std::mutex oss_mutex;

Expand All @@ -163,7 +144,7 @@ std::string process_search_request(const std::string &received_message,

// dummy pool doing everything in the caller thread
ThreadPool dummy_pool(0);
QueryExecutor engine(config, anno_graph, std::move(aligner_config), dummy_pool);
QueryExecutor engine(config, anno_graph, dummy_pool);

engine.query_fasta(tf.name(),
[&](const std::string &res) {
Expand Down