# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
from typing import List
import requests
from owlapy.class_expression import OWLClassExpression
from owlapy.converter import converter
from sortedcontainers import SortedSet
from ontolearn.learning_problem import PosNegLPStandard
from ontolearn.metrics import F1
from itertools import chain, combinations
from more_itertools import powerset
[docs]
class SPARQLQueryLearner:
"""Learning SPARQL queries: Given a description logic concept (potentially generated by a concept learner),
try to improve the fittness (e.g., F1) of the corresponding SPARQL query.
Attributes:
name (str): Name of the model = 'SPARQL Query Learner'
endpoint_url (string): The URL of the SPARQL endpoint to use
max_number_of_filters (int): Limit the number of filters combined during the improvement process
learning_problem (PosNegLPStandard): the learning problem (sets of positive and negative examples)
uses_complex_filters (bool): Denotes whether the learner uses complex filters
(i.e., makes use of the values of data properties) to improve the quality
_root_var (str): The root variable to be used in the OWL2SPARQL conversion
_possible_filters (List[str]): A list of possible FILTERs to use to improve the quality
"""
__slots__ = ('endpoint_url',
'max_number_of_filters',
'uses_complex_filters',
'learning_problem',
'_possible_filters',
)
# public
name = 'SPARQL Query Learner'
endpoint_url: str
max_number_of_filters: int
learning_problem: PosNegLPStandard
uses_complex_filters: bool
# private
_root_var = "?x"
_possible_filters: List[str]
def __init__(
self,
learning_problem: PosNegLPStandard,
endpoint_url: str,
max_number_of_filters: int = 3,
use_complex_filters: bool = True):
# init slots
self.learning_problem = learning_problem
self.endpoint_url = endpoint_url
self.max_number_of_filters = max_number_of_filters
self._possible_filters = list()
self.uses_complex_filters = use_complex_filters
str_properties = list()
property_positive_values_dict = dict()
property_negative_values_dict = dict()
# find all data properties having string literals as objects
properties_response = self._http_request("SELECT DISTINCT ?p WHERE { "
" ?s ?p ?o"
" FILTER(DATATYPE(?o) = <http://www.w3.org/2001/XMLSchema#string>)"
"}")
for result in properties_response.json()["results"]["bindings"]:
str_properties.append(result["p"]["value"])
# find the set of str properties that are used by ALL positive examples
str_properties_used_by_all_positive_examples = set(str_properties)
for property_str in str_properties:
for positive_example in self.learning_problem.pos:
query_response = self._http_request("SELECT ?o WHERE {{ "
" <{}> <{}> ?o"
"}}".format(positive_example.str, property_str))
if len(query_response.json()["results"]["bindings"]) > 0:
# the positive example uses the property
continue
# the positive example does not use the property -> remove it
str_properties_used_by_all_positive_examples.discard(property_str)
# find the set of str properties that are used by ANY negative examples
str_properties_used_by_any_negative_examples = set()
for property_str in str_properties:
for negative_example in self.learning_problem.neg:
query_response = self._http_request("SELECT ?o WHERE {{ "
" <{}> <{}> ?o"
"}}".format(negative_example.str, property_str))
if len(query_response.json()["results"]["bindings"]) > 0:
# the negative example uses the property
str_properties_used_by_any_negative_examples.add(property_str)
# create possible filters using the sets str_properties_used_by_all_positive_examples and str_properties_used_by_all_negative_examples
for property_str in str_properties_used_by_all_positive_examples:
self._possible_filters.append("FILTER EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str))
for property_str in str_properties_used_by_any_negative_examples:
self._possible_filters.append("FILTER NOT EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str))
# if enabled, the learner tries to leverage also the values of the data properties
if self.uses_complex_filters:
# for each property in str_properties, find its possible values
for property_str in str_properties:
# stores the STR values found by combining property_str with positive examples
property_positive_values_dict[property_str] = SortedSet()
for positive_example in self.learning_problem.pos:
query_response = self._http_request("SELECT DISTINCT ?o WHERE {{ "
" <{}> <{}> ?o"
"}}".format(positive_example.str, property_str))
for result in query_response.json()["results"]["bindings"]:
property_positive_values_dict[property_str].add(result["o"]["value"])
# stores the STR values found by combining property_str with negative examples
property_negative_values_dict[property_str] = SortedSet()
for negative_example in self.learning_problem.neg:
query_response = self._http_request("SELECT DISTINCT ?o WHERE {{ "
" <{}> <{}> ?o"
"}}".format(negative_example.str, property_str))
for result in query_response.json()["results"]["bindings"]:
property_negative_values_dict[property_str].add(result["o"]["value"])
# prepare possible filters by utilizing the values of the data properties pointing to str literals
var_id = 0
# iterate over the (property, positive values) pairs
for property_str, values in property_positive_values_dict.items():
var_id += 1
if len(values) == 0:
# there are no positive examples using property_str
# create a FILTER that looks for individuals that do not use property_str
self._possible_filters.append("FILTER NOT EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str))
else:
# there are positive examples that use property_str
# create a FILTER that looks for individuals using the values encountered with positive examples
in_list = [ '"{}"\n'.format(v) for v in property_positive_values_dict[property_str]]
self._possible_filters.append("{} <{}> ?value_{} "
"FILTER(?value_{} IN ({}))"
.format(self._root_var, property_str, var_id, var_id, ",".join(in_list)))
# iterate over the (property, negative values) pairs
for property_str in property_negative_values_dict:
var_id += 1
if len(property_negative_values_dict[property_str]) == 0:
# there are no negative examples using property_str
# create a FILTER that looks for individuals that use property_str
self._possible_filters.append("FILTER EXISTS {{ {} <{}> [] }}".format(self._root_var, property_str))
else:
# there are negative examples that use property_str
# create a FILTER that looks for individuals not using the values encountered with positive examples
in_list = [ '"{}"\n'.format(v) for v in property_negative_values_dict[property_str]]
self._possible_filters.append("{} <{}> ?value_{} "
"FILTER(?value_{} NOT IN ({}))"
.format(self._root_var, property_str, var_id, var_id, ",".join(in_list)))
# the provided concept should be generated by a concept learner that was supplied the same learning problem
[docs]
def learn_sparql_query(self, ce: OWLClassExpression):
# compute the f1 score of the provided query
confusion_matrix_query = converter.as_confusion_matrix_query(ce=ce,
root_variable=self._root_var,
positive_examples=self.learning_problem.pos,
negative_examples=self.learning_problem.neg)
original_query = converter.as_query(ce=ce, root_variable="?x")
original_f1_score = self._compute_f1_score(confusion_matrix_query)
print("Trying to improve the class expression `{}`"
"and its corresponding SPARQL query `{}` with F1 score: {}"
.format(ce, ' '.join(original_query.split()), original_f1_score))
best_score = original_f1_score
# template query: it will be assigned the FILTERs
confusion_matrix_query = converter.as_confusion_matrix_query(ce=ce,
root_variable="?x",
positive_examples=self.learning_problem.pos,
negative_examples=self.learning_problem.neg)
best_query = original_query
# start the search
# we try combinations of FILTERs stored in self._possible_filters by iterating their powerset
# we first try single sets
# we then try their combinations
# the maximum number of sets that are combined is specified by self.max_number_of_filters
for combination_of_filters in powerset([i for i in range(len(self._possible_filters))]):
if len(combination_of_filters) == 0:
continue
if len(combination_of_filters) > self.max_number_of_filters:
break
# create a SPARQL query using the FILTERs specified in combination_of_filters
learned_query_cm = confusion_matrix_query.replace("VALUES", " ".join([self._possible_filters[idx] for idx in combination_of_filters]) + " VALUES")
f1_score = self._compute_f1_score(learned_query_cm)
# update the best score and its corresponding query
if f1_score > best_score:
best_score = f1_score
best_query = original_query[:-1] + " ".join([self._possible_filters[idx] for idx in combination_of_filters]) + "}"
# print if we have found a better query
if (best_score > original_f1_score):
print("Found a better SPARQL query `{}`, with F1 score: {}".format(' '.join(best_query.split()), best_score))
else:
print("The provided class expression and its corresponding SPARQL query could not be improved")
def _http_request(self, query_str) -> requests.Response:
response = requests.post(url=self.endpoint_url,
headers={"Content-Type": "application/sparql-query"},
data=query_str.replace('\\', '\\\\')) # in nctrer there are str literals that contain `\\`, which can be parsed by SPARQL (need to be escaped)
status_code = response.status_code
if not (200 <= status_code < 204):
raise Exception("Query failed")
return response
def _compute_f1_score(self, query_str: str):
try:
stats = self._http_request(query_str).json()["results"]["bindings"][0]
return F1().score2(tp=int(stats["tp"]["value"]),
fp=int(stats["fp"]["value"]),
fn=int(stats["fn"]["value"]),
tn=int(stats["tn"]["value"]))
except Exception:
print("Exception occurred!")
# query failed (e.g., due to malformed string; continue)
return F1().score2(tp=0, fp=0, fn=0, tn=0)
def _powerset_of_filters(self):
set_of_indices = [i for i in range(len(self._possible_filters))]
# lazily construction of the powerset (can be early terminated)
return chain.from_iterable(combinations(set_of_indices, r) for r in range(len(set_of_indices) + 1))