Skip to content

Implement sample merging #1026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
18 changes: 18 additions & 0 deletions Singularity
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,21 @@ From: centos:7
%apphelp denovo
Standard pipeline with de novo assembly instead of mapping to reference
sequences.

%apphelp merge_fastqs
Combine and filter the FASTQ files from two samples into a single output file.

%applabels merge_fastqs
KIVE_INPUTS fastq1_a fastq2_a fastq1_b fastq2_b \
bad_cycles_a bad_cycles_b
KIVE_OUTPUTS fastq1_result fastq2_result
KIVE_THREADS 1
KIVE_MEMORY 200

%apprun merge_fastqs
PYTHONPATH=/opt/micall \
python -m micall.core.merge_fastqs \
"$1" "$2" "$3" "$4" \
--bad_cycles_a_csv "$5" \
--bad_cycles_b_csv "$6" \
"$7" "$8"
7 changes: 6 additions & 1 deletion docs/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@
link: /design/resistance.html
- name: Genome Coverage
link: /genome_coverage.html
- name: Scripts and tools
link: /scripts.html
dropdown:
- name: Merge fastqs
link: /scripts/merge_fastqs.html
- name: GitHub
link: https://github.com/cfe-lab/MiCall
link: https://github.com/cfe-lab/MiCall
110 changes: 110 additions & 0 deletions docs/scripts/merge_fastqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@

The purpose of this script is to merge multiple FASTQ files into a single comprehensive file while simultaneously trimming the sequences based on provided quality scores. Trimming is performed to remove reads with low quality scores, which could skew the analysis outcomes.

## Usage

To use the script, users must provide two mandatory CSV files: a trim plan and a merge plan. Optionally, users can also provide a CSV file listing compressed files or files that require compression. Detailed descriptions for each input file and its structure are provided below.

### Input Files

1. **Trim Plan (trimplan.csv):**
- This file lists the FASTQ files that require trimming before merging.
- It has the following column headers:
- r1: FASTQ file for read 1
- r2: FASTQ file for read 2
- ...
- rn: FASTQ file for read n
- bad_cycles (optional): A CSV file that lists bad cycles with high error rates to be omitted during the trimming process.
- Example
```csv
r1,r2,bad_cycles
sampleA_R1.fastq,sampleA_R2.fastq,bad_cycles_sampleA.csv
sampleB_R1.fastq,sampleB_R2.fastq,
```

2. **Merge Plan (mergeplan.csv):**
- This file maps input FASTQ files to their desired output files after merging.
- It consists of the following columns:
- input: Path to the input FASTQ file to be merged.
- output: Path to the output FASTQ file after merging.
- Example:
```csv
input,output
sampleA_R1.fastq,merged_r1.fastq.gz
sampleB_R1.fastq,merged_r1.fastq.gz
sampleA_R2.fastq,merged_R2.fastq
sampleB_R2.fastq,merged_R2.fastq
```

3. **Zip File (zipfile.csv, optional):**
- Lists the FASTQ files that are gzipped and need uncompressing before processing or the output files that need to be compressed after merging.
- It has a single column:
- file: Path to the compressed file.
- Example:
```csv
file
sampleA_R1.fastq.gz
sampleB_R2.fastq.gz
merged_r1.fastq.gz
```

### Command-Line Arguments

The script requires the following command-line arguments to be passed:

- **trimplan:** A CSV file containing the lists of files to be trimmed.
- **mergeplan:** A CSV file containing the merge plan.
- **--zipfile (optional):** A CSV file containing a list of files that are compressed or need to be compressed.

```bash
python "micall/core/merge_fastqs.py" "trimplan.csv" "mergeplan.csv" --zipfile "zipfile.csv"
```

### Input FASTQ files

For merging two FASTQ files from the `A` and `B` sample, the input FASTQ files might look like this:

- **sampleA_R1.fastq:**
```
@SEQ_ID_A1
GGTAAC...
+
!!!BBB...
```

- **sampleB_R1.fastq:**
```
@SEQ_ID_B1
CCTTGG...
+
!!!BBB...
```

After running the script, `merged_r1.fastq.gz` would contain:

```
@SEQ_ID_A1
GGTAAC...
+
!!!BBB...
@SEQ_ID_B1
CCTTGG...
+
!!!BBB...
```

