# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
"""Triple store representations."""
import logging
import re
from itertools import chain
from typing import Iterable, Set, Optional, Generator, Union, Tuple, Callable
import requests
from owlapy import owl_expression_to_sparql
from owlapy.class_expression import *
from owlapy.class_expression import OWLThing
from owlapy.iri import IRI
from owlapy.owl_axiom import (
OWLObjectPropertyRangeAxiom,
OWLObjectPropertyDomainAxiom,
OWLDataPropertyRangeAxiom,
OWLDataPropertyDomainAxiom,
OWLClassAxiom,
OWLEquivalentClassesAxiom, OWLAxiom,
)
from owlapy.owl_datatype import OWLDatatype
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.owl_literal import OWLLiteral
from owlapy.owl_ontology import OWLOntologyID
from owlapy.abstracts import AbstractOWLOntology, AbstractOWLReasoner
from owlapy.owl_property import (
OWLDataProperty,
OWLObjectPropertyExpression,
OWLObjectInverseOf,
OWLObjectProperty,
OWLProperty,
)
from requests import Response
from requests.exceptions import RequestException, JSONDecodeError
from owlapy.converter import Owl2SparqlConverter
from ontolearn.knowledge_base import KnowledgeBase
import traceback
from collections import Counter
logger = logging.getLogger(__name__)
rdfs_prefix = "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>\n "
owl_prefix = "PREFIX owl: <http://www.w3.org/2002/07/owl#>\n "
rdf_prefix = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n "
xsd_prefix = "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\n"
[docs]
def is_valid_url(url) -> bool:
"""
Check the validity of a URL.
Args:
url (str): The url to validate.
Returns:
True if url is not None, and it passes the regex check.
"""
regex = re.compile(
r"^https?://" # http:// or https://
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|" # domain...
r"localhost|" # localhost...
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip
r"(?::\d+)?" # optional port
r"(?:/?|[/?]\S+)$",
re.IGNORECASE,
)
return url is not None and regex.search(url)
[docs]
def send_http_request_to_ts_and_fetch_results(triplestore_address: str, query: str, return_type: Callable):
"""
Execute the SPARQL query in the given triplestore_address and return the result as the given return_type.
Args:
triplestore_address (str): The triplestore address where the query will be executed.
query (str): SPARQL query where the root variable should be '?x'.
return_type (Callable): OWLAPY class as type. e.g. OWLClass, OWLNamedIndividual, etc.
Returns:
Generator containing the results of the query as the given type.
"""
try:
response = requests.post(triplestore_address, data={"query": query})
except RequestException as e:
raise RequestException(
f"Make sure the server is running on the `triplestore_address` = '{triplestore_address}'"
f". Check the error below:"
f"\n -->Error: {e}"
)
try:
if return_type == OWLLiteral:
yield from unwrap(response)
else:
yield from [return_type(i) for i in unwrap(response) if i is not None]
# return [return_type(IRI.create(i['x']['value'])) for i in
# response.json()['results']['bindings']]
except JSONDecodeError as e:
raise JSONDecodeError(
f"Something went wrong with decoding JSON from the response. Check for typos in "
f"the `triplestore_address` = '{triplestore_address}' otherwise the error is likely "
f"caused by an internal issue. \n -->Error: {e}"
)
[docs]
def unwrap(result: Response):
json = result.json()
vars_ = list(json["head"]["vars"])
for b in json["results"]["bindings"]:
val = []
for v in vars_:
if b[v]["type"] == "uri":
val.append(IRI.create(b[v]["value"]))
elif b[v]["type"] == "bnode":
continue
elif b[v]["type"] == "literal" and "datatype" in b[v]:
val.append(
OWLLiteral(b[v]["value"], OWLDatatype(IRI.create(b[v]["datatype"])))
)
elif b[v]["type"] == "literal" and "datatype" not in b[v]:
continue
elif b[v]["type"] == "literal" and "datatype" in b[v]:
val.append(
OWLLiteral(b[v]["value"], OWLDatatype(IRI.create(b[v]["datatype"])))
)
elif b[v]["type"] == "literal" and "datatype" not in b[v]:
continue
else:
raise NotImplementedError(
f"Seems like this kind of data is not handled: {b[v]}"
)
if len(val) == 1:
yield val.pop()
else:
yield None
[docs]
def suf(direct: bool):
"""Put the star for rdfs properties depending on direct param"""
return " " if direct else "* "
[docs]
class TripleStoreOntology(AbstractOWLOntology):
def __init__(self, triplestore_address: str):
assert is_valid_url(triplestore_address), (
"You should specify a valid URL in the following argument: "
"'triplestore_address' of class `TripleStore`")
self.url = triplestore_address
[docs]
def classes_in_signature(self) -> Iterable[OWLClass]:
query = owl_prefix + "SELECT DISTINCT ?x WHERE {?x a owl:Class.}"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass)
[docs]
def data_properties_in_signature(self) -> Iterable[OWLDataProperty]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:DatatypeProperty.}"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLDataProperty)
[docs]
def object_properties_in_signature(self) -> Iterable[OWLObjectProperty]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:ObjectProperty.}"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
[docs]
def individuals_in_signature(self) -> Iterable[OWLNamedIndividual]:
query = owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a owl:NamedIndividual.}"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLNamedIndividual)
[docs]
def equivalent_classes_axioms(self, c: OWLClass) -> Iterable[OWLEquivalentClassesAxiom]:
# TODO:CD: Please fit the query into a single line
query = (
owl_prefix
+ "SELECT DISTINCT ?x"
+ "WHERE { ?x owl:equivalentClass "
+ f"<{c.str}>."
+ "FILTER(?x != "
+ f"<{c.str}>)}}"
)
for cls in send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass):
yield OWLEquivalentClassesAxiom([c, cls])
[docs]
def general_class_axioms(self) -> Iterable[OWLClassAxiom]:
# TODO:CD: What does general class axiom mean ? Please document this function.
# / RE:AB: Doc strings in the base class
raise NotImplementedError("Currently, ")
[docs]
def data_property_domain_axioms(self, pe: OWLDataProperty) -> Iterable[OWLDataPropertyDomainAxiom]:
domains = self.get_property_domains(pe)
if len(domains) == 0:
yield OWLDataPropertyDomainAxiom(pe, OWLThing)
else:
for dom in domains:
yield OWLDataPropertyDomainAxiom(pe, dom)
[docs]
def data_property_range_axioms(
self, pe: OWLDataProperty
)-> Iterable[OWLDataPropertyRangeAxiom]:
query = f"{rdfs_prefix}SELECT DISTINCT ?x WHERE {{ <{pe.str}> rdfs:range ?x. }}"
for rng in send_http_request_to_ts_and_fetch_results(self.url, query, OWLDatatype):
yield OWLDataPropertyRangeAxiom(pe, rng)
[docs]
def object_property_domain_axioms(
self, pe: OWLObjectProperty
) -> Iterable[OWLObjectPropertyDomainAxiom]:
domains = self.get_property_domains(pe)
if len(domains) == 0:
yield OWLObjectPropertyDomainAxiom(pe, OWLThing)
else:
for dom in domains:
yield OWLObjectPropertyDomainAxiom(pe, dom)
[docs]
def object_property_range_axioms(self, pe: OWLObjectProperty) -> Iterable[OWLObjectPropertyRangeAxiom]:
query = rdfs_prefix + "SELECT ?x WHERE { " + f"<{pe.str}>" + " rdfs:range ?x. }"
# TODO: CD: Why do we need to use set operation ?! \ RE:AB: In order to calculate its length im converting to set
ranges = set(send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass))
if len(ranges) == 0:
yield OWLObjectPropertyRangeAxiom(pe, OWLThing)
else:
for rng in ranges:
yield OWLObjectPropertyRangeAxiom(pe, rng)
[docs]
def get_property_domains(self, pe: OWLProperty)->Set:
if isinstance(pe, OWLObjectProperty) or isinstance(pe, OWLDataProperty):
query = (
rdfs_prefix
+ "SELECT ?x WHERE { "
+ f"<{pe.str}>"
+ " rdfs:domain ?x. }"
)
# TODO: CD: Why do we need to use set operation ?!
domains = set(send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass))
return domains
else:
raise NotImplementedError
[docs]
def get_owl_ontology_manager(self):
# no manager for this kind of Ontology
# @TODO:CD: Please document this class method / RE:AB: Doc strings in the base class
pass
[docs]
def get_ontology_id(self) -> OWLOntologyID:
# @TODO:CD: Please document this class method / RE:AB: Doc strings in the base class
# query = (rdf_prefix + owl_prefix +
# "SELECT ?ontologyIRI WHERE { ?ontology rdf:type owl:Ontology . ?ontology rdf:about ?ontologyIRI .}")
# return list(get_results_from_ts(self.url, query, OWLOntologyID)).pop()
raise NotImplementedError
[docs]
def add_axiom(self, axiom: Union[OWLAxiom, Iterable[OWLAxiom]]):
"""Cant modify a triplestore ontology. Implemented because of the base class."""
pass
[docs]
def remove_axiom(self, axiom: Union[OWLAxiom, Iterable[OWLAxiom]]):
"""Cant modify a triplestore ontology. Implemented because of the base class."""
pass
[docs]
def __eq__(self, other):
if isinstance(other, type(self)):
return self.url == other.url
return NotImplemented
[docs]
def __hash__(self):
return hash(self.url)
[docs]
def __repr__(self):
return f"TripleStoreOntology({self.url})"
[docs]
class TripleStoreReasoner(AbstractOWLReasoner):
def __init__(self, ontology: TripleStoreOntology):
self.ontology = ontology
self.url = self.ontology.url
self._owl2sparql_converter = Owl2SparqlConverter()
[docs]
def data_property_domains(
self, pe: OWLDataProperty, direct: bool = False
) -> Iterable[OWLClassExpression]:
domains = {
d.get_domain() for d in self.ontology.data_property_domain_axioms(pe)
}
sub_domains = set(chain.from_iterable([self.sub_classes(d) for d in domains]))
yield from domains - sub_domains
if not direct:
yield from sub_domains
[docs]
def object_property_domains(
self, pe: OWLObjectProperty, direct: bool = False
) -> Iterable[OWLClassExpression]:
domains = {
d.get_domain() for d in self.ontology.object_property_domain_axioms(pe)
}
sub_domains = set(chain.from_iterable([self.sub_classes(d) for d in domains]))
yield from domains - sub_domains
if not direct:
yield from sub_domains
[docs]
def object_property_ranges(
self, pe: OWLObjectProperty, direct: bool = False
) -> Iterable[OWLClassExpression]:
ranges = {r.get_range() for r in self.ontology.object_property_range_axioms(pe)}
sub_ranges = set(chain.from_iterable([self.sub_classes(d) for d in ranges]))
yield from ranges - sub_ranges
if not direct:
yield from sub_ranges
[docs]
def equivalent_classes(
self, ce: OWLClassExpression, only_named: bool = True
) -> Iterable[OWLClassExpression]:
if only_named:
if isinstance(ce, OWLClass):
query = (
owl_prefix
+ "SELECT DISTINCT ?x "
+ "WHERE { {?x owl:equivalentClass "
+ f"<{ce.str}>.}}"
+ "UNION {"
+ f"<{ce.str}>"
+ " owl:equivalentClass ?x.}"
+ "FILTER(?x != "
+ f"<{ce.str}>)}}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass)
else:
print(f"Equivalent classes for complex class expressions is not implemented\t{ce}")
# raise NotImplementedError(f"Equivalent classes for complex class expressions is not implemented\t{ce}")
yield from {}
else:
raise NotImplementedError("Finding equivalent complex classes is not implemented")
[docs]
def disjoint_classes(
self, ce: OWLClassExpression, only_named: bool = True
) -> Iterable[OWLClassExpression]:
if only_named:
if isinstance(ce, OWLClass):
query = (
owl_prefix
+ " SELECT DISTINCT ?x "
+ "WHERE { "
+ f"<{ce.str}>"
+ " owl:disjointWith ?x .}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass)
else:
raise NotImplementedError(
"Disjoint classes for complex class expressions is not implemented"
)
else:
raise NotImplementedError(
"Finding disjoint complex classes is not implemented"
)
[docs]
def different_individuals(
self, ind: OWLNamedIndividual
) -> Iterable[OWLNamedIndividual]:
query = (
owl_prefix
+ rdf_prefix
+ "SELECT DISTINCT ?x \n"
+ "WHERE{ ?allDifferent owl:distinctMembers/rdf:rest*/rdf:first ?x.\n"
+ "?allDifferent owl:distinctMembers/rdf:rest*/rdf:first"
+ f"<{ind.str}>"
+ ".\n"
+ "FILTER(?x != "
+ f"<{ind.str}>"
+ ")}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLNamedIndividual)
[docs]
def same_individuals(self, ind: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]:
query = (
owl_prefix
+ "SELECT DISTINCT ?x "
+ "WHERE {{ ?x owl:sameAs "
+ f"<{ind.str}>"
+ " .}"
+ "UNION { "
+ f"<{ind.str}>"
+ " owl:sameAs ?x.}}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLNamedIndividual)
[docs]
def equivalent_object_properties(
self, op: OWLObjectPropertyExpression
) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (
owl_prefix
+ "SELECT DISTINCT ?x "
+ "WHERE { {?x owl:equivalentProperty "
+ f"<{op.str}>.}}"
+ "UNION {"
+ f"<{op.str}>"
+ " owl:equivalentProperty ?x.}"
+ "FILTER(?x != "
+ f"<{op.str}>)}}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (
owl_prefix
+ "SELECT DISTINCT ?x "
+ "WHERE { ?inverseProperty owl:inverseOf "
+ f"<{op.get_inverse().str}> ."
+ " {?x owl:equivalentProperty ?inverseProperty .}"
+ "UNION { ?inverseProperty owl:equivalentClass ?x.}"
+ "FILTER(?x != ?inverseProperty }>)}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
[docs]
def equivalent_data_properties(
self, dp: OWLDataProperty
) -> Iterable[OWLDataProperty]:
query = (
owl_prefix
+ "SELECT DISTINCT ?x"
+ "WHERE { {?x owl:equivalentProperty "
+ f"<{dp.str}>.}}"
+ "UNION {"
+ f"<{dp.str}>"
+ " owl:equivalentProperty ?x.}"
+ "FILTER(?x != "
+ f"<{dp.str}>)}}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLDataProperty)
[docs]
def data_property_values(
self, ind: OWLNamedIndividual, pe: OWLDataProperty, direct: bool = True
) -> Iterable[OWLLiteral]:
query = "SELECT ?x WHERE { " + f"<{ind.str}>" + f"<{pe.str}>" + " ?x . }"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLLiteral)
if not direct:
for prop in self.sub_data_properties(pe):
yield from self.data_property_values(ind, prop, True)
[docs]
def object_property_values(
self,
ind: OWLNamedIndividual,
pe: OWLObjectPropertyExpression,
direct: bool = True,
) -> Iterable[OWLNamedIndividual]:
if isinstance(pe, OWLObjectProperty):
query = "SELECT ?x WHERE { " + f"<{ind.str}> " + f"<{pe.str}>" + " ?x . }"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLNamedIndividual)
elif isinstance(pe, OWLObjectInverseOf):
query = (
owl_prefix
+ "SELECT ?x WHERE { ?inverseProperty owl:inverseOf "
+ f"<{pe.get_inverse().str}>."
+ f"<{ind.str}> ?inverseProperty ?x . }}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLNamedIndividual)
if not direct:
for prop in self.sub_object_properties(pe):
yield from self.object_property_values(ind, prop, True)
[docs]
def flush(self) -> None:
pass
[docs]
def instances(
self, ce: OWLClassExpression, direct: bool = False, seen_set: Set = None
) -> Iterable[OWLNamedIndividual]:
if not seen_set:
seen_set = set()
seen_set.add(ce)
ce_to_sparql = self._owl2sparql_converter.as_query("?x", ce)
if not direct:
ce_to_sparql = ce_to_sparql.replace(
"?x a ",
"?x a ?some_cls. \n ?some_cls "
"<http://www.w3.org/2000/01/rdf-schema#subClassOf>* ",
)
yield from send_http_request_to_ts_and_fetch_results(self.url, ce_to_sparql, OWLNamedIndividual)
if not direct:
for cls in self.equivalent_classes(ce):
if cls not in seen_set:
seen_set.add(cls)
yield from self.instances(cls, direct, seen_set)
[docs]
def sub_classes(
self, ce: OWLClassExpression, direct: bool = False, only_named: bool = True
) -> Iterable[OWLClassExpression]:
if not only_named:
raise NotImplementedError("Finding anonymous subclasses not implemented")
if isinstance(ce, OWLClass):
query = (
rdfs_prefix
+ "SELECT ?x WHERE { ?x rdfs:subClassOf"
+ suf(direct)
+ f"<{ce.str}>"
+ ". }"
)
results = list(send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass))
if ce in results:
results.remove(ce)
yield from results
else:
raise NotImplementedError(
"Subclasses of complex classes retrieved via triple store is not implemented"
)
# query = "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> " \
# "SELECT DISTINCT ?x WHERE { ?x rdfs:subClassOf" + suf(direct) + " ?c. \n" \
# "?s a ?c . \n"
# ce_to_sparql_statements = self._owl2sparql_converter.convert("?s", ce)
# for s in ce_to_sparql_statements:
# query = query + s + "\n"
# query = query + "}"
# yield from get_results_from_ts(self._triplestore_address, query, OWLClass)
[docs]
def super_classes(
self, ce: OWLClassExpression, direct: bool = False, only_named: bool = True
) -> Iterable[OWLClassExpression]:
if not only_named:
raise NotImplementedError("Finding anonymous superclasses not implemented")
if isinstance(ce, OWLClass):
if ce == OWLThing:
return []
query = (
rdfs_prefix
+ "SELECT ?x WHERE { "
+ f"<{ce.str}>"
+ " rdfs:subClassOf"
+ suf(direct)
+ "?x. }"
)
results = list(send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass))
if ce in results:
results.remove(ce)
if (not direct and OWLThing not in results) or len(results) == 0:
results.append(OWLThing)
yield from results
else:
raise NotImplementedError(
"Superclasses of complex classes retrieved via triple store is not "
"implemented"
)
[docs]
def disjoint_object_properties(
self, op: OWLObjectPropertyExpression
) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (
owl_prefix
+ rdf_prefix
+ "SELECT DISTINCT ?x \n"
+ "WHERE{ ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n"
+ "?AllDisjointProperties owl:members/rdf:rest*/rdf:first"
+ f"<{op.str}>"
+ ".\n"
+ "FILTER(?x != "
+ f"<{op.str}>"
+ ")}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (
owl_prefix
+ " SELECT DISTINCT ?x "
+ "WHERE { ?inverseProperty owl:inverseOf "
+ f"<{op.get_inverse().str}> ."
+ " ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n"
+ " ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?inverseProperty.\n"
+ " FILTER(?x != ?inverseProperty)}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
[docs]
def disjoint_data_properties(
self, dp: OWLDataProperty
) -> Iterable[OWLDataProperty]:
query = (
owl_prefix
+ rdf_prefix
+ "SELECT DISTINCT ?x \n"
+ "WHERE{ ?AllDisjointProperties owl:members/rdf:rest*/rdf:first ?x.\n"
+ "?AllDisjointProperties owl:members/rdf:rest*/rdf:first"
+ f"<{dp.str}>"
+ ".\n"
+ "FILTER(?x != "
+ f"<{dp.str}>"
+ ")}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLDataProperty)
[docs]
def all_data_property_values(
self, pe: OWLDataProperty, direct: bool = True
) -> Iterable[OWLLiteral]:
query = "SELECT DISTINCT ?x WHERE { ?y" + f"<{pe.str}>" + " ?x . }"
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLLiteral)
if not direct:
for prop in self.sub_data_properties(pe):
yield from self.all_data_property_values(prop, True)
[docs]
def sub_data_properties(
self, dp: OWLDataProperty, direct: bool = False
) -> Iterable[OWLDataProperty]:
query = (
rdfs_prefix
+ "SELECT ?x WHERE { ?x rdfs:subPropertyOf"
+ suf(direct)
+ f"<{dp.str}>"
+ ". }"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLDataProperty)
[docs]
def super_data_properties(
self, dp: OWLDataProperty, direct: bool = False
) -> Iterable[OWLDataProperty]:
query = (
rdfs_prefix
+ "SELECT ?x WHERE {"
+ f"<{dp.str}>"
+ " rdfs:subPropertyOf"
+ suf(direct)
+ " ?x. }"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLDataProperty)
[docs]
def sub_object_properties(
self, op: OWLObjectPropertyExpression, direct: bool = False
) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (
rdfs_prefix
+ "SELECT ?x WHERE { ?x rdfs:subPropertyOf"
+ suf(direct)
+ f"<{op.str}> . FILTER(?x != "
+ f"<{op.str}>) }}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (
rdfs_prefix
+ "SELECT ?x "
+ "WHERE { ?inverseProperty owl:inverseOf "
+ f"<{op.get_inverse().str}> ."
+ " ?x rdfs:subPropertyOf"
+ suf(direct)
+ " ?inverseProperty . }"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
[docs]
def super_object_properties(
self, op: OWLObjectPropertyExpression, direct: bool = False
) -> Iterable[OWLObjectPropertyExpression]:
if isinstance(op, OWLObjectProperty):
query = (
rdfs_prefix
+ "SELECT ?x WHERE {"
+ f"<{op.str}>"
+ " rdfs:subPropertyOf"
+ suf(direct)
+ " ?x. FILTER(?x != "
+ f"<{op.str}>) }}"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
elif isinstance(op, OWLObjectInverseOf):
query = (
rdfs_prefix
+ "SELECT ?x "
+ "WHERE { ?inverseProperty owl:inverseOf "
+ f"<{op.get_inverse().str}> ."
+ " ?inverseProperty rdfs:subPropertyOf"
+ suf(direct)
+ "?x . }"
)
yield from send_http_request_to_ts_and_fetch_results(self.url, query, OWLObjectProperty)
[docs]
def types(
self, ind: OWLNamedIndividual, direct: bool = False
) -> Iterable[OWLClass]:
if direct:
query = "SELECT ?x WHERE {" + f"<{ind.str}> a" + " ?x. }"
else:
query = (
rdfs_prefix + "SELECT DISTINCT ?x WHERE {" + f"<{ind.str}> a ?cls. "
" ?cls rdfs:subClassOf* ?x}"
)
yield from [
i
for i in send_http_request_to_ts_and_fetch_results(self.url, query, OWLClass)
if i != OWLClass(IRI("http://www.w3.org/2002/07/owl#", "NamedIndividual"))
]
[docs]
def get_root_ontology(self) -> AbstractOWLOntology:
return self.ontology
[docs]
def is_isolated(self):
# not needed here
pass
#######################################################################################################################
# See https://github.com/dice-group/Ontolearn/issues/451 for the decision behind this seperation
[docs]
class TripleStoreReasonerOntology:
def __init__(self, url: str = None):
assert url is not None, "URL cannot be None"
self.url = url
[docs]
def __str__(self):
return f"TripleStoreReasonerOntology:{self.url}"
[docs]
def query(self, sparql_query: str):
return requests.Session().post(self.url, data={"query": sparql_query})
[docs]
def are_owl_concept_disjoint(self, c: OWLClass, cc: OWLClass) -> bool:
query = f"""{owl_prefix}ASK WHERE {{<{c.str}> owl:disjointWith <{cc.str}> .}}"""
# Workaround self.query doesn't work for ASK at the moment
return (
requests.Session().post(self.url, data={"query": query}).json()["boolean"]
)
[docs]
def abox(self, str_iri: str) -> Generator[
Tuple[
Tuple[OWLNamedIndividual, OWLProperty, OWLClass],
Tuple[OWLObjectProperty, OWLObjectProperty, OWLNamedIndividual],
Tuple[OWLObjectProperty, OWLDataProperty, OWLLiteral],
],
None,
None,
]:
"""@TODO:"""
sparql_query = f"SELECT DISTINCT ?p ?o WHERE {{ <{str_iri}> ?p ?o }}"
subject_ = OWLNamedIndividual(str_iri)
for binding in self.query(sparql_query).json()["results"]["bindings"]:
p, o = binding["p"], binding["o"]
# ORDER MATTERS
if p["value"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
yield subject_, OWLProperty(
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
), OWLClass(o["value"])
elif o["type"] == "uri":
#################################################################
# IMPORTANT
# Can we assume that if o has URI and is not owl class, then o can be considered as an individual ?
#################################################################
yield subject_, OWLObjectProperty(p["value"]), OWLNamedIndividual(
o["value"]
)
elif o["type"] == "literal":
if data_type := o.get("datatype", None):
if data_type == "http://www.w3.org/2001/XMLSchema#boolean":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=bool(o["value"]))
elif data_type == "http://www.w3.org/2001/XMLSchema#integer":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=float(o["value"]))
elif data_type == "http://www.w3.org/2001/XMLSchema#nonNegativeInteger":
# TODO: We do not have http://www.w3.org/2001/XMLSchema#nonNegativeInteger implemented
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=float(o["value"]))
elif data_type == "http://www.w3.org/2001/XMLSchema#double":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=float(o["value"]))
else:
# TODO: Unclear for the time being.
# print(f"Currently this type of literal is not supported:{o} but can done easily let us know :)")
continue
"""
# TODO: Converting a SPARQL query becomes an issue with strings.
elif data_type == "http://www.w3.org/2001/XMLSchema#string":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=repr(o["value"]))
elif data_type == "http://www.w3.org/2001/XMLSchema#date":
yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=repr(o["value"]))
"""
else:
# print(f"Currently this type of literal is not supported:{o} but can done easily let us know :)")
continue
# yield subject_, OWLDataProperty(p["value"]), OWLLiteral(value=repr(o["value"]))
else:
raise RuntimeError(f"Unrecognized type {subject_} ({p}) ({o})")
[docs]
def classes_in_signature(self) -> Iterable[OWLClass]:
query = owl_prefix + """SELECT DISTINCT ?x WHERE { ?x a owl:Class }"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def most_general_classes(self) -> Iterable[OWLClass]:
"""At least it has single subclass and there is no superclass"""
query = f"""{rdf_prefix}{rdfs_prefix}{owl_prefix} SELECT ?x WHERE {{
?concept rdf:type owl:Class .
FILTER EXISTS {{ ?x rdfs:subClassOf ?z . }}
FILTER NOT EXISTS {{ ?y rdfs:subClassOf ?x . }}
}}
"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def least_general_named_concepts(self) -> Generator[OWLClass, None, None]:
"""At least it has single superclass and there is no subclass"""
query = f"""{rdf_prefix}{rdfs_prefix}{owl_prefix} SELECT ?concept WHERE {{
?concept rdf:type owl:Class .
FILTER EXISTS {{ ?concept rdfs:subClassOf ?x . }}
FILTER NOT EXISTS {{ ?y rdfs:subClassOf ?concept . }}
}}"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["concept"]["value"])
[docs]
def get_direct_parents(self, named_concept: OWLClass):
"""Father rdf:subClassOf Person"""
assert isinstance(named_concept, OWLClass)
str_named_concept = f"<{named_concept.str}>"
query = f"""{rdfs_prefix} SELECT ?x WHERE {{ {str_named_concept} rdfs:subClassOf ?x . }} """
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def subconcepts(self, named_concept: OWLClass, direct=True):
assert isinstance(named_concept, OWLClass)
str_named_concept = f"<{named_concept.str}>"
if direct:
query = f"""{rdfs_prefix} SELECT ?x WHERE {{ ?x rdfs:subClassOf* {str_named_concept}. }} """
else:
query = f"""{rdf_prefix} SELECT ?x WHERE {{ ?x rdf:subClassOf {str_named_concept}. }} """
for str_iri in self.query(query):
yield OWLClass(str_iri)
[docs]
def get_type_individuals(self, individual: str):
query = f"""SELECT DISTINCT ?x WHERE {{ <{individual}> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?x }}"""
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLClass(binding["x"]["value"])
[docs]
def instances(
self, expression: OWLClassExpression, named_individuals: bool = False
) -> Generator[OWLNamedIndividual, None, None]:
assert isinstance(expression, OWLClassExpression)
try:
sparql_query = owl_expression_to_sparql(expression=expression,
named_individuals=named_individuals)
except Exception as exc:
print(f"Error at converting {expression} into sparql")
traceback.print_exception(exc)
print(f"Error at converting {expression} into sparql")
raise RuntimeError("Couldn't convert")
try:
# TODO:Be aware of the implicit inference of x being OWLNamedIndividual!
for binding in self.query(sparql_query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
except:
print(self.query(sparql_query).text)
raise RuntimeError
[docs]
def individuals_in_signature(self) -> Generator[OWLNamedIndividual, None, None]:
# owl:OWLNamedIndividual is often missing: Perhaps we should add union as well
query = (
owl_prefix + "SELECT DISTINCT ?x\n " + "WHERE {?x a ?y. ?y a owl:Class.}"
)
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
[docs]
def data_properties_in_signature(self) -> Iterable[OWLDataProperty]:
query = (
owl_prefix + "SELECT DISTINCT ?x " + "WHERE {?x a owl:DatatypeProperty.}"
)
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def object_properties_in_signature(self) -> Iterable[OWLObjectProperty]:
query = owl_prefix + "SELECT DISTINCT ?x " + "WHERE {?x a owl:ObjectProperty.}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLObjectProperty(binding["x"]["value"])
[docs]
def boolean_data_properties(self):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x rdfs:range xsd:boolean}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def double_data_properties(self):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x rdfs:range xsd:double}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLDataProperty(binding["x"]["value"])
[docs]
def range_of_double_data_properties(self, prop: OWLDataProperty):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?z <{prop.str}> ?x}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLLiteral(value=float(binding["x"]["value"]))
[docs]
def domain_of_double_data_properties(self, prop: OWLDataProperty):
query = f"{rdf_prefix}\n{rdfs_prefix}\n{xsd_prefix}SELECT DISTINCT ?x WHERE {{?x <{prop.str}> ?z}}"
for binding in self.query(query).json()["results"]["bindings"]:
yield OWLNamedIndividual(binding["x"]["value"])
[docs]
class TripleStore(KnowledgeBase):
url: str
ontology: TripleStoreOntology
reasoner: TripleStoreReasoner
def __init__(self, url: str=None):
assert url is not None, "url must be string"
self.url = url
self.g = TripleStoreReasonerOntology(url=self.url)
self.ontology = TripleStoreOntology(self.url)
self.reasoner = TripleStoreReasoner(self.ontology)
super().__init__( ontology=self.ontology, reasoner=self.reasoner, load_class_hierarchy=False)
[docs]
def __str__(self):
return f"TripleStore:{self.g}"
[docs]
def get_direct_sub_concepts(self, concept: OWLClass) -> Iterable[OWLClass]:
assert isinstance(concept, OWLClass)
yield from self.reasoner.sub_classes(concept, direct=True)
def get_direct_parents(self, concept: OWLClassExpression) -> Iterable[OWLClass]:
assert isinstance(concept, OWLClass)
yield from self.reasoner.super_classes(concept, direct=True)
[docs]
def get_all_direct_sub_concepts(self, concept: OWLClassExpression) -> Iterable[OWLClassExpression]:
assert isinstance(concept, OWLClass)
yield from self.reasoner.sub_classes(concept, direct=True)
def get_all_sub_concepts(self, concept: OWLClassExpression) -> Iterable[OWLClassExpression]:
assert isinstance(concept, OWLClass)
yield from self.reasoner.sub_classes(concept, direct=False)
def get_concepts(self) -> Iterable[OWLClass]:
yield from self.ontology.classes_in_signature()
@property
def concepts(self) -> Iterable[OWLClass]:
yield from self.ontology.classes_in_signature()
[docs]
def contains_class(self, concept: OWLClassExpression) -> bool:
assert isinstance(concept, OWLClass)
return concept in self.ontology.classes_in_signature()
[docs]
def most_general_object_properties(
self, *, domain: OWLClassExpression, inverse: bool = False) -> Iterable[OWLObjectProperty]:
assert isinstance(domain, OWLClassExpression)
func: Callable
func = (
self.get_object_property_ranges
if inverse
else self.get_object_property_domains
)
inds_domain = self.individuals_set(domain)
for prop in self.ontology.object_properties_in_signature():
if domain.is_owl_thing() or inds_domain <= self.individuals_set(func(prop)):
yield prop
@property
def object_properties(self) -> Iterable[OWLObjectProperty]:
yield from self.ontology.object_properties_in_signature()
[docs]
def get_object_properties(self) -> Iterable[OWLObjectProperty]:
yield from self.ontology.object_properties_in_signature()
@property
def data_properties(self) -> Iterable[OWLDataProperty]:
yield from self.ontology.data_properties_in_signature()
[docs]
def get_data_properties(
self, ranges: Set[OWLDatatype] = None
) -> Iterable[OWLDataProperty]:
if ranges is not None:
for dp in self.ontology.data_properties_in_signature():
if self.get_data_property_ranges(dp) & ranges:
yield dp
else:
yield from self.ontology.data_properties_in_signature()
def __abox_expression(self, individual: OWLNamedIndividual) -> Generator[
Union[
OWLClass,
OWLObjectSomeValuesFrom,
OWLObjectMinCardinality,
OWLDataSomeValuesFrom,
],
None,
None,
]:
"""
Return OWL Class Expressions obtained from all set of triples where an input OWLNamedIndividual is subject.
Retrieve all triples (i,p,o) where p \in Resources, and o \in [Resources, Literals] and return the followings
1- Owl Named Classes: C(i)=1.
2- ObjectSomeValuesFrom Nominals: \exists r. {a, b, ..., d}, e.g. (i r, a) exists.
3- OWLObjectSomeValuesFrom over named classes: \exists r. C s.t. x \in {a, b, ..., d} C(x)=1.
4- OWLObjectMinCardinality over named classes: ≥ c r. C
5- OWLDataSomeValuesFrom over literals: \exists r. {literal_a, ..., literal_b}
"""
object_property_to_individuals = dict()
data_property_to_individuals = dict()
# To no return duplicate objects.
quantifier_gate = set()
# (1) Iterate over triples where individual is in the subject position.
for s, p, o in self.g.abox(str_iri=individual.str):
if isinstance(p, OWLProperty) and isinstance(o, OWLClass):
##############################################################
# RETURN OWLClass
##############################################################
yield o
elif isinstance(p, OWLObjectProperty) and isinstance(o, OWLNamedIndividual):
##############################################################
# Store for \exist r. {i, ..., j} and OWLObjectMinCardinality over type counts
##############################################################
object_property_to_individuals.setdefault(p, []).append(o)
elif isinstance(p, OWLDataProperty) and isinstance(o, OWLLiteral):
##############################################################
# Store for \exist r. {literal, ..., another literal}
##############################################################
data_property_to_individuals.setdefault(p, []).append(o)
else:
raise RuntimeError(
f"Unrecognized triples to expression mappings {p}{o}"
)
# Iterating over the mappings of object properties to individuals.
for (
object_property,
list_owl_individuals,
) in object_property_to_individuals.items():
# RETURN: \exists r. {x1,x33, .., x8} => Existential restriction over nominals
yield OWLObjectSomeValuesFrom(
property=object_property, filler=OWLObjectOneOf(list_owl_individuals)
)
owl_class: OWLClass
count: int
for owl_class, count in Counter(
[
type_i
for i in list_owl_individuals
for type_i in self.get_types(ind=i, direct=True)
]
).items():
existential_quantifier = OWLObjectSomeValuesFrom(
property=object_property, filler=owl_class
)
if existential_quantifier in quantifier_gate:
"Do nothing"
else:
##############################################################
# RETURN: \exists r. C => Existential quantifiers over Named OWL Class
##############################################################
quantifier_gate.add(existential_quantifier)
yield existential_quantifier
object_min_cardinality = OWLObjectMinCardinality(
cardinality=count, property=object_property, filler=owl_class
)
if object_min_cardinality in quantifier_gate:
"Do nothing"
else:
##############################################################
# RETURN: ≥ c r. C => OWLObjectMinCardinality over Named OWL Class
##############################################################
quantifier_gate.add(object_min_cardinality)
yield object_min_cardinality
# Iterating over the mappings of data properties to individuals.
for data_property, list_owl_literal in data_property_to_individuals.items():
##############################################################
# RETURN: \exists r. {literal, ..., another literal} => Existential quantifiers over Named OWL Class
##############################################################
# if list_owl_literal is {True, False) doesn't really make sense OWLDataSomeValuesFrom
# Perhaps, if
yield OWLDataSomeValuesFrom(
property=data_property, filler=OWLDataOneOf(list_owl_literal)
)
[docs]
def abox(self, individual: OWLNamedIndividual, mode: str = "native"):
"""
Get all axioms of a given individual being a subject entity
Args:
individual (OWLNamedIndividual): An individual
mode (str): The return format.
1) 'native' -> returns triples as tuples of owlapy objects,
2) 'iri' -> returns triples as tuples of IRIs as string,
3) 'axiom' -> triples are represented by owlapy axioms.
4) 'expression' -> unique owl class expressions based on (1).
Returns: Iterable of tuples or owlapy axiom, depending on the mode.
"""
assert mode in [
"native",
"iri",
"axiom",
"expression",
], "Valid modes are: 'native', 'iri' or 'axiom', 'expression'"
if mode == "native":
yield from self.g.abox(str_iri=individual.str)
elif mode == "expression":
yield from self.__abox_expression(individual)
elif mode == "axiom":
raise NotImplementedError("Axioms should be checked.")
[docs]
def are_owl_concept_disjoint(self, c: OWLClass, cc: OWLClass) -> bool:
assert isinstance(c, OWLClass) and isinstance(cc, OWLClass)
return self.g.are_owl_concept_disjoint(c, cc)
[docs]
def get_concepts(self) -> Generator[OWLClass, None, None]:
yield from self.ontology.classes_in_signature()
[docs]
def get_classes_in_signature(self) -> Generator[OWLClass, None, None]:
yield from self.ontology.classes_in_signature()
[docs]
def get_most_general_classes(self) -> Generator[OWLClass, None, None]:
yield from self.g.most_general_classes()
[docs]
def get_boolean_data_properties(self):
yield from self.g.boolean_data_properties()
[docs]
def get_double_data_properties(self):
yield from self.g.double_data_properties()
[docs]
def get_range_of_double_data_properties(self, prop: OWLDataProperty):
yield from self.g.range_of_double_data_properties(prop)
[docs]
def individuals(
self,
concept: Optional[OWLClassExpression] = None,
named_individuals: bool = False,
) -> Generator[OWLNamedIndividual, None, None]:
"""Given an OWL class expression, retrieve all individuals belonging to it.
Args:
concept: Class expression of which to list individuals.
named_individuals: flag for returning only owl named individuals in the SPARQL mapping
Returns:
Generator of individuals belonging to the given class.
"""
# Jean: yield from self.ontology.individuals_in_signature() mostly returns an empty generator, so I commented it out
#if concept is None or concept.is_owl_thing():
# yield from self.ontology.individuals_in_signature()
#else:
yield from self.g.instances(concept, named_individuals=named_individuals)
[docs]
def get_types(self, ind: OWLNamedIndividual, direct: True) -> Generator[OWLClass, None, None]:
if not direct:
raise NotImplementedError("Inferring indirect types not available")
return self.g.get_type_individuals(ind.str)
[docs]
def get_all_sub_concepts(self, concept: OWLClass, direct=True):
yield from self.g.subconcepts(concept, direct)
[docs]
def classes_in_signature(self):
yield from self.ontology.classes_in_signature()
[docs]
def get_direct_parents(self, c: OWLClass):
yield from self.g.get_direct_parents(c)
[docs]
def most_general_named_concepts(self):
yield from self.g.most_general_named_concepts()
[docs]
def least_general_named_concepts(self):
yield from self.g.least_general_named_concepts()
[docs]
def query(self, sparql: str):
yield from self.g.query(sparql_query=sparql)