# Copyright 2021-2022 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Aniek Roelofs, James Diprose
from __future__ import annotations
import logging
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple
import nltk
import requests
from airflow.exceptions import AirflowException
from observatory.platform.files import get_chunks
[docs]WIKI_MAX_TITLES = 20 # Set the number of titles for which wiki descriptions are retrieved at once, the API can return max 20 extracts.
[docs]def fetch_wikipedia_descriptions(wikipedia_urls: List[str]) -> List[Tuple[str, str]]:
"""Get the wikipedia descriptions for each entity (institution or country).
:param wikipedia_urls: a list of Wikipedia URLs.
:return: None.
"""
# Download 'punkt' resource, required when shortening wiki descriptions
nltk.download("punkt")
# Create list with dictionaries of max 20 ids + titles (this is wiki api max)
chunks = list(get_chunks(input_list=wikipedia_urls, chunk_size=WIKI_MAX_TITLES))
n_wikipedia_urls = len(wikipedia_urls)
n_chunks = len(chunks)
logging.info(f"Downloading {n_wikipedia_urls} wikipedia descriptions in {n_chunks} chunks.")
# Process each dictionary in separate thread to get wiki descriptions
futures, results = [], []
with ThreadPoolExecutor() as executor:
# Queue tasks
for chunk in chunks:
futures.append(executor.submit(fetch_wikipedia_descriptions_batch, chunk))
# Wait for results
for completed in as_completed(futures):
results += completed.result()
# Print progress
n_downloaded = len(results)
p_progress = n_downloaded / n_wikipedia_urls * 100
logging.info(f"Downloading descriptions {n_downloaded}/{n_wikipedia_urls}: {p_progress:.2f}%")
logging.info(f"Finished downloading wikipedia descriptions")
logging.info(f"Expected results: {n_wikipedia_urls}, actual num descriptions returned: {n_downloaded}")
if n_wikipedia_urls != n_downloaded:
raise Exception(f"Number of Wikipedia descriptions returned does not match the number of Wikipedia URLs sent")
return results
[docs]def fetch_wikipedia_descriptions_batch(wikipedia_urls: List) -> List[Tuple[str, str]]:
"""Fetch the wikipedia descriptions for a set of Wikipedia URLs
:param wikipedia_urls: a list of Wikipedia URLs.
:return: List with tuples (id, wiki description)
"""
titles = []
title_url_index = {}
for url in wikipedia_urls:
# Extract title from URL
title = url.split("wikipedia.org/wiki/")[-1].split("#")[0]
title_url_index[urllib.parse.unquote(title)] = url
# URL encode title if it is not encoded yet
if title == urllib.parse.unquote(title):
titles.append(urllib.parse.quote(title))
# Append title directly if it is already encoded and not empty
else:
titles.append(title)
# Confirm that there is a max of 20 titles, the limit for the wikipedia API
assert len(titles) <= 20
# Extract descriptions using the Wikipedia API
url = f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=extracts&titles={'%7C'.join(titles)}&redirects=1&exintro=1&explaintext=1"
response = requests.get(url)
if response.status_code != 200:
raise AirflowException(f"Unsuccessful retrieving wikipedia extracts, url: {url}")
response_json = response.json()
pages = response_json["query"]["pages"]
# Create mapping between redirected/normalized page title and original page title
redirects = {}
for title in response_json["query"].get("redirects", []):
redirects[title["to"]] = title["from"]
normalized = {}
for title in response_json["query"].get("normalized", []):
normalized[title["to"]] = title["from"]
# Resolve redirects referring to each other to 1 redirect
for key, value in redirects.copy().items():
if value in redirects:
redirects[key] = redirects.pop(value)
# Create mapping between Wikipedia URL and decoded page title.
results = []
for page_id, page in pages.items():
page_title = page["title"]
# Get page_title from redirected/normalized if it is present
page_title = redirects.get(page_title, page_title)
page_title = normalized.get(page_title, page_title)
# Link original url to description
wikipedia_url = title_url_index[urllib.parse.unquote(page_title)]
# Get description and clean up
description = page.get("extract", "")
if description:
description = remove_text_between_brackets(description)
description = shorten_text_full_sentences(description)
results.append((wikipedia_url, description))
return results
[docs]def remove_text_between_brackets(text: str) -> str:
"""Remove any text between (nested) brackets.
If there is a space after the opening bracket, this is removed as well.
E.g. 'Like this (foo, (bar)) example' -> 'Like this example'
:param text: The text to modify
:return: The modified text
"""
new_text = []
nested = 0
for char in text:
if char == "(":
nested += 1
new_text = new_text[:-1] if new_text and new_text[-1] == " " else new_text
elif (char == ")") and nested:
nested -= 1
elif nested == 0:
new_text.append(char)
return "".join(new_text).strip()
[docs]def shorten_text_full_sentences(text: str, *, char_limit: int = 300) -> str:
"""Shorten a text to as many complete sentences as possible, while the total number of characters stays below
the char_limit.
Always return at least one sentence, even if this exceeds the char_limit.
:param text: A string with the complete text
:param char_limit: The max number of characters
:return: The shortened text.
"""
# Create list of sentences
sentences = nltk.tokenize.sent_tokenize(text)
# Add sentences until char limit is reached
sentences_output = []
total_len = 0
for sentence in sentences:
total_len += len(sentence)
if (total_len > char_limit) and sentences_output:
break
sentences_output.append(sentence)
return " ".join(sentences_output)