Resulting file may have some bases trimmed (removed from its read sequence).

## Workflow Descriptions

The script implements a series of steps to perform the merging:

1. **File Parsing:** It starts by parsing input CSV files detailing which FASTQ files need to be merged or trimmed and which files are compressed or should be post-process compression.

2. **Quality Trimming:** Reads or cycles marked for poor quality are meticulously trimmed from the FASTQ files to ensure the accuracy of the subsequent analysis steps.

3. **File Concatenation:** The script executes a concatenating routine aligning with the merge plan, producing a single, cohesive FASTQ file from multiple input sequences.

4. **Compression Handling:** When instructed, the script efficiently compresses or decompresses FASTQ files to adequately manage disk space and meet file format requirements.

5. **Finalization:** The cleanup step happens last, removing any temporary files generated throughout the process.
177 changes: 177 additions & 0 deletions micall/core/merge_fastqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import argparse
import logging
import shutil
import sys
import os
import tempfile
import gzip
import csv
from typing import TextIO, List, Optional, Dict, IO, Set, Tuple

from micall.core.trim_fastqs import TrimSteps, trim


logging.basicConfig(level=logging.INFO,
format='%(asctime)s[%(levelname)s]%(name)s.%(funcName)s(): %(message)s')
logger = logging.getLogger(__name__)


class AmbiguousBadCycles(ValueError): pass
class BadTrimplanFieldNames(ValueError): pass


def concatenate_files(input_files: List[str], output_file: str) -> None:
with open(output_file, 'wb') as dst:
for input_file in input_files:
with open(input_file, 'rb') as src:
shutil.copyfileobj(src, dst)


def parse_inputs_and_merge_fastqs(trim_file: TextIO, mergeplan_file: TextIO, zip_file: Optional[TextIO]) -> None:
mergeplan: Dict[str, List[str]] = {}
trimplan: Set[Tuple[str, ...]] = set()
zipped: Set[str] = set()
bad_cycles: Dict[str, str] = {}

mergeplan_reader = csv.DictReader(mergeplan_file)
trim_reader = csv.DictReader(trim_file)
zip_reader = csv.DictReader(zip_file) if zip_file is not None else None

for row in mergeplan_reader:
input_path = row["input"]
output_path = row["output"]

if output_path not in mergeplan:
mergeplan[output_path] = []
mergeplan[output_path].append(input_path)

no_badcycles_fields = list(sorted(
(field for field in trim_reader.fieldnames or [] if field != "bad_cycles"),
key=lambda field: field.lower()))
expected_no_badcycles_fields = [f"r{i + 1}" for i in range(len(no_badcycles_fields))]

if [field.lower() for field in no_badcycles_fields] \
!= expected_no_badcycles_fields:
raise BadTrimplanFieldNames(f"Bad field names: {no_badcycles_fields}, expected {expected_no_badcycles_fields}")

for row in trim_reader:
input_paths = tuple(row[field] for field in no_badcycles_fields)
trimplan.add(input_paths)
bad_cycles_path = row.get("bad_cycles", "")
if bad_cycles_path:
for input_path in input_paths:
bad_cycles[input_path] = bad_cycles_path

if zip_reader is not None:
for row in zip_reader:
zipped.add(row["file"])

return merge_fastqs(trimplan, mergeplan, zipped, bad_cycles)


def compress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as input_file, \
open(output_path, 'wb') as output_file:
with gzip.GzipFile(fileobj=output_file, mode='wb') as gzip_file:
shutil.copyfileobj(input_file, gzip_file)


def uncompress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as compressed_file, \
open(output_path, 'w+b') as ret:
with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
shutil.copyfileobj(gzip_file, ret)


def get_transitive(graph: Dict[str, str], key: str) -> str:
if key in graph:
return get_transitive(graph, graph[key])
else:
return key


def merge_fastqs(trimplan: Set[Tuple[str, ...]],
mergeplan: Dict[str, List[str]],
zipped: Set[str] = set(),
bad_cycles: Dict[str, str] = {},
) -> None:

inputs = [value for values in mergeplan.values() for value in values]
outputs = list(mergeplan.keys())
name_mapping: Dict[str, str] = {}
temporary: List[IO] = []

