# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
"""Pyhon binders of other concept learners."""
import subprocess
from datetime import datetime
from typing import List, Dict
from .utils import create_experiment_folder
import re
import time
import os
from .learning_problem import PosNegLPStandard
[docs]
class PredictedConcept:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
[docs]
def __iter__(self):
yield self.Prediction
[docs]
class DLLearnerBinder:
"""
dl-learner python binder.
"""
def __init__(self, binary_path=None, model=None, kb_path=None, storage_path=".", max_runtime=3):
try:
assert binary_path
assert model
assert kb_path
except AssertionError:
print(f'binary_path:{binary_path}, model:{model}, kb_path{kb_path} cannot be None')
raise
self.binary_path = binary_path
self.kb_path = kb_path
self.name = model
self.max_runtime = max_runtime
if storage_path is not None:
self.storage_path = storage_path
else:
self.storage_path, _ = create_experiment_folder()
self.best_predictions = None
self.config_name_identifier = None
[docs]
def write_dl_learner_config(self, pos: List[str], neg: List[str], use_sparql=False) -> str:
"""Writes config file for dl-learner.
Args:
pos: A list of URIs of individuals indicating positive examples in concept learning problem.
neg: A list of URIs of individuals indicating negatives examples in concept learning problem.
Returns:
str: Path of generated config file.
"""
assert len(pos) > 0 and isinstance(pos[0], str)
assert len(neg) > 0 and isinstance(neg[0], str)
Text = list()
pos_string = "{ "
neg_string = "{ "
for i in pos:
pos_string += "\"" + str(
i) + "\","
for j in neg:
neg_string += "\"" + str(
j) + "\","
pos_string = pos_string[:-1]
pos_string += "}"
neg_string = neg_string[:-1]
neg_string += "}"
Text.append("rendering = \"dlsyntax\"")
Text.append("// knowledge source definition")
Text.append("cli.type = \"org.dllearner.cli.CLI\"")
Text.append("\n")
Text.append("// knowledge source definition")
if use_sparql:
Text.append(
"ks.url = \"" + self.kb_path + '\"')
Text.append("ks.type = \"SPARQL endpoint\"")
Text.append("reasoner.type = \"SPARQL Reasoner\"")
Text.append("op.type = \"tdtop\"")
else:
Text.append(
"ks.fileName = \"" + self.kb_path + '\"')
Text.append("ks.type = \"OWL File\"")
Text.append("reasoner.type = \"closed world reasoner\"")
Text.append("op.type = \"rho\"")
Text.append("op.useNumericDatatypes = \"false\"")
Text.append("op.useCardinalityRestrictions = \"false\"")
Text.append("reasoner.sources = { ks }")
Text.append("\n")
Text.append("lp.type = \"PosNegLPStandard\"")
Text.append("accuracyMethod.type = \"fmeasure\"")
Text.append("\n")
Text.append("lp.positiveExamples =" + pos_string)
Text.append("\n")
Text.append("lp.negativeExamples =" + neg_string)
Text.append("\n")
Text.append("alg.writeSearchTree = \"true\"")
if self.name == 'celoe':
Text.append("alg.type = \"celoe\"")
Text.append("alg.stopOnFirstDefinition = \"true\"")
elif self.name == 'ocel':
Text.append("alg.type = \"ocel\"")
Text.append("alg.showBenchmarkInformation = \"true\"")
elif self.name == 'eltl':
Text.append("alg.type = \"eltl\"")
Text.append("alg.maxNrOfResults = \"1\"")
Text.append("alg.stopOnFirstDefinition = \"true\"")
else:
raise ValueError('Wrong algorithm chosen.')
Text.append("alg.maxExecutionTimeInSeconds = " + str(self.max_runtime))
Text.append("\n")
pathToConfig = self.storage_path + '/' + self.name + '_' + datetime.now().strftime("%Y%m%d_%H%M%S_%f") + '.conf'
with open(pathToConfig, "wb") as wb:
for i in Text:
wb.write(i.encode("utf-8"))
wb.write("\n".encode("utf-8"))
return pathToConfig
[docs]
def fit(self, lp: PosNegLPStandard, max_runtime: int = None, use_sparql=False):
"""Fit dl-learner model on a given positive and negative examples.
Args:
lp:PosNegLPStandard
lp.pos A list of URIs of individuals indicating positive examples in concept learning problem.
lp.neg A list of URIs of individuals indicating negatives examples in concept learning problem.
max_runtime: Limit to stop the algorithm after n seconds.
Returns:
self.
"""
if max_runtime:
self.max_runtime = max_runtime
pathToConfig = self.write_dl_learner_config(pos=[i.str for i in lp.pos],
neg=[i.str for i in lp.neg],
use_sparql=use_sparql)
total_runtime = time.time()
res = subprocess.run([self.binary_path, pathToConfig], capture_output=True, universal_newlines=True)
total_runtime = round(time.time() - total_runtime, 3)
self.best_predictions = self.parse_dl_learner_output(res.stdout.splitlines(), pathToConfig)
self.best_predictions['Runtime'] = total_runtime
return self
[docs]
def best_hypotheses(self, n: int = None) -> PredictedConcept:
# @ TODO:
# Convert string to OWL class object
# {'Prediction': 'Child', 'Accuracy': 1.0, 'F-measure': 1.0, 'NumClassTested': 3, 'Runtime': 3.502}
return PredictedConcept(**self.best_hypothesis())
[docs]
def best_hypothesis(self):
""" Return predictions if exists.
Returns:
The prediction or the string 'No prediction found.'
"""
if self.best_predictions:
return self.best_predictions
else:
print('No prediction found.')
[docs]
def parse_dl_learner_output(self, output_of_dl_learner: List[str], file_path: str) -> Dict:
"""Parse the output received from executing dl-learner.
Args:
output_of_dl_learner: The output of dl-learner to parse.
file_path: The file path to store the output.
Returns:
A dictionary of {'Prediction': ..., 'Accuracy': ..., 'F-measure': ...}.
"""
solutions = None
best_concept_str = None
acc = -1.0
f_measure = -1.0
search_info = None
num_expression_tested = -1
# DL-learner does not provide a unified output :(
# ELTL => No info pertaining to the number of concept tested, number of retrieval etc.
# CELOE => Algorithm terminated successfully (time: 245ms, 188 descriptions tested, 69 nodes in the search
# tree).
# OCEL => Algorithm stopped (4505 descriptions tested).
time.time()
txt_path = file_path + '.txt' # self.storage_path + '/output_' + self.name + '_' + str(time.time()) + '.txt'
# (1) Store output of dl learner and extract solutions.
with open(txt_path, 'w') as w:
for th, sentence in enumerate(output_of_dl_learner):
w.write(sentence + '\n')
if 'solutions' in sentence and '1:' in output_of_dl_learner[th + 1]:
solutions = output_of_dl_learner[th:]
if 'Algorithm' in sentence:
search_info = sentence
# check whether solutions found
if solutions: # if solution found, check the correctness of relevant part of dl-learner output.
try:
assert isinstance(solutions, list)
assert 'solutions' in solutions[0]
assert len(solutions) > 0
assert '1: ' in solutions[1][:5]
except AssertionError:
print(type(solutions))
print('####')
print(solutions[0])
print('####')
print(len(solutions))
else:
# no solution found.
print('#################')
print('#######{}##########'.format(self.name))
print('#################')
for i in output_of_dl_learner[-3:-1]:
print(i)
if 'descriptions' in i:
search_info = i
print('#################')
print('#######{}##########'.format(self.name))
print('#################')
_ = re.findall(r'\d+ descriptions tested', search_info)
assert len(_) == 1
# Get the numbers
num_expression_tested = int(re.findall(r'\d+', _[0])[0])
return {'Model': self.name, 'Prediction': best_concept_str, 'Accuracy': float(acc) * .01,
'F-measure': float(f_measure) * .01, 'NumClassTested': int(num_expression_tested)}
# top_predictions must have the following form
"""solutions ......:
1: Parent(pred.acc.: 100.00 %, F - measure: 100.00 %)
2: ⊤ (pred.acc.: 50.00 %, F-measure: 66.67 %)
3: Person(pred.acc.: 50.00 %, F - measure: 66.67 %)
"""
best_solution = solutions[1]
if self.name == 'ocel':
""" parse differently"""
token = '(accuracy '
start_index = len('1: ')
end_index = best_solution.index(token)
best_concept_str = best_solution[start_index:end_index - 1] # -1 due to white space between *) (*.
quality_info = best_solution[end_index:]
# best_concept_str => *Sister ⊔ (Female ⊓ (¬Granddaughter))*
# quality_info => *(accuracy 100%, length 16, depth 2)*
# Create a list to hold the numbers
predicted_accuracy_info = re.findall(r'accuracy \d*%', quality_info)
assert len(predicted_accuracy_info) == 1
assert predicted_accuracy_info[0][-1] == '%' # percentage sign
acc = re.findall(r'\d+\.?\d+', predicted_accuracy_info[0])[0]
_ = re.findall(r'\d+ descriptions tested', search_info)
assert len(_) == 1
# Get the numbers
num_expression_tested = int(re.findall(r'\d+', _[0])[0])
elif self.name in ['celoe', 'eltl']:
# e.g. => 1: Sister ⊔ (∃ married.Brother) (pred. acc.: 90.24%, F-measure: 91.11%)
# Heuristic => Quality info start with *(pred. acc.: *
token = '(pred. acc.: '
start_index = len('1: ')
end_index = best_solution.index(token)
best_concept_str = best_solution[start_index:end_index - 1] # -1 due to white space between *) (*.
quality_info = best_solution[end_index:]
# best_concept_str => *Sister ⊔ (Female ⊓ (¬Granddaughter))*
# quality_info => *(pred. acc.: 79.27%, F-measure: 82.83%)*
# Create a list to hold the numbers
predicted_accuracy_info = re.findall(r'pred. acc.: \d+.\d+%', quality_info)
f_measure_info = re.findall(r'F-measure: \d+.\d+%', quality_info)
assert len(predicted_accuracy_info) == 1
assert len(f_measure_info) == 1
assert predicted_accuracy_info[0][-1] == '%' # percentage sign
assert f_measure_info[0][-1] == '%' # percentage sign
acc = re.findall(r'\d+\.?\d+', predicted_accuracy_info[0])[0]
f_measure = re.findall(r'\d+\.?\d+', f_measure_info[0])[0]
if search_info is not None:
# search_info is expected to be " Algorithm terminated successfully (time: 252ms, 188 descriptions
# tested, 69 nodes in the search tree)."
_ = re.findall(r'\d+ descriptions tested', search_info)
if len(_) == 0:
assert self.name == 'eltl'
else:
assert len(_) == 1
# Get the numbers
num_expression_tested = int(re.findall(r'\d+', _[0])[0])
else:
raise ValueError
# 100% into range between 1.0 and 0.0
return {'Prediction': best_concept_str, 'Accuracy': float(acc) * .01, 'F-measure': float(f_measure) * .01,
'NumClassTested': int(num_expression_tested)}
[docs]
@staticmethod
def train(dataset: List = None) -> None:
""" Dummy method, currently it does nothing."""
[docs]
def fit_from_iterable(self, dataset: List = None, max_runtime=None) -> List[Dict]:
"""Fit dl-learner model on a list of given positive and negative examples.
Args:
dataset: A list of tuple (s,p,n) where
s => string representation of target concept,
p => positive examples, i.e. s(p)=1 and
n => negative examples, i.e. s(n)=0.
max_runtime: Limit to stop the algorithm after n seconds.
Returns:
self.
"""
raise NotImplementedError
assert len(dataset) > 0
if max_runtime:
assert isinstance(max_runtime, int)
self.max_runtime = max_runtime
return [self.fit(pos=p, neg=n, max_runtime=self.max_runtime).best_hypothesis() for (s, p, n) in dataset]