#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from cagecleaner.local_run import LocalRun
from cagecleaner.region_run import RegionRun
from cagecleaner.file_utils import _extract_one_region
import logging
import shutil
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm.contrib.concurrent import thread_map
from tqdm.contrib.logging import logging_redirect_tqdm
LOG = logging.getLogger(__name__)
[docs]
class LocalRegionRun(LocalRun, RegionRun):
"""
Subclass orchestrating the workflow for dereplication by region similarity using regions
extracted from local sources.
This class combines local file handling with region-based dereplication workflows.
It extracts genomic regions of interest (with optional sequence margins) from local
assembly files, performs MMseqs2-based sequence dereplication on the extracted regions,
and integrates dereplication results back into the binary table. Handles contig edge
cases where regions with sequence margins extend beyond scaffold boundaries according
to user-specified behavior (keep but clip them (permissive), or discard them (strict)).
Inherits from:
LocalRun: Intermediary class providing local file handling utilities.
RegionRun: Intermediary class providing region dereplication utilities.
"""
def __init__(self, parsed_args):
"""
Initialise a LocalRegionRun instance.
Calls the parent class constructor to set up common configuration, temporary directories,
and logging infrastructure.
Args:
parsed_args (dict): Parsed and validated command-line arguments
Returns:
None
"""
super().__init__(parsed_args)
return None
[docs]
def join_dereplication_with_binary(self) -> None:
"""
Join dereplication clustering results with the binary table.
Reads the MMseqs2 clustering output file and joins it with the binary table based on
scaffold ID and region coordinates (Start, End). This associates each extracted region
in the binary table with its dereplication status (representative or redundant) and
representative region identifier. The resulting table is sorted by representative and
dereplication status for clarity.
The clustering table is parsed to extract scaffold and coordinate information from
compound region identifiers. Dereplication status is determined by comparing each
region's identifier with its assigned representative.
Mutates:
self.binary_df (pd.DataFrame): Updated in-place with additional columns for
'representative' and 'dereplication_status', and sorted by these columns.
The temporary 'Region' column is removed after processing.
Returns:
None
Raises:
FileNotFoundError: If the dereplication table cannot be read
RuntimeError: If the dereplication table is empty, or has empty values due to an invalid filename formatting.
RuntimeError: If the binary table is empty after joining with the dereplication table
Notes:
The Region temporary column in self.binary_df was added by joining with the MMseqs2 dereplication table.
This is the workflow-specific implementation of the abstract method inherited from its grandparent class Run.
"""
# Parse the MMseqs clustering table
path_to_cluster_file: Path = self.DEREP_OUT_DIR / "derep_cluster.tsv"
try:
derep_df: pd.DataFrame = pd.read_table(path_to_cluster_file,
names = ['representative', 'Region'],
header = None
)
except FileNotFoundError as err:
LOG.critical(f'{err}')
raise err
# Determine dereplication status
derep_df[['Scaffold', 'Start', 'End']] = derep_df['Region'].str.split(pat = "|", expand = True)
derep_df[['Start', 'End']] = derep_df[['Start', 'End']].astype(int)
derep_df['dereplication_status'] = derep_df['Region'] == derep_df['representative']
derep_df['dereplication_status'] = np.where(derep_df['dereplication_status'], 'dereplication_representative', 'redundant')
if derep_df.isna().values.any():
msg = "Invalid filename formatting in dereplication table."
LOG.error(msg)
raise RuntimeError(msg)
if derep_df.empty:
msg = "Dereplication table is empty."
LOG.error(msg)
raise RuntimeError(msg)
# Add dereplication status columns to binary table by joining with the clustering table
LOG.debug("Joining MMseqs2 clustering table and cblaster binary table.")
self.binary_df = self.binary_df.merge(derep_df, on = ["Scaffold", 'Start', 'End'])
self.binary_df.drop(columns = ['Region'], inplace = True)
# Verify binary table is not empty
if self.binary_df.empty:
msg = "Binary table is empty after joining with the dereplication table!"
LOG.error(msg)
raise RuntimeError(msg)
# Sort by representative ID and then by dereplication status
self.binary_df = self.binary_df.sort_values(['representative', 'dereplication_status'])
LOG.info("Mapping done!")
return None
[docs]
def run(self):
"""
Execute the complete local region-based dereplication pipeline.
Orchestrates all processing steps in sequence: stages local genome assemblies for region extraction,
extracts genomic regions of interest from the staged genomes, runs MMseqs2 dereplication on the
extracted regions, joins the dereplication clustering results with the binary table, recovers hit
diversity information from the dereplication output, filters the original session based on dereplication
results, and generates final output files with dereplication metadata.
Cleans up temporary working directories upon completion.
Returns:
None
"""
LOG.info("--- STEP 1: Staging genomes for dereplication. ---")
self.prepare_genomes()
LOG.info("--- STEP 2: Extracting genomic regions. ---")
self.extract_regions()
LOG.info("--- STEP 3: Dereplicating. ---")
self.dereplicate_regions()
LOG.info("--- STEP 4: Mapping dereplication output to binary table. ---")
self.join_dereplication_with_binary()
LOG.info("--- STEP 5: Recovering hit diversity. ---")
self.recover_hits()
LOG.info("--- STEP 6: Filtering session file. ---")
self.filter_session()
LOG.info("--- STEP 7: Generating output files")
self.generate_output()
# Remove the temporary directory:
LOG.info("Cleaning up temporary directory.")
shutil.rmtree(self.TEMP_DIR)
return None