Source code for cocoatree._pipeline
from . import msa
from . import statistics
from . import deconvolution
from . import __params
import pandas as pd
import numpy as np
[docs]
def perform_sca(sequences_id, sequences,
n_components=4,
freq_regul=__params.__freq_regularization_ref,
gap_threshold=0.4, seq_threshold=0.2,
coevolution_metric="SCA", correction=None):
"""
Perform statistical coupling analysis (SCA)
Parameters
----------
sequences : list of MSA sequences to filter
sequences_id : list of the MSA's sequence identifiers
n_components : int, default: 4
gap_threshold : float [0, 1], default: 0.4
max proportion of gaps tolerated
seq_threshold : maximum fraction of gaps per sequence (default 0.2)
coevolution_metric : {'SCA', 'NMI', 'MI}, optional, default: 'SCA'
which coevolution metric to use:
- SCA: the coevolution matrix from Rivoire et al
- MI: the mutual information
- NMI: the normalized mutual information
correction : {None, 'APC', 'entropy'}, default: None
which correction to use
Returns
-------
coevolution_matrix : np.ndarray (n_filtered_pos, n_filtered_pos)
results : pd.DataFrame with the following columns
- original_msa_pos : the original MSA position
- filtered_msa_pos : the position in the filtered MSA
and for each component:
- PCk: the projection of the residue onto the kth principal component
- ICk: the projeciton of the residue onto the kth independent
component
- sector_k: wherether the residue is found to be part of sector k
"""
# Start by filtering sequences
seq_kept, seq_kept_id, pos_kept = msa.filter_sequences(
sequences, sequences_id, gap_threshold=gap_threshold,
seq_threshold=seq_threshold)
# Compute sequence weights. This is mostly to avoid recomputing it at
# several step in the pipeline and thus speed things up a bit
seq_weights, _ = msa.compute_seq_weights(seq_kept)
# Compute co-evolution matrix
if coevolution_metric == "SCA":
coevol_matrix = statistics.pairwise.compute_sca_matrix(
seq_kept,
seq_weights=seq_weights,
freq_regul=freq_regul)
elif coevolution_metric == "MI":
coevol_matrix = statistics.pairwise.compute_mutual_information_matrix(
seq_kept, seq_weights=seq_weights, freq_regul=freq_regul,
normalize=False)
elif coevolution_metric == "NMI":
coevol_matrix = statistics.pairwise.compute_mutual_information_matrix(
seq_kept, seq_weights=seq_weights, freq_regul=freq_regul)
else:
raise ValueError(
"Unknown 'coevol_metric' value. User provided"
f"{coevolution_metric}. Options are 'SCA', 'MI', 'NMI'")
# Compute correction on coevolution matrix
if correction is not None:
if correction == "APC":
_, coevol_matrix = statistics.pairwise.compute_apc(coevol_matrix)
elif correction == "entropy":
entropy_aa = statistics.position.compute_conservation(
seq_kept,
seq_weights=seq_weights)
coevol_matrix = statistics.pairwise.compute_entropy_correction(
coevol_matrix, entropy_aa)
else:
raise ValueError(
"Unknown 'correction' value. User provided"
f"{correction}. Options are 'APC', 'entropy'")
# Now, compute deconvolution
principal_components = deconvolution.extract_principal_components(
coevol_matrix)
independent_components = deconvolution.extract_independent_components(
coevol_matrix, n_components=n_components)
sectors = deconvolution.extract_sectors(
independent_components, coevol_matrix)
# Now, map everything into a nice pandas DataFrame
pos_mapping, _ = msa.map_msa_positions(len(sequences[0]), pos_kept)
results = pd.DataFrame(
{"original_msa_pos": np.arange(len(sequences[0]), dtype=int),
"filtered_msa_pos": pos_mapping.values()})
# Add PCA and ICA results
for k in range(n_components):
results.loc[~results["filtered_msa_pos"].isna(),
"PC%d" % (k+1)] = principal_components[k]
results.loc[~results["filtered_msa_pos"].isna(),
"IC%d" % (k+1)] = independent_components[k]
results["sector_%d" % (k+1)] = np.isin(
results["filtered_msa_pos"],
sectors[k])
results.loc[~results["filtered_msa_pos"].isna(),
"sector_%d" % (k+1)] = np.isin(
results.loc[~results["filtered_msa_pos"].isna(),
"filtered_msa_pos"],
sectors[k])
return coevol_matrix, results