-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmergeTables.py
executable file
·223 lines (183 loc) · 8.47 KB
/
mergeTables.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/hpc/local/Rocky8/dhl_ec/software/tempMiniconda3envs/gwas/bin/python
### #!/usr/bin/python
#
# Merge two files into one.
#
# Description: merges two files based on some key-column into one file. The lines do not
# have to be sorted.
#
# Original written and published by:
# * Paul I.W. de Bakker, [email protected]
# * 4 July 2009
#
# Written by: Bram van Es; Utrecht, the Netherlands
# Edited by: Mike Puijk;
# Suggested by: Sander W. van der Laan; Utrecht, the Netherlands;
# Version: 2.0 beta 1
# Update date: 2023-04-28
#
# Usage: python3 mergeTables.py --in_file1 /file1.txt.gz --in_file2 /file2.txt.gz --indexID VariantID --out_file /joined.txt.gz [optional: --replace: add option to replace column contents, default: none; --verbose: add option to print verbose output (T/F), default: F]
# TO DO
# (optional: --replace) add option to replace column contents
#
# (bug): If one of the input files only contains a single column the script will fail. This is due to the delimiter detection (it doens't know there shouldn't be one).
# The fix to this bug would be to create 2 extra options, so that you can specify per file what delimiter to use (or to use none).
# This would also circumvent any other issues the delimiter detection might have.
# Alternatively, use "grep -f" for this use-case.
# Import libraries
import os
import sys
import subprocess
import polars as pl
import argparse
import magic
import gzip
import time
import csv
# Check for required libraries and install them if not present
# https://stackoverflow.com/questions/12332975/installing-python-module-within-code
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
from argparse import RawTextHelpFormatter
# set starting time
start = time.time()
# detect file delimiter
def detect_delimiter(in_file):
# Read first 10KB of the file to determine the delimiter
sniffer = csv.Sniffer()
lines = in_file.read(10000)
delimiter = str(sniffer.sniff(lines).delimiter)
return delimiter
# detect file delimiter
# def detect_delimiter(file_path):
# with open(file_path, "rb") as f:
# # Read first 10KB of the file to determine the delimiter
# sample = f.read(10240)
# if b" " in sample:
# return " "
# else:
# return "\t"
# Parse arguments
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='''
+ mergeTables 2.0 beta 1+
This script joins `in_file1` and `in_file2` based on the `indexID` column. The index column must be
the first column in both files. The `out_file` file will be compressed with gzip and written
in parquet-format when out_file ends with .parquet, otherwise a space-delimited .txt is written.
The `replace` option adds the option to replace column contents. The `verbose` option adds the
option to print verbose output (T/F), default: F.
This is an example call:
python3 mergeTables.py --in_file1 /file1.txt.gz --in_file2 /file2.txt.gz --indexID VariantID --out_file /joined.txt.gz [optional: --replace VariantID; --verbose: T/F]
''',
epilog='''
+ Copyright 1979-2023. Bram van Es & Sander W. van der Laan | [email protected] | https://vanderlaan.science +''',
formatter_class=RawTextHelpFormatter)
parser.add_argument('--in_file1', type=str, required=True)
parser.add_argument('--in_file2', type=str, required=True)
parser.add_argument('--indexID', type=str, required=True)
# parser.add_argument('--replace', type=str, required=False)
parser.add_argument('--out_file', type=str, required=True)
parser.add_argument('--verbose', type=str, required=False)
args = parser.parse_args()
in_file1 = args.in_file1 # parser.parse_args().in_file1
in_file2 = args.in_file2 # parser.parse_args().in_file2
indexID = args.indexID # parser.parse_args().indexID
# replace = args.replace # parser.parse_args().replace
out_file = args.out_file # parser.parse_args().out_file
if args.verbose:
verbose = args.verbose # parser.parse_args().verbose
else:
verbose = "F"
# Print some information
print("\n+ mergeTables 2.0 beta 1 +")
print(f"\n Starting merging the following files:")
print(f" > [{in_file1}]")
print(f" > [{in_file2}]")
print(f"\n Index set to [{indexID}]")
# print(f"\n Column to replace [{replace}]")
print(f"\n Output will be written to [{out_file}]")
if verbose == "T":
print(f"\n Verbose output is set to [{verbose}]. This will slow down the process, but prints times and file information.")
else:
print(f"\n Verbose output is set to [{verbose}] (default)")
# Detect file type using magic
# https://stackoverflow.com/questions/43580/how-do-i-check-the-file-type-of-a-file-in-python
mime = magic.Magic(mime=True)
mime_type_file1 = mime.from_file(in_file1) #== 'application/gzip'
if verbose == "T":
is_gzip1_time = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(is_gzip1_time - start))}")
mime_type_file2 = mime.from_file(in_file2) #== 'application/gzip'
if verbose == "T":
is_gzip2_time = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(is_gzip2_time - is_gzip1_time))}")
if verbose == "T":
print(f"\n Detected files types:")
print(f" > File1 gzipped? {mime_type_file1}")
print(f" > File2 gzipped? {mime_type_file2}")
# Detect file delimiter
if mime_type_file1 == "application/gzip" or mime_type_file1 == "application/x-gzip":
with gzip.open(in_file1, mode = 'rt') as f:
file1_delimiter = detect_delimiter(f)
file1 = pl.read_csv(in_file1, separator=file1_delimiter, infer_schema_length=0)
else:
with open(in_file1) as f:
file1_delimiter = detect_delimiter(f)
file1 = pl.read_csv(in_file1, separator=file1_delimiter, infer_schema_length=0)
if verbose == "T":
file1_delimiter_t = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(file1_delimiter_t - is_gzip2_time))}")
if mime_type_file2 == "application/gzip" or mime_type_file2 == "application/x-gzip":
with gzip.open(in_file2, mode = 'rt') as f:
file2_delimiter = detect_delimiter(f)
file2 = pl.read_csv(in_file2, separator=file2_delimiter, infer_schema_length=0)
else:
with open(in_file2) as f:
file2_delimiter = detect_delimiter(f)
file2 = pl.read_csv(in_file2, separator=file2_delimiter, infer_schema_length=0)
if verbose == "T":
file2_delimiter_t = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(file2_delimiter_t - file1_delimiter_t))}")
if verbose == "T":
print(f"\n Detected the following delimiters:")
print(f" > File1: [{file1_delimiter}].")
print(f" > File2: [{file2_delimiter}].")
print(f"\n Detected the following heads and tails:")
print(f" > File1:")
print(file1.head(3))
print(file1.tail(3))
print(f" > File2:")
print(file2.head(3))
print(file2.tail(3))
# Read files using polars
# How to get rid of 'Polars found a filename. Ensure you pass a path to the file instead of a python file object when possible for best performance.'
# https://stackoverflow.com/questions/75690784/polars-for-python-how-to-get-rid-of-ensure-you-pass-a-path-to-the-file-instead
new_df = file1.join(file2, on=indexID, how='inner')
if verbose == "T":
new_df_t = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(new_df_t - file2_delimiter_t))}")
if verbose == "T":
print(f"\n Detected the following heads and tails in the merge file:")
print(new_df.head(3))
print(new_df.tail(3))
# Write output file
print(f"\n Writing output file...")
if out_file.endswith(".parquet"):
new_df.write_parquet(out_file,
compression='gzip',
statistics=True)
else:
new_df.write_csv(out_file,
has_header=True, separator=' ', null_value='NA',
batch_size=1024)
if verbose == "T":
new_df_write_t = time.time()
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(new_df_write_t - new_df_t))}")
end = time.time()
print(f"Total elapsed time: {time.strftime('%H:%M:%S', time.gmtime(end - start))}")
# Done
print("\n Wow, all done. Let's have a beer buddy!\n")
print("+ Copyright 1979-2023. Bram van Es & Sander W. van der Laan | [email protected] | https://vanderlaan.science +")