# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import re
from logging import getLogger
from typing import List, Tuple, Dict, Any, Union
from collections import namedtuple
from hdt import HDTDocument
from deeppavlov.core.common.file import load_pickle
from deeppavlov.core.commands.utils import expand_path
from deeppavlov.core.common.registry import register
log = getLogger(__name__)
[docs]@register('wiki_parser')
class WikiParser:
"""This class extract relations, objects or triplets from Wikidata HDT file"""
[docs] def __init__(self, wiki_filename: str, file_format: str = "hdt", lang: str = "@en", **kwargs) -> None:
"""
Args:
wiki_filename: file with Wikidata
file_format: format of Wikidata file
lang: Russian or English language
**kwargs:
"""
self.description_rel = "http://schema.org/description"
self.file_format = file_format
self.wiki_filename = str(expand_path(wiki_filename))
if self.file_format == "hdt":
self.document = HDTDocument(self.wiki_filename)
elif self.file_format == "pickle":
self.document = load_pickle(self.wiki_filename)
self.parsed_document = {}
else:
raise ValueError("Unsupported file format")
self.lang = lang
[docs] def __call__(self, parser_info_list: List[str], queries_list: List[Any]) -> List[Any]:
wiki_parser_output = []
for parser_info, query in zip(parser_info_list, queries_list):
if parser_info == "query_execute":
*query_to_execute, return_if_found = query
candidate_output = self.execute(*query_to_execute)
wiki_parser_output.append(candidate_output)
if return_if_found and candidate_output:
return wiki_parser_output
elif parser_info == "find_rels":
wiki_parser_output += self.find_rels(*query)
elif parser_info == "find_label":
wiki_parser_output.append(self.find_label(*query))
elif parser_info == "find_triplets":
if self.file_format == "hdt":
tr, c = self.document.search_triples(*query)
wiki_parser_output.append(list(tr))
else:
wiki_parser_output.append(self.document.get(query, {}))
elif parser_info == "parse_triplets" and self.file_format == "pickle":
for entity in query:
self.parse_triplets(entity)
wiki_parser_output.append("ok")
else:
raise ValueError("Unsupported query type")
return wiki_parser_output
def execute(self, what_return: List[str],
query_seq: List[List[str]],
filter_info: List[Tuple[str]] = None,
order_info: namedtuple = None) -> List[List[str]]:
"""
Let us consider an example of the question "What is the deepest lake in Russia?"
with the corresponding SPARQL query
"SELECT ?ent WHERE { ?ent wdt:P31 wd:T1 . ?ent wdt:R1 ?obj . ?ent wdt:R2 wd:E1 } ORDER BY ASC(?obj) LIMIT 5"
arguments:
what_return: ["?obj"]
query_seq: [["?ent", "http://www.wikidata.org/prop/direct/P17", "http://www.wikidata.org/entity/Q159"]
["?ent", "http://www.wikidata.org/prop/direct/P31", "http://www.wikidata.org/entity/Q23397"],
["?ent", "http://www.wikidata.org/prop/direct/P4511", "?obj"]]
filter_info: []
order_info: order_info(variable='?obj', sorting_order='asc')
"""
extended_combs = []
combs = []
if "qualifier" not in filter_info:
for n, query in enumerate(query_seq):
unknown_elem_positions = [(pos, elem) for pos, elem in enumerate(query) if elem.startswith('?')]
"""
n = 0, query = ["?ent", "http://www.wikidata.org/prop/direct/P17",
"http://www.wikidata.org/entity/Q159"]
unknown_elem_positions = ["?ent"]
n = 1, query = ["?ent", "http://www.wikidata.org/prop/direct/P31",
"http://www.wikidata.org/entity/Q23397"]
unknown_elem_positions = [(0, "?ent")]
n = 2, query = ["?ent", "http://www.wikidata.org/prop/direct/P4511", "?obj"]
unknown_elem_positions = [(0, "?ent"), (2, "?obj")]
"""
if n == 0:
combs = self.search(query, unknown_elem_positions)
# combs = [{"?ent": "http://www.wikidata.org/entity/Q5513"}, ...]
else:
if combs:
known_elements = []
extended_combs = []
for elem in query:
if elem in combs[0].keys():
known_elements.append(elem)
for comb in combs:
"""
n = 1
query = ["?ent", "http://www.wikidata.org/prop/direct/P31",
"http://www.wikidata.org/entity/Q23397"]
comb = {"?ent": "http://www.wikidata.org/entity/Q5513"}
known_elements = ["?ent"], known_values = ["http://www.wikidata.org/entity/Q5513"]
filled_query = ["http://www.wikidata.org/entity/Q5513",
"http://www.wikidata.org/prop/direct/P31",
"http://www.wikidata.org/entity/Q23397"]
new_combs = [["http://www.wikidata.org/entity/Q5513",
"http://www.wikidata.org/prop/direct/P31",
"http://www.wikidata.org/entity/Q23397"], ...]
extended_combs = [{"?ent": "http://www.wikidata.org/entity/Q5513"}, ...]
"""
known_values = [comb[known_elem] for known_elem in known_elements]
for known_elem, known_value in zip(known_elements, known_values):
filled_query = [elem.replace(known_elem, known_value) for elem in query]
new_combs = self.search(filled_query, unknown_elem_positions)
for new_comb in new_combs:
extended_combs.append({**comb, **new_comb})
combs = extended_combs
if combs:
if filter_info:
for filter_elem, filter_value in filter_info:
combs = [comb for comb in combs if filter_value in comb[filter_elem]]
if order_info and not isinstance(order_info, list) and order_info.variable is not None:
reverse = True if order_info.sorting_order == "desc" else False
sort_elem = order_info.variable
for i in range(len(combs)):
value_str = combs[i][sort_elem].split('^^')[0].strip('"')
if value_str.endswith("T00:00:00Z"):
value_str = value_str.strip("T00:00:00Z")
combs[i][sort_elem] = value_str
else:
combs[i][sort_elem] = float(value_str)
combs = sorted(combs, key=lambda x: x[sort_elem], reverse=reverse)
combs = [combs[0]]
if what_return[-1].startswith("count"):
combs = [[combs[0][key] for key in what_return[:-1]] + [len(combs)]]
else:
combs = [[elem[key] for key in what_return] for elem in combs]
return combs
def search(self, query: List[str], unknown_elem_positions: List[Tuple[int, str]]) -> List[Dict[str, str]]:
query = list(map(lambda elem: "" if elem.startswith('?') else elem, query))
subj, rel, obj = query
if self.file_format == "hdt":
triplets, c = self.document.search_triples(subj, rel, obj)
if rel == self.description_rel:
triplets = [triplet for triplet in triplets if triplet[2].endswith(self.lang)]
combs = [{elem: triplet[pos] for pos, elem in unknown_elem_positions} for triplet in triplets]
else:
if subj:
subj, triplets = self.find_triplets(subj, "forw")
triplets = [[subj, triplet[0], obj] for triplet in triplets for obj in triplet[1:]]
if obj:
obj, triplets = self.find_triplets(obj, "backw")
triplets = [[subj, triplet[0], obj] for triplet in triplets for subj in triplet[1:]]
if rel:
if rel == self.description_rel:
triplets = [triplet for triplet in triplets if triplet[1] == "descr_en"]
else:
rel = rel.split('/')[-1]
triplets = [triplet for triplet in triplets if triplet[1] == rel]
combs = [{elem: triplet[pos] for pos, elem in unknown_elem_positions} for triplet in triplets]
return combs
def find_label(self, entity: str, question: str) -> str:
entity = str(entity).replace('"', '')
if self.file_format == "hdt":
if entity.startswith("Q"):
# example: "Q5513"
entity = "http://www.wikidata.org/entity/" + entity
# "http://www.wikidata.org/entity/Q5513"
if entity.startswith("http://www.wikidata.org/entity/"):
labels, c = self.document.search_triples(entity, "http://www.w3.org/2000/01/rdf-schema#label", "")
# labels = [["http://www.wikidata.org/entity/Q5513", "http://www.w3.org/2000/01/rdf-schema#label",
# '"Lake Baikal"@en'], ...]
for label in labels:
if label[2].endswith(self.lang):
found_label = label[2].strip(self.lang).replace('"', '')
return found_label
elif entity.endswith(self.lang):
# entity: '"Lake Baikal"@en'
entity = entity[:-3]
return entity
elif "^^" in entity:
"""
examples:
'"1799-06-06T00:00:00Z"^^<http://www.w3.org/2001/XMLSchema#dateTime>' (date)
'"+1642"^^<http://www.w3.org/2001/XMLSchema#decimal>' (number)
"""
entity = entity.split("^^")[0]
for token in ["T00:00:00Z", "+"]:
entity = entity.replace(token, '')
entity = self.format_date(entity, question)
return entity
elif entity.isdigit():
return entity
if self.file_format == "pickle":
if entity:
if entity.startswith("Q"):
triplets = self.document.get(entity, {}).get("forw", [])
triplets = self.uncompress(triplets)
for triplet in triplets:
if triplet[0] == "name_en":
return triplet[1]
else:
entity = self.format_date(entity, question)
return entity
return "Not Found"
def format_date(self, entity, question):
date_info = re.findall("([\d]{3,4})-([\d]{1,2})-([\d]{1,2})", entity)
if date_info:
year, month, day = date_info[0]
if "how old" in question.lower():
entity = datetime.datetime.now().year - int(year)
elif day != "00":
date = datetime.datetime.strptime(f"{year}-{month}-{day}", "%Y-%m-%d")
entity = date.strftime("%d %B %Y")
else:
entity = year
return entity
entity = entity.lstrip('+-')
return entity
def find_alias(self, entity: str) -> List[str]:
aliases = []
if entity.startswith("http://www.wikidata.org/entity/"):
labels, cardinality = self.document.search_triples(entity,
"http://www.w3.org/2004/02/skos/core#altLabel", "")
aliases = [label[2].strip(self.lang).strip('"') for label in labels if label[2].endswith(self.lang)]
return aliases
def find_rels(self, entity: str, direction: str, rel_type: str = "no_type") -> List[str]:
rels = []
if self.file_format == "hdt":
if direction == "forw":
query = [f"http://www.wikidata.org/entity/{entity}", "", ""]
else:
query = ["", "", f"http://www.wikidata.org/entity/{entity}"]
triplets, c = self.document.search_triples(*query)
if rel_type != "no_type":
start_str = f"http://www.wikidata.org/prop/{rel_type}"
else:
start_str = "http://www.wikidata.org/prop/P"
rels = [triplet[1] for triplet in triplets if triplet[1].startswith(start_str)]
if self.file_format == "pickle":
triplets = self.document.get(entity, {}).get(direction, [])
triplets = self.uncompress(triplets)
rels = [triplet[0] for triplet in triplets if triplet[0].startswith("P")]
return rels
def uncompress(self, triplets: Union[str, List[List[str]]]) -> List[List[str]]:
if isinstance(triplets, str):
triplets = triplets.split('\t')
triplets = [triplet.strip().split(" ") for triplet in triplets]
return triplets
def parse_triplets(self, entity):
triplets = self.document.get(entity, {})
for direction in ["forw", "backw"]:
if direction in triplets:
dir_triplets = triplets[direction]
dir_triplets = self.uncompress(dir_triplets)
if entity in self.parsed_document:
self.parsed_document[entity][direction] = dir_triplets
else:
self.parsed_document[entity] = {direction: dir_triplets}
def find_triplets(self, subj: str, direction: str) -> Tuple[str, List[List[str]]]:
subj = subj.split('/')[-1]
if subj in self.parsed_document:
triplets = self.parsed_document.get(subj, {}).get(direction, [])
else:
triplets = self.document.get(subj, {}).get(direction, [])
triplets = self.uncompress(triplets)
return subj, triplets