-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_hgvs_annotation_tsv.py
219 lines (178 loc) · 8.92 KB
/
generate_hgvs_annotation_tsv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from datetime import date
from os.path import exists
from sys import path
from pprint import pprint
import argparse
import os
import sys
import read_resource_jsons as read_resources
def main():
# Parse the command line
args = parse_command_line()
# Define the executable bcftools command
if args.tools_directory:
if not args.tools_directory.endswith('/'): args.tools_directory = str(args.tools_directory) + '/'
bcftools = args.tools_directory + 'bcftools/bcftools'
else:
bcftools = 'bcftools'
# Read the mosaicJson file to get information on how to process different annotations
mosaic_info = read_resources.read_mosaic_json(args.mosaic_json, args.reference)
# Open an output tsv file to write annotations to
outputFile = open('hgvs.tsv', 'w')
# Loop over the annotations that are to be uploaded to Mosaic for this resource and get the annotation name, uid and
# type
annotations = {}
uids = []
for annotation in mosaic_info['resources']['HGVS']['annotations']:
uid = mosaic_info['resources']['HGVS']['annotations'][annotation]['uid']
tagType = mosaic_info['resources']['HGVS']['annotations'][annotation]['type']
annotations[annotation] = {'uid': uid, 'type': tagType}
# Write the header line to the tsv file
print('CHROM\tSTART\tEND\tREF\tALT\t', '\t'.join(str(x['uid']) for x in annotations.values()), sep = '', file = outputFile)
# Check that the delimiter is provided to determine how to split up the compound annotation. If it is not set, then provide a
# warning and do not preceed with this annotation
try:
delimiter = mosaic_info['resources']['HGVS']['delimiter']
except:
fail('The delimiter field is not provided for resource ' + str(resource) + ' and so its annotation cannot be processed.')
# Get all the MANE transcript ids
mane_file = '/scratch/ucgd/lustre-labs/marth/scratch/calypso/data/GRCh38/reference/MANE.GRCh38.v1.3.ensembl.transcript_ids.txt'
mane = open(mane_file, 'r')
maneIds = []
for record in mane.readlines():
maneIds.append(record.rstrip())
mane.close()
# Loop over all records in the vcf file
command = bcftools + ' query -f \'%CHROM\\t%POS\\t%END\\t%REF\\t%ALT\\t%INFO/' + '\\t%INFO/'.join([mosaic_info['resources']['HGVS']['info_field']]) + '\\n\' ' + str(args.input_vcf)
for record in os.popen(command).readlines():
# Split the record on tabs
fields = record.rstrip().split('\t')
# If all values are '.', this line can be ignored
uniqueValues = set(fields[5:])
if len(uniqueValues) == 1 and list(uniqueValues)[0] == '.':
continue
# Update the chromosome and position
fields[0], fields[2] = updateCoords(fields[0], fields[2])
# If there are multiple annotations, e.g. there could be HGVS codes for multiple transcripts, break the
# annotations up
hgvsFields = fields.pop()
combinedCodes = hgvsFields.split(',') if ',' in hgvsFields else [hgvsFields]
positions = []
for annotation in mosaic_info['resources']['HGVS']['annotations']:
position = mosaic_info['resources']['HGVS']['annotations'][annotation]['position']
if position:
positions.append(position)
# Loop over the HGVS annotations
options = {}
optionalCodes = {}
for i, code in enumerate(combinedCodes):
options[i] = False
hgvsCodes = code.split(delimiter)
optionalCodes[i] = {}
for position in positions:
fullCode = hgvsCodes[int(position) - 1]
hgvsCode = fullCode.split(':')
if not fullCode: continue
if '%' in hgvsCode[1]: hgvsCode[1] = hgvsCode[1].replace('%3D', '=')
if hgvsCode[0] in maneIds: options[i] = True
optionalCodes[i][position] = hgvsCode
if len(optionalCodes[i]) != 2 and options[i]: options[i] = 'mane'
elif len(optionalCodes[i]) == 2 and not options[i]: options[i] = 'nonmane'
elif len(optionalCodes[i]) == 0: del optionalCodes[i]
# If no options were found (e.g. the HGVS codes were empty), skip this field
if len(optionalCodes) == 0: continue
# Loop over the available HGVS codes and choose the best option
topChoices = []
for i in options:
if options[i] == True: topChoices.append(i)
# If there was a single top choice, use this:
if len(topChoices) == 1:
fields = updateFields(fields, optionalCodes[topChoices[0]][positions[0]][1])
fields = updateFields(fields, optionalCodes[topChoices[0]][positions[1]][1])
# If there are multiple top choices, include all of them. These will likely come from different, overlapping genes
elif len(topChoices) > 1:
code = ''
for i in topChoices: code += str(optionalCodes[i][positions[0]][1]) + ','
code.rstrip(',')
fields = updateFields(fields, code)
code = ''
for i in topChoices: code += str(optionalCodes[i][positions[1]][1]) + ','
code.rstrip(',')
fields = updateFields(fields, code)
# If there are no top choices, look for MANE transcripts, but records that don't have both a c. and a p.
else:
maneChoices = []
for i in options:
if options[i] == 'mane': maneChoices.append(i)
# If there was a single mane, use this, otherwise pick from the options if there are
# multiple. If there are no manes, move to non-mane transcripts that have both a c. and p.
if len(maneChoices) == 1:
if positions[0] in optionalCodes[maneChoices[0]]: fields = updateFields(fields, optionalCodes[maneChoices[0]][positions[0]][1])
else: fields.append('')
if positions[1] in optionalCodes[maneChoices[0]]: fields = updateFields(fields, optionalCodes[maneChoices[0]][positions[1]][1])
else: fields.append('')
elif len(maneChoices) > 1:
i = list(optionalCodes.keys())[0]
if positions[0] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[0]][1])
else: fields.append('')
if positions[1] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[1]][1])
else: fields.append('')
else:
nonManeChoices = []
for i in options:
if options[i] == 'nonmane': nonManeChoices.append(i)
# Look through the options as before
if len(nonManeChoices) == 1:
if positions[0] in optionalCodes[nonManeChoices[0]]: fields = updateFields(fields, optionalCodes[nonManeChoices[0]][positions[0]][1])
else: fields.append('')
if positions[1] in optionalCodes[nonManeChoices[0]]: fields = updateFields(fields, optionalCodes[nonManeChoices[0]][positions[1]][1])
else: fields.append('')
elif len(nonManeChoices) > 1:
i = list(optionalCodes.keys())[0]
if positions[0] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[0]][1])
else: fields.append('')
if positions[1] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[1]][1])
else: fields.append('')
else:
# If there is a single non mane, use it, otherwise just take the first value
if len(optionalCodes) >= 1:
i = list(optionalCodes.keys())[0]
if positions[0] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[0]][1])
else: fields.append('')
if positions[1] in optionalCodes[i]: fields = updateFields(fields, optionalCodes[i][positions[1]][1])
else: fields.append('')
print('\t'.join(fields), file = outputFile)
# Close the output tsv file
outputFile.close()
# Input options
def parse_command_line():
global version
parser = argparse.ArgumentParser(description='Process the command line')
# Required arguments
parser.add_argument('--input_vcf', '-i', required = True, metavar = 'string', help = 'The input vcf file to annotate')
parser.add_argument('--reference', '-r', required = True, metavar = 'string', help = 'The genome reference file used')
parser.add_argument('--tools_directory', '-s', required = True, metavar = 'string', help = 'The path to the directory where the tools live')
parser.add_argument('--mosaic_json', '-m', required = True, metavar = 'string', help = 'The json file describing the Mosaic parameters')
return parser.parse_args()
# Update the chromosome and position in the tsv file
def updateCoords(chrom, pos):
# Check that the chromosome does not include "chr" prefix
if chrom.startswith('chr'): chrom = chrom.strip('chr')
# Add one to the end position
pos = str(int(pos) + 1)
# Return the updated values
return chrom, pos
# Add the value to the fields list ensuring that it is a valid value
def updateFields(fields, value):
# Ensuret he value is under the 255 character limit
if len(value) > 254: value = 'HGVS code too long'
# Append the value and return
fields.append(value)
return fields
# If the script fails, provide an error message and exit
def fail(message):
print(message, sep = "")
exit(1)
# Initialise global variables
if __name__ == "__main__":
main()