# Copyright 2021-2022 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: James Diprose
from __future__ import annotations
import json
import os
import os.path
from typing import Dict
import pendulum
import requests
from airflow.exceptions import AirflowException
[docs]class Zenodo:
def __init__(self, host: str = "https://zenodo.org", access_token: str = None, timeout: int = 60):
self.host = host
self.access_token = access_token
self.timeout = timeout
[docs] def make_url(self, path: str):
# Remove last / from host
host = self.host
if self.host[-1] == "/":
host = self.host[:-1]
# Add leading / to path
if path[0] != "/":
path = f"/{path}"
return f"{host}{path}"
[docs] def get_versions(self, conceptrecid: int, all_versions: int = 0, size: int = 10, sort: str = "mostrecent"):
query = f"conceptrecid:{conceptrecid}"
return requests.get(
self.make_url("/api/deposit/depositions"),
params={
"q": query,
"all_versions": all_versions,
"access_token": self.access_token,
"sort": sort,
"size": size,
},
timeout=self.timeout,
)
[docs] def create_new_version(self, id: int):
url = self.make_url(f"/api/deposit/depositions/{id}/actions/newversion")
return requests.post(url, params={"access_token": self.access_token})
[docs] def get_deposition(self, id: int):
url = self.make_url(f"/api/deposit/depositions/{id}")
return requests.get(url, params={"access_token": self.access_token})
[docs] def delete_file(self, id: int, file_id: str):
url = self.make_url(f"/api/deposit/depositions/{id}/files/{file_id}")
return requests.delete(url, params={"access_token": self.access_token})
[docs] def update(self, id: int, data: Dict):
url = self.make_url(f"/api/deposit/depositions/{id}")
headers = {"Content-Type": "application/json"}
return requests.put(url, data=json.dumps(data), headers=headers, params={"access_token": self.access_token})
[docs] def upload_file(self, id: int, file_path: str):
url = self.make_url(f"/api/deposit/depositions/{id}/files")
data = {"name": os.path.basename(file_path)}
files = {"file": open(file_path, "rb")}
return requests.post(url, data=data, files=files, params={"access_token": self.access_token})
[docs] def publish(self, id: int):
url = self.make_url(f"/api/deposit/depositions/{id}/actions/publish")
return requests.post(url, params={"access_token": self.access_token})
[docs]def make_draft_version(zenodo: Zenodo, conceptrecid: int):
# Make new draft version
res = zenodo.get_versions(conceptrecid)
if res.status_code != 200:
raise AirflowException(f"zenodo.get_versions status_code {res.status_code}")
versions = res.json()
if len(versions) == 0:
raise AirflowException(f"make_draft_version: at least 1 version must exist")
latest = versions[0]
draft_id = latest["id"]
state = latest["state"]
if state == "done":
# If published then create a new draft
res = zenodo.create_new_version(draft_id)
if res.status_code != 201:
raise AirflowException(f"zenodo.create_new_version status_code {res.status_code}")
draft_id = int(res.json()["links"]["latest_draft"].split("/")[-1])
# Fetch draft deposition
res = zenodo.get_deposition(draft_id)
if res.status_code != 200:
raise AirflowException(f"zenodo.get_deposition status_code {res.status_code}")
# Update metadata
draft = res.json()
publication_date = pendulum.now().format("YYYY-MM-DD")
metadata = draft["metadata"]
metadata["publication_date"] = publication_date
metadata["version"] = publication_date
data = {"metadata": metadata}
res = zenodo.update(draft_id, data)
if res.status_code != 200:
raise AirflowException(f"zenodo.update status_code {res.status_code}")
[docs]def publish_new_version(zenodo: Zenodo, draft_id: int, file_path: str):
# Get full deposition which contains files
res = zenodo.get_deposition(draft_id)
if res.status_code != 200:
raise AirflowException(f"zenodo.get_deposition {res.status_code}")
draft = res.json()
# Delete existing files
for file in draft["files"]:
file_id = file["id"]
res = zenodo.delete_file(draft_id, file_id)
if res.status_code != 204:
raise AirflowException(f"zenodo.delete_file status_code {res.status_code}")
# Upload new file
res = zenodo.upload_file(draft_id, file_path)
if res.status_code != 201:
raise AirflowException(f"zenodo.upload_file status_code {res.status_code}")
# Publish
res = zenodo.publish(draft_id)
if res.status_code != 202:
raise AirflowException(f"zenodo.publish status_code {res.status_code}")