# coding: utf-8
"""Python file for querying the OpenTargets database (https://www.opentargets.org/)."""
import datetime
import warnings
from typing import Dict, Literal, Tuple
import numpy as np
import pandas as pd
import requests
from tqdm import tqdm
import pyBiodatafuse.constants as Cons
from pyBiodatafuse import id_mapper
from pyBiodatafuse.utils import (
check_columns_against_constants,
collapse_data_sources,
get_identifier_of_interest,
give_annotator_warning,
)
def check_endpoint_opentargets() -> bool:
"""Check the availability of the OpenTargets API endpoint.
:returns: a dictionary containing the version information
"""
query = """
query MetaInfo {
meta{
name
apiVersion{
x
y
z
}
dataVersion{
year
month
}
}
}"""
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query}).json()
if not r["data"]:
return False
return True
def get_version_opentargets() -> dict:
"""Get version of OpenTargets API.
:returns: a dictionary containing the version information
"""
query = """
query MetaInfo {
meta{
name
apiVersion{
x
y
z
}
dataVersion{
year
month
}
}
}"""
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query}).json()
year = r["data"]["meta"]["dataVersion"]["year"]
month = r["data"]["meta"]["dataVersion"]["month"]
api_version_x = r["data"]["meta"]["apiVersion"]["x"]
api_version_y = r["data"]["meta"]["apiVersion"]["y"]
api_version_z = r["data"]["meta"]["apiVersion"]["z"]
metadata = {
Cons.DATASOURCE: r["data"]["meta"]["name"],
Cons.METADATA: {
"source_version": {
"apiVersion": f"{api_version_x}.{api_version_y}.{api_version_z}",
},
"data_version": f"{year}-{month}",
},
}
return metadata
[docs]
def get_gene_go_process(
bridgedb_df: pd.DataFrame,
) -> Tuple[pd.DataFrame, dict]:
"""Get information about GO pathways associated with a genes of interest.
:param bridgedb_df: BridgeDb output for creating the list of gene ids to query
:returns: a DataFrame containing the OpenTargets output and dictionary of the query metadata.
"""
# Check if the API is available
api_available = check_endpoint_opentargets()
if not api_available:
warnings.warn(
f"{Cons.OPENTARGETS} GraphQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
data_df = get_identifier_of_interest(bridgedb_df, Cons.OPENTARGETS_GENE_INPUT_ID)
gene_ids = data_df[Cons.TARGET_COL].tolist()
# Record the start time
opentargets_version = get_version_opentargets()
start_time = datetime.datetime.now()
query_string = """
query targetPathways {
targets (ensemblIds: $ids){
id
geneOntology {
term {
id
name
}
aspect
}
}
}
"""
query_string = query_string.replace("$ids", str(gene_ids).replace("'", '"'))
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query_string}).json()
# Record the end time
end_time = datetime.datetime.now()
"""Metadata details"""
# Get the current date and time
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate the time elapsed
time_elapsed = str(end_time - start_time)
# Add version, datasource, query, query time, and the date to metadata
opentargets_version["query"] = {
"size": len(gene_ids),
"input_type": Cons.OPENTARGETS_GENE_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.OPENTARGETS_ENDPOINT,
}
# Generate the OpenTargets DataFrame
intermediate_df = pd.DataFrame()
for gene in tqdm(r["data"]["targets"], desc="Processing gene annotation"):
terms = [i["term"] for i in gene["geneOntology"]]
types = [i["aspect"] for i in gene["geneOntology"]]
path_df = pd.DataFrame(terms)
path_df[Cons.OPENTARGETS_GO_TYPE] = types
path_df = path_df.drop_duplicates()
path_df[Cons.TARGET_COL] = gene["id"]
intermediate_df = pd.concat([intermediate_df, path_df], ignore_index=True)
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.OPENTARGETS_GO_COL}.",
stacklevel=2,
)
return pd.DataFrame(), opentargets_version
intermediate_df.rename(
columns={
"id": Cons.OPENTARGETS_GO_ID,
"name": Cons.OPENTARGETS_GO_NAME,
},
inplace=True,
)
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=Cons.OPENTARGETS_GO_OUTPUT_DICT,
check_values_in=[Cons.GO],
)
# Merge the two DataFrames on the target column
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.OPENTARGETS_GENE_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=list(Cons.OPENTARGETS_GO_OUTPUT_DICT.keys()),
col_name=Cons.OPENTARGETS_GO_COL,
)
"""Update metadata"""
# Calculate the number of new nodes
num_new_nodes = intermediate_df[Cons.OPENTARGETS_GO_ID].nunique()
# Calculate the number of new edges
num_new_edges = intermediate_df.drop_duplicates(
subset=[Cons.TARGET_COL, Cons.OPENTARGETS_GO_ID]
).shape[0]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.OPENTARGETS_GO_COL)
# Add the number of new nodes and edges to metadata
opentargets_version[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
opentargets_version[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, opentargets_version
[docs]
def get_gene_reactome_pathways(
bridgedb_df: pd.DataFrame,
) -> Tuple[pd.DataFrame, dict]:
"""Get information about Reactome pathways associated with a gene.
:param bridgedb_df: BridgeDb output for creating the list of gene ids to query
:returns: a DataFrame containing the OpenTargets output and dictionary of the query metadata.
"""
# Check if the API is available
api_available = check_endpoint_opentargets()
if not api_available:
warnings.warn(
f"{Cons.OPENTARGETS} GraphQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
data_df = get_identifier_of_interest(bridgedb_df, Cons.OPENTARGETS_GENE_INPUT_ID)
gene_ids = data_df[Cons.TARGET_COL].tolist()
# Record the start time
opentargets_version = get_version_opentargets()
start_time = datetime.datetime.now()
query_string = """
query targetPathways {
targets (ensemblIds: $ids){
id
pathways {
pathway
pathwayId
}
}
}
"""
query_string = query_string.replace("$ids", str(gene_ids).replace("'", '"'))
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query_string}).json()
# Record the end time
end_time = datetime.datetime.now()
"""Metdata details"""
# Get the current date and time
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate the time elapsed
time_elapsed = str(end_time - start_time)
# Add version, datasource, query, query time, and the date to metadata
opentargets_version["query"] = {
"size": len(gene_ids),
"input_type": Cons.OPENTARGETS_GENE_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.OPENTARGETS_ENDPOINT,
}
# Generate the OpenTargets DataFrame
intermediate_df = pd.DataFrame()
for gene in tqdm(r["data"]["targets"], desc="Processing gene-pathway interactions"):
path_df = pd.DataFrame(gene[Cons.PATHWAYS])
path_df = path_df.drop_duplicates()
path_df[Cons.TARGET_COL] = gene["id"]
intermediate_df = pd.concat([intermediate_df, path_df], ignore_index=True)
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.OPENTARGETS_REACTOME_COL}.",
stacklevel=2,
)
return pd.DataFrame(), opentargets_version
intermediate_df.rename(
columns={
"pathway": Cons.PATHWAY_LABEL,
"pathwayId": Cons.PATHWAY_ID,
},
inplace=True,
)
# Fixing the pathway_id
new_ids = []
for idx in intermediate_df[Cons.PATHWAY_ID]:
if idx.startswith("R-"):
new_ids.append(f"{Cons.REACTOME}:{idx}")
elif idx.startswith("WP"):
new_ids.append(f"{Cons.WP}:{idx}")
else:
print(idx) # TODO: if this occures, then we need to check the data
new_ids.append(idx)
intermediate_df[Cons.PATHWAY_ID] = new_ids
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=Cons.OPENTARGETS_REACTOME_OUTPUT_DICT,
check_values_in=[Cons.OPENTARGETS_POSSIBLE_PATHWAY_IDS],
)
# Merge the two DataFrames on the target column
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.OPENTARGETS_GENE_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=[Cons.PATHWAY_LABEL, Cons.PATHWAY_ID],
col_name=Cons.OPENTARGETS_REACTOME_COL,
)
"""Update metadata"""
# Calculate the number of new nodes
num_new_nodes = intermediate_df[Cons.PATHWAY_ID].nunique()
# Calculate the number of new edges
num_new_edges = intermediate_df.drop_duplicates(
subset=[Cons.TARGET_COL, Cons.PATHWAY_ID]
).shape[0]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.OPENTARGETS_REACTOME_COL)
# Add the number of new nodes and edges to metadata
opentargets_version[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
opentargets_version[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, opentargets_version
def _process_compounds(
drug_df: pd.DataFrame, target_id: str, target_type: Literal["gene", "disease"]
) -> pd.DataFrame:
"""Process the compounds data from OpenTargets."""
if target_type == "gene":
cols = Cons.OPENTARET_COMPOUND_COLS + ["mechanisms_of_action"]
else:
cols = Cons.OPENTARET_COMPOUND_COLS
drug_df[cols] = drug_df["drug"].apply(pd.Series)
drug_df[Cons.TARGET_COL] = target_id
drug_df[Cons.CHEMBL_ID] = drug_df[Cons.CHEMBL_ID].astype(str)
if target_type == "gene":
drug_df[Cons.OPENTARGETS_COMPOUND_RELATION] = drug_df["mechanismOfAction"].apply(
lambda x: "inhibits" if "antagonist" in x else "activates"
)
drug_df[Cons.DRUGBANK_ID] = drug_df["cross_references"].apply(
lambda x: (
next((ref["ids"][0] for ref in x if ref["source"] == "drugbank"), None)
if isinstance(x, list)
else None
)
)
drug_df[Cons.DRUGBANK_ID] = drug_df[Cons.DRUGBANK_ID].apply(
lambda x: f"{Cons.DRUGBANK}:{x}" if x else None
)
drug_df[[Cons.OPENTARGETS_ADVERSE_EFFECT_COUNT, Cons.OPENTARGETS_ADVERSE_EFFECT]] = (
drug_df.apply(
lambda row: (
pd.Series([row["adverse_events"]["count"], row["adverse_events"]["rows"]])
if isinstance(row["adverse_events"], dict)
else pd.Series([0, None])
),
axis=1,
)
)
drug_df[Cons.OPENTARGETS_ADVERSE_EFFECT_COUNT] = drug_df[
Cons.OPENTARGETS_ADVERSE_EFFECT_COUNT
].astype(int)
drug_df.drop(columns=["drug", "cross_references", "adverse_events"], inplace=True)
return drug_df
[docs]
def get_gene_compound_interactions(
bridgedb_df: pd.DataFrame,
cache_pubchem_cid: bool = True,
) -> Tuple[pd.DataFrame, dict]:
"""Get information about drugs associated with a genes of interest.
:param bridgedb_df: BridgeDb output for creating the list of gene ids to query
:param cache_pubchem_cid: whether to cache the PubChem CID for the ChEMBL ID
:returns: a DataFrame containing the OpenTargets output and dictionary of the query metadata.
"""
# Check if the API is available
api_available = check_endpoint_opentargets()
if not api_available:
warnings.warn(
f"{Cons.OPENTARGETS} GraphQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
data_df = get_identifier_of_interest(bridgedb_df, Cons.OPENTARGETS_GENE_INPUT_ID)
gene_ids = data_df[Cons.TARGET_COL].tolist()
# Record the start time
opentargets_version = get_version_opentargets()
start_time = datetime.datetime.now()
query_string = """
query targetDrugs {
targets (ensemblIds: $ids){
id
knownDrugs {
rows {
mechanismOfAction
drug {
id
name
isApproved
maximumClinicalTrialPhase
crossReferences {
source
ids
}
adverseEvents {
count
rows {
name
}
}
mechanismsOfAction {
uniqueActionTypes
uniqueTargetTypes
}
}
}
}
}
}
"""
query_string = query_string.replace("$ids", str(gene_ids).replace("'", '"'))
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query_string}).json()
# Record the end time
end_time = datetime.datetime.now()
"""Metadata details"""
# Get the current date and time
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate the time elapsed
time_elapsed = str(end_time - start_time)
# Add version, datasource, query, query time, and the date to metadata
opentargets_version["query"] = {
"size": len(gene_ids),
"input_type": Cons.OPENTARGETS_GENE_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.OPENTARGETS_ENDPOINT,
}
# Generate the OpenTargets DataFrame
intermediate_df = pd.DataFrame()
if r.get("data") and r.get("data").get("targets"):
for gene in tqdm(r["data"]["targets"], desc="Processing gene-drug interactions"):
if not gene["knownDrugs"]:
continue
drug_info = gene["knownDrugs"]["rows"]
drug_df = pd.DataFrame(drug_info)
if drug_df.empty:
continue
drug_df = _process_compounds(drug_df, gene["id"], "gene")
intermediate_df = pd.concat([intermediate_df, drug_df], ignore_index=True)
intermediate_df = intermediate_df.drop_duplicates(
subset=[
col
for col in intermediate_df.columns
if col not in [Cons.OPENTARGETS_ADVERSE_EFFECT, "mechanisms_of_action"]
]
)
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.OPENTARGETS_GENE_COMPOUND_COL}.",
stacklevel=2,
)
return pd.DataFrame(), opentargets_version
# Fixing chembl_id to pubchem_id
chembl_ids = intermediate_df[Cons.CHEMBL_ID].values.tolist()
mapped_df, _ = id_mapper.pubchem_xref(
identifiers=chembl_ids,
identifier_type="name",
cache_res=cache_pubchem_cid,
)
mapped_df = mapped_df[[Cons.IDENTIFIER_COL, Cons.TARGET_COL]]
mapped_dict = mapped_df.set_index(Cons.IDENTIFIER_COL).to_dict()[Cons.TARGET_COL]
intermediate_df["compound_cid"] = intermediate_df[Cons.CHEMBL_ID].map(mapped_dict)
intermediate_df[Cons.CHEMBL_ID] = f"{Cons.CHEMBL}:" + intermediate_df[Cons.CHEMBL_ID]
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=Cons.OPENTARGETS_COMPOUND_OUTPUT_DICT,
check_values_in=Cons.OPENTARGETS_COMPOUND_VALUE_CHECK_LIST,
)
# Merge the two DataFrames on the target column
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.OPENTARGETS_GENE_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=list(Cons.OPENTARGETS_COMPOUND_OUTPUT_DICT.keys()),
col_name=Cons.OPENTARGETS_GENE_COMPOUND_COL,
)
# Ensure the column is correctly assigned
if Cons.OPENTARGETS_GENE_COMPOUND_COL not in merged_df.columns:
merged_df[Cons.OPENTARGETS_GENE_COMPOUND_COL] = pd.Series(
[[] for _ in range(len(merged_df))]
)
"""Update metadata"""
# Calculate the number of new nodes
num_new_nodes = intermediate_df[Cons.CHEMBL_ID].nunique()
# Calculate the number of new edges
num_new_edges = intermediate_df.drop_duplicates(subset=[Cons.TARGET_COL, Cons.CHEMBL_ID]).shape[
0
]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.OPENTARGETS_GENE_COMPOUND_COL)
# Add the number of new nodes and edges to metadata
opentargets_version[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
opentargets_version[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, opentargets_version
[docs]
def get_disease_compound_interactions(
bridgedb_df: pd.DataFrame,
cache_pubchem_cid: bool = False,
) -> Tuple[pd.DataFrame, dict]:
"""Get information about drugs associated with diseases of interest.
:param bridgedb_df: BridgeDb output for creating the list of gene ids to query.
:param cache_pubchem_cid: If True, the PubChem CID will be cached for future use.
:returns: a DataFrame containing the OpenTargets output and dictionary of the query metadata.
"""
# Check if the API is available
api_available = check_endpoint_opentargets()
if not api_available:
warnings.warn(
f"{Cons.OPENTARGETS} GraphQL endpoint is not available. Unable to retrieve data.",
stacklevel=2,
)
return pd.DataFrame(), {}
if bridgedb_df.empty:
warnings.warn(
"There is no input.",
stacklevel=2,
)
return pd.DataFrame(), {}
data_df = bridgedb_df[bridgedb_df[Cons.TARGET_SOURCE_COL] == Cons.OPENTARGETS_DISEASE_INPUT_ID]
efo_ids = data_df[Cons.TARGET_COL].tolist()
# Record the start time
opentargets_version = get_version_opentargets()
start_time = datetime.datetime.now()
query_string = """
query DiseaseDrugs{
diseases (efoIds: $efoIds) {
id
name
knownDrugs {
rows {
drug {
id
name
isApproved
maximumClinicalTrialPhase
crossReferences {
ids
source
}
adverseEvents{
count
rows{
name
}
}
}
}
}
}
}"""
final_data = []
# query in batches of 25
for i in range(0, len(efo_ids), 25):
batch_ids = efo_ids[i : i + 25]
query_string = query_string.replace("$efoIds", str(batch_ids).replace("'", '"'))
r = requests.post(Cons.OPENTARGETS_ENDPOINT, json={"query": query_string}).json()
if not r.get("data", {}).get("diseases") or r is None:
continue
final_data.append(r)
# Record the end time
end_time = datetime.datetime.now()
"""Metdata details"""
# Get the current date and time
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Calculate the time elapsed
time_elapsed = str(end_time - start_time)
# Add version, datasource, query, query time, and the date to metadata
opentargets_version["query"] = {
"size": len(efo_ids),
"input_type": Cons.OPENTARGETS_DISEASE_INPUT_ID,
"time": time_elapsed,
"date": current_date,
"url": Cons.OPENTARGETS_ENDPOINT,
}
# Generate the OpenTargets DataFrame
intermediate_df = pd.DataFrame()
if len(final_data) == 0:
warnings.warn(
f"There is no annotation for your input list in {Cons.OPENTARGETS_DISEASE_COMPOUND_COL}.",
stacklevel=2,
)
return pd.DataFrame(), opentargets_version
for data in tqdm(final_data, desc="Processing diseases-drug interactions"):
for disease in data["data"]["diseases"]:
if not disease["knownDrugs"]:
continue
# Based on clinical trial data
drug_info = disease["knownDrugs"]["rows"]
drug_df = pd.DataFrame(drug_info)
if drug_df.empty:
continue
drug_df = _process_compounds(drug_df, disease["id"], "disease")
intermediate_df = pd.concat([intermediate_df, drug_df], ignore_index=True)
intermediate_df = intermediate_df.drop_duplicates(
subset=[
col
for col in intermediate_df.columns
if col not in [Cons.OPENTARGETS_ADVERSE_EFFECT, "mechanisms_of_action"]
]
)
if intermediate_df.empty:
warnings.warn(
f"There is no annotation for your input list in {Cons.OPENTARGETS_DISEASE_COMPOUND_COL}.",
stacklevel=2,
)
return pd.DataFrame(), opentargets_version
# Fixing chembl_id to pubchem_id
chembl_ids = intermediate_df[Cons.CHEMBL_ID].values.tolist()
mapped_df, _ = id_mapper.pubchem_xref(
identifiers=chembl_ids,
identifier_type="name",
cache_res=cache_pubchem_cid,
)
mapped_df = mapped_df[[Cons.IDENTIFIER_COL, Cons.TARGET_COL]]
mapped_dict = mapped_df.set_index(Cons.IDENTIFIER_COL).to_dict()[Cons.TARGET_COL]
intermediate_df["compound_cid"] = intermediate_df[Cons.CHEMBL_ID].map(mapped_dict)
intermediate_df[Cons.CHEMBL_ID] = f"{Cons.CHEMBL}:" + intermediate_df[Cons.CHEMBL_ID]
intermediate_df[Cons.OPENTARGETS_COMPOUND_RELATION] = Cons.OPENTARGETS_COMPOUND_DISEASE_RELATION
# Check if all keys in df match the keys in OUTPUT_DICT
check_columns_against_constants(
data_df=intermediate_df,
output_dict=Cons.OPENTARGETS_COMPOUND_OUTPUT_DICT,
check_values_in=Cons.OPENTARGETS_COMPOUND_VALUE_CHECK_LIST,
)
# Merge the two DataFrames on the target column
merged_df = collapse_data_sources(
data_df=data_df,
source_namespace=Cons.OPENTARGETS_DISEASE_INPUT_ID,
target_df=intermediate_df,
common_cols=[Cons.TARGET_COL],
target_specific_cols=list(Cons.OPENTARGETS_COMPOUND_OUTPUT_DICT.keys()),
col_name=Cons.OPENTARGETS_DISEASE_COMPOUND_COL,
)
# Fill in missing values for columns in bridgedb_df
if len(merged_df) != len(bridgedb_df):
subset_df = bridgedb_df[~bridgedb_df[Cons.TARGET_COL].isin(merged_df[Cons.TARGET_COL])]
fill_data = [[{i: np.nan for i in Cons.OPENTARGETS_COMPOUND_OUTPUT_DICT.keys()}]] * len(
subset_df
)
subset_df[Cons.OPENTARGETS_DISEASE_COMPOUND_COL] = fill_data
merged_df = pd.concat([merged_df, subset_df], ignore_index=True)
"""Update metadata"""
# Calculate the number of new nodes
num_new_nodes = intermediate_df[Cons.CHEMBL_ID].nunique()
# Calculate the number of new edges
num_new_edges = intermediate_df.drop_duplicates(subset=[Cons.TARGET_COL, Cons.CHEMBL_ID]).shape[
0
]
# Check the intermediate_df
if num_new_edges != len(intermediate_df):
give_annotator_warning(Cons.OPENTARGETS_DISEASE_COMPOUND_COL)
# Add the number of new nodes and edges to metadata
opentargets_version[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes
opentargets_version[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges
return merged_df, opentargets_version