#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Python file for querying the AOP Wiki RDF SPARQL endpoint.
This module provides functionality to query the AOP Wiki RDF SPARQL endpoint for
Adverse Outcome Pathways (AOPs) associated with genes and compounds.
"""
import datetime
import os
import warnings
from string import Template
from typing import Any, Dict, Tuple
import pandas as pd
from SPARQLWrapper import JSON, SPARQLWrapper
from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException
from tqdm import tqdm
import pyBiodatafuse.constants as Cons
from pyBiodatafuse.logging_config import get_logger
from pyBiodatafuse.utils import (
check_columns_against_constants,
collapse_data_sources,
get_identifier_of_interest,
give_annotator_warning,
)
# Pre-requisite:
QUERY_LIMIT = 25
QUERY_COMPOUND = os.path.join(os.path.dirname(__file__), "queries", "aopwiki-compound.rq")
QUERY_COMPOUND_SIMPLE = os.path.join(
os.path.dirname(__file__), "queries", "aopwiki-compound-simple.rq"
)
QUERY_GENE = os.path.join(os.path.dirname(__file__), "queries", "aopwiki-gene.rq")
QUERY_GENE_SIMPLE = os.path.join(os.path.dirname(__file__), "queries", "aopwiki-gene-simple.rq")
logger = get_logger(__name__)
def read_sparql_file(file_path: str) -> str:
"""Read a SPARQL query file.
:param file_path: the path to the SPARQL query file
:returns: the content of the SPARQL query file
"""
with open(file_path, "r") as fin:
sparql_query = fin.read()
return sparql_query
def check_endpoint_aopwiki() -> bool:
"""Check the availability of the AOP Wiki SPARQL endpoint.
:returns: True if the endpoint is available, False otherwise.
"""
try:
sparql = SPARQLWrapper(Cons.AOPWIKI_ENDPOINT)
sparql.setReturnFormat(JSON)
sparql.setQuery("SELECT * WHERE { ?s ?p ?o } LIMIT 1")
sparql.queryAndConvert()
return True
except SPARQLWrapperException:
return False
[docs]
def get_aops_gene(bridgedb_df: pd.DataFrame, pathway: bool = False) -> Tuple[pd.DataFrame, dict]:
"""Query for AOPs associated with genes from AOP Wiki RDF.
:param bridgedb_df: BridgeDb output for creating the list of gene ids to query
:param pathway: if True, retrieve full pathway information including upstream/downstream key events.
If False (default), retrieve simplified AOP information.
:returns: a DataFrame containing the AOP Wiki RDF output and dictionary of the AOP Wiki RDF metadata
"""
# Check if the endpoint is available
if not check_endpoint_aopwiki():
warnings.warn(
f"{Cons.AOPWIKIRDF} SPARQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
# Record the start time
start_time = datetime.datetime.now()
# Step 1: Identifier mapping and harmonization
data_df = get_identifier_of_interest(bridgedb_df, Cons.AOPWIKI_GENE_INPUT_ID)
gene_list = data_df[Cons.TARGET_COL].unique().tolist()
# Step 2: Prepare target list and batch queries
query_batches = [
" ".join(f'"{target}"' for target in gene_list[i : i + QUERY_LIMIT])
for i in range(0, len(gene_list), QUERY_LIMIT)
]
# Step 3: Run SPARQL queries
sparql = SPARQLWrapper(Cons.AOPWIKI_ENDPOINT)
sparql.setReturnFormat(JSON)
# Select query file based on pathway parameter
query_file = QUERY_GENE if pathway else QUERY_GENE_SIMPLE
intermediate_df = pd.DataFrame()
for batch in tqdm(query_batches, desc=f"Querying {Cons.AOPWIKIRDF} for genes"):
# Prepare the substitution dictionary
substit_dict = {
"genes": str(['"' + target.replace('"', "") + '"' for target in batch.split(" ")])
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
}
# Load and substitute the query template
with open(query_file, "r") as f:
query = Template(f.read()).substitute(substit_dict)
# Execute the query and process results
sparql.setQuery(query)
res = sparql.queryAndConvert()
res_df = pd.DataFrame(
[
{k: (v["value"] if "value" in v else "") for k, v in item.items()}
for item in res["results"]["bindings"]
]
)
# Retrieve the expected columns from the SPARQL query results' "vars"
expected_columns = res["head"]["vars"]
# Ensure all expected columns are present in intermediate_df
for col in expected_columns:
if col not in intermediate_df.columns:
intermediate_df[col] = None
# Concatenate the new results into the intermediate DataFrame
intermediate_df = pd.concat([intermediate_df, res_df], ignore_index=True)
# Record the end time
end_time = datetime.datetime.now()
# Step 4: Check if the query returned any results
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.AOPWIKIRDF}.",
stacklevel=2,
)
return pd.DataFrame(), {}
# Step 5: Clean and process the results
input_col = Cons.AOPWIKI_GENE_INPUT_ID
output_dict = Cons.AOPWIKI_GENE_OUTPUT_DICT if pathway else Cons.AOPWIKI_GENE_OUTPUT_DICT_SIMPLE
for key in output_dict.keys():
if key in intermediate_df.columns:
intermediate_df[key] = intermediate_df[key].apply(
lambda x: x.split("/")[-1] if isinstance(x, str) and "http" in x else x
)
intermediate_df.rename(columns={input_col: Cons.TARGET_COL}, inplace=True)
intermediate_df = intermediate_df.drop_duplicates()
# Step 6: Generate metadata
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time_elapsed = str(end_time - start_time)
metadata_dict: Dict[str, Any] = {
"datasource": Cons.AOPWIKIRDF,
"query": {
"size": len(gene_list),
"input_type": Cons.AOPWIKI_GENE_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.AOPWIKI_ENDPOINT,
"pathway": pathway,
},
}
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=output_dict,
check_values_in=[Cons.AOPWIKI_VALUE_CHECK_LIST],
)
# Step 7: Integrate into main dataframe
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.AOPWIKI_GENE_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=list(output_dict.keys()),
col_name=Cons.AOPWIKI_GENE_COL,
)
# Calculate the number of new nodes and edges
num_new_nodes = intermediate_df[Cons.TARGET_COL].nunique()
num_new_edges = intermediate_df.shape[0]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.AOPWIKIRDF)
# Add the number of new nodes and edges to metadata
metadata_dict[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
metadata_dict[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, metadata_dict
[docs]
def get_aops_compound(
bridgedb_df: pd.DataFrame, pathway: bool = False
) -> Tuple[pd.DataFrame, dict]:
"""Query for AOPs associated with compounds from AOP Wiki RDF.
:param bridgedb_df: BridgeDb output for creating the list of compound ids to query
:param pathway: if True, retrieve full pathway information including upstream/downstream key events.
If False (default), retrieve simplified AOP information.
:returns: a DataFrame containing the AOP Wiki RDF output and dictionary of the AOP Wiki RDF metadata
"""
# Check if the endpoint is available
if not check_endpoint_aopwiki():
warnings.warn(
f"{Cons.AOPWIKIRDF} SPARQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
# Record the start time
start_time = datetime.datetime.now()
# Step 1: Identifier mapping and harmonization
data_df = get_identifier_of_interest(bridgedb_df, Cons.AOPWIKI_COMPOUND_INPUT_ID)
compound_list = data_df[Cons.TARGET_COL].unique().tolist()
# Step 2: Prepare target list and batch queries
query_batches = [
" ".join(f'"{target}"' for target in compound_list[i : i + QUERY_LIMIT])
for i in range(0, len(compound_list), QUERY_LIMIT)
]
# Step 3: Run SPARQL queries
sparql = SPARQLWrapper(Cons.AOPWIKI_ENDPOINT)
sparql.setReturnFormat(JSON)
# Select query file based on pathway parameter
query_file = QUERY_COMPOUND if pathway else QUERY_COMPOUND_SIMPLE
intermediate_df = pd.DataFrame()
for batch in tqdm(query_batches, desc=f"Querying {Cons.AOPWIKIRDF} for compounds"):
# Prepare the substitution dictionary
substit_dict = {
"compounds": str(
[
"<https://identifiers.org/pubchem.compound/" + target.replace('"', "") + ">"
for target in batch.split(" ")
]
)
.replace("[", "")
.replace("]", "")
.replace("'", "")
.replace(",", "")
}
# Load and substitute the query template
with open(query_file, "r") as f:
query = Template(f.read()).substitute(substit_dict)
# Execute the query and process results
sparql.setQuery(query)
res = sparql.queryAndConvert()
res_df = pd.DataFrame(
[
{k: (v["value"] if "value" in v else "") for k, v in item.items()}
for item in res["results"]["bindings"]
]
)
# Retrieve the expected columns from the SPARQL query results' "vars"
expected_columns = res["head"]["vars"]
# Ensure all expected columns are present in intermediate_df
for col in expected_columns:
if col not in intermediate_df.columns:
intermediate_df[col] = None
# Concatenate the new results into the intermediate DataFrame
intermediate_df = pd.concat([intermediate_df, res_df], ignore_index=True)
# Record the end time
end_time = datetime.datetime.now()
# Step 4: Check if the query returned any results
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.AOPWIKI_COMPOUND_COL}.",
stacklevel=2,
)
return pd.DataFrame(), {}
# Step 5: Clean and process the results
input_col = "pubchem_compound"
output_dict = (
Cons.AOPWIKI_COMPOUND_OUTPUT_DICT if pathway else Cons.AOPWIKI_COMPOUND_OUTPUT_DICT_SIMPLE
)
# Clean URLs in output columns
for key in output_dict.keys():
if key in intermediate_df.columns:
intermediate_df[key] = intermediate_df[key].apply(
lambda x: x.split("/")[-1] if isinstance(x, str) and "http" in x else x
)
# Clean the compound ID column
if input_col in intermediate_df.columns:
intermediate_df[input_col] = intermediate_df[input_col].apply(lambda x: x.split("/")[-1])
intermediate_df.rename(columns={input_col: Cons.TARGET_COL}, inplace=True)
intermediate_df = intermediate_df.drop_duplicates()
# Step 6: Generate metadata
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time_elapsed = str(end_time - start_time)
metadata_dict: Dict[str, Any] = {
"datasource": Cons.AOPWIKIRDF,
"query": {
"size": len(compound_list),
"input_type": Cons.AOPWIKI_COMPOUND_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.AOPWIKI_ENDPOINT,
"pathway": pathway,
},
}
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=output_dict,
check_values_in=[],
)
# Step 7: Integrate into main dataframe
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.AOPWIKI_COMPOUND_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=list(output_dict.keys()),
col_name=Cons.AOPWIKI_COMPOUND_COL,
)
# Calculate the number of new nodes and edges
num_new_nodes = intermediate_df[Cons.TARGET_COL].nunique()
num_new_edges = intermediate_df.drop_duplicates(subset=[Cons.TARGET_COL]).shape[0]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.AOPWIKI_COMPOUND_COL)
# Add the number of new nodes and edges to metadata
metadata_dict[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
metadata_dict[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, metadata_dict
[docs]
def get_aops(
bridgedb_df: pd.DataFrame,
pathway: bool = False,
) -> Tuple[pd.DataFrame, dict]:
"""Query for AOPs associated with genes or compounds.
:param bridgedb_df: BridgeDb output for creating the list of gene/compound ids to query
:param pathway: if True, retrieve full pathway information including upstream/downstream key events.
If False (default), retrieve simplified AOP information.
:raises ValueError: if the input identifiers are not recognized or if they are not admitted gene or compound identifiers
:returns: a DataFrame containing the AOP Wiki RDF output and dictionary of the AOP Wiki RDF metadata
"""
# Find the matching type based on which input_identifier we find
if (
Cons.AOPWIKI_GENE_INPUT_ID in bridgedb_df["identifier.source"].values
or Cons.AOPWIKI_GENE_INPUT_ID in bridgedb_df["target.source"].values
):
return get_aops_gene(bridgedb_df, pathway=pathway)
elif (
Cons.AOPWIKI_COMPOUND_INPUT_ID in bridgedb_df["identifier.source"].values
or Cons.AOPWIKI_COMPOUND_INPUT_ID in bridgedb_df["target.source"].values
):
return get_aops_compound(bridgedb_df, pathway=pathway)
else:
raise ValueError(
f"Input identifiers must be either '{Cons.AOPWIKI_GENE_INPUT_ID}' or '{Cons.AOPWIKI_COMPOUND_INPUT_ID}'"
)