Source code for pyBiodatafuse.annotators.wikidata

# -*- coding: utf-8 -*-


"""Python file for querying the Wikidata database (https://www.wikidata.org/)."""

import datetime
import os
import warnings
from string import Template
from typing import Any, Dict

import pandas as pd
from SPARQLWrapper import JSON, SPARQLWrapper
from SPARQLWrapper.SPARQLExceptions import SPARQLWrapperException

import pyBiodatafuse.constants as Cons
from pyBiodatafuse.utils import (
    collapse_data_sources,
    get_identifier_of_interest,
    give_annotator_warning,
)


def check_endpoint_wikidata() -> bool:
    """Check the availability of the Wikidata SPARQL endpoint.

    :returns: True if the endpoint is available, False otherwise.
    """
    with open(os.path.dirname(__file__) + "/queries/wikidata-test.rq", "r") as fin:
        sparql_query = fin.read()

    sparql = SPARQLWrapper(Cons.WIKIDATA_ENDPOINT)
    sparql.setReturnFormat(JSON)

    sparql.setQuery(sparql_query)

    try:
        sparql.queryAndConvert()
        return True
    except SPARQLWrapperException:
        return False


# TODO: Fix this information to be fetched from the server
def get_version_wikidata() -> dict:
    """Get version of Wikidata content.

    :returns: a dictionary containing the (data) version information
    """
    now = str(datetime.datetime.now())

    metadata = {
        "metadata": {
            "data_version": {
                "dataVersion": {
                    "year": now[0:4],
                    "month": now[5:7],
                }
            },
        },
    }

    return metadata


# TODO: Remove this functionabilty
[docs] def get_gene_cellular_component(bridgedb_df: pd.DataFrame): """Get cellcular component information and Wikidata identifiers for a gene's encoded protein. :param bridgedb_df: BridgeDb output for creating the list of gene ids to query :returns: a DataFrame containing the Wikidata output and dictionary of the query metadata. """ # Check if the Wikidata API is available api_available = check_endpoint_wikidata() if not api_available: warnings.warn( f"{Cons.WIKIDATA} SPARQL endpoint is not available. Unable to retrieve data.", stacklevel=2, ) return pd.DataFrame(), {} # Record the start time start_time = datetime.datetime.now() # Add version to metadata file wikidata_version = get_version_wikidata() data_df = get_identifier_of_interest(bridgedb_df, Cons.NCBI_GENE) gene_list = data_df[Cons.TARGET_COL].tolist() gene_list = list(set(gene_list)) query_gene_lists = [] if len(gene_list) > 25: for i in range(0, len(gene_list), 25): tmp_list = gene_list[i : i + 25] query_gene_lists.append(" ".join(f'"{g}"' for g in tmp_list)) else: query_gene_lists.append(" ".join(f'"{g}"' for g in gene_list)) with open( os.path.dirname(__file__) + "/queries/wikidata-genes-cellularComponent.rq", "r" ) as fin: sparql_query = fin.read() sparql = SPARQLWrapper(Cons.WIKIDATA_ENDPOINT) sparql.setReturnFormat(JSON) query_count = 0 intermediate_df = pd.DataFrame() for gene_list_str in query_gene_lists: query_count += 1 sparql_query_template = Template(sparql_query) substit_dict = dict(gene_list=gene_list_str) sparql_query_template_sub = sparql_query_template.substitute(substit_dict) sparql.setQuery(sparql_query_template_sub) res = sparql.queryAndConvert() df = pd.DataFrame(res["results"]["bindings"]) for col in df: df[col] = df[col].map(lambda x: x["value"], na_action="ignore") intermediate_df = pd.concat([intermediate_df, df], ignore_index=True) # Record the end time and build metadata end_time = datetime.datetime.now() current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") time_elapsed = str(end_time - start_time) # Add the datasource, query, query time, and the date to metadata wikidata_metadata: Dict[str, Any] = { "datasource": Cons.WIKIDATA, "metadata": {"source_version": wikidata_version}, "query": { "size": len(gene_list), "time": time_elapsed, "date": current_date, "url": Cons.WIKIDATA_ENDPOINT, }, } if "cellularComp" not in intermediate_df.columns: return pd.DataFrame(), wikidata_metadata # Organize the annotation results as an array of dictionaries intermediate_df = intermediate_df.rename( columns={ "cellularComp": Cons.WIKIDATA_ID_COL, "cellularCompLabel": Cons.WIKIDATA_LABEL_COL, "go": Cons.GO_ID, } ) intermediate_df = intermediate_df.rename(columns={"geneId": Cons.TARGET_COL}) # Merge the two DataFrames on the target column merged_df = collapse_data_sources( data_df=data_df, source_namespace=Cons.NCBI_GENE, target_df=intermediate_df, common_cols=[Cons.TARGET_COL], target_specific_cols=list(Cons.WIKIDATA_OUTPUT_DICT.keys()), col_name=Cons.WIKIDATA_CC_COL, ) # Calculate the number of new nodes num_new_nodes = intermediate_df[Cons.WIKIDATA_ID_COL].nunique() # Calculate the number of new edges num_new_edges = intermediate_df.drop_duplicates( subset=[Cons.TARGET_COL, Cons.WIKIDATA_ID_COL] ).shape[0] # Check the intermediate_df if num_new_edges != len(intermediate_df): give_annotator_warning(Cons.WIKIDATA) # Add the number of new nodes and edges to metadata wikidata_metadata[Cons.QUERY][Cons.NUM_NODES] = num_new_nodes wikidata_metadata[Cons.QUERY][Cons.NUM_EDGES] = num_new_edges return merged_df, wikidata_metadata