# Copyright 2021-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations, annotations
import dataclasses
import glob
import json
import logging
import os
import os.path
import os.path
import os.path
import shutil
import statistics
from concurrent.futures import as_completed, ThreadPoolExecutor
from typing import Dict, List, Tuple, Union
from urllib.parse import urlparse
from zipfile import ZipFile
import google.cloud.bigquery as bigquery
import jsonlines
import math
import numpy as np
import pendulum
from airflow.exceptions import AirflowException
from glom import Coalesce, glom, SKIP
from jinja2 import Template
from academic_observatory_workflows.clearbit import clearbit_download_logo
from academic_observatory_workflows.config import project_path
from academic_observatory_workflows.github import trigger_repository_dispatch
from academic_observatory_workflows.oa_dashboard_workflow.institution_ids import INSTITUTION_IDS
from academic_observatory_workflows.oa_dashboard_workflow.release import OaDashboardRelease
from academic_observatory_workflows.wikipedia import fetch_wikipedia_descriptions
from academic_observatory_workflows.zenodo import make_draft_version, publish_new_version, Zenodo
from observatory_platform.airflow.airflow import get_airflow_connection_password
from observatory_platform.files import yield_jsonl
from observatory_platform.google.bigquery import (
bq_create_table_from_query,
bq_load_from_memory,
bq_run_query,
)
from observatory_platform.google.gcs import (
gcs_blob_name_from_path,
gcs_download_blob,
gcs_download_blobs,
gcs_upload_file,
)
from observatory_platform.jinja2_utils import render_template
[docs]INCLUSION_THRESHOLD = {"country": 5, "institution": 50}
[docs]END_YEAR = pendulum.now().year - 1
[docs]README = """# COKI Open Access Dataset
The COKI Open Access Dataset measures open access performance for {{ n_countries }} countries and {{ n_institutions }} institutions
and is available in JSON Lines format. The data is visualised at the COKI Open Access Dashboard: https://open.coki.ac/.
## Licence
[COKI Open Access Dataset](https://open.coki.ac/data/) © {{ year }} by [Curtin University](https://www.curtin.edu.au/)
is licenced under [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/)
## Citing
To cite the COKI Open Access Dashboard please use the following citation:
> Diprose, J., Hosking, R., Rigoni, R., Roelofs, A., Chien, T., Napier, K., Wilson, K., Huang, C., Handcock, R., Montgomery, L., & Neylon, C. (2023). A User-Friendly Dashboard for Tracking Global Open Access Performance. The Journal of Electronic Publishing 26(1). doi: https://doi.org/10.3998/jep.3398
If you use the website code, please cite it as below:
> James P. Diprose, Richard Hosking, Richard Rigoni, Aniek Roelofs, Kathryn R. Napier, Tuan-Yow Chien, Alex Massen-Hane, Katie S. Wilson, Lucy Montgomery, & Cameron Neylon. (2022). COKI Open Access Website. Zenodo. https://doi.org/10.5281/zenodo.6374486
If you use this dataset, please cite it as below:
> Richard Hosking, James P. Diprose, Aniek Roelofs, Tuan-Yow Chien, Lucy Montgomery, & Cameron Neylon. (2022). COKI Open Access Dataset [Data set]. Zenodo. https://doi.org/10.5281/zenodo.6399463
## Attributions
The COKI Open Access Dataset contains information from:
* [Open Alex](https://openalex.org/) which is made available under a [CC0 licence](https://creativecommons.org/publicdomain/zero/1.0/).
* [Crossref Metadata](https://www.crossref.org/documentation/metadata-plus/) via the Metadata Plus program. Bibliographic metadata is made available without copyright restriction and Crossref generated data with a [CC0 licence](https://creativecommons.org/share-your-work/public-domain/cc0/). See [metadata licence information](https://www.crossref.org/documentation/retrieve-metadata/rest-api/rest-api-metadata-license-information/) for more details.
* [Unpaywall](https://unpaywall.org/). The [Unpaywall Data Feed](https://unpaywall.org/products/data-feed) is used under license. Data is freely available from Unpaywall via the API, data dumps and as a data feed.
* [Research Organization Registry](https://ror.org/) which is made available under a [CC0 licence](https://creativecommons.org/share-your-work/public-domain/cc0/).
"""
[docs]def upload_institution_ids(*, release: OaDashboardRelease):
data = [{"ror_id": ror_id} for ror_id in INSTITUTION_IDS]
success = bq_load_from_memory(
release.institution_ids_table_id,
data,
schema_file_path=project_path("oa_dashboard_workflow", "schema", "institution_ids.json"),
)
if not success:
raise AirflowException(f"upload_institution_ids: error loading table {release.institution_ids_table_id}")
[docs]def create_entity_tables(
*, release: OaDashboardRelease, entity_types: list[str], start_year: int, end_year: int, inclusion_thresholds: dict
):
results, queries = [], []
# Query the country and institution aggregations
for entity_type in entity_types:
template_path = project_path("oa_dashboard_workflow", "sql", f"{entity_type}.sql.jinja2")
sql = render_template(
template_path,
agg_table_id=release.observatory_agg_table_id(entity_type),
start_year=start_year,
end_year=end_year,
ror_table_id=release.ror_table_id,
country_table_id=release.country_table_id,
institution_ids_table_id=release.institution_ids_table_id,
inclusion_threshold=inclusion_thresholds[entity_type],
)
dst_table_id = release.oa_dashboard_table_id(entity_type)
queries.append((sql, dst_table_id))
# Run queries, saving to BigQuery
for sql, dst_table_id in queries:
success = bq_create_table_from_query(sql=sql, table_id=dst_table_id)
results.append(success)
state = all(results)
if not state:
raise AirflowException("OaDashboardWorkflow.query failed")
[docs]def add_wiki_descriptions(*, release: OaDashboardRelease, entity_type: str):
logging.info(f"add_wiki_descriptions: {entity_type}")
# Get entities to fetch descriptions for
results = bq_run_query(
f"SELECT DISTINCT wikipedia_url FROM {release.oa_dashboard_table_id(entity_type)} WHERE wikipedia_url IS NOT NULL AND TRIM(wikipedia_url) != ''"
)
wikipedia_urls = [result["wikipedia_url"] for result in results]
# Fetch Wikipedia descriptions
results = fetch_wikipedia_descriptions(wikipedia_urls)
# Upload to BigQuery
data = [{"url": wikipedia_url, "text": description} for wikipedia_url, description in results]
desc_table_id = release.descriptions_table_id(entity_type)
success = bq_load_from_memory(
desc_table_id,
data,
schema_file_path=project_path("oa_dashboard_workflow", "schema", "descriptions.json"),
)
if not success:
raise AirflowException(f"Uploading data to {desc_table_id} table failed")
# Update entity table
template_path = project_path("oa_dashboard_workflow", "sql", "update_descriptions.sql.jinja2")
sql = render_template(
template_path,
entity_table_id=release.oa_dashboard_table_id(entity_type),
descriptions_table_id=desc_table_id,
)
bq_run_query(sql)
[docs]def download_assets(*, release: OaDashboardRelease, bucket_name: str):
# Download assets
# They are unzipped in this particular order so that images-base overwrites any files in images
blob_names = ["images.zip", "images-base.zip"]
for blob_name in blob_names:
# Download asset zip
file_path = os.path.join(release.download_folder, blob_name)
gcs_download_blob(bucket_name=bucket_name, blob_name=blob_name, file_path=file_path) # data_bucket
# Unzip into build
unzip_folder_path = os.path.join(release.build_path, "images")
with ZipFile(file_path) as zip_file:
zip_file.extractall(unzip_folder_path) # Overwrites by default
[docs]def download_institution_logos(*, release: OaDashboardRelease):
logging.info(f"download_logos: institution")
# Get entities to fetch descriptions for
entity_type = "institution"
results = bq_run_query(f"SELECT id, url FROM {release.oa_dashboard_table_id(entity_type)}")
entities = [(result["id"], result["url"]) for result in results]
# Update logos
data = fetch_institution_logos(release.build_path, entities)
# Upload to BigQuery
logos_table_id = release.logos_table_id(entity_type)
success = bq_load_from_memory(
logos_table_id,
data,
schema_file_path=project_path("oa_dashboard_workflow", "schema", "logos.json"),
)
if not success:
raise AirflowException(f"Uploading data to {logos_table_id} table failed")
# Update with entity table
template_path = project_path("oa_dashboard_workflow", "sql", "update_logos.sql.jinja2")
sql = render_template(
template_path,
entity_table_id=release.oa_dashboard_table_id(entity_type),
logos_table_id=logos_table_id,
)
bq_run_query(sql)
# Zip dataset
shutil.make_archive(os.path.join(release.out_path, "images"), "zip", os.path.join(release.build_path, "images"))
[docs]def export_tables(*, release: OaDashboardRelease, entity_types: list[str], download_bucket: str):
# Fetch data
blob_prefix = gcs_blob_name_from_path(release.download_folder)
results = []
for entity_type in entity_types:
destination_uri = f"gs://{download_bucket}/{blob_prefix}/{entity_type}-data-*.jsonl.gz"
table_id = release.oa_dashboard_table_id(entity_type)
success = bq_query_to_gcs(
query=f"SELECT * FROM {table_id} ORDER BY stats.p_outputs_open DESC",
# Uses a query to export data to make sure it is in the correct order
project_id=release.output_project_id,
destination_uri=destination_uri,
)
results.append(success)
if not all(results):
raise AirflowException("OaDashboardWorkflow.download failed")
[docs]def download_data(*, release: OaDashboardRelease, download_bucket: str):
blob_prefix = gcs_blob_name_from_path(release.download_folder)
state = gcs_download_blobs(
bucket_name=download_bucket,
prefix=blob_prefix,
destination_path=release.download_folder,
)
if not state:
raise AirflowException("OaDashboardWorkflow.download failed")
[docs]def make_draft_zenodo_version(*, zenodo_conn_id: str, zenodo_host: str, conceptrecid: int):
zenodo_token = get_airflow_connection_password(zenodo_conn_id)
zenodo = Zenodo(host=zenodo_host, access_token=zenodo_token)
make_draft_version(zenodo, conceptrecid)
[docs]def fetch_zenodo_versions(
*,
zenodo_conn_id: str,
zenodo_host: str,
conceptrecid: int,
):
# Get versions
zenodo_token = get_airflow_connection_password(zenodo_conn_id)
zenodo = Zenodo(host=zenodo_host, access_token=zenodo_token)
res = zenodo.get_versions(conceptrecid, all_versions=1)
if res.status_code != 200:
raise AirflowException(f"zenodo.get_versions status_code {res.status_code}")
zenodo_versions = [
ZenodoVersion(
pendulum.parse(version["created"]),
f"https://zenodo.org/record/{version['id']}/files/coki-oa-dataset.zip?download=1",
).to_dict()
for version in res.json()
]
return zenodo_versions
[docs]def build_datasets(
*,
release: OaDashboardRelease,
entity_types: list[str],
zenodo_versions: list[ZenodoVersion],
start_year: int,
end_year: int,
readme_text: str,
):
# Save OA Dashboard dataset
build_data_path = os.path.join(release.build_path, "data")
save_oa_dashboard_dataset(
release.download_folder, build_data_path, entity_types, zenodo_versions, start_year, end_year
)
shutil.make_archive(os.path.join(release.out_path, "data"), "zip", os.path.join(release.build_path, "data"))
# Save COKI Open Access Dataset
coki_dataset_path = os.path.join(release.transform_folder, "coki-oa-dataset")
save_zenodo_dataset(release.download_folder, coki_dataset_path, entity_types, readme_text)
shutil.make_archive(os.path.join(release.out_path, "coki-oa-dataset"), "zip", coki_dataset_path)
[docs]def publish_zenodo_version(
*,
release: OaDashboardRelease,
version: str,
bucket_name: str,
zenodo_conn_id: str,
zenodo_host: str,
conceptrecid: int,
):
zenodo_token = get_airflow_connection_password(zenodo_conn_id)
zenodo = Zenodo(host=zenodo_host, access_token=zenodo_token)
res = zenodo.get_versions(conceptrecid, all_versions=0)
if res.status_code != 200:
raise AirflowException(f"zenodo.get_versions status_code {res.status_code}")
draft = res.json()[0]
draft_id = draft["id"]
if draft["state"] != "unsubmitted":
raise AirflowException(f"Latest version is not a draft: {draft_id}")
# Download latest Zenodo version from bucket
file_name = "coki-oa-dataset.zip"
file_path = os.path.join(release.out_path, file_name)
blob_name = f"{version}/{file_name}"
success = gcs_download_blob(bucket_name=bucket_name, blob_name=blob_name, file_path=file_path)
if not success:
raise AirflowException(f"publish_zenodo_version: unable to download gs://{bucket_name}/{blob_name}")
# Publish new version
publish_new_version(zenodo, draft_id, file_path)
[docs]def upload_dataset(*, release: OaDashboardRelease, version: str, bucket_name: str):
# gcs_upload_file should always rewrite a new version of latest.zip if it exists
# object versioning on the bucket will keep the previous versions
for file_name in ["data.zip", "images.zip", "coki-oa-dataset.zip"]:
blob_name = f"{version}/{file_name}"
file_path = os.path.join(release.out_path, file_name)
gcs_upload_file(bucket_name=bucket_name, blob_name=blob_name, file_path=file_path, check_blob_hash=False)
[docs]def repository_dispatch(*, github_conn_id: str):
token = get_airflow_connection_password(github_conn_id)
event_types = ["data-update/develop", "data-update/staging", "data-update/production"]
for event_type in event_types:
trigger_repository_dispatch(
org="The-Academic-Observatory", repo_name="coki-oa-web", token=token, event_type=event_type
)
@dataclasses.dataclass
[docs]class ZenodoVersion:
[docs] release_date: pendulum.DateTime
@staticmethod
[docs] def from_dict(dict_: dict) -> ZenodoVersion:
return ZenodoVersion(release_date=pendulum.parse(dict_["release_date"]), download_url=dict_["download_url"])
[docs] def to_dict(self) -> Dict:
return {"release_date": self.release_date.strftime("%Y-%m-%d"), "download_url": self.download_url}
@dataclasses.dataclass
[docs]class Histogram:
[docs] def to_dict(self) -> Dict:
return {"data": self.data, "bins": self.bins}
@dataclasses.dataclass
[docs]class EntityHistograms:
[docs] p_outputs_open: Histogram
[docs] n_outputs_open: Histogram
[docs] def to_dict(self) -> Dict:
return {
"p_outputs_open": self.p_outputs_open.to_dict(),
"n_outputs": self.n_outputs.to_dict(),
"n_outputs_open": self.n_outputs_open.to_dict(),
}
@dataclasses.dataclass
[docs]class EntityStats:
[docs] histograms: EntityHistograms
[docs] def to_dict(self) -> Dict:
return {
"n_items": self.n_items,
"min": self.min,
"max": self.max,
"median": self.median,
"histograms": self.histograms.to_dict(),
}
@dataclasses.dataclass
[docs]class Stats:
[docs] zenodo_versions: List[ZenodoVersion]
[docs] institution: EntityStats
[docs] def to_dict(self) -> Dict:
return {
"start_year": self.start_year,
"end_year": self.end_year,
"last_updated": self.last_updated,
"zenodo_versions": [z.to_dict() for z in self.zenodo_versions],
"country": self.country.to_dict(),
"institution": self.institution.to_dict(),
}
[docs]def bq_query_to_gcs(*, query: str, project_id: str, destination_uri: str, location: str = "us") -> bool:
"""Run a BigQuery query and save the results on Google Cloud Storage.
:param query: the query string.
:param project_id: the Google Cloud project id.
:param destination_uri: the Google Cloud Storage destination uri.
:param location: the BigQuery dataset location.
:return: the status of the job.
"""
client = bigquery.Client()
# Run query
query_job: bigquery.QueryJob = client.query(query, location=location)
query_job.result()
# Create and run extraction job
source_table_id = f"{project_id}.{query_job.destination.dataset_id}.{query_job.destination.table_id}"
config = bigquery.ExtractJobConfig()
config.destination_format = bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON
config.compression = bigquery.Compression.GZIP
extract_job: bigquery.ExtractJob = client.extract_table(
source_table_id, destination_uri, job_config=config, location=location
)
extract_job.result()
return query_job.state == "DONE" and extract_job.state == "DONE"
[docs]def save_oa_dashboard_dataset(
download_folder: str,
build_data_path: str,
entity_types: List[str],
zenodo_versions: List[ZenodoVersion],
start_year: int,
end_year: int,
):
# Iterate over entity data files
index = []
stats_index = {}
for entity_type in entity_types:
entities_path = os.path.join(build_data_path, entity_type)
os.makedirs(entities_path, exist_ok=True)
entities = []
data_path = data_file_pattern(download_folder, entity_type)
for entity in yield_data_glob(data_path):
# Save entity file as JSON
entity_id = entity["id"]
output_path = os.path.join(entities_path, f"{entity_id}.json")
save_json(output_path, entity)
# Select subset of fields for index and stats calculations
subset = oa_dashboard_subset(entity)
entities.append(subset)
# Save index for entity type
path = os.path.join(build_data_path, f"{entity_type}.json")
save_json(path, entities)
stats_index[entity_type] = make_entity_stats(entities)
index += entities
# Save index
index_path = os.path.join(build_data_path, "index.json")
save_json(index_path, index)
# Make stats from index
last_updated = zenodo_versions[0].release_date.format("D MMMM YYYY")
country_stats = stats_index["country"]
institution_stats = stats_index["institution"]
stats = Stats(start_year, end_year, last_updated, zenodo_versions, country_stats, institution_stats)
output_path = os.path.join(build_data_path, "stats.json")
save_json(output_path, stats.to_dict())
logging.info(f"Saved stats data")
[docs]def save_zenodo_dataset(download_folder: str, dataset_path: str, entity_types: List[str], readme_text: str):
"""Save the COKI Open Access Dataset to a zip file.
:param download_folder: the path where the downloaded data files can be found.
:param dataset_path: the path to the folder where the dataset should be saved.
:param entity_types: the entity types.
:param readme_text: the readme text.
:return: None.
"""
os.makedirs(dataset_path, exist_ok=True)
# For each entity type, save the jsonl file
counts = {entity_type: 0 for entity_type in entity_types}
for entity_type in entity_types:
out_path = os.path.join(dataset_path, f"{entity_type}.jsonl")
with open(out_path, mode="w") as out_file:
with jsonlines.Writer(out_file) as writer:
data_path = data_file_pattern(download_folder, entity_type)
for entity in yield_data_glob(data_path):
subset = zenodo_subset(entity)
writer.write(subset)
counts[entity_type] += 1
# Save README
file_path = os.path.join(dataset_path, "README.md")
template = Template(readme_text, keep_trailing_newline=True)
rendered = template.render(
year=pendulum.now().year, n_countries=counts["country"], n_institutions=counts["institution"]
)
with open(file_path, mode="w") as f:
f.write(rendered)
[docs]def oa_dashboard_subset(item: Dict) -> Dict:
subset_spec = {
"id": "id",
"name": "name",
"logo_sm": "logo_sm",
"entity_type": "entity_type",
"region": "region",
"subregion": "subregion",
"country_code": Coalesce("country_code", default=SKIP),
"country_name": Coalesce("country_name", default=SKIP),
"institution_type": Coalesce("institution_type", default=SKIP),
"acronyms": "acronyms",
"stats": {
"n_outputs": "stats.n_outputs",
"n_outputs_open": "stats.n_outputs_open",
"n_outputs_black": "stats.n_outputs_black",
"p_outputs_open": "stats.p_outputs_open",
"p_outputs_publisher_open_only": "stats.p_outputs_publisher_open_only",
"p_outputs_both": "stats.p_outputs_both",
"p_outputs_other_platform_open_only": "stats.p_outputs_other_platform_open_only",
"p_outputs_closed": "stats.p_outputs_closed",
"p_outputs_black": "stats.p_outputs_black",
},
}
return glom(item, subset_spec)
[docs]def zenodo_subset(item: Dict):
subset_spec = {
"id": "id",
"name": "name",
"region": "region",
"subregion": "subregion",
"country_code": Coalesce("country_code", default=SKIP),
"country_name": Coalesce("country_name", default=SKIP),
"institution_type": Coalesce("institution_type", default=SKIP),
"start_year": "start_year",
"end_year": "end_year",
"acronyms": "acronyms",
"stats": "stats",
"years": "years",
}
return glom(item, subset_spec)
[docs]def save_json(path: str, data: Union[Dict, List]):
"""Save data to JSON.
:param path: the output path.
:param data: the data to save.
:return: None.
"""
with open(path, mode="w") as f:
json.dump(data, f, separators=(",", ":"))
[docs]def data_file_pattern(download_folder: str, entity_type: str):
return os.path.join(download_folder, f"{entity_type}-data-*.jsonl.gz")
[docs]def yield_data_glob(pattern: str) -> List[Dict]:
"""Load country or institution data files into a Pandas DataFrame.
:param pattern: the file path including a glob pattern.
:return: the list of dicts.
"""
file_paths = sorted(glob.glob(pattern))
for file_path in file_paths:
for entity in yield_jsonl(file_path):
yield entity
[docs]def make_entity_stats(entities: List[Dict]) -> EntityStats:
"""Calculate stats for entities.
:param entities: a list of entities.
:return: the entity stats object.
"""
p_outputs_open = np.array([entity["stats"]["p_outputs_open"] for entity in entities])
n_outputs = np.array([entity["stats"]["n_outputs"] for entity in entities])
n_outputs_open = np.array([entity["stats"]["n_outputs_open"] for entity in entities])
# Make median, min and max values
stats_median = dict(p_outputs_open=statistics.median(p_outputs_open))
stats_min = dict(
p_outputs_open=math.floor(float(np.min(p_outputs_open))),
n_outputs=int(np.min(n_outputs)),
n_outputs_open=int(np.min(n_outputs_open)),
)
stats_max = dict(
p_outputs_open=math.ceil(float(np.max(p_outputs_open))),
n_outputs=int(np.max(n_outputs)),
n_outputs_open=int(np.max(n_outputs_open)),
)
# Make histograms
data, bins = np.histogram(p_outputs_open, bins="auto")
hist_p_outputs_open = Histogram(data.tolist(), bins.tolist())
# Make log10 shifted histograms. Add 1 to values to make consistent with UI
data, bins = np.histogram(np.log10(n_outputs + 1), bins="auto")
hist_n_outputs = Histogram(data.tolist(), bins.tolist())
data, bins = np.histogram(np.log10(n_outputs_open + 1), bins="auto")
hist_n_outputs_open = Histogram(data.tolist(), bins.tolist())
return EntityStats(
n_items=len(entities),
min=stats_min,
max=stats_max,
median=stats_median,
histograms=EntityHistograms(
p_outputs_open=hist_p_outputs_open, n_outputs=hist_n_outputs, n_outputs_open=hist_n_outputs_open
),
)
[docs]def make_logo_url(*, entity_type: str, entity_id: str, size: str, fmt: str) -> str:
"""Make a logo url.
:param entity_type: the entity entity_type: country or institution.
:param entity_id: the entity id.
:param size: the size of the logo: s or l.
:param fmt: the format of the logo.
:return: the logo url.
"""
return f"logos/{entity_type}/{size}/{entity_id}.{fmt}"
[docs]def fetch_institution_logo(ror_id: str, url: str, size: str, width: int, fmt: str, build_path: str) -> Tuple[str, str]:
"""Get the path to the logo for an institution.
If the logo does not exist in the build path yet, download from the Clearbit Logo API tool.
If the logo does not exist and failed to download, the path will default to "unknown.svg".
:param ror_id: the institution's ROR id
:param url: the URL of the company domain + suffix e.g. spotify.com
:param size: the image size of the small logo for tables etc.
:param width: the width of the image.
:param fmt: the image format.
:param build_path: the build path for files of this workflow
:return: The ROR id and relative path (from build path) to the logo
"""
logo_path = "unknown.svg"
size_folder = os.path.join(build_path, "images", "logos", "institution", size)
os.makedirs(size_folder, exist_ok=True)
file_path = os.path.join(size_folder, f"{ror_id}.{fmt}")
if not os.path.isfile(file_path):
logging.debug(
f"fetch_institution_logo: downloading logo company_url={url}, file_path={file_path}, size={width}, fmt={fmt}"
)
clearbit_download_logo(company_url=url, file_path=file_path, size=width, fmt=fmt)
if os.path.isfile(file_path):
logging.debug(f"fetch_institution_logo: {file_path} exists, skipping download")
logo_path = make_logo_url(entity_type="institution", entity_id=ror_id, size=size, fmt=fmt)
return ror_id, logo_path
[docs]def clean_url(url: str) -> str:
"""Remove path and query from URL.
:param url: the url.
:return: the cleaned url.
"""
p = urlparse(url)
return f"{p.scheme}://{p.netloc}/"
[docs]def fetch_institution_logos(build_path: str, entities: List[Tuple[str, str]]) -> List[Dict]:
"""Update the index with logos, downloading logos if they don't exist.
:param build_path: the path to the build folder.
:param entities: the entities to process consisting of their id and url.
:return: None.
"""
# Get the institution logo and the path to the logo image
logging.info("Downloading logos using Clearbit")
total = len(entities)
results = {
entity_id: {"id": entity_id, "logo_sm": "unknown.svg", "logo_md": "unknown.svg", "logo_lg": "unknown.svg"}
for entity_id, url in entities
}
for size, width, fmt in [("sm", 32, "jpg"), ("md", 128, "jpg"), ("lg", 532, "png")]:
logging.info(f"Downloading logos: size={size}, width={width}, fmt={fmt}")
# Create jobs
futures = []
with ThreadPoolExecutor() as executor:
for entity_id, url in entities:
if url:
url = clean_url(url)
futures.append(
executor.submit(fetch_institution_logo, entity_id, url, size, width, fmt, build_path)
)
# Wait for results
n_downloaded = 0
for completed in as_completed(futures):
entity_id, logo_path = completed.result()
results[entity_id][f"logo_{size}"] = logo_path
n_downloaded += 1
# Print progress
p_progress = n_downloaded / total * 100
if n_downloaded % 100 == 0:
logging.info(f"Downloading logos {n_downloaded}/{total}: {p_progress:.2f}%")
logging.info("Finished downloading logos")
return list(results.values())