Voici la documentation : https://geoplateforme.pages.gpf-tech.ign.fr/documentation

Skip to content
Extraits de code Groupes Projets

feat(s3): upload files to output stored data S3

Fusionnées Jean-Marie KERLOCH a demandé de fusionner feature/files_upload_to_s3 vers main
2 fichiers
+ 88
11
Comparer les modifications
  • Côte à côte
  • En ligne
Fichiers
2
@@ -16,6 +16,7 @@ from gpf_entrepot_toolbelt.orchestrator.models import GpfOrchestratorParameters
from gpf_entrepot_toolbelt.orchestrator.models.stored_data import GpfStoredData
from gpf_entrepot_toolbelt.orchestrator.models.upload import GpfUpload
from gpf_entrepot_toolbelt.orchestrator.status import Status
from gpf_entrepot_toolbelt.orchestrator.storage.s3_client import GpfS3Client
from gpf_entrepot_toolbelt.utils.check_path import check_path
from gpf_entrepot_toolbelt.utils.get_file_informations import generate_md5_sum
@@ -191,6 +192,24 @@ def get_file_information(file_name: Path, output_stored_data: GpfStoredData) ->
return dict_info
def upload_file_to_s3(input_dir: Path, s3_client: GpfS3Client) -> None:
"""Retourne les informations d'une livraison
Args:
input_dir (Path): Répertoire à upload dans S3
s3_client (GpfS3Client): client S3 associé à la stored data de sortie
"""
for path in input_dir.iterdir():
if path.is_dir():
upload_file_to_s3(path, s3_client)
else:
# Stockage des fichiers 'a plat' : on ne garde pas l'arborescence en entrée.
s3_client.upload_single_file(
Path(path),
key_name=f"{s3_client.prefix}/{path.name}",
)
# -- RUN
def run(work_dir: Path, parameters: GpfOrchestratorParameters) -> Tuple[Status, str]:
"""Main function running the logic.
@@ -234,23 +253,26 @@ def run(work_dir: Path, parameters: GpfOrchestratorParameters) -> Tuple[Status,
# Récupération des informations sur les données
descriptor_file_info_list += get_information(upload_dir, output_stored_data)
# Vérification des noms uniques
# Vérification nom interdit
file_names = [file_info["file_name"] for file_info in descriptor_file_info_list]
if "__descriptor.json" in file_names:
return Status.FAILURE, f"'__descriptor.json' can't be used as filename"
# Vérification des noms uniques
count_dict = {name: file_names.count(name) for name in file_names}
duplicated_files = [k for k, v in count_dict.items() if v > 1]
if len(duplicated_files) != 0:
return Status.FAILURE, f"Duplicated file names: {','.join(duplicated_files)}"
# TODO : insertion des données dans le bucket S3
# Insertion des données dans le bucket S3
s3_client = output_stored_data.storage_client
for upload in uploads:
# copie sur le S3
s3_path = "le_bon_chemin_S3"
# Storage.copy(livraison["le_chemin"], s3_path)
upload_dir = work_dir / "upload" / upload._id
upload_file_to_s3(upload_dir, s3_client)
# Ajout du fichier __descriptor.json dans le S3
with open(Path(work_dir / "__descriptor.json"), "w") as desc_file:
desc_file.write(json.dumps(descriptor_file_info_list))
s3_client = output_stored_data.storage_client
s3_client.upload_single_file(
Path(work_dir / "__descriptor.json"),
key_name=f"{s3_client.prefix}/__descriptor.json",
@@ -262,6 +284,8 @@ def run(work_dir: Path, parameters: GpfOrchestratorParameters) -> Tuple[Status,
total_size = sum(
[file_info["file_length"] for file_info in descriptor_file_info_list]
)
# Ajout de la taille du fichier "__descriptor.json"
total_size += Path(work_dir / "__descriptor.json").stat().st_size
# type_infos :
# files_number : nombre de fichiers total dans la donnée en sortie (à l'exclusion du __descriptor.json)
Chargement en cours