Source code for academic_observatory_workflows.config
# Copyright 2020-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: James Diprose
import json
import os
import kubernetes
from observatory_platform.config import module_file_path
[docs]class Tag:
"""DAG tag."""
[docs] academic_observatory = "academic-observatory"
[docs] data_quality = "data-quality"
[docs]def project_path(*subpaths: str) -> str:
"""Make a path to a file or folder within the Academic Observatory Workflows project.
:param subpaths: any sub paths.
:return: a path to a file or folder.
"""
return os.path.join(construct_module_path("academic_observatory_workflows"), *subpaths)
[docs]def construct_module_path(*parts: str) -> str:
"""Constructs the full module path given parts of a path."""
module_path = ".".join(list(parts))
file_path = module_file_path(module_path)
if not os.path.exists(file_path):
raise FileNotFoundError(f"construct_module_path: directory {file_path} does not exist!")
return file_path
[docs]class TestConfig:
"""Common parameters for end to end and unit testing"""
[docs] gcp_project_id: str = os.getenv("TEST_GCP_PROJECT_ID")
[docs] gcp_data_location: str = os.getenv("TEST_GCP_DATA_LOCATION")
[docs] http_host_url: str = "host.minikube.internal"
[docs] ftp_host_url: str = "host.minikube.internal"
[docs] gke_image: str = "academic-observatory:test"
[docs] gke_namespace: str = "default"
[docs] gke_cluster_connection: dict = dict(
conn_id="gke_cluster",
conn_type="kubernetes",
extra=json.dumps(
{
"extra__kubernetes__namespace": "default",
# KUBE_CONFIG_DEFAULT_LOCATION defined in kubernets module as os.environ.get('KUBECONFIG', '~/.kube/config')
"extra__kubernetes__kube_config_path": kubernetes.config.kube_config.KUBE_CONFIG_DEFAULT_LOCATION,
"extra__kubernetes__context": "minikube",
}
),
)