Source code for pyBiodatafuse.annotators.pubchem

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Python file for queriying PubChem (https://pubchem.ncbi.nlm.nih.gov/)."""

import datetime
import os
import warnings
from string import Template
from typing import Any, Dict, Tuple

import pandas as pd
from SPARQLWrapper import JSON, SPARQLWrapper
from tqdm import tqdm

import pyBiodatafuse.constants as Cons
from pyBiodatafuse.utils import (
    check_columns_against_constants,
    collapse_data_sources,
    get_identifier_of_interest,
    give_annotator_warning,
)


def check_endpoint_pubchem() -> bool:
    """Check the availability of the IDSM endpoint.

    :returns: True if the endpoint is available, False otherwise.
    """
    query_string = """SELECT * WHERE {
        <http://rdf.ncbi.nlm.nih.gov/pubchem/taxonomy> ?p ?o
        }
        LIMIT 1
        """
    sparql = SPARQLWrapper(Cons.PUBCHEM_ENDPOINT)
    sparql.setOnlyConneg(True)
    sparql.setQuery(query_string)
    try:
        sparql.query()
        return True
    except BaseException:
        return False


# TODO - Add metadata function. Currently, no metadata is returned from IDSM servers
def check_version_pubchem():
    """Check the current version of the PubChem database."""
    pass


[docs] def get_protein_compound_screened(bridgedb_df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]: """Query PubChem for molecules screened on proteins as targets. :param bridgedb_df: BridgeDb output for creating the list of gene ids to query. :returns: a DataFrame containing the PubChem output and dictionary of the PubChem metadata. """ # Check if the IDSM endpoint is available api_available = check_endpoint_pubchem() if not api_available: warnings.warn( f"{Cons.PUBCHEM} endpoint is not available. Unable to retrieve data.", stacklevel=2 ) return pd.DataFrame(), {} # Record the start time start_time = datetime.datetime.now() data_df = get_identifier_of_interest(bridgedb_df, Cons.PUBCHEM_COMPOUND_INPUT_ID) protein_list_str = data_df[Cons.TARGET_COL].tolist() for i in range(len(protein_list_str)): protein_list_str[i] = '"' + protein_list_str[i] + '"' protein_list_str = list(set(protein_list_str)) query_protein_list = [] if len(protein_list_str) > 25: for i in range(0, len(protein_list_str), 25): tmp_list = protein_list_str[i : i + 25] query_protein_list.append(" ".join(f"{g}" for g in tmp_list)) else: query_protein_list.append(" ".join(f"{g}" for g in protein_list_str)) with open( os.path.dirname(__file__) + "/queries/pubchem-proteins-screend-molecule.rq", "r" ) as fin: sparql_query = fin.read() sparql = SPARQLWrapper(Cons.PUBCHEM_ENDPOINT) sparql.setReturnFormat(JSON) sparql.setOnlyConneg(True) query_count = 0 intermediate_df = pd.DataFrame() for protein_str in tqdm(query_protein_list, desc="Querying PubChem"): query_count += 1 sparql_query_template = Template(sparql_query) substit_dict = dict(protein_list=protein_str) sparql_query_template_sub = sparql_query_template.substitute(substit_dict) sparql.setQuery(sparql_query_template_sub) res = sparql.queryAndConvert() df = pd.DataFrame(res["results"]["bindings"]) for col in df: df[col] = df[col].map(lambda x: x["value"], na_action="ignore") intermediate_df = pd.concat([intermediate_df, df], ignore_index=True) # Record the end time end_time = datetime.datetime.now() """Metdata details""" # Get the current date and time current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Calculate the time elapsed time_elapsed = str(end_time - start_time) # Add the datasource, query, query time, and the date to metadata pubchem_metadata: Dict[str, Any] = { "datasource": Cons.PUBCHEM, "query": { "size": len(protein_list_str), "input_type": Cons.PUBCHEM_COMPOUND_INPUT_ID, "time": time_elapsed, "date": current_date, "url": Cons.PUBCHEM_ENDPOINT, }, } if intermediate_df.empty: warnings.warn( f"There is no annotation for your input list in {Cons.PUBCHEM}.", stacklevel=2, ) return pd.DataFrame(), pubchem_metadata intermediate_df.rename( columns={ "upProt": Cons.TARGET_COL, "assay": Cons.PUBCHEM_ASSAY_ID, "SMILES": Cons.PUBCHEM_SMILES, "InChI": Cons.PUBCHEM_INCHI, "sample_compound_name": Cons.PUBCHEM_COMPOUND_NAME, }, inplace=True, ) # Want to keep compounds tested across multiple targets intermediate_df["target_count"] = intermediate_df["target_count"].astype(int) mask = intermediate_df["target_count"] > 1 intermediate_df = intermediate_df[mask] intermediate_df.drop(columns=["target_count"], inplace=True) # Get identifiers from the URL intermediate_df[Cons.TARGET_COL] = intermediate_df[Cons.TARGET_COL].map( lambda x: x.split(Cons.PUBCHEM_UNIPROT_IRI)[-1] ) intermediate_df[Cons.PUBCHEM_ASSAY_ID] = intermediate_df[Cons.PUBCHEM_ASSAY_ID].map( lambda x: x.split(Cons.PUBCHEM_BIOASSAY_IRI)[-1] ) intermediate_df[Cons.PUBCHEM_ASSAY_ID] = intermediate_df[Cons.PUBCHEM_ASSAY_ID].apply( lambda x: x.replace("AID", "AID:") ) intermediate_df["outcome"] = intermediate_df["outcome"].map( lambda x: x.split(Cons.PUBCHEM_OUTCOME_IRI)[1] ) intermediate_df["compound_cid"] = intermediate_df["compound_cid"].map( lambda x: x.split(Cons.PUBCHEM_COMPOUND_IRI)[-1] ) intermediate_df["compound_cid"] = intermediate_df["compound_cid"].apply( lambda x: x.replace("CID", "CID:") ) intermediate_df["assay_type"] = intermediate_df["assay_type"].map(Cons.ASSAY_ENDPOINT_TYPES) intermediate_df.dropna(subset=["assay_type"], inplace=True) intermediate_df.drop_duplicates( subset=[Cons.TARGET_COL, Cons.PUBCHEM_ASSAY_ID, "compound_cid"], inplace=True ) # Check if all keys in df match the keys in OUTPUT_DICT check_columns_against_constants( data_df=intermediate_df, output_dict=Cons.PUBCHEM_COMPOUND_OUTPUT_DICT, check_values_in=[Cons.PUBCHEM_POSSIBLE_OUTCOMES, Cons.INCHI], ) # Merge the two DataFrames on the target column merged_df = collapse_data_sources( data_df=data_df, source_namespace=Cons.PUBCHEM_COMPOUND_INPUT_ID, target_df=intermediate_df, common_cols=[Cons.TARGET_COL], target_specific_cols=list(Cons.PUBCHEM_COMPOUND_OUTPUT_DICT.keys()), col_name=Cons.PUBCHEM_COMPOUND_ASSAYS_COL, ) """Update metadata""" # Calculate the number of new nodes num_new_nodes = intermediate_df["compound_cid"].nunique() # Calculate the number of new edges num_new_edges = intermediate_df.drop_duplicates(subset=[Cons.TARGET_COL, "compound_cid"]).shape[ 0 ] # Add the number of new nodes and edges to metadata pubchem_metadata[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes pubchem_metadata[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges return merged_df, pubchem_metadata
def int_response_value_types(resp_list: list, key_list: list): """Change values in response dictionaries to int to stay consistent with other Annotators. :param: resp_list: list of response dictionaries. :param: key_list: list of keys to change to int. :returns: resp_list with int values in response dictionaries on keys in key_list. """ for r in resp_list: for k in key_list: try: r[k] = int(r[k]) except ValueError: continue return resp_list