# coding: utf-8
"""Python utils file for global functions."""
import warnings
from importlib import resources
from typing import List, Optional
import pandas as pd
import pyBiodatafuse.constants as Cons
from pyBiodatafuse.logging_config import get_logger
logger = get_logger(__name__)
[docs]
def get_identifier_of_interest(
bridgedb_df: pd.DataFrame, db_source: str, keep: Optional[List] = None
) -> pd.DataFrame:
"""Get identifier of interest from BridgeDb output file.
:param bridgedb_df: DataFrame containing the output from BridgeDb
:param db_source: identifier of interest from BridgeDB (e.g. "NCBI Gene")
:param keep: list of additional identifier sources to keep in the output
:returns: a DataFrame containing the identifiers of interest
"""
# Load identifier options
with resources.path("pyBiodatafuse.resources", "datasources.csv") as df:
identifier_options = pd.read_csv(df)["source"].tolist()
# Check if source is in identifier options
assert db_source in identifier_options, f"Source {db_source} is not in identifier options"
if keep is None:
keep = []
keep.append(db_source)
# Filter rows where "target.source" is specific datasource for eg. "NCBI Gene"
subset_df = bridgedb_df[bridgedb_df[Cons.TARGET_SOURCE_COL].isin(keep)]
return subset_df.reset_index(drop=True)
[docs]
def collapse_data_sources(
data_df: pd.DataFrame,
source_namespace: str,
target_df: pd.DataFrame,
common_cols: list,
target_specific_cols: list,
col_name: str,
) -> pd.DataFrame:
"""Collapse data sources into a single column.
:param data_df: BridegDb dataFrame containing idenfitiers from all sources
:param source_namespace: identifier of interest from BridgeDB (e.g. "NCBI Gene")
:param target_df: DataFrame containing data from a external source
:param common_cols: list of columns that are common to both dataframes and can be used to merge
:param target_specific_cols: list of columns that are specific to the external source
:param col_name: name of the new column to be created
:returns: a DataFrame containing the new data columns for a new resource
"""
data_df = data_df[data_df[Cons.TARGET_SOURCE_COL] == source_namespace]
if target_df.empty:
# If the target_df is empty, then return the data_df as is
data_df[col_name] = None
data_df.reset_index(inplace=True, drop=True)
return data_df
merged_df = pd.merge(data_df, target_df, on=common_cols, how="left")
# Create a new source column with values from selected columns as a list
merged_df[col_name] = merged_df[target_specific_cols].apply(lambda row: row.to_dict(), axis=1)
# Convert source column from string to a list of strings
merged_df[col_name] = merged_df[col_name].apply(lambda x: [x])
# Group by the first 4 columns and aggregate the values into a list
cols_of_interest = data_df.columns.tolist()
merged_df = merged_df.groupby(cols_of_interest)[col_name].sum().reset_index()
return merged_df
[docs]
def combine_sources(bridgedb_df: pd.DataFrame, df_list: List[pd.DataFrame]) -> pd.DataFrame:
"""Combine multiple dataframes into a single dataframe.
:param bridgedb_df: BridgeDb output.
:param df_list: list of dataframes to be combined.
:returns: a single dataframe containing from a list of dataframes
"""
m = bridgedb_df[
(bridgedb_df[Cons.TARGET_SOURCE_COL] == Cons.ENSEMBL)
| (bridgedb_df[Cons.TARGET_SOURCE_COL] == Cons.PUBCHEM_COMPOUND)
]
if m.empty: # Failed databases: KEGG
logger.warning(
f"Target source column does not contain any of the following: {Cons.ENSEMBL} or {Cons.PUBCHEM_COMPOUND}"
)
m = bridgedb_df
for df in df_list:
if df.empty:
continue
m = pd.merge(
m,
df.drop(
columns=[Cons.TARGET_SOURCE_COL, Cons.IDENTIFIER_SOURCE_COL, Cons.TARGET_COL]
+ [col for col in df.columns if col.endswith("_dea")],
errors="ignore",
),
on=Cons.IDENTIFIER_COL,
how="outer",
)
m = m.loc[:, ~m.columns.duplicated()] # remove duplicate columns
return m
[docs]
def combine_with_homologs(df: pd.DataFrame, homolog_dfs: list) -> pd.DataFrame:
"""Merge a DataFrame with a list of homolog dataframes.
:param df: An already combined df containing output of non-homolog annotators.
:param homolog_dfs: List of homolog dataframes to be combined.
:returns: Merged DataFrame with homolog-derived data added, clean of temp columns.
"""
df[Cons.ENSEMBL_HOMOLOGS] = df[Cons.ENSEMBL_HOMOLOGS].apply(
lambda x: [{"homolog": x["homolog"]}] if isinstance(x, dict) else x
)
exploded_df = df.explode(Cons.ENSEMBL_HOMOLOGS)
exploded_df["homolog"] = exploded_df[Cons.ENSEMBL_HOMOLOGS].apply(
lambda x: x["homolog"] if isinstance(x, dict) else None
)
exploded_df = exploded_df.rename(columns={"identifier": "original_identifier"})
for homolog_df in homolog_dfs:
if homolog_df is None or homolog_df.empty:
continue
last_col = homolog_df.columns[-1]
temp_df = homolog_df[["identifier", last_col]].copy()
temp_col = f"{last_col}_temp"
temp_df = temp_df.rename(columns={last_col: temp_col})
exploded_df = pd.merge(
exploded_df, temp_df, how="left", left_on="homolog", right_on="identifier"
)
if "identifier" in exploded_df.columns:
exploded_df.drop(columns=["identifier"], inplace=True)
for col in exploded_df.columns:
if col.endswith("_temp"):
base_col = col.replace("_temp", "")
if base_col in exploded_df.columns:
exploded_df[base_col] = exploded_df[base_col].combine_first(exploded_df[col])
else:
exploded_df[base_col] = exploded_df[col]
exploded_df.drop(
columns=[col for col in exploded_df.columns if col.endswith("_temp")], inplace=True
)
exploded_df.drop(columns=["homolog", "identifier_y"], errors="ignore", inplace=True)
exploded_df = exploded_df.rename(columns={"original_identifier": "identifier"})
exploded_df[Cons.ENSEMBL_HOMOLOGS] = exploded_df[Cons.ENSEMBL_HOMOLOGS].apply(
lambda x: (
[x]
if isinstance(x, dict) and "homolog" in x
else ([{"homolog": x}] if isinstance(x, str) and pd.notnull(x) else [])
)
)
exploded_df = exploded_df[~exploded_df["identifier.source"].isna()]
return exploded_df
[docs]
def check_columns_against_constants(
data_df: pd.DataFrame, output_dict: dict, check_values_in: list
):
"""Check if columns in the data source output DataFrame match expected types and values from a dictionary of constants.
:param data_df: DataFrame to check.
:param output_dict: Dictionary containing expected types for columns.
:param check_values_in: List of column names to check values against constants.
"""
for col, expected_type in output_dict.items():
if col not in data_df.columns:
warnings.warn(f"Column '{col}' is missing in the DataFrame.", stacklevel=2)
continue
if not data_df[col].dropna().apply(type).eq(expected_type).all():
warnings.warn(
f"Not all values in column '{col}' have the correct type '{expected_type}'.",
stacklevel=2,
)
if col in check_values_in:
exec(f"from pyBiodatafuse.constants import {col.upper()}") # noqa: S102
starts_with = locals()[col.upper()]
if not data_df[col].apply(type).eq(int).all():
prefixes = starts_with.split("|")
if (
not data_df[col]
.dropna()
.apply(
lambda value, prefixes=prefixes: any(
value.startswith(prefix) for prefix in prefixes
)
)
.all()
):
warnings.warn(
f"All values in column '{col}' do not start with '{starts_with}'.",
stacklevel=2,
)
[docs]
def give_annotator_warning(annotator_name: str) -> None:
"""Get the warning message for an annotator."""
warnings.warn(
f"The intermediate_df in {annotator_name} annotator should be checked, please create an issue on https://github.com/BioDataFuse/pyBiodatafuse/issues/.",
stacklevel=2,
)