Source code for cagecleaner.file_utils

import re
import logging
import subprocess
import gzip
import os
from pathlib import Path
from tqdm.contrib.concurrent import thread_map
from tqdm.contrib.logging import logging_redirect_tqdm
from Bio import SeqIO
from subprocess import CalledProcessError


LOG = logging.getLogger(__name__)


[docs] def remove_suffixes(string: str | Path) -> str: """ Split off any valid sequence file suffix either in the form of .<suffix> or .<suffix>.gz. Args: string (str | Path): string from which the suffix should be split off, if any. Returns: str: the string without the sequence file suffix """ pattern = r'\.(fasta|fna|fa|gb|gbk|gbff)(\.gz)?$' # valid extensions for fasta and genbank files, potentially gzipped return re.sub(pattern, '', str(string))
[docs] def is_fasta(file: str | Path) -> bool: """ Check whether the file ends in any of the accepted fasta suffices. Gzipped allowed. Args: file (str | Path): filename to check Returns: bool: Boolean result of the check """ pattern = r'\.(fasta|fna|fa)(\.gz)?$' if re.search(pattern, str(file)) is None: return False else: return True
[docs] def is_genbank(file: str | Path) -> bool: """ Check whether the file ends in any of the accepted genbank suffices. Gzipped allowed. Args: file (str | Path): filename to check Returns: bool: Boolean result of the check """ pattern = r'\.(gbff|gbk|gb)(\.gz)?$' if re.search(pattern, str(file)) is None: return False else: return True
def _extract_one_region(row: dict, margin: int, in_dir: Path, out_dir: Path, strict: bool) -> bool: """ Extract a genomic region from a fasta file. A genomic region is extracted from a fasta file parsed using BioPython's SeqIO. The original cluster genomic coordinates are used in the filename and the sequence ID of the extracted region to make them appear in MMseqs2 dereplication table downstream in the pipeline. Extracted region's sequence files are gzipped. Regions that extend beyond contig boundaries are treated as specified by the user (strict_regions flag). When strict_regions is enabled, regions at contig edges are skipped.When disabled (permissive mode), such regions are retained but clipped to the contig boundaries. Args: row (dict): dictionary form of a row from the binary table margin (int): size of the sequence margin to add to both sides of the cluster before extraction in_dir (Path): path of the input directory containing the genome assembly files out_dir (Path): path of the output directory to which the region files will be extracted. strict (bool): flag for contig edge behaviour (strict discarding or permissive clipping) Returns: bool: Whether the extracted region was at a contig edge Raises: FileNotFoundError: If the input file cannot be found. FileNotFoundError: If the output file cannot be opened. """ contig_end = False assembly_file = Path(row['assembly_file']) scaffold = row['Scaffold'] begin_cluster = row['Start'] begin = begin_cluster - margin end_cluster = row['End'] end = end_cluster + margin in_file = in_dir / assembly_file try: with gzip.open(in_file, "rt") as handle: # Parse sequences seqs = SeqIO.to_dict(SeqIO.parse(handle, 'fasta')) except FileNotFoundError as err: LOG.error(err) raise err # Select the contig to extract the region from scaffold_to_extract_from = seqs[scaffold] # Check for contig edges length = len(scaffold_to_extract_from) if end >= length or begin < 0: contig_end = True # If strict, skip regions that are at a contig edge if strict: return contig_end end = min(end, length) begin = max(0, begin) # Extract genomic region from assembly region = scaffold_to_extract_from[begin:end] # Write in a new compressed fasta file, using the original cluster coordinates as sequence ID and filename region.id = '|'.join([scaffold_to_extract_from.id, str(begin_cluster), str(end_cluster)]) out_file = str(Path(out_dir / region.id)) + '.fasta.gz' try: with gzip.open(out_file, "wt") as out_handle: SeqIO.write(region, out_handle, "fasta") except FileNotFoundError as err: LOG.error(err) raise err return contig_end def _convert_one_genbank_to_fasta(input_output_paths: tuple[Path]) -> None: """ Convert a genbank file to a fasta file. Converts a genbank file to a fasta file using any2fasta. Gzips the new file. Args: input_output_paths (tuple[Path]): Tuple of the paths of the input genbank and the output fasta file Returns: None Raises: FileNotFoundError: If the output path cannot be opened. CalledProcessError: If the any2fasta command run failed. """ in_file, out_file = input_output_paths # Open the output file and redirect the output of any2fasta to it. try: with open(out_file, "w") as handle: try: # use -q for quiet mode, text=True because output is not in byte form. subprocess.run(['any2fasta', '-q', '-g', str(in_file)], stdout=handle, check=True, text=True) except CalledProcessError as err: LOG.error(err) raise err except FileNotFoundError as err: LOG.error(err) raise err with open(out_file, 'r') as handle: with gzip.open(out_file.with_suffix('.fasta.gz'), "wt") as compressed_handle: compressed_handle.writelines(handle) os.remove(out_file) return None
[docs] def convert_genbanks_to_fastas(in_dir: Path, out_dir: Path, workers: int = 1, no_progress: bool = False) -> None: """ Convert all genbank files in an input directory to fasta files. All genbank files in the input directory are converted to fasta files in parallel using any2fasta. Args: in_dir (Path): input directory containing the genbank files out_dir (Path): output directory where the new fasta files will be written workers (int): number of threads for parallellisation no_progress (bool): flag to disable showing a progress bar while converting Returns: None Raises: RuntimeError: If the input directory is empty. """ try: next(in_dir.iterdir()) except StopIteration: msg = "Input directory is empty!" LOG.error(msg) raise RuntimeError(msg) input_output_paths = [(i, out_dir / i.with_suffix('.fasta').name) for i in in_dir.iterdir()] LOG.info(f'Converting {len(input_output_paths)} Genbank genomes to Fasta format.') with logging_redirect_tqdm(loggers = [LOG]): thread_map(_convert_one_genbank_to_fasta, input_output_paths, max_workers = workers, leave = False, disable = no_progress) return None