Voici la documentation : https://geoplateforme.pages.gpf-tech.ign.fr/documentation

Skip to content
Extraits de code Groupes Projets

Vérification pour une donnée en sortie avec statut MODIFYING

Fusionnées Benoit Ducarouge a demandé de fusionner feat/fix-issue-2 vers main
Fichiers
51
+ 66
3
@@ -10,19 +10,72 @@ import logging
from pathlib import Path
# package
from gpf_check_rlt.core import get_vector_files_structure
from gpf_entrepot_toolbelt.orchestrator.models import GpfOrchestratorParameters
from gpf_entrepot_toolbelt.orchestrator.status import Status
from gpf_entrepot_toolbelt.utils.database_inspection import PgDatabaseInspector
# -- GLOBALS
# logs
logger = logging.getLogger(__name__)
# -- LOGIC
def check_data_structure(
upload_dir: Path, parameters: GpfOrchestratorParameters
) -> bool:
"""Vérifie que la structure de la donnée en entrée corresponde a la
structure de la stored_data de sortie.
Args:
upload_dir (Path): dossier contenant les données d'entrées
parameters (GpfOrchestratorParameters): fichier de parametres
"""
INSERER ICI LA LOGIQUE DU PACKAGE
"""
Returns:
bool: True si la structure est compatible, False sinon
"""
vector_files = get_vector_files_structure(upload_dir)
if vector_files is None:
return False
out_db = parameters.output_database
inspector = PgDatabaseInspector(out_db)
table_names = inspector.get_table_names_from_schema(out_db.schema_name)
if (
len(table_names) != 2
or "dataset" not in table_names
or "image" not in table_names
):
logger.error("La structure de stored_data en sortie n'est pas compatible")
return False
db_mission_fields = inspector.get_column_names(out_db.schema_name, "dataset")
table_mission_name = vector_files["table_mission"]["layer_name"]
table_mission_datasource = vector_files["table_mission"]["layer_datasource"]
table_mission = table_mission_datasource.GetLayerByName(table_mission_name)
table_mission_definition = table_mission.GetLayerDefn()
for i in range(table_mission_definition.GetFieldCount()):
field_name = table_mission_definition.GetFieldDefn(i).GetName()
if field_name.lower() not in db_mission_fields:
logger.error(
f"Le champ {field_name} du layer {table_mission_name} n'existe pas dans la stored_data de sortie."
)
return False
db_cliches_fields = inspector.get_column_names(out_db.schema_name, "image")
table_cliches_name = vector_files["table_cliches"]["layer_name"]
table_cliches_datasource = vector_files["table_cliches"]["layer_datasource"]
table_cliches = table_cliches_datasource.GetLayerByName(table_cliches_name)
table_cliches_definition = table_cliches.GetLayerDefn()
for i in range(table_cliches_definition.GetFieldCount()):
field_name = table_cliches_definition.GetFieldDefn(i).GetName()
if field_name.lower() not in db_cliches_fields:
logger.error(
f"Le champ {field_name} du layer {table_cliches_name} n'existe pas dans la stored_data de sortie."
)
return False
return True
# -- RUN
@@ -64,6 +117,16 @@ def run(
)
return Status.FAILURE
if (
parameters.output_stored_data.status == "MODIFYING"
and not check_data_structure(upload_dir, parameters)
):
logger.user_error(
"Impossible d'intégrer les données car la structure de la stored data en sortie est incompatible"
)
return Status.FAILURE
# if parameters.output_stored_data.status == "GENERATING":
# RUN
result = Status.SUCCESS
error_str = ""
Chargement en cours