# Copyright 2020-2025 Curtin University
# Copyright 2024-2025 UC Curation Center (California Digital Library)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import glob
import json
import logging
import os
from collections import OrderedDict
from concurrent.futures import as_completed, ProcessPoolExecutor
from json import JSONEncoder
from pathlib import Path
from typing import Any, List, Tuple
import json_lines
import jsonlines
from bigquery_schema_generator.generate_schema import flatten_schema_map, SchemaGenerator
[docs]def yield_jsonl(file_path: str):
"""Return or yield row of a JSON lines file as a dictionary. If the file
is gz compressed then it will be extracted.
:param file_path: the path to the JSON lines file.
:return: generator.
"""
with json_lines.open(file_path) as file:
for row in file:
yield row
[docs]def merge_schema_maps(to_add: OrderedDict, old: OrderedDict) -> OrderedDict:
"""Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found
when from scanning through files into one large nested OrderedDict.
:param to_add: The incoming schema to add to the existing "old" schema.
:param old: The existing old schema with previously populated values.
:return: The old schema with newly added fields.
"""
schema_generator = SchemaGenerator()
if old:
# Loop through the fields to add to the schema
for key, value in to_add.items():
if key in old:
# Merge existing fields together.
old[key] = schema_generator.merge_schema_entry(old_schema_entry=old[key], new_schema_entry=value)
else:
# New top level field is added.
old[key] = value
else:
# Initialise it with first result if it is empty
old = to_add.copy()
return old
[docs]def flatten_schema(schema_map: OrderedDict) -> dict:
"""A quick trick using the JSON encoder and load string function to convert from a nested
OrderedDict object to a regular dictionary.
:param schema_map: The generated schema from SchemaGenerator.
:return schema: A Bigquery style schema."""
encoded_schema = JSONEncoder().encode(
flatten_schema_map(
schema_map,
keep_nulls=False,
sorted_schema=True,
infer_mode=True,
input_format="json",
)
)
return json.loads(encoded_schema)
[docs]def sort_schema(input_file: Path):
def sort_schema_func(schema):
# Sort schema entries by name and sort the fields of each entry by key_order
key_order = ["name", "type", "mode", "description", "fields"]
sorted_schema = [
{k: field[k] for k in key_order if k in field} for field in sorted(schema, key=lambda x: x["name"])
]
# Sort the fields recursively
for field in sorted_schema:
if field.get("type") == "RECORD" and "fields" in field:
field["fields"] = sort_schema_func(field["fields"])
return sorted_schema
# Load the JSON schema from a string
with open(input_file, mode="r") as f:
data = json.load(f)
# Sort the schema
sorted_json_schema = sort_schema_func(data)
# Save the schema
with open(input_file, mode="w") as f:
json.dump(sorted_json_schema, f, indent=2)
[docs]def list_jsonl_files(folder_path):
# Use glob to recursively find all .jsonl files
jsonl_files = glob.glob(os.path.join(folder_path, "**", "*.jsonl.gz"), recursive=True)
return jsonl_files
[docs]def remove_empty_dicts(arr):
return [item for item in arr if not (isinstance(item, dict) and not item)]
[docs]def remove_nulls_from_list_field(obj: dict, field: str):
if field in obj:
value = obj.get(field) or []
obj[field] = [x for x in value if x is not None]
[docs]def clean_string_or_bool(value):
if value is None or (isinstance(value, str) and value.strip() == "") or isinstance(value, bool):
return None
return value
[docs]def normalize_to_string_or_none(value):
"""Normalizes a value to a string or returns None if the value is None or an empty string."""
if value is None or (isinstance(value, str) and not value.strip()):
return None
return str(value)
[docs]def filter_non_empty_dicts(arr):
"""Filters out empty dictionaries from a list."""
return [item for item in arr if not (isinstance(item, dict) and not item)]
[docs]def normalize_affiliations_and_identifiers(obj, field):
"""Normalizes affiliation and name identifier fields in a list."""
items = obj.get(field, [])
for item in items:
for key in ["affiliation", "nameIdentifiers"]:
if isinstance(item.get(key), dict):
item[key] = filter_non_empty_dicts([item[key]])
else:
item[key] = filter_non_empty_dicts(item.get(key, []))
obj[field] = items
[docs]def normalize_identifier_fields(obj: dict, field: str, subfield: str):
"""Normalizes identifier fields to strings within a specified field."""
for item in obj.get(field, []):
if subfield in item:
item[subfield] = normalize_to_string_or_none(item[subfield])
[docs]def get_chunks(*, input_list: List[Any], chunk_size: int = 8) -> List[Any]:
"""Generator that splits a list into chunks of a fixed size.
:param input_list: Input list.
:param chunk_size: Size of chunks.
:return: The next chunk from the input list.
"""
n = len(input_list)
for i in range(0, n, chunk_size):
yield input_list[i : i + chunk_size]
[docs]def generate_schema_for_dataset(input_folder: Path, output_folder: Path, max_workers: int):
merged_schema_map = OrderedDict()
i = 1
files = list_jsonl_files(str(input_folder))
total_files = len(files)
for c, chunk in enumerate(get_chunks(input_list=files, chunk_size=500)):
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for input_path in chunk:
output_path = str(output_folder / Path(input_path).relative_to(input_folder)).replace(
".jsonl.gz", ".jsonl"
)
futures.append(executor.submit(transform, input_path, output_path))
for future in as_completed(futures):
input_path, empty_file, schema_map, schema_error = future.result()
if not empty_file:
if schema_error:
msg = f"File {input_path}: {schema_error}"
with open(input_folder / "errors.txt", mode="a") as f:
f.write(f"{msg}\n")
print(msg)
# Merge the schemas from each process. Each data file could have more fields than others.
try:
merged_schema_map = merge_schema_maps(to_add=schema_map, old=merged_schema_map)
except Exception as e:
print(f"merge_schema_maps error: {e}")
percent = i / total_files * 100
print(f"Progress: {i} / {total_files}, {percent:.2f}%")
i += 1
# Flatten schema from nested OrderedDicts to a regular Bigquery schema.
merged_schema = flatten_schema(schema_map=merged_schema_map)
# Save schema to file
generated_schema_path = os.path.join(input_folder, f"schema.json")
with open(generated_schema_path, mode="w") as f_out:
json.dump(merged_schema, f_out, indent=2)
sort_schema(Path(generated_schema_path))
[docs]def check_directory(path):
"""Check if the provided path is a valid directory."""
if not Path(path).is_dir():
raise argparse.ArgumentTypeError(f"The directory {path} does not exist.")
return Path(path)
if __name__ == "__main__":
"""Command line tool to transform DataCite and generate a first pass at a schema"""
[docs] parser = argparse.ArgumentParser(description="Process OpenAlex entities.")
# Required arguments
parser.add_argument("input_folder", type=check_directory, help="The input folder path")
parser.add_argument("output_folder", type=check_directory, help="The output folder path")
# Optional argument with default value from os.cpu_count()
parser.add_argument(
"--max_workers",
type=int,
default=os.cpu_count(),
help="The maximum number of workers (default: number of CPUs)",
)
# Parse the arguments
args = parser.parse_args()
generate_schema_for_dataset(args.input_folder, args.output_folder, args.max_workers)