Source code for ontolearn.learners.sparql_query_learner

# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
from typing import List

import requests
from owlapy.class_expression import OWLClassExpression
from owlapy.converter import converter
from sortedcontainers import SortedSet

from ontolearn.learning_problem import PosNegLPStandard
from ontolearn.metrics import F1
from itertools import chain, combinations
from more_itertools import powerset


[docs] class SPARQLQueryLearner: """Learning SPARQL queries: Given a description logic concept (potentially generated by a concept learner), try to improve the fittness (e.g., F1) of the corresponding SPARQL query. Attributes: name (str): Name of the model = 'SPARQL Query Learner' endpoint_url (string): The URL of the SPARQL endpoint to use max_number_of_filters (int): Limit the number of filters combined during the improvement process learning_problem (PosNegLPStandard): the learning problem (sets of positive and negative examples) uses_complex_filters (bool): Denotes whether the learner uses complex filters (i.e., makes use of the values of data properties) to improve the quality _root_var (str): The root variable to be used in the OWL2SPARQL conversion _possible_filters (List[str]): A list of possible FILTERs to use to improve the quality """ __slots__ = ('endpoint_url', 'max_number_of_filters', 'uses_complex_filters', 'learning_problem', '_possible_filters', ) # public name = 'SPARQL Query Learner' endpoint_url: str max_number_of_filters: int learning_problem: PosNegLPStandard uses_complex_filters: bool # private _root_var = "?x" _possible_filters: List[str] def __init__( self, learning_problem: PosNegLPStandard, endpoint_url: str, max_number_of_filters: int = 3, use_complex_filters: bool = True): # init slots self.learning_problem = learning_problem self.endpoint_url = endpoint_url self.max_number_of_filters = max_number_of_filters self._possible_filters = list() self.uses_complex_filters = use_complex_filters str_properties = list() property_positive_values_dict = dict() property_negative_values_dict = dict() # find all data properties having string literals as objects properties_response = self._http_request("SELECT DISTINCT ?p WHERE { " " ?s ?p ?o" " FILTER(DATATYPE(?o) = <http://www.w3.org/2001/XMLSchema#string>)" "}") for result in properties_response.json()["results"]["bindings"]: str_properties.append(result["p"]["value"]) # find the set of str properties that are used by ALL positive examples str_properties_used_by_all_positive_examples = set(str_properties) for property_str in str_properties: for positive_example in self.learning_problem.pos: query_response = self._http_request("SELECT ?o WHERE {{ " " <{}> <{}> ?o" "}}".format(positive_example.str, property_str)) if len(query_response.json()["results"]["bindings"]) > 0: # the positive example uses the property continue # the positive example does not use the property -> remove it str_properties_used_by_all_positive_examples.discard(property_str) # find the set of str properties that are used by ANY negative examples str_properties_used_by_any_negative_examples = set() for property_str in str_properties: for negative_example in self.learning_problem.neg: query_response = self._http_request("SELECT ?o WHERE {{ " " <{}> <{}> ?o" "}}".format(negative_example.str, property_str)) if len(query_response.json()["results"]["bindings"]) > 0: # the negative example uses the property str_properties_used_by_any_negative_examples.add(property_str) # create possible filters using the sets str_properties_used_by_all_positive_examples and str_properties_used_by_all_negative_examples for property_str in str_properties_used_by_all_positive_examples: self._possible_filters.append("FILTER EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str)) for property_str in str_properties_used_by_any_negative_examples: self._possible_filters.append("FILTER NOT EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str)) # if enabled, the learner tries to leverage also the values of the data properties if self.uses_complex_filters: # for each property in str_properties, find its possible values for property_str in str_properties: # stores the STR values found by combining property_str with positive examples property_positive_values_dict[property_str] = SortedSet() for positive_example in self.learning_problem.pos: query_response = self._http_request("SELECT DISTINCT ?o WHERE {{ " " <{}> <{}> ?o" "}}".format(positive_example.str, property_str)) for result in query_response.json()["results"]["bindings"]: property_positive_values_dict[property_str].add(result["o"]["value"]) # stores the STR values found by combining property_str with negative examples property_negative_values_dict[property_str] = SortedSet() for negative_example in self.learning_problem.neg: query_response = self._http_request("SELECT DISTINCT ?o WHERE {{ " " <{}> <{}> ?o" "}}".format(negative_example.str, property_str)) for result in query_response.json()["results"]["bindings"]: property_negative_values_dict[property_str].add(result["o"]["value"]) # prepare possible filters by utilizing the values of the data properties pointing to str literals var_id = 0 # iterate over the (property, positive values) pairs for property_str, values in property_positive_values_dict.items(): var_id += 1 if len(values) == 0: # there are no positive examples using property_str # create a FILTER that looks for individuals that do not use property_str self._possible_filters.append("FILTER NOT EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str)) else: # there are positive examples that use property_str # create a FILTER that looks for individuals using the values encountered with positive examples in_list = [ '"{}"\n'.format(v) for v in property_positive_values_dict[property_str]] self._possible_filters.append("{} <{}> ?value_{} " "FILTER(?value_{} IN ({}))" .format(self._root_var, property_str, var_id, var_id, ",".join(in_list))) # iterate over the (property, negative values) pairs for property_str in property_negative_values_dict: var_id += 1 if len(property_negative_values_dict[property_str]) == 0: # there are no negative examples using property_str # create a FILTER that looks for individuals that use property_str self._possible_filters.append("FILTER EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str)) else: # there are negative examples that use property_str # create a FILTER that looks for individuals not using the values encountered with positive examples in_list = [ '"{}"\n'.format(v) for v in property_negative_values_dict[property_str]] self._possible_filters.append("{} <{}> ?value_{} " "FILTER(?value_{} NOT IN ({}))" .format(self._root_var, property_str, var_id, var_id, ",".join(in_list))) # the provided concept should be generated by a concept learner that was supplied the same learning problem
[docs] def learn_sparql_query(self, ce: OWLClassExpression): # compute the f1 score of the provided query confusion_matrix_query = converter.as_confusion_matrix_query(ce=ce, root_variable=self._root_var, positive_examples=self.learning_problem.pos, negative_examples=self.learning_problem.neg) original_query = converter.as_query(ce=ce, root_variable="?x") original_f1_score = self._compute_f1_score(confusion_matrix_query) print("Trying to improve the class expression `{}`" "and its corresponding SPARQL query `{}` with F1 score: {}" .format(ce, ' '.join(original_query.split()), original_f1_score)) best_score = original_f1_score # template query: it will be assigned the FILTERs confusion_matrix_query = converter.as_confusion_matrix_query(ce=ce, root_variable="?x", positive_examples=self.learning_problem.pos, negative_examples=self.learning_problem.neg) best_query = original_query # start the search # we try combinations of FILTERs stored in self._possible_filters by iterating their powerset # we first try single sets # we then try their combinations # the maximum number of sets that are combined is specified by self.max_number_of_filters for combination_of_filters in powerset([i for i in range(len(self._possible_filters))]): if len(combination_of_filters) == 0: continue if len(combination_of_filters) > self.max_number_of_filters: break # create a SPARQL query using the FILTERs specified in combination_of_filters learned_query_cm = confusion_matrix_query.replace("VALUES", " ".join([self._possible_filters[idx] for idx in combination_of_filters]) + " VALUES") f1_score = self._compute_f1_score(learned_query_cm) # update the best score and its corresponding query if f1_score > best_score: best_score = f1_score best_query = original_query[:-1] + " ".join([self._possible_filters[idx] for idx in combination_of_filters]) + "}" # print if we have found a better query if (best_score > original_f1_score): print("Found a better SPARQL query `{}`, with F1 score: {}".format(' '.join(best_query.split()), best_score)) else: print("The provided class expression and its corresponding SPARQL query could not be improved")
def _http_request(self, query_str) -> requests.Response: response = requests.post(url=self.endpoint_url, headers={"Content-Type": "application/sparql-query"}, data=query_str.replace('\\', '\\\\')) # in nctrer there are str literals that contain `\\`, which can be parsed by SPARQL (need to be escaped) status_code = response.status_code if not (200 <= status_code < 204): raise Exception("Query failed") return response def _compute_f1_score(self, query_str: str): try: stats = self._http_request(query_str).json()["results"]["bindings"][0] return F1().score2(tp=int(stats["tp"]["value"]), fp=int(stats["fp"]["value"]), fn=int(stats["fn"]["value"]), tn=int(stats["tn"]["value"])) except Exception: print("Exception occurred!") # query failed (e.g., due to malformed string; continue) return F1().score2(tp=0, fp=0, fn=0, tn=0) def _powerset_of_filters(self): set_of_indices = [i for i in range(len(self._possible_filters))] # lazily construction of the powerset (can be early terminated) return chain.from_iterable(combinations(set_of_indices, r) for r in range(len(set_of_indices) + 1))