#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from cagecleaner.remote_run import RemoteRun
from cagecleaner.region_run import RegionRun
from cagecleaner.communication import fetch_contig_lengths, download_regions
import logging
import warnings
import shutil
import pandas as pd
import numpy as np
from pathlib import Path
warnings.filterwarnings(action = "ignore", module = "Bio")
LOG = logging.getLogger(__name__)
[docs]
class RemoteRegionRun(RemoteRun, RegionRun):
"""
Subclass orchestrating the workflow for dereplication by region sequence similary using regions downloaded from NCBI.
This class combines remote region downloading functionality with local region dereplication.
It downloads genomic regions (with optional sequence margins) from NCBI based on scaffold IDs and
coordinates in a binary table, performs MMseqs2-based sequence dereplication on the downloaded regions,
and integrates dereplication results back into the binary table. Handles contig edge cases
where regions with sequence margins extend beyond the scaffold boundaries according to user-specified behavior
(keep but clip them (permissive), or discard them (strict)).
It links NCBI assembly accession IDs to the scaffold IDs in the binary table, downloads the
corresponding genome assemblies, maps the scaffold IDs back to the downloaded assemblies, performs
whole-genome dereplication using skDER, and integrates dereplication results back into
the binary table. Unmapped scaffolds (scaffolds for which no assemby file was found) are tracked
and reported separately.
Inherits from:
RemoteRun: Intermediary class providing common downloading configurations.
RegionRun: Intermediary class providing region dereplication utilities
"""
def __init__(self, parsed_args):
"""
Initialise a RemoteRegionRun instance.
Calls the parent class constructor to set up common configuration, temporary directories,
and logging infrastructure.
Args:
parsed_args (dict): Parsed and validated command-line arguments
Returns:
None
"""
super().__init__(parsed_args)
return None
[docs]
def fetch_regions(self) -> None:
"""
Fetch genomic regions from NCBI based on scaffold coordinates with optional sequence margins.
Extracts region coordinates from the binary table and applies user-specified margins
(upstream and downstream sequence extensions). Handles regions that extend beyond contig boundaries
by either discarding them (strict mode) or clipping them (permissive mode). Fetches contig
lengths from NCBI using Entrez utilities. Logs statistics about discarded or clipped regions.
Downloads the regions.
Mutates:
self.binary_df (pd.DataFrame): Adds 'Contig_length' column from NCBI data.
self.DEREP_IN_DIR: Populates folder with downloaded gzipped genomic region FASTA files.
Returns:
None
"""
## Add sequence margins
regions = self.binary_df[['Scaffold', 'Start', 'End']].copy()
regions['Start'] -= self.margin
regions['End'] += self.margin
## First fetch the length of each contig
contig_ids = regions['Scaffold'].to_list()
lengths_df = fetch_contig_lengths(contig_ids)
regions = regions.merge(lengths_df, on = "Scaffold", how = 'inner') # Discard the ones without length
# Stop if we don't have any contig left
if lengths_df.empty:
msg = "No length information retrieved for any contig!"
LOG.error(msg)
raise RuntimeError(msg)
# Report the ones without length
regions_without = regions.merge(lengths_df, on = "Scaffold", how = 'left', indicator = True)
regions_without = regions_without[regions_without['_merge'] == 'left_only'].drop(columns = ['_merge'])
if not regions_without.empty:
LOG.warning(f'Contigs for which no length could be retrieved: {" ".join(regions_without["Scaffold"].to_list())}')
LOG.warning('These contigs will not be downloaded anyway.')
# Add the contig length column to the extended binary for easy joining later
# when merging the dereplication status with the extended binary table
self.binary_df = self.binary_df.merge(lengths_df, on = 'Scaffold', how = 'left')
## Treat contig edges upstream (put at zero, or remove)
mask_up = regions['Start'] <= 0
if self.strict_regions:
regions = regions[~mask_up]
else:
regions['Start'] = regions['Start'].clip(lower = 1)
## Treat contig edges downstream
mask_down = regions['End'] > regions['Contig_length']
if self.strict_regions:
regions = regions[~mask_down]
else:
regions['End'] = regions['End'].clip(upper = regions['Contig_length'])
## Some stats
diff_upstream = mask_up.sum()
diff_downstream = mask_down.sum()
diff = diff_upstream + diff_downstream
LOG.info(f'{diff} regions were at a contig end.')
if self.strict_regions:
LOG.info('These regions will not be downloaded in strict mode.')
## Download the regions
download_regions(regions,
directory = self.DEREP_IN_DIR,
download_workers = self.download_workers,
no_progress = self.no_progress)
return None
[docs]
def join_dereplication_with_binary(self) -> None:
"""
Join MMseqs2 dereplication clustering results with the binary table.
Reads the dereplication clustering output file and parses region coordinates. Assigns
dereplication status ('dereplication_representative' or 'redundant') based on whether
each region's accession matches its representative. Handles four possible cases of region boundary
mismatches due to region boundary adjustments during preprocessing:
1. No contig edges: Exact coordinate match after removing margins.
2. Upstream edge: Match on scaffold and end coordinate (contig was clipped at the upstream edge).
3. Downstream edge: Match on scaffold and start coordinate (contig was clipped at the downstream edge).
4. Both edges: Interval containment where the original cluster region is within the dereplicated region
(contig was clipped at both edges).
Concatenates results from all cases and removes temporary helper columns. Sorts the final
table by representative ID and dereplication status.
Mutates:
self.binary_df (pd.DataFrame): Joined with dereplication results; adds 'representative'
and 'dereplication_status' columns; removes 'Contig_length'
and 'Region' columns.
Returns:
None
Raises:
FileNotFoundError: If the dereplication table cannot be read
RuntimeError: If the dereplication table is empty, or has empty values due to an invalid filename formatting.
RuntimeError: If the binary table is empty after joining with the dereplication table
Notes:
The Region temporary column in self.binary_df was added by joining with the MMseqs2 dereplication table.
The Contig_length temporary column in self.binary_df was added by fetch_regions when the sequence margins
were added on-the-fly when fetching the regions.
This is the workflow-specific implementation of the abstract method inherited from its grandparent class Run.
"""
# Read the MMseqs2 clustering table:
path_to_cluster_file: Path = self.DEREP_OUT_DIR / "derep_cluster.tsv"
# Convert to dataframe
try:
derep_df: pd.DataFrame = pd.read_table(path_to_cluster_file,
names = ['representative', 'Region'],
header = None)
except FileNotFoundError as err:
LOG.critical('f{err}')
raise err
# Parse the dereplicated region coordinates (which includes the margin)
derep_df[['Scaffold', 'Range']] = derep_df['Region'].str.split(pat = ":", expand = True)
derep_df[['Start', 'End']] = derep_df['Range'].str.split(pat = "-", expand = True)
derep_df[['Start', 'End']] = derep_df[['Start', 'End']].astype(int)
derep_df.drop(columns = ['Range'], inplace = True)
if derep_df.isna().values.any():
msg = "Invalid filename formatting in dereplication table."
LOG.error(msg)
raise RuntimeError(msg)
if derep_df.empty:
msg = "Dereplication table is empty."
LOG.error(msg)
raise RuntimeError(msg)
# Add dereplication status column based on whether the scaffold's ID is the same as the representative's
# If there is a match between binary_df['assembly_file'] and derep_dfs['assembly'] (its index column), the representative and status is added.
derep_df['dereplication_status'] = derep_df['Region'] == derep_df['representative']
derep_df['dereplication_status'] = np.where(derep_df['dereplication_status'], 'dereplication_representative', 'redundant')
# Subtract the region margin again to merge properly with the extended binary table
binary_merged_two_edges = binary_merged_edge_up = binary_merged_edge_down = binary_merged_no_edges = pd.DataFrame()
# Case 1: no contig edges
derep_no_edges = derep_df.copy()
key = ['Scaffold', 'Start', 'End']
derep_no_edges['Start'] += self.margin
derep_no_edges['End'] -= self.margin
binary_merged_no_edges = self.binary_df.merge(derep_no_edges, on = key, how = 'inner')
remaining_binary = self.binary_df.merge(derep_no_edges[key], on = key, how = 'left', indicator = True)
remaining_binary = remaining_binary[remaining_binary['_merge'] == 'left_only'].drop(columns = ['_merge'])
# Case 2: contig edge upstream
if not remaining_binary.empty:
derep_edge_up = derep_df.copy()
key = ['Scaffold', 'End']
derep_edge_up['End'] -= self.margin
binary_merged_edge_up = remaining_binary.merge(derep_edge_up, on = key, how = 'inner')
remaining_binary = remaining_binary.merge(binary_merged_edge_up[key], on = key, how = 'left', indicator = True)
remaining_binary = remaining_binary[remaining_binary['_merge'] == 'left_only'].drop(columns = ['_merge'])
# Case 3: contig edge downstream
if not remaining_binary.empty:
derep_edge_down = derep_df.copy()
key = ['Scaffold', 'Start']
derep_edge_down['Start'] += self.margin
binary_merged_edge_down = remaining_binary.merge(derep_edge_down, on = key, how = 'inner')
remaining_binary = remaining_binary.merge(binary_merged_edge_down[key], on = key, how = 'left', indicator = True)
remaining_binary = remaining_binary[remaining_binary['_merge'] == 'left_only'].drop(columns = ['_merge'])
# Case 4: two contig edges
if not remaining_binary.empty:
derep_two_edges = derep_df.copy()
remaining_binary['Interval'] = [pd.Interval(left = crds.Start, right = crds.End, closed = 'both')
for crds in remaining_binary[['Start', 'End']].itertuples()]
derep_two_edges['Interval'] = [pd.Interval(left = drp.Start, right = drp.End, closed = 'both')
for drp in derep_two_edges[['Start', 'End']].itertuples()]
binary_merged_two_edges = remaining_binary.merge(derep_two_edges, on = 'Scaffold', how = 'inner',
suffixes = ['_bin', '_drep'])
binary_merged_two_edges = binary_merged_two_edges[binary_merged_two_edges.apply(lambda row:
row['Interval_bin']
in row['Interval_drep'],
axis = 1)]
# Concat the hits from all cases
binary_merged = pd.concat([binary_merged_no_edges, binary_merged_edge_up,
binary_merged_edge_down, binary_merged_two_edges])
self.binary_df = binary_merged
# Verify binary table is not empty
if self.binary_df.empty:
msg = "Binary table is empty after joining with the dereplication table!"
LOG.error(msg)
raise RuntimeError(msg)
# Sort by representative ID and then by dereplication status
self.binary_df = self.binary_df.sort_values(['representative', 'dereplication_status'])
self.binary_df.drop(columns = ['Contig_length', 'Region'], inplace = True)
LOG.info("Mapping done!")
return None
[docs]
def run(self):
"""
Execute the complete remote region dereplication pipeline.
Orchestrates all processing steps in sequence: fetches genomic regions from NCBI with
optional sequence margins and contig boundary handling, performs MMseqs2-based dereplication on
the downloaded regions, joins the dereplication results with the binary table, recovers hit
diversity according to user parameters, filters the original session file, and generates
output files. Cleans up the temporary directory upon completion.
Returns:
None
"""
LOG.info('--- STEP 1: Downloading genomic regions. ---')
self.fetch_regions() # Download genomic regions using ncbi_acc_download
LOG.info("--- STEP 2: Dereplicating ---")
self.dereplicate_regions() # Dereplicate.
LOG.info("--- STEP 3: Mapping dereplication clustering to binary table ---")
self.join_dereplication_with_binary() # Map each row in the binary table with its representative
LOG.info("--- STEP 4: Recovering hit diversity ---")
self.recover_hits() # Recover hits by content and score depending on user input
LOG.info("--- STEP 5: Filtering original session file. ---")
self.filter_session() # Filter the original session file, retaining only the dereplicated hits.
LOG.info("--- STEP 6: Generating output files ---")
self.generate_output() # Generate all output files.
# Remove the temporary directory:
LOG.info("Cleaning up temporary directory.")
shutil.rmtree(self.TEMP_DIR)
return None