for output_path in outputs:
os.makedirs(os.path.dirname(output_path), exist_ok=True)

for input_path in inputs:
if input_path in zipped:
logger.info('Uncompressing %s.', input_path)
uncompressed = tempfile.NamedTemporaryFile(mode="w+b")
uncompressed_path = uncompressed.name
uncompress_file(input_path, uncompressed_path)
temporary.append(uncompressed)
name_mapping[input_path] = uncompressed_path

for to_trim in trimplan:
assert len(to_trim) > 0

trim_outputs: List[str] = []
trim_inputs: List[str] = []

all_bad_cycles_paths = set(bad_cycles[path] for path in to_trim if path in bad_cycles)
if len(all_bad_cycles_paths) == 0:
bad_cycles_path = None
elif len(all_bad_cycles_paths) == 1:
bad_cycles_path = next(iter(all_bad_cycles_paths))
else:
raise AmbiguousBadCycles(f"Ambiguous bad_cycles for {to_trim}: {all_bad_cycles_paths}.")

for path in to_trim:
path = get_transitive(name_mapping, path)
tmp = tempfile.NamedTemporaryFile()
trim_inputs.append(path)
trim_outputs.append(tmp.name)
temporary.append(tmp)
name_mapping[path] = tmp.name

logger.info('Trimming samples %s.', ','.join(map(repr, to_trim)))
trim(trim_inputs, bad_cycles_path, trim_outputs, use_gzip=False)

for output_path in mergeplan:
input_files = mergeplan[output_path]
logger.info('Merging results %s to %s.', ','.join(map(repr, input_files)), output_path)
input_files = [get_transitive(name_mapping, path) for path in input_files]
tmp = tempfile.NamedTemporaryFile()
temporary.append(tmp)
name_mapping[output_path] = tmp.name
output_path = tmp.name
concatenate_files(input_files, output_path)

for output_path in mergeplan:
concatenated = name_mapping[output_path]
if output_path in zipped:
logger.info('Compressing output %s.', output_path)
compress_file(concatenated, output_path)
else:
os.rename(concatenated, output_path)

for toclose in temporary:
try: toclose.close()
except: pass

logger.info('Done.')


def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description="Combine and filter the FASTQ files from multiple samples into single output files.")
parser.add_argument("trimplan", type=argparse.FileType('rt'), help="A CSV file containing the lists of files to be trimmed.")
parser.add_argument("mergeplan", type=argparse.FileType('rt'), help="A CSV file containing merge plan.")
parser.add_argument("--zipfile", type=argparse.FileType('rt'), help="A CSV file containing a list of files that are compressed or need to be compressed.")
args = parser.parse_args(argv)

parse_inputs_and_merge_fastqs(args.trimplan, args.mergeplan, args.zipfile)
return 0


if __name__ == '__main__': exit(main(sys.argv[1:]))
12 changes: 7 additions & 5 deletions micall/core/trim_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pathlib import Path

import typing
from typing import Optional

from Bio import SeqIO

Expand Down Expand Up @@ -57,12 +58,12 @@ def parse_args():


def trim(original_fastq_filenames: typing.Sequence[str],
bad_cycles_filename: str,
bad_cycles_filename: Optional[str],
trimmed_fastq_filenames: typing.Sequence[str],
use_gzip: bool = True,
summary_file: typing.TextIO = None,
skip: typing.Tuple[str] = (),
project_code: str = None):
summary_file: Optional[typing.TextIO] = None,
skip: typing.Tuple[str, ...] = (),
project_code: Optional[str] = None):
"""

:param original_fastq_filenames: sequence of two filenames, containing
Expand All @@ -88,8 +89,9 @@ def trim(original_fastq_filenames: typing.Sequence[str],

censored_filenames = [filename + '.censored.fastq'
for filename in trimmed_fastq_filenames]
if (TrimSteps.censor in skip) or not os.path.exists(bad_cycles_filename):
if (TrimSteps.censor in skip) or bad_cycles_filename is None or not os.path.exists(bad_cycles_filename):
bad_cycles = []
logger.debug(f"Assuming no bad cycles for {original_fastq_filenames}")
else:
with open(bad_cycles_filename, 'r') as bad_cycles:
bad_cycles = list(csv.DictReader(bad_cycles))
Expand Down
Loading