-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiFastaContigAvgJudgement.py
220 lines (163 loc) · 6.73 KB
/
multiFastaContigAvgJudgement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/python
import sys
import os
import os.path
import argparse
import re
import string
import logging
import warnings
## simpleFastaStats.py takes a single fasta-formatted DNA/RNA text file and
## outputs contig count, average contig length, N50 contig lengths, maximum contig length, and cumulative contig length
## Function: A closure for file extension checking
def ext_check(expected_ext, openner):
def extension(filename):
if not filename.lower().endswith(expected_ext):
raise ValueError()
return openner(filename)
return extension
## Function: Filename extractor from filepath
def getIsolateID(filePathString):
splitStr = re.split(pattern='/', string=filePathString)
fileNameIdx = len(splitStr) - 1
isolateString = re.split(pattern='\.', string=splitStr[fileNameIdx])
if(len(isolateString[0]) < 10):
isolateString = re.split(pattern='\.', string=splitStr[0])
return isolateString[0]
## Function: Checks existence of --outDir
def readable_dir(prospective_dir):
if not os.path.isdir(prospective_dir):
raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir))
if os.access(prospective_dir, os.R_OK):
if( not prospective_dir.endswith("/") ):
prospective_dir = prospective_dir + "/"
return prospective_dir
else:
raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir))
parser = argparse.ArgumentParser(description='compare average contig and contig counts among multiple .fasta, move lower quality assemblies to Hel', usage="multiFastaContigAvgJudgement.py filepath/input.assembly*.fasta --minLength 500(default) --format [brief(default)|verbose|tsv|csv]")
parser.add_argument("filename",type=ext_check('.fasta', argparse.FileType('r')), nargs='+')
## output folder
parser.add_argument('--outDir', '-D', type=readable_dir, required=True, action='store')
## minimum contig length
parser.add_argument("--minLength", '-min', default='500', type=int)
parser.add_argument("--format", default='brief', type = lambda s : s.lower(), choices=['tsv', 'csv', 'brief', 'verbose', 'c', 's', 'b', 'v'])
## arrays of dict type variables
#GenomeDrafts = []
#GenomeContigs = []
inFileName = []
draftContigs = []
draftGenome = {}
contigLengths = {}
idCount = 0
contigID = ""
contigStr = ""
contigCount = []
maxContig = []
contigN50 = []
drLength = 0
draftLength = []
avgContig = []
args = parser.parse_args()
helHeim = args.outDir
intMinLen = args.minLength
idxFile = 0
##### Begin logging #####
logger = logging.getLogger("multiFastaContigAvgJudgement.py")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
if(len(args.filename) < 2):
print("Input Error: Two or more .fasta files required!")
sys.exit(1)
##### Begin multiple input file loop #####
for filehandle in args.filename:
inFileName.append(getIsolateID(filehandle.name))
## First inner loop to read input file lines
for line in filehandle:
if(re.search(r'^>', line)):
if(idCount > 0):
draftGenome[contigID] = contigStr
contigID = ""
contigStr = ""
contigID = line.strip()
if(re.search(r'\(paired\)', contigID)):
contigID = contigID.replace('_(paired)', '')
if(re.search('R1_001_', contigID)):
contigID = contigID.replace('R1_001_', '')
draftContigs.append(contigID)
idCount = idCount + 1
#print(contigID)
elif(re.search(r'^(A|T|G|C|U|N)+', line)):
contigStr = contigStr + line.strip()
draftGenome[contigID] = contigStr
### End first inner loop
## Close input file
filehandle.close()
### Second inner loop to populate dict of contig lengths
for contigKey in draftGenome:
if( len(draftGenome[contigKey]) > (intMinLen - 1) ):
contigLengths[contigKey] = len(draftGenome[contigKey])
##print(contigKey + " => " + str(contigLengths[contigKey]))
### End second innner loop
count = 0
### Third inner loop to find longest contig and contig count given length > intMinLen
for contigID in sorted(contigLengths, key=contigLengths.__getitem__, reverse=True):
if( contigLengths[contigID] > (intMinLen - 1) ):
if(count == 0):
maxContig.append(contigLengths[contigID])
top = 1
count = count + 1
drLength = drLength + contigLengths[contigID]
draftLength.append(drLength)
### End third inner loop
contigCount.append(count)
avgContig.append(draftLength[idxFile]/contigCount[idxFile])
### to compute N50, find the contig that 'resides' at 1/2 of draftLength
drLength = 0
cumulativeLength = 0;
### Fourth inner loop to calculate N50
for contigID in sorted(contigLengths, key=contigLengths.__getitem__, reverse=True):
if( contigLengths[contigID] > (intMinLen - 1) ):
cumulativeLength = cumulativeLength + contigLengths[contigID]
if(cumulativeLength > (draftLength[idxFile]/2)):
contigN50.append(contigLengths[contigID])
break
### End fourth inner loop
draftContigs = []
draftGenome = {}
contigLengths = {}
idCount = 0
contigID = ""
contigStr = ""
idxFile = idxFile + 1
##### End of multiple input file loop #####
idx = 0
for idx in range(len(inFileName)):
if ( args.format == 'verbose' or args.format == 'v' ):
print("Assembly File\tMinimum Contig Length:\tcontigCount\tavgContig\tN50\tmaxContig\tdraftLength")
print("{}\t".format(inFileName[idx]), ">", intMinLen - 1 ,"bp:\t", contigCount[idx], "\t", "%.0f" % avgContig[idx], "\t", contigN50[idx], "\t", maxContig[idx], "\t", draftLength[idx])
elif( args.format == 'brief' or args.format == 'b' ):
print("Assembly\tcontigCount\tavgContig\tN50\tmaxContig")
print(inFileName[idx] + "\t" + str(contigCount[idx]) + "\t" + str("%.0f" % avgContig[idx]) + "\t" + str(contigN50[idx]) + "\t" + str(maxContig[idx]))
elif ( args.format == 'tsv' or args.format == 't'):
print(str(contigCount[idx]) + "\t" + str("%.0f" % avgContig[idx]) + "\t" + str(contigN50[idx]) + "\t" + str(maxContig[idx]))
elif ( args.format == 'csv' or args.format == 'c' ):
print(inFileName[idx] + "," + str(contigCount[idx]) + "," + str("%.0f" % avgContig[idx]) + "," + str(contigN50[idx]) + "," + str(maxContig[idx]))
idx = idx + 1
### Judgement Day ###
### All assembly files are sent to Hel except for theWorthy ###
## index of the file with optimal assembly metrics
theWorthy = 0
for helCount in range( 1, len(inFileName) ):
if(avgContig[helCount] > avgContig[theWorthy]):
theWorthy = helCount
##print(theWorthy, " ", avgContig[helCount], " ", avgContig[theWorthy])
elif(contigCount[helCount] < contigCount[theWorthy]):
theWorthy = helCount
idx = 0
for idx in range( len(inFileName) ):
if(idx != theWorthy):
os.system("mv -v {} {}".format(args.filename[idx].name, helHeim))