mirror of
https://forge.chapril.org/tykayn/osm-commerces
synced 2025-10-04 17:04:53 +02:00
295 lines
No EOL
11 KiB
Python
Executable file
295 lines
No EOL
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Script pour analyser les polygones de villes et générer des fichiers JSON d'analyse.
|
|
|
|
Ce script:
|
|
1. Parcourt tous les fichiers de polygones dans le dossier "polygons"
|
|
2. Pour chaque polygone, vérifie si un fichier d'analyse JSON existe déjà
|
|
3. Si non, utilise loop_thematics_history_in_zone_to_counts.py pour extraire les données
|
|
4. Sauvegarde les résultats dans un fichier JSON avec une analyse de complétion
|
|
5. Ajoute une date de création à chaque analyse
|
|
|
|
Usage:
|
|
python analyze_city_polygons.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import glob
|
|
import subprocess
|
|
import argparse
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Import des thèmes depuis loop_thematics_history_in_zone_to_counts.py
|
|
from loop_thematics_history_in_zone_to_counts import THEMES
|
|
|
|
# Chemins des répertoires
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
POLYGONS_DIR = os.path.join(SCRIPT_DIR, "polygons")
|
|
OSM_DATA_FILE = os.path.join(SCRIPT_DIR, "osm_data", "france-latest.osm.pbf")
|
|
ANALYSIS_DIR = os.path.join(SCRIPT_DIR, "city_analysis")
|
|
TEMP_DIR = os.path.join(SCRIPT_DIR, "temp")
|
|
OUTPUT_DIR = os.path.join(SCRIPT_DIR, "resultats")
|
|
|
|
def ensure_directories_exist():
|
|
"""
|
|
Vérifie que les répertoires nécessaires existent, sinon les crée.
|
|
"""
|
|
os.makedirs(POLYGONS_DIR, exist_ok=True)
|
|
os.makedirs(ANALYSIS_DIR, exist_ok=True)
|
|
os.makedirs(TEMP_DIR, exist_ok=True)
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
print(f"Dossier d'analyses: {ANALYSIS_DIR}")
|
|
|
|
def analysis_exists(insee_code):
|
|
"""
|
|
Vérifie si l'analyse pour le code INSEE donné existe déjà.
|
|
|
|
Args:
|
|
insee_code (str): Le code INSEE de la commune
|
|
|
|
Returns:
|
|
bool: True si l'analyse existe, False sinon
|
|
"""
|
|
analysis_file = os.path.join(ANALYSIS_DIR, f"analyse_commune_{insee_code}.json")
|
|
return os.path.isfile(analysis_file)
|
|
|
|
def run_command(command):
|
|
"""
|
|
Exécute une commande shell et retourne la sortie.
|
|
|
|
Args:
|
|
command (str): La commande à exécuter
|
|
|
|
Returns:
|
|
str: La sortie de la commande ou None en cas d'erreur
|
|
"""
|
|
print(f"Exécution: {command}")
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
return result.stdout
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Erreur lors de l'exécution de la commande: {e}")
|
|
print(f"Sortie de la commande: {e.stdout}")
|
|
print(f"Erreur de la commande: {e.stderr}")
|
|
return None
|
|
|
|
def extract_data_for_polygon(poly_file, insee_code):
|
|
"""
|
|
Extrait les données pour un polygone donné en utilisant loop_thematics_history_in_zone_to_counts.py.
|
|
|
|
Args:
|
|
poly_file (str): Chemin vers le fichier de polygone
|
|
insee_code (str): Code INSEE de la commune
|
|
|
|
Returns:
|
|
dict: Dictionnaire contenant les données extraites ou None en cas d'erreur
|
|
"""
|
|
try:
|
|
# Vérifier que le fichier OSM existe
|
|
if not os.path.isfile(OSM_DATA_FILE):
|
|
print(f"Erreur: Le fichier OSM {OSM_DATA_FILE} n'existe pas.")
|
|
return None
|
|
|
|
# Exécuter loop_thematics_history_in_zone_to_counts.py pour extraire les données
|
|
# Nous utilisons --max-dates=1 pour obtenir uniquement les données les plus récentes
|
|
command = f"python3 {os.path.join(SCRIPT_DIR, 'loop_thematics_history_in_zone_to_counts.py')} --input {OSM_DATA_FILE} --poly {poly_file} --output-dir {OUTPUT_DIR} --temp-dir {TEMP_DIR} --max-dates=1"
|
|
run_command(command)
|
|
|
|
# Récupérer les résultats depuis les fichiers CSV générés
|
|
zone_name = Path(poly_file).stem
|
|
data = {"themes": {}}
|
|
|
|
for theme_name in THEMES.keys():
|
|
csv_file = os.path.join(OUTPUT_DIR, f"{zone_name}_{theme_name}.csv")
|
|
|
|
if os.path.exists(csv_file):
|
|
# Lire le fichier CSV et extraire les données les plus récentes
|
|
with open(csv_file, "r") as f:
|
|
lines = f.readlines()
|
|
if len(lines) > 1: # S'assurer qu'il y a des données (en-tête + au moins une ligne)
|
|
headers = lines[0].strip().split(",")
|
|
latest_data = lines[-1].strip().split(",")
|
|
|
|
# Créer un dictionnaire pour ce thème
|
|
theme_data = {}
|
|
for i, header in enumerate(headers):
|
|
if i < len(latest_data):
|
|
# Convertir en nombre si possible
|
|
try:
|
|
value = latest_data[i]
|
|
if value and value != "":
|
|
theme_data[header] = float(value) if "." in value else int(value)
|
|
else:
|
|
theme_data[header] = 0
|
|
except ValueError:
|
|
theme_data[header] = latest_data[i]
|
|
|
|
data["themes"][theme_name] = theme_data
|
|
|
|
return data
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'extraction des données pour {insee_code}: {e}")
|
|
return None
|
|
|
|
def create_analysis(poly_file):
|
|
"""
|
|
Crée une analyse pour un fichier de polygone donné.
|
|
|
|
Args:
|
|
poly_file (str): Chemin vers le fichier de polygone
|
|
|
|
Returns:
|
|
str: Chemin vers le fichier d'analyse créé ou None en cas d'erreur
|
|
"""
|
|
try:
|
|
# Extraire le code INSEE du nom du fichier (format: commune_XXXXX.poly)
|
|
poly_filename = os.path.basename(poly_file)
|
|
if poly_filename.startswith("commune_") and poly_filename.endswith(".poly"):
|
|
insee_code = poly_filename[8:-5] # Enlever "commune_" et ".poly"
|
|
else:
|
|
print(f"Format de nom de fichier non reconnu: {poly_filename}")
|
|
return None
|
|
|
|
print(f"Création de l'analyse pour la commune {insee_code}...")
|
|
|
|
# Extraire les données pour ce polygone
|
|
data = extract_data_for_polygon(poly_file, insee_code)
|
|
if not data:
|
|
return None
|
|
|
|
# Ajouter des métadonnées à l'analyse
|
|
data["metadata"] = {
|
|
"insee_code": insee_code,
|
|
"creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"polygon_file": poly_filename,
|
|
"osm_data_file": os.path.basename(OSM_DATA_FILE)
|
|
}
|
|
|
|
# Calculer les statistiques de complétion globales
|
|
total_objects = 0
|
|
total_completion = 0
|
|
theme_count = 0
|
|
|
|
for theme_name, theme_data in data["themes"].items():
|
|
if "nombre_total" in theme_data and theme_data["nombre_total"] > 0:
|
|
total_objects += theme_data["nombre_total"]
|
|
if "pourcentage_completion" in theme_data:
|
|
total_completion += theme_data["pourcentage_completion"] * theme_data["nombre_total"]
|
|
theme_count += 1
|
|
|
|
if total_objects > 0:
|
|
data["metadata"]["total_objects"] = total_objects
|
|
data["metadata"]["average_completion"] = total_completion / total_objects if total_objects > 0 else 0
|
|
data["metadata"]["theme_count"] = theme_count
|
|
|
|
# Sauvegarder l'analyse dans un fichier JSON
|
|
analysis_file = os.path.join(ANALYSIS_DIR, f"analyse_commune_{insee_code}.json")
|
|
with open(analysis_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"Analyse pour la commune {insee_code} sauvegardée dans {analysis_file}")
|
|
return analysis_file
|
|
except Exception as e:
|
|
print(f"Erreur lors de la création de l'analyse: {e}")
|
|
return None
|
|
|
|
def main():
|
|
"""
|
|
Fonction principale du script.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyse les polygones de villes et génère des fichiers JSON d'analyse."
|
|
)
|
|
parser.add_argument(
|
|
"--force", "-f", action="store_true", help="Force la recréation des analyses existantes"
|
|
)
|
|
parser.add_argument(
|
|
"--single", "-s", help="Traite uniquement le polygone spécifié (code INSEE)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# S'assurer que les répertoires nécessaires existent
|
|
ensure_directories_exist()
|
|
|
|
# Récupérer les fichiers de polygones à traiter
|
|
if args.single:
|
|
# Traiter uniquement le polygone spécifié
|
|
single_poly_file = os.path.join(POLYGONS_DIR, f"commune_{args.single}.poly")
|
|
if not os.path.isfile(single_poly_file):
|
|
print(f"Erreur: Le fichier de polygone pour la commune {args.single} n'existe pas.")
|
|
return 1
|
|
poly_files = [single_poly_file]
|
|
print(f"Mode test: traitement uniquement du polygone {args.single}")
|
|
else:
|
|
# Traiter tous les polygones
|
|
poly_files = glob.glob(os.path.join(POLYGONS_DIR, "commune_*.poly"))
|
|
if not poly_files:
|
|
print("Aucun fichier de polygone trouvé dans le dossier 'polygons'.")
|
|
return 1
|
|
|
|
# Pour le test, limiter à 3 polygones
|
|
test_mode = False # Mettre à True pour limiter le traitement à quelques polygones
|
|
if test_mode:
|
|
# Trier les polygones par taille et prendre les 3 plus petits
|
|
poly_files = sorted(poly_files, key=os.path.getsize)[:3]
|
|
print(f"Mode test: traitement limité à {len(poly_files)} polygones")
|
|
for poly_file in poly_files:
|
|
print(f" - {os.path.basename(poly_file)}")
|
|
|
|
# Compteurs pour les statistiques
|
|
total = len(poly_files)
|
|
existing = 0
|
|
created = 0
|
|
failed = 0
|
|
|
|
# Pour chaque fichier de polygone, créer une analyse si elle n'existe pas déjà
|
|
for i, poly_file in enumerate(poly_files, 1):
|
|
# Extraire le code INSEE du nom du fichier
|
|
poly_filename = os.path.basename(poly_file)
|
|
insee_code = poly_filename[8:-5] # Enlever "commune_" et ".poly"
|
|
|
|
print(f"\nTraitement du polygone {i}/{total}: {poly_filename}")
|
|
|
|
if analysis_exists(insee_code) and not args.force:
|
|
print(f"L'analyse pour la commune {insee_code} existe déjà.")
|
|
existing += 1
|
|
continue
|
|
|
|
# Créer l'analyse
|
|
result = create_analysis(poly_file)
|
|
|
|
if result:
|
|
created += 1
|
|
else:
|
|
failed += 1
|
|
|
|
# Afficher les statistiques
|
|
print("\nRésumé:")
|
|
print(f"Total des polygones traités: {total}")
|
|
print(f"Analyses déjà existantes: {existing}")
|
|
print(f"Analyses créées avec succès: {created}")
|
|
print(f"Échecs: {failed}")
|
|
|
|
return 0 # Succès
|
|
except KeyboardInterrupt:
|
|
print("\nOpération annulée par l'utilisateur.")
|
|
return 1 # Erreur
|
|
except Exception as e:
|
|
print(f"Erreur inattendue: {e}")
|
|
return 1 # Erreur
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |