Source code for academic_observatory_workflows.config

# Copyright 2020-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: James Diprose

import json
import os

import kubernetes

from observatory_platform.config import module_file_path


[docs]class Tag: """DAG tag."""
[docs] academic_observatory = "academic-observatory"
[docs] data_quality = "data-quality"
[docs]def project_path(*subpaths: str) -> str: """Make a path to a file or folder within the Academic Observatory Workflows project. :param subpaths: any sub paths. :return: a path to a file or folder. """ return os.path.join(construct_module_path("academic_observatory_workflows"), *subpaths)
[docs]def construct_module_path(*parts: str) -> str: """Constructs the full module path given parts of a path.""" module_path = ".".join(list(parts)) file_path = module_file_path(module_path) if not os.path.exists(file_path): raise FileNotFoundError(f"construct_module_path: directory {file_path} does not exist!") return file_path
[docs]class TestConfig: """Common parameters for end to end and unit testing"""
[docs] gcp_project_id: str = os.getenv("TEST_GCP_PROJECT_ID")
[docs] gcp_data_location: str = os.getenv("TEST_GCP_DATA_LOCATION")
[docs] http_host_url: str = "host.minikube.internal"
[docs] http_port: int = 5080
[docs] ftp_host_url: str = "host.minikube.internal"
[docs] ftp_port: int = 5021
[docs] gke_image: str = "academic-observatory:test"
[docs] gke_namespace: str = "default"
[docs] gke_cluster_connection: dict = dict( conn_id="gke_cluster", conn_type="kubernetes", extra=json.dumps( { "extra__kubernetes__namespace": "default", # KUBE_CONFIG_DEFAULT_LOCATION defined in kubernets module as os.environ.get('KUBECONFIG', '~/.kube/config') "extra__kubernetes__kube_config_path": kubernetes.config.kube_config.KUBE_CONFIG_DEFAULT_LOCATION, "extra__kubernetes__context": "minikube", } ), )