Voici la documentation : https://geoplateforme.pages.gpf-tech.ign.fr/documentation

Skip to content
Validations sur la source (99)
......@@ -136,3 +136,4 @@ _logs/
docs/misc/licenses.md
.vscode/
tests/**/output.json
......@@ -4,6 +4,10 @@ stages:
- build
- deploy
workflow:
rules:
- if: '$CI_PIPELINE_SOURCE != "merge_request_event"'
# Templates
include:
- template: Security/SAST.gitlab-ci.yml
......@@ -19,7 +23,8 @@ include:
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PROJECT_FOLDER: "src"
DOCKER_IMG_NAME: "gpf-md5-checker"
WITH_DOCKER_JOBS: "false"
# DOCKER_IMG_NAME: "gpf-md5-checker"
# Pip's cache doesn't store the python packages
# https://pip.pypa.io/en/stable/reference/pip_install/#caching
......@@ -42,9 +47,9 @@ git-hooks:
cache:
paths:
- ${PRE_COMMIT_HOME}
only:
refs:
- merge_requests
# only:
# refs:
# - merge_requests
before_script:
- apt install git
- python3 -m pip install -U pip
......@@ -75,15 +80,19 @@ sast:
test:
stage: test
image: python:3.9-slim-buster
only:
changes:
- "**/*.py"
- ".gitlab-ci.yml"
refs:
- main
- merge_requests
- tags
# filter disabled because sonar job requires it and always runs...
# only:
# changes:
# - "**/*.py"
# - ".gitlab-ci.yml"
# - sonar-project.properties
# refs:
# - main
# - merge_requests
# - tags
before_script:
- python3 -m pip install -U -r requirements.txt
- python3 -m pip install -U -r requirements/base.txt
- python3 -m pip install -U -r requirements/testing.txt
script:
- pytest
......@@ -104,9 +113,10 @@ test:documentation:
- "docs/"
- requirements/documentation.txt
- ".gitlab-ci.yml"
refs:
- merge_requests
# refs:
# - merge_requests
before_script:
- python -m pip install -U -r requirements/base.txt
- python -m pip install -U -r requirements/documentation.txt
script:
- sphinx-build -b html -d docs/_build/cache -j auto -q docs build/docs
......@@ -143,6 +153,7 @@ build:documentation:
- main
- tags
before_script:
- python -m pip install -U -r requirements/base.txt
- python -m pip install -U -r requirements/documentation.txt
script:
- sphinx-build -b html -d docs/_build/cache -j auto -q docs build/docs
......@@ -154,6 +165,11 @@ build:documentation:
when: always
# -- DEPLOYMENT JOBS -------------------------------------------------------------------
gitlab:container:
extends: .docker-build
variables:
DOCKER_IMG_NAME: "$CI_REGISTRY_IMAGE/gpf-md5-checker"
gitlab:pypi:
stage: deploy
image: python:3.9-slim-buster
......
......@@ -16,6 +16,14 @@ Unreleased
-->
## 0.4.0 - 2022-12-06
- Make package much more generic
- Load input parameters.json
- Generate an output.json file at the end of execution
- Test coverage to 83%
- CI: add Sonarqube configuration `sonar-project.properties`
## 0.3.1 - 2022-11-23
- CI: Sonarqube template requires test to run also on tags refs
......
......@@ -4,6 +4,10 @@
[![coverage report](https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/badges/main/coverage.svg)](https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/-/commits/main)
[![Quality Gate Status](https://sonar.gpf-tech.ign.fr/api/project_badges/measure?project=geoplateforme_scripts-verification_check-md5&metric=alert_status&token=squ_a1848e7921a855498181cc95806b2418da16a756)](https://sonar.gpf-tech.ign.fr/dashboard?id=geoplateforme_scripts-verification_check-md5)
[![Latest Release](https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/-/badges/release.svg)](https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/-/releases)
----
## Contribuer
......
......@@ -43,6 +43,7 @@ extensions = [
"sphinx.ext.extlinks",
"sphinx.ext.githubpages",
"sphinx.ext.intersphinx",
"sphinx.ext.napoleon",
# 3rd party
"myst_parser",
"sphinx_argparse_cli",
......
......@@ -13,3 +13,21 @@ Une fois l'outil installé, il est appelable en ligne de commande : *{{ cli_name
:prog: gpf-md5-checker
:title: Commandes et options
```
----
## Variables d'environnement
### Génériques
| Nom de la variable | Argument CLI correspondant | Valeur par défaut |
| :----------------- | :------------------------: | :---------------: |
| `GPF_INPUT_CONFIGURATION_FILENAME` | `--input-configuration-filename` | `parameters.json` |
| `GPF_UPLOAD_DIR` | `--upload-dir-name` | `upload` |
| `GPF_WORK_DIR` | `--work-dir-path` | |
### Spécifiques
| Nom de la variable | Argument CLI correspondant | Valeur par défaut |
| :----------------- | :------------------------: | :---------------: |
| `GPF_CHUNK_SIZE` | `--chunk-size` | `8192` |
typing-extensions>=4,<5 ; python_version < '3.11'
......@@ -44,16 +44,16 @@ setup(
version=__about__.__version__,
classifiers=[
"Development Status :: 4 - Beta",
"Environment :: Console",
"Intended Audience :: Information Technology",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Operating System :: Microsoft :: Windows :: Windows 10",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Operating System :: OS Independent",
"Topic :: Scientific/Engineering :: GIS",
"Topic :: System :: Installation/Setup",
],
# packaging
py_modules=["src"],
......
# sonar.host.url=https://sonar.gpf-tech.ign.fr
sonar.projectKey=geoplateforme_scripts-verification_check-md5
# Because Community Edition doesn't support multiple branches,
# you should only analyze your main branch.
# You can restrict analysis to your main branch by adding the branch name to the only parameter.
# only=main
# Python versions
sonar.python.version=3.9, 3.10, 3.11
# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
sonar.sources=src
# Encoding of the source code. Default is default system encoding
sonar.sourceEncoding=UTF-8
# Python configuration
sonar.language=python3
sonar.python.file.suffixes=py
sonar.python.coverage.reportPaths=coverage.xml
sonar.python.xunit.reportPath=junit/test-results.xml
sonar.coverage.exclusions=**__init__**,tests/**,*.py,docs/**
sonar.exclusions=*.xml,doc/**
#! python3 # noqa: E265
"""
Metadata bout the package to easily retrieve informations about it.
See: https://packaging.python.org/guides/single-sourcing-package-version/
"""Metadata bout the package to easily retrieve informations about it.
See: https://packaging.python.org/guides/single-sourcing-package-version/
"""
from datetime import date
......@@ -35,12 +35,12 @@ __uri_homepage__ = (
"https://geoplateforme.pages.gpf-tech.ign.fr/scripts-verification/check-md5/"
)
__uri_repository__ = (
"https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5"
"https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/"
)
__uri_tracker__ = "https://gitlab.gpf-tech.ign.fr/geoplateforme/scripts-verification/check-md5/issues/"
__uri_tracker__ = f"{__uri_repository__}issues/"
__uri__ = __uri_repository__
__version__ = "0.3.1"
__version__ = "0.4.0"
__version_info__ = tuple(
[
int(num) if num.isdigit() else num
......@@ -52,10 +52,9 @@ __version_info__ = tuple(
__cli_usage__ = (
"Le programme va vérifier les hash md5 contenu dans le dossier `GPF_WORK_DIR/upload` "
"Le fichier <fichier.md5> peut contenir plusieurs hash.\n"
"Chaque ligne doit être de la forme :"
"<hash_md5> <filename>"
"Entre le hash md5 et le fichier se trouve deux espaces."
"Pour maximiser la compatibilité, l'algorithme va rechercher si le fichier contient"
"32 caractères pour le hash md5 puis du texte."
"Chaque ligne doit être de la forme : <hash_md5> <filename>\n\n"
"Entre le hash md5 et le fichier se trouve deux espaces. "
"Pour maximiser la compatibilité, l'algorithme va rechercher si le fichier contient "
"32 caractères pour le hash md5 puis du texte.\n"
"Le programme s'occupe de convertir les fins de fichier Windows en unix."
)
#! python3 # noqa: E265
"""Package level."""
# submodules
from .__about__ import __version__ # noqa: F401
#! python3 # noqa: E265
"""Main CLI entrypoint.
"""
"""Main CLI entrypoint."""
# standard lib
import argparse
import logging
import sys
from datetime import datetime
from os import getenv
from pathlib import Path
from typing import List
# package
from src.__about__ import (
__author__,
__cli_usage__,
......@@ -19,15 +21,19 @@ from src.__about__ import (
__uri_homepage__,
__version__,
)
# package
from src.constants import arg_type_path_folder
from src.constants import Status, arg_type_path_folder
from src.md5sum import run
from src.orchestrator.check_livraison import check_livraison_structure
from src.orchestrator.models import GpfOrchestratorParameters, OutputDataStructure
from src.utils.dict_counter import count_dict_values
# ############################################################################
# ########## MAIN ################
# ################################
def main(command_line_arguments: List[str] = None):
"""Main CLI entrypoint"""
def main(argv: List[str] = None):
"""Main CLI entrypoint."""
# create the top-level parser
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
......@@ -55,7 +61,9 @@ def main(command_line_arguments: List[str] = None):
version=__version__,
)
# generic
parser.add_argument(
"-w",
"--workdir",
"--work-dir-path",
dest="work_dir_path",
......@@ -65,6 +73,7 @@ def main(command_line_arguments: List[str] = None):
)
parser.add_argument(
"-u",
"--uploaddir",
"--upload-dir-name",
dest="upload_dir_name",
......@@ -73,21 +82,31 @@ def main(command_line_arguments: List[str] = None):
default=getenv("GPF_UPLOAD_DIR", "upload"),
)
parser.add_argument(
"-c",
"--input-configuration-filename",
dest="input_configuration_file_name",
type=str,
help="Name (not the path) of the input configuration file.",
default=getenv("GPF_INPUT_CONFIGURATION_FILENAME", "parameters.json"),
)
# specific
parser.add_argument(
"--chunk-size",
dest="chunk_size",
type=int,
help="Input working directory. Must exist.",
help="Size of the chunk-data in octets to load in memory.",
default=getenv("GPF_CHUNK_SIZE", 8192),
)
# -- PARSE PASSED ARGUMENTS --
# get passed args and force print help if none
args = parser.parse_args(None if sys.argv[1:] else ["-h"])
# args = parser.parse_args(None if sys.argv[1:] else ["-h"])
# just get passed args
# args = parser.parse_args()
args = parser.parse_args(argv)
# set log level depending on verbosity argument
if 0 < args.verbosity < 4:
......@@ -111,14 +130,113 @@ def main(command_line_arguments: List[str] = None):
logger = logging.getLogger(__title_clean__)
logger.debug(f"Log level set: {logging.getLevelName(args.verbosity)}")
# Run
run(
work_dir=args.work_dir_path,
# -- RUN LOGIC --
# check livraison
if not check_livraison_structure(
work_dir_path=args.work_dir_path,
upload_dir_name=args.upload_dir_name,
chunk_size=args.chunk_size,
in_json_file_name=args.input_configuration_file_name,
):
error_message = (
"Upload (aka livraison) doesn't complies with the expected structure."
f"{args.work_dir_path}. Check the job logs for more details."
)
logger.error(error_message)
failed_output = OutputDataStructure(
executionId=getenv(
"CI_JOB_ID",
f"{__title_clean__}-{__version__}-{datetime.now():%Y-%m-%d_%H%M%s}",
),
status=Status.ERROR,
failures=["Bad upload (livraison) structure"],
trace=error_message,
)
try:
out_file = Path(args.work_dir_path, "output.json")
out_file.parent.mkdir(parents=True, exist_ok=True)
with out_file.open("w", encoding="UTF8") as wf:
wf.write(failed_output.to_json())
except Exception as critical_error:
# probably that the work dir is not writable, let's try to write somewhere
# else and to the stdout
logger.critical(f"Unable to write the output file. Trace: {critical_error}")
sys.exit(error_message)
# open configuration file
try:
parameters = GpfOrchestratorParameters.from_json(
Path(args.work_dir_path, args.input_configuration_file_name)
)
except Exception as error:
error_message = (
"Reading the input configuration file "
f"({args.work_dir_path}/{args.input_configuration_file_name} failed. "
f"Trace: {error}"
)
logger.error(error_message)
failed_output = OutputDataStructure(
executionId=getenv(
"CI_JOB_ID",
f"{__title_clean__}-{__version__}-{datetime.now():%Y-%m-%d_%H%M%s}",
),
status=Status.ERROR,
failures=["Bad input configuration file."],
trace=error_message,
)
out_file = Path(args.work_dir_path, "output.json")
with out_file.open("w", encoding="UTF8") as wf:
wf.write(failed_output.to_json())
sys.exit(error_message)
# create output object
result_output = OutputDataStructure(executionId=parameters.executionId)
# Run
try:
run_result = run(
work_dir=args.work_dir_path,
upload_dir_name=args.upload_dir_name,
chunk_size=args.chunk_size,
)
if run_result[0] != 0:
result_output.status = Status(run_result[0] % 2)
result_output.failures = run_result[1]
except Exception as error:
logger.error(
f"Running {__title__} (version {__version__}) failed. Trace: {error}"
)
result_output.failures = run_result[1]
result_output.status = Status.ERROR
result_output.trace = error
# write result into output file
out_file = Path(args.work_dir_path, "output.json")
with out_file.open("w", encoding="UTF8") as wf:
wf.write(result_output.to_json())
# in debug mode, print ouput to stdout
ct_success, ct_failure, ct_error = (
count_dict_values(run_result[1], Status.SUCCESS.name),
count_dict_values(run_result[1], Status.FAILURE.name),
count_dict_values(run_result[1], Status.ERROR.name),
)
# TODO: make this kind of output a generic function
report_term = f"""Execution report:
{'-'*40}
Success: {ct_success:8}
Failure: {ct_failure:8}
Error: {ct_error:10}
{'-'*40}
Total: {ct_success+ct_failure+ct_error:10} tested files.
"""
logger.debug(report_term)
# -- Stand alone execution
if __name__ == "__main__":
sys.exit(main()) # required by unittest
main() # required by unittest
......@@ -8,6 +8,9 @@ from enum import Enum
from pathlib import Path
from typing import Union
# package
from src.utils.check_path import check_path
class Status(Enum):
"""Output status for GPF orchestrator."""
......@@ -30,28 +33,16 @@ def arg_type_path_folder(input_path: Union[Path, str]) -> Path:
Returns:
Union[Path, ArgumentTypeError]: _description_
"""
if not isinstance(input_path, (Path, str)):
if not check_path(
input_path=input_path,
must_exists=True,
must_be_a_folder=True,
must_be_readable=True,
must_be_writable=True,
raise_error=False,
):
raise ArgumentTypeError(
TypeError(
"Input path must be a valid path as "
f"pathlib.path or str, not {type(input_path)}."
)
f"{input_path} is not a valid folder path. Check the logs."
)
if isinstance(input_path, str):
try:
input_path = Path(input_path)
except Exception as exc:
raise ArgumentTypeError(
f"Converting {input_path} into Path failed. Trace: {exc}"
)
# check the path exists
if not input_path.exists():
raise ArgumentTypeError(f"{input_path.resolve()} doesn't exist.")
# check if it's a folder
if not input_path.is_dir():
raise ArgumentTypeError(f"{input_path.resolve()} is not a folder.")
return input_path
return Path(input_path)
......@@ -7,15 +7,15 @@
# standard lib
import hashlib
import json
import logging
import os
from collections import namedtuple
from os import getenv
from pathlib import Path
from typing import Tuple
# package
from src.constants import Status
from src.utils.check_path import check_path
# -- GLOBALS
......@@ -24,32 +24,14 @@ logger = logging.getLogger(__name__)
# -- FUNCTIONS
def testFilename(filename: str) -> bool:
"""
Simple méthode pour convertir une chaîne en Path
et vérifier si c'est un fichier.
"""
path = Path(filename)
return path.is_file()
def testDirname(dirname: str) -> bool:
"""
Simple méthode pour convertir une chaîne en Path
et vérifier si c'est un dossier.
"""
path = Path(dirname)
return path.is_dir()
def generatemd5sum(filename: str, chunksize: int = 8192) -> str:
def generate_md5_sum(filename: str, chunksize: int = 8192) -> str:
"""Génère un hash md5 du fichier filename.
Example:
Exemple (pour un fichier contenant le texte "md5")
`echo "md5" > /tmp/md5.txt`
>>> generatemd5sum("/tmp/md5.txt")
>>> generate_md5_sum("/tmp/md5.txt")
'772ac1a55fab1122f3b369ee9cd31549'
Preconditions:
......@@ -76,31 +58,21 @@ def validate(filename: str, md5digest: str, chunksize: int = 8192) -> Status:
Status.FAILURE si le hash ne correspond pas avec celui calculé
Status.SUCCESS autrement
"""
if not testFilename(filename):
if not check_path(
input_path=filename, must_be_a_file=True, must_exists=True, raise_error=False
):
logger.error(
f"{filename}: TECHNICAL ERROR: le fichier {filename} n'existe pas."
)
return Status.ERROR
result = generatemd5sum(filename, chunksize) == md5digest
result = generate_md5_sum(filename, chunksize) == md5digest
status_return = Status.SUCCESS if result is True else Status.FAILURE
logger.debug(f"{filename}: {status_return.name}")
return status_return
def countDictValues(d: dict, val) -> int:
"""
Compte le nombre de valeurs val d'un dictionnaire d
"""
result = 0
for f in d.values():
for rlist in f:
if rlist.status == val:
result += 1
return result
def checkMD5File(filename: str, status: dict, chunksize: int = 8192) -> int:
def check_md5_file(filename: Path, status: dict, chunksize: int = 8192) -> int:
"""Vérifie un fichier *.md5.
Ce genre de fichier est classiquement géneré par l'utilitaire
......@@ -152,31 +124,41 @@ def checkMD5File(filename: str, status: dict, chunksize: int = 8192) -> int:
def run(
work_dir: Path = None,
work_dir: Path,
upload_dir_name: str = "upload",
chunk_size: int = 8192,
) -> int:
) -> Tuple[int, dict]:
"""Main function running the logic.
Args:
work_dir (Path): Input working directory. The folder must exist.
upload_dir_name (str, optional): Name (not the path) of the upload directory. \
Defaults to "upload".
chunk_size (int, optional): Size of the chunk-data in octets to load in memory. \
Defaults to 8192.
Raises:
ValueError: _description_
TypeError: _description_
ValueError: _description_
FileExistsError: _description_
exc: _description_
Returns:
0 indique un SUCCESS
1 pour indiquer qu'il y a eu au moins une erreur d'un calcul md5
2 pour indiquer qu'il y a eu au moins une erreur technique
3 pour indiquer qu'il y a eu au moins une fois les deux erreurs
int: 0 is SUCCESS \
- 1: at least one md5 error calculation \
- 2: at least one technical error \
- 3: at least both md5 calculation and technical error.
"""
# checks
if not isinstance(work_dir, Path):
raise TypeError(
f"work_dir must be a pathlib.Path instance, not {type(work_dir)} ({work_dir})."
)
if not work_dir.is_dir():
raise ValueError(
f"work_dir must be a valid folder path. {work_dir.resolve()} is not."
)
if not work_dir.exists():
raise FileExistsError(f"work_dir folder doesn't exist: {work_dir.resolve()}.")
if not check_path(
input_path=work_dir,
must_be_a_folder=True,
must_exists=True,
must_be_readable=True,
raise_error=False,
):
raise ValueError(f"work_dir must be a valid folder path. {work_dir} is not.")
if not isinstance(upload_dir_name, str):
raise TypeError(f"work_dir must be a str instance, not {type(work_dir)}.")
......@@ -202,35 +184,19 @@ def run(
for entry in it:
if entry.name.endswith(".md5") and entry.is_file():
logger.debug(f"Traitement de {entry.name} :")
result |= checkMD5File(
result |= check_md5_file(
filename=Path(upload_dir_path) / entry.name,
status=status,
chunksize=chunk_size,
)
# TODO: Où exporte-t-on le fichier ?
try:
with open(Path(work_dir, "output.json"), "w") as outfile:
json.dump(status, outfile, indent="\t", sort_keys=True)
outfile.write("\n")
except EnvironmentError as exc:
logger.error(f"Impossible d'enregistrer le résultat. TECHNICAL ERROR: {exc}")
raise exc
success = countDictValues(status, Status.SUCCESS.name)
failure = countDictValues(status, Status.FAILURE.name)
error = countDictValues(status, Status.ERROR.name)
logger.debug(f"Testés : {success + failure + error}")
logger.debug(f"SUCCESS : {success}")
logger.debug(f"FAILURE : {failure}")
logger.debug(f"ERROR : {error}")
return result
return result, status
# -- Stand alone execution
if __name__ == "__main__":
from os import getenv
print(
run(
work_dir=getenv("GPF_WORK_DIR", Path("./tests")),
......
#! python3 # noqa: E265
"""Modules related to the GPF orchestrator."""
#! python3 # noqa: E265
"""Check an upload."""
# -- IMPORTS
# standard lib
import logging
from pathlib import Path
from typing import Iterable
# package
from src.orchestrator.exceptions import UploadNotEnoughFiles
from src.utils.check_path import check_path
# -- GLOBALS
# logs
logger = logging.getLogger(__name__)
# -- FUNCTIONS
def check_minimum_file(
input_path: Path,
minimum: int = 1,
ignore_extensions: Iterable = (".md5",),
raise_error: bool = True,
) -> bool:
"""Check if the input folder path contains at least {minimum} file inside, \
beside {ignore_extensions}.
Args:
input_path (Path): folder path to look into.
minimum (int, optional): minimum number of files. Defaults to 1.
ignore_extensions (Iterable, optional): file extensions to ignore. Defaults to (".md5",).
raise_error (bool, optional): if True, it raises an exception. Defaults to True.
Raises:
UploadNotEnoughFiles: when the folder contains less than the minimum set and \
raise_error is False
Returns:
bool: True if the minimum of files is reached.
"""
listing_files = [
f for f in input_path.rglob(".") if f.suffix not in ignore_extensions
]
if len(listing_files) < minimum:
error_message = (
f"Folder {input_path} doesn't contain enough files with "
f"extension other: {''.join(ignore_extensions)}."
)
if not raise_error:
logger.error(error_message)
return False
else:
raise UploadNotEnoughFiles(error_message)
else:
return True
def check_livraison_structure(
work_dir_path: Path,
upload_dir_name: str = "upload",
in_json_file_name: str = "parameters.json",
) -> bool:
"""Check that the upload (livraison in Entrepôt terminology) is valid. Mainly file \
and folder structure.
Args:
work_dir (Path): path to the work_dir folder.
upload_dir_name (str, optional): name of the subfolder to look for. \
Must be a subfolder of work_dir. Defaults to "upload".
in_json_file_name (str, optional): JSON filename where parameters are stored. \
Defaults to "parameters.json".
Returns:
bool: True if everything is OK
"""
result = all(
[
# check if the work_dir is a folder which exists and is r/w
check_path(
input_path=work_dir_path,
must_be_a_folder=True,
must_be_readable=True,
must_exists=True,
must_be_writable=True,
raise_error=False,
),
# check if there is a subfolder in the work_dir
check_path(
input_path=work_dir_path / upload_dir_name,
must_be_a_folder=True,
must_be_readable=True,
must_exists=True,
must_be_writable=True,
raise_error=False,
),
# check if the input JSON file exists in the work_dir
check_path(
input_path=work_dir_path / in_json_file_name,
must_be_a_file=True,
must_be_readable=True,
must_exists=True,
raise_error=False,
),
# check if the input JSON file exists in the work_dir
check_minimum_file(
input_path=work_dir_path / upload_dir_name,
minimum=1,
ignore_extensions=(".md5",),
raise_error=False,
),
]
)
return result
#! python3 # noqa: E265
"""Custom exceptions."""
class OrchestratorBaseException(Exception):
"""Package parent custom exception."""
pass
class UploadNotEnoughFiles(OrchestratorBaseException):
"""When there is not enough compatible files in an upload."""
pass
#! python3 # noqa: E265
"""Object models related to the GPF orchestrator."""
from .output import OutputDataStructure # noqa: F401
from .parameters import GpfOrchestratorParameters # noqa: F401
#! python3 # noqa: E265
"""Output object with related sugar."""
# standard
import json
from dataclasses import dataclass, field
from typing import Iterable, Optional
# package
from src.constants import Status
# ############################################################################
# ########## Classes ###############
# ##################################
@dataclass
class OutputDataStructure:
"""Output data structure.
Example:
.. code-block:: python
output = OutputDataStructure(
executionId="1231544456-1546546-164565",
status=Status.FAILURE,
failures=("data.shp", "sirene.csv"),
trace="[USER] No SRS, no cry",
)
"""
executionId: str
status: Status = Status.SUCCESS
failures: Iterable[Optional[str]] = field(default=(None,))
trace: str = None
def to_dict(self) -> dict:
"""Convert dataclass object into dictionary. Supersedes dataclass.asdict to \
handle enum value.
Returns:
dict: object as dictionary
Example:
.. code-block:: python
output = OutputDataStructure(
executionId="1231544456-1546546-164565",
)
print(output.to_dict())
"""
return {
"executionId": self.executionId,
"status": self.status.value,
"failures": self.failures,
"trace": self.trace,
}
def to_json(self, status_as_name: bool = True, **kwargs) -> str:
"""Supersedes json.dumps using the dictionary returned by to_dict().
Args:
status_as_name (bool, optional): if True dumps the status as name. If not, \
as int. Defaults to True.
Returns:
str: object serialized as JSON string
Example:
.. code-block:: python
from pathlib import Path
# create output file
out_file = Path("/tmp/output.json")
out_file.parent.mkdir(parents=True, exist_ok=True)
# write into the file passing extra parameters to json.dumps
with out_file.open("w", encoding="UTF8") as wf:
wf.write(output.to_json(sort_keys=True))
"""
obj_as_dict = self.to_dict()
# handle cases when the full exception is passed
if isinstance(obj_as_dict.get("trace"), Exception):
exc = obj_as_dict.get("trace")
if isinstance(exc.args, tuple) and len(exc.args):
exc = exc.args[0]
else:
exc = getattr(exc, "message", repr(exc))
obj_as_dict["trace"] = exc
# status as name or int
if status_as_name:
obj_as_dict["status"] = self.status.name
return json.dumps(obj_as_dict, **kwargs)
# #############################################################################
# ##### Stand alone program ########
# ##################################
if __name__ == "__main__":
"""Standalone execution."""
import datetime
from dataclasses import asdict
from pathlib import Path
from src.__about__ import __title_clean__
output = OutputDataStructure(
executionId="1231544456-1546546-164565",
status=Status.FAILURE,
failures=("data.shp", "sirene.csv"),
trace="[USER] No SRS, no cry",
)
print(output)
print(asdict(output), output.to_json)
now = datetime.datetime.now()
out_file = Path(f"/tmp/{__title_clean__}/output_{now:%Y-%m-%d_%H%M%s}.json")
out_file.parent.mkdir(parents=True, exist_ok=True)
with out_file.open("w", encoding="UTF8") as wf:
wf.write(output.to_json(sort_keys=True))
#! python3 # noqa: E265
"""
Model definition for GPF orchestrator parameters and related sugar.
Author: Julien Moura (Oslandia)
"""
# #############################################################################
# ########## Libraries #############
# ##################################
# standard
import json
import logging
from pathlib import Path
from sys import version_info
from typing import Any, Dict
# Imports depending on Python version
if version_info[1] < 11:
from typing_extensions import Self
else:
from typing import Self
# Package
from src.utils.check_path import check_path
# #############################################################################
# ########## Globals ###############
# ##################################
# logs
logger = logging.getLogger(__name__)
# #############################################################################
# ########## Classes ###############
# ##################################
class GpfOrchestratorParameters:
"""Object definition for GPF orchestrator parameters."""
# optional mapping on attributes names
ATTR_MAP = {}
def __init__(
self,
executionId: str = None,
userId: str = None,
inputParameters: dict = None,
targetParameters: Dict[dict, Any] = None,
technicalParameters: Dict[dict, Any] = None,
# internal
json_ref_path: Path = None,
loaded_from_json: bool = False,
# extra
**kwargs,
):
"""Initialize an orchestrator parameters object."""
# default values for immutable attributes
self.loaded_from_json = loaded_from_json
# default values for attributes/properties that can be get/set
self._executionId = None
self._userId = None
self._json_ref_path = None
# if values have been passed, so use them as objects attributes.
# attributes are prefixed by an underscore '_'
if executionId:
self._executionId = executionId
if userId:
self._userId = userId
if inputParameters:
self._inputParameters = inputParameters
if targetParameters:
self._targetParameters = targetParameters or None
if technicalParameters:
self._technicalParameters = technicalParameters
if json_ref_path:
self._json_ref_path = json_ref_path
@classmethod
def from_json(cls, in_json_path: Path) -> Self:
"""Load object from a JSON file.
:param Path in_json_path: path to the json file
:return Self: object with attributes filled from JSON.
:example:
.. code-block:: python
parameters = GpfOrchestratorParameters.from_json(
Path("workd_dir/parameters.json")
)
print(GpfOrchestratorParameters.executionId)
"""
# checks
check_path(
input_path=in_json_path,
must_be_a_file=True,
must_exists=True,
must_be_readable=True,
)
# load JSON
with in_json_path.open(mode="r", encoding="utf8") as in_json:
data = json.load(in_json)
# map attributes names
for k, v in cls.ATTR_MAP.items():
data[k] = data.pop(v, None)
# return new instance with loaded object
return cls(
json_ref_path=in_json_path,
loaded_from_json=True,
**data,
)
@property
def executionId(self) -> str:
"""Returns the executionId.
:return str: object executionId
"""
return self._executionId
@property
def userId(self) -> str:
"""Returns the userId.
:return str: object userId
"""
return self._userId
@property
def inputParameters(self) -> dict:
"""Returns the inputParameters.
:return dict: object inputParameters
"""
return self._inputParameters
@property
def targetParameters(self) -> dict:
"""Returns the targetParameters.
:return dict: object targetParameters
"""
if self._targetParameters:
return self._targetParameters
return None
@property
def technicalParameters(self) -> dict:
"""Returns the technicalParameters.
:return dict: object technicalParameters
"""
return self._technicalParameters
# internal
@property
def is_loaded_from_json(self) -> bool:
"""Tells if the object has been loaded from a JSON file.
:return bool: True if the object has been loaded from a JSON file
"""
return self.loaded_from_json
@property
def json_ref_path(self) -> Path:
"""Returns the path to the corresponding JSON path.
:return Path: input JSON path
"""
if self.is_loaded_from_json:
return self._json_ref_path.resolve()
else:
return None
# #############################################################################
# ##### Stand alone program ########
# ##################################
if __name__ == "__main__":
"""Standalone execution."""
params = GpfOrchestratorParameters.from_json(
in_json_path=Path("tests/fixtures/orchestrator/parameters_full_good_old.json")
)
print(dir(params), params.targetParameters)
params = GpfOrchestratorParameters.from_json(
in_json_path=Path("tests/fixtures/orchestrator/parameters_full_good_new.json")
)
print(params.targetParameters, params.inputs)