5
5
from Bio .SeqRecord import SeqRecord
6
6
import logging
7
7
from mappy import Aligner
8
+ from functools import cached_property
8
9
9
10
from micall .utils .contig_stitcher_context import StitcherContext
10
11
from micall .utils .consensus_aligner import Alignment
19
20
ACCEPTABLE_STITCHING_PROB = Fraction (1 , 20 )
20
21
21
22
23
+ @dataclass (frozen = True )
24
+ class ContigWithAligner (Contig ):
25
+ @cached_property
26
+ def aligner (self ) -> Aligner :
27
+ return Aligner (seq = str (self .seq ), bw = 500 , bw_long = 500 , preset = 'map-ont' )
28
+
29
+ @staticmethod
30
+ def make (contig : Contig ) -> 'ContigWithAligner' :
31
+ return ContigWithAligner (name = contig .name , seq = contig .seq )
32
+
33
+ @staticmethod
34
+ def empty () -> 'ContigWithAligner' :
35
+ return ContigWithAligner .make (Contig .empty ())
36
+
37
+ def map_overlap (self , overlap : str ) -> Iterator [Alignment ]:
38
+ for x in self .aligner .map (overlap ):
39
+ if x .is_primary :
40
+ yield x
41
+
42
+
22
43
@dataclass (frozen = True )
23
44
class ContigsPath :
24
45
# Contig representing all combined contigs in the path.
25
- whole : Contig
46
+ whole : ContigWithAligner
26
47
27
48
# Id's of contigs that comprise this path.
28
49
parts_ids : FrozenSet [int ]
@@ -44,7 +65,7 @@ def is_empty(self) -> bool:
44
65
45
66
@staticmethod
46
67
def empty () -> 'ContigsPath' :
47
- return ContigsPath (Contig .empty (), frozenset (),
68
+ return ContigsPath (ContigWithAligner .empty (), frozenset (),
48
69
probability = Fraction (1 ),
49
70
pessimisstic_probability = ACCEPTABLE_STITCHING_PROB )
50
71
@@ -58,29 +79,29 @@ class Overlap:
58
79
shift : int
59
80
60
81
61
- def get_overlap (finder : OverlapFinder , left : Contig , right : Contig ) -> Optional [Overlap ]:
82
+ GET_OVERLAP_CACHE : MutableMapping [Tuple [int , int ], Optional [Overlap ]] = {}
83
+
84
+
85
+ def get_overlap (finder : OverlapFinder , left : ContigWithAligner , right : ContigWithAligner ) -> Optional [Overlap ]:
62
86
if len (left .seq ) == 0 or len (right .seq ) == 0 :
63
87
return None
64
88
89
+ key = (left .id , right .id )
65
90
shift = find_maximum_overlap (left .seq , right .seq , finder = finder )
66
91
if shift == 0 :
67
- return None
68
-
69
- return Overlap (shift )
70
-
92
+ ret = None
93
+ GET_OVERLAP_CACHE [key ] = ret
94
+ return ret
71
95
72
- def map_overlap_onto_candidate (overlap : str , candidate : str ) -> Iterator [Alignment ]:
73
- # TODO: Move this implementation into consensus_aligner maybe.
74
- aligner = Aligner (seq = candidate , bw = 500 , bw_long = 500 , preset = 'map-ont' )
75
- for x in aligner .map (overlap ):
76
- if x .is_primary :
77
- yield x
96
+ ret = Overlap (shift )
97
+ GET_OVERLAP_CACHE [key ] = ret
98
+ return ret
78
99
79
100
80
101
def try_combine_contigs (finder : OverlapFinder ,
81
102
max_acceptable_prob : Fraction ,
82
- a : Contig , b : Contig ,
83
- ) -> Optional [Tuple [Contig , Fraction ]]:
103
+ a : ContigWithAligner , b : ContigWithAligner ,
104
+ ) -> Optional [Tuple [ContigWithAligner , Fraction ]]:
84
105
# TODO: Memoize this function.
85
106
# Two-layer caching seems most optimal:
86
107
# first by key=contig.id, then by key=contig.seq.
@@ -128,25 +149,25 @@ def try_combine_contigs(finder: OverlapFinder,
128
149
right_initial_overlap = right .seq [:abs (shift )]
129
150
130
151
if len (left_initial_overlap ) < len (right_initial_overlap ):
131
- left_overlap_alignments = map_overlap_onto_candidate (str (right_initial_overlap ), str ( left . seq ))
152
+ left_overlap_alignments = left . map_overlap (str (right_initial_overlap ))
132
153
left_cutoff = min ((al .r_st for al in left_overlap_alignments ), default = None )
133
154
if left_cutoff is None :
134
155
logger .debug ("Overlap alignment between %s and %s failed." , a .unique_name , b .unique_name )
135
156
return None
136
157
137
- right_overlap_alignments = map_overlap_onto_candidate (str (left_initial_overlap ), str ( right . seq ))
158
+ right_overlap_alignments = right . map_overlap (str (left_initial_overlap ))
138
159
right_cutoff = max ((al .r_en for al in right_overlap_alignments ), default = None )
139
160
if right_cutoff is None :
140
161
logger .debug ("Overlap alignment between %s and %s failed." , a .unique_name , b .unique_name )
141
162
return None
142
163
else :
143
- right_overlap_alignments = map_overlap_onto_candidate (str (left_initial_overlap ), str ( right . seq ))
164
+ right_overlap_alignments = right . map_overlap (str (left_initial_overlap ))
144
165
right_cutoff = max ((al .r_en for al in right_overlap_alignments ), default = None )
145
166
if right_cutoff is None :
146
167
logger .debug ("Overlap alignment between %s and %s failed." , a .unique_name , b .unique_name )
147
168
return None
148
169
149
- left_overlap_alignments = map_overlap_onto_candidate (str (right_initial_overlap ), str ( left . seq ))
170
+ left_overlap_alignments = left . map_overlap (str (right_initial_overlap ))
150
171
left_cutoff = min ((al .r_st for al in left_overlap_alignments ), default = None )
151
172
if left_cutoff is None :
152
173
logger .debug ("Overlap alignment between %s and %s failed." , a .unique_name , b .unique_name )
@@ -186,7 +207,7 @@ def try_combine_contigs(finder: OverlapFinder,
186
207
right_overlap_chunk = '' .join (x for x in aligned_right [max_concordance_index :] if x != '-' )
187
208
188
209
result_seq = left_remainder + left_overlap_chunk + right_overlap_chunk + right_remainder
189
- result_contig = Contig (None , result_seq )
210
+ result_contig = ContigWithAligner (None , result_seq )
190
211
191
212
logger .debug ("Joined %s and %s together in a contig %s with lengh %s." ,
192
213
a .unique_name , b .unique_name ,
@@ -198,7 +219,7 @@ def try_combine_contigs(finder: OverlapFinder,
198
219
def extend_by_1 (finder : OverlapFinder ,
199
220
max_acceptable_prob : Fraction ,
200
221
path : ContigsPath ,
201
- candidate : Contig ,
222
+ candidate : ContigWithAligner ,
202
223
) -> Iterator [ContigsPath ]:
203
224
if path .has_contig (candidate ):
204
225
return
@@ -217,7 +238,7 @@ def extend_by_1(finder: OverlapFinder,
217
238
218
239
def calc_extension (finder : OverlapFinder ,
219
240
max_acceptable_prob : Fraction ,
220
- contigs : Sequence [Contig ],
241
+ contigs : Sequence [ContigWithAligner ],
221
242
path : ContigsPath ,
222
243
) -> Iterator [ContigsPath ]:
223
244
@@ -228,7 +249,7 @@ def calc_extension(finder: OverlapFinder,
228
249
def calc_multiple_extensions (finder : OverlapFinder ,
229
250
max_acceptable_prob : Fraction ,
230
251
paths : Iterable [ContigsPath ],
231
- contigs : Sequence [Contig ],
252
+ contigs : Sequence [ContigWithAligner ],
232
253
) -> Iterator [ContigsPath ]:
233
254
for path in paths :
234
255
yield from calc_extension (finder , max_acceptable_prob , contigs , path )
@@ -249,7 +270,7 @@ def filter_extensions(existing: MutableMapping[str, ContigsPath],
249
270
yield from ret .values ()
250
271
251
272
252
- def calculate_all_paths (contigs : Sequence [Contig ]) -> Iterator [ContigsPath ]:
273
+ def calculate_all_paths (contigs : Sequence [ContigWithAligner ]) -> Iterator [ContigsPath ]:
253
274
max_acceptable_prob = ACCEPTABLE_STITCHING_PROB
254
275
existing : MutableMapping [str , ContigsPath ] = {}
255
276
finder = OverlapFinder .make ('ACTG' )
@@ -290,12 +311,12 @@ def calculate_all_paths(contigs: Sequence[Contig]) -> Iterator[ContigsPath]:
290
311
max_acceptable_prob = max (x .pessimisstic_probability for x in paths )
291
312
292
313
293
- def find_most_probable_path (contigs : Sequence [Contig ]) -> ContigsPath :
314
+ def find_most_probable_path (contigs : Sequence [ContigWithAligner ]) -> ContigsPath :
294
315
paths = calculate_all_paths (contigs )
295
316
return max (paths , key = ContigsPath .score )
296
317
297
318
298
- def stitch_consensus (contigs : Iterable [Contig ]) -> Iterable [ Contig ]:
319
+ def stitch_consensus (contigs : Iterable [ContigWithAligner ]) -> Iterator [ ContigWithAligner ]:
299
320
remaining = tuple (contigs )
300
321
while remaining :
301
322
most_probable = find_most_probable_path (remaining )
@@ -304,7 +325,7 @@ def stitch_consensus(contigs: Iterable[Contig]) -> Iterable[Contig]:
304
325
if not most_probable .has_contig (contig ))
305
326
306
327
307
- def write_contigs (output_fasta : TextIO , contigs : Iterable [Contig ]):
328
+ def write_contigs (output_fasta : TextIO , contigs : Iterable [ContigWithAligner ]):
308
329
records = (SeqRecord (Seq .Seq (contig .seq ),
309
330
description = '' ,
310
331
id = contig .unique_name ,
@@ -313,9 +334,9 @@ def write_contigs(output_fasta: TextIO, contigs: Iterable[Contig]):
313
334
SeqIO .write (records , output_fasta , "fasta" )
314
335
315
336
316
- def read_contigs (input_fasta : TextIO ) -> Iterable [Contig ]:
337
+ def read_contigs (input_fasta : TextIO ) -> Iterable [ContigWithAligner ]:
317
338
for record in SeqIO .parse (input_fasta , "fasta" ):
318
- yield Contig (name = record .name , seq = record .seq )
339
+ yield ContigWithAligner (name = record .name , seq = record .seq )
319
340
320
341
321
342
def referenceless_contig_stitcher (input_fasta : TextIO ,
0 commit comments