Source code for pyBiodatafuse.annotators.wikipathways

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python file for queriying Wikipathways SPARQL endpoint ()."""

import datetime
import os
import warnings
from string import Template
from typing import Any, Dict

import pandas as pd
from SPARQLWrapper import JSON, SPARQLWrapper
from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException
from tqdm import tqdm

import pyBiodatafuse.constants as Cons
from pyBiodatafuse.utils import (
    check_columns_against_constants,
    collapse_data_sources,
    get_identifier_of_interest,
    give_annotator_warning,
)


def check_endpoint_wikipathways() -> bool:
    """Check the availability of the WikiPathways SPARQL endpoint.

    :returns: True if the endpoint is available, False otherwise.
    """
    with open(os.path.dirname(__file__) + "/queries/wikipathways-metadata.rq", "r") as fin:
        sparql_query = fin.read()

    sparql = SPARQLWrapper(Cons.WIKIPATHWAYS_ENDPOINT)
    sparql.setReturnFormat(JSON)

    sparql.setQuery(sparql_query)

    try:
        sparql.queryAndConvert()
        return True
    except SPARQLWrapperException:
        return False


def get_version_wikipathways() -> dict:
    """Get version of WikiPathways.

    :returns: a dictionary containing the version information
    """
    with open(os.path.dirname(__file__) + "/queries/wikipathways-metadata.rq", "r") as fin:
        sparql_query = fin.read()

    sparql = SPARQLWrapper(Cons.WIKIPATHWAYS_ENDPOINT)
    sparql.setReturnFormat(JSON)

    sparql.setQuery(sparql_query)
    res = sparql.queryAndConvert()

    wikipathways_version = {"source_version": res["results"]["bindings"][0]["title"]["value"]}

    return wikipathways_version


[docs] def get_gene_wikipathways( bridgedb_df: pd.DataFrame, query_interactions: bool = False, organism: str = "Homo sapiens" ) -> pd.DataFrame: """Query WikiPathways for pathways associated with genes. :param bridgedb_df: BridgeDb output for creating the list of gene ids to query :param query_interactions: Set whether to retrieve gene part_of pathways relationships (False) or all molecular interactions (True). :param organism: The organism to query. Default is "Homo sapiens". :returns: a DataFrame containing the WikiPathways output and dictionary of the WikiPathways metadata. """ # Check if the endpoint is available api_available = check_endpoint_wikipathways() if not api_available: warnings.warn( f"{Cons.WIKIPATHWAYS} SPARQL endpoint is not available. Unable to retrieve data.", stacklevel=2, ) return pd.DataFrame(), {} data_df = get_identifier_of_interest(bridgedb_df, Cons.WIKIPATHWAYS_GENE_INPUT_ID) wikipathways_version = get_version_wikipathways() gene_list = data_df[Cons.TARGET_COL].tolist() gene_list = list(set(gene_list)) query_gene_lists = [] output_dict: Dict[Any, Any] if len(gene_list) > 25: for i in range(0, len(gene_list), 25): tmp_list = gene_list[i : i + 25] query_gene_lists.append(" ".join(f'"{g}"' for g in tmp_list)) else: if query_interactions: gene_list = [f"<https://identifiers.org/ncbigene/{g}>" for g in gene_list] query_gene_lists.append(" ".join(g for g in gene_list)) col_name = "" if query_interactions: file = os.path.join("queries", "wikipathways-mims.rq") output_dict = Cons.WIKIPATHWAYS_MOLECULAR_GENE_OUTPUT_DICT col_name = Cons.WIKIPATHWAYS_MOLECULAR_COL else: file = os.path.join("queries", "wikipathways-genes-pathways.rq") output_dict = Cons.WIKIPATHWAYS_PATHWAYS_OUTPUT_DICT col_name = Cons.WIKIPATHWAYS_PATHWAY_COL with open(os.path.join(os.path.dirname(__file__), file), "r", encoding="utf-8") as fin: sparql_query = fin.read() # Record the start time start_time = datetime.datetime.now() sparql = SPARQLWrapper(Cons.WIKIPATHWAYS_ENDPOINT) sparql.setReturnFormat(JSON) intermediate_df = pd.DataFrame() for gene_list_str in tqdm(query_gene_lists, desc="Querying WikiPathways"): sparql_query_template = Template(sparql_query) substit_dict = dict() if query_interactions: substit_dict = dict(gene_list=gene_list_str, organism_name=f'"{organism}"') if not query_interactions: substit_dict = dict(gene_list=gene_list_str) sparql_query_template_sub = sparql_query_template.substitute(substit_dict) sparql.setQuery(sparql_query_template_sub) result = sparql.queryAndConvert() res = result["results"]["bindings"] # get data df = pd.DataFrame(res) for col in df: df[col] = df[col].map(lambda x: x["value"], na_action="ignore") # Retrieve the expected columns from the SPARQL query results' "vars" (deal with empty optionals) expected_columns = result["head"]["vars"] # Ensure all expected columns are present in intermediate_df for col in expected_columns: if col not in intermediate_df.columns: intermediate_df[col] = None # Add missing columns with default value None # Concatenate the new results into the intermediate DataFrame intermediate_df = pd.concat([intermediate_df, df], ignore_index=True) # Record the end time end_time = datetime.datetime.now() """Metadata details""" # Get the current date and time current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Calculate the time elapsed time_elapsed = str(end_time - start_time) # Add the datasource, query, query time, and the date to metadata wikipathways_metadata: Dict[str, Any] = { "datasource": Cons.WIKIPATHWAYS, "metadata": wikipathways_version, "query": { "size": len(gene_list), "input_type": Cons.WIKIPATHWAYS_GENE_INPUT_ID, "time": time_elapsed, "date": current_date, "url": Cons.WIKIPATHWAYS_ENDPOINT, }, } if Cons.WIKIPATHWAYS_GENE_ID not in intermediate_df.columns: warnings.warn( f"There is no annotation for your input list in {Cons.WIKIPATHWAYS}.", stacklevel=2, ) return pd.DataFrame(), wikipathways_metadata # Fix identifiers for col in Cons.WIKIPATHWAY_ID_CLEANER_DICT: if col not in intermediate_df.columns: continue intermediate_df[col] = ( intermediate_df[col].str.removeprefix(Cons.WIKIPATHWAY_ID_CLEANER_DICT[col]).fillna("") ) # Organize the annotation results as an array of dictionaries intermediate_df.rename( columns={ Cons.WIKIPATHWAYS_GENE_ID: Cons.TARGET_COL, "pathway_gene_count": Cons.PATHWAY_GENE_COUNTS, }, inplace=True, ) if Cons.PATHWAY_GENE_COUNTS in intermediate_df.columns: intermediate_df[Cons.PATHWAY_GENE_COUNTS] = pd.to_numeric( intermediate_df[Cons.PATHWAY_GENE_COUNTS], errors="coerce" ) # Add namespaces for col, namespace in Cons.WIKIPATHWAY_NAMESPACE_DICT.items(): if col not in intermediate_df.columns: continue intermediate_df[col] = intermediate_df[col].apply( lambda x, ns=namespace: f"{ns}:{x}" if x else x ) if query_interactions: intermediate_df[Cons.PATHWAY_ID] = ( intermediate_df[Cons.PATHWAY_ID] .str.removeprefix("https://identifiers.org/wikipathways/") .str.replace(r"(WP\d+)_.*", r"\1", regex=True) .str.replace("WP", f"{Cons.WIKIPATHWAY}:", regex=False) .str.strip() ) else: intermediate_df[Cons.PATHWAY_ID] = intermediate_df[Cons.PATHWAY_ID].apply( lambda x: x.replace("WP", f"{Cons.WIKIPATHWAY}:WP") if "WP" in x else x ) # Check if all keys in df match the keys in OUTPUT_DICT check_columns_against_constants( data_df=intermediate_df, output_dict=output_dict, check_values_in=[Cons.WIKIPATHWAY], ) # Merge the two DataFrames on the target column merged_df = collapse_data_sources( data_df=data_df, source_namespace=Cons.WIKIPATHWAYS_GENE_INPUT_ID, target_df=intermediate_df, common_cols=[Cons.TARGET_COL], target_specific_cols=list(output_dict.keys()), col_name=col_name, ) """Update metadata""" # Calculate the number of new nodes num_new_nodes = intermediate_df[Cons.PATHWAY_ID].nunique() # Calculate the number of new edges num_new_edges = intermediate_df.drop_duplicates( subset=[Cons.TARGET_COL, Cons.PATHWAY_ID] ).shape[0] if not query_interactions: # Check the intermediate_df if num_new_edges != len(intermediate_df): give_annotator_warning(Cons.WIKIPATHWAYS) # Add the number of new nodes and edges to metadata wikipathways_metadata[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes wikipathways_metadata[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges return merged_df, wikipathways_metadata