add quality control for links in a blog source

This commit is contained in:
Tykayn 2025-03-27 13:37:58 +01:00 committed by tykayn
parent 7b12ef533b
commit 22285e44ae
3 changed files with 424 additions and 8 deletions

226
check_links.py Normal file
View file

@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Vérifie l'accessibilité des liens trouvés dans le JSON généré par scan_links.py.
Génère un rapport des domaines inaccessibles et des articles comportant ces liens.
"""
import os
import json
import argparse
import requests
import time
from datetime import datetime
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
# Limiter les requêtes pour ne pas surcharger les serveurs
REQUEST_DELAY = 0.5 # Délai entre les requêtes vers le même domaine (en secondes)
MAX_WORKERS = 10 # Nombre maximum de threads parallèles
# User agent pour éviter d'être bloqué
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
def check_url(url):
"""Vérifie si une URL est accessible"""
try:
response = requests.head(url, headers=HEADERS, timeout=10, allow_redirects=True)
# Si HEAD échoue, essayer avec GET (certains serveurs n'acceptent pas HEAD)
if response.status_code >= 400:
response = requests.get(url, headers=HEADERS, timeout=10, allow_redirects=True, stream=True)
# Fermer immédiatement la connexion pour éviter de télécharger le contenu entier
response.close()
# Considérer comme réussi les codes 2xx et 3xx
return {
'url': url,
'status': response.status_code,
'accessible': response.status_code < 400,
'error': None
}
except requests.exceptions.RequestException as e:
return {
'url': url,
'status': None,
'accessible': False,
'error': str(e)
}
def main():
parser = argparse.ArgumentParser(description="Vérifier les liens des fichiers Org")
parser.add_argument("json_file", help="Fichier JSON à examiner, généré par scan_links.py")
parser.add_argument("--output_dir", default="link_checker_output", help="Dossier de sortie pour les rapports")
args = parser.parse_args()
json_links_report = 'links_report_'+ args.json_file + '.json'
if not os.path.exists(json_links_report):
print(f"Le fichier {json_links_report} n'existe pas.")
return
# Créer le dossier de sortie
os.makedirs(args.output_dir, exist_ok=True)
# Charger les données JSON
with open(json_links_report, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Vérification des liens depuis {json_links_report}...")
# Collecter toutes les URLs uniques par domaine
domains_to_urls = {}
url_to_articles = {}
for article, links in data['article_links'].items():
for link in links:
url = link['url']
# S'assurer que l'URL commence par http:// ou https://
if not url.startswith(('http://', 'https://')):
if url.startswith('www.'):
url = 'https://' + url
else:
# Ignorer les liens qui ne sont pas des URLs Web
continue
# Ajouter l'URL au dictionnaire des domaines
domain = urlparse(url).netloc
if domain not in domains_to_urls:
domains_to_urls[domain] = set()
domains_to_urls[domain].add(url)
# Enregistrer les articles qui utilisent cette URL
if url not in url_to_articles:
url_to_articles[url] = []
url_to_articles[url].append(article)
print(f"Vérification de {len(url_to_articles)} URLs uniques sur {len(domains_to_urls)} domaines...")
# Vérifier les URLs avec un ThreadPoolExecutor
urls_to_check = list(url_to_articles.keys())
results = {}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_url = {executor.submit(check_url, url): url for url in urls_to_check}
for i, future in enumerate(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results[url] = result
# Afficher la progression
if (i + 1) % 10 == 0 or i + 1 == len(urls_to_check):
print(f"Progression: {i + 1}/{len(urls_to_check)} URLs vérifiées")
# Respecter le délai entre les requêtes pour le même domaine
domain = urlparse(url).netloc
time.sleep(REQUEST_DELAY)
except Exception as e:
results[url] = {
'url': url,
'status': None,
'accessible': False,
'error': str(e)
}
# Identifier les URLs inaccessibles
inaccessible_urls = [url for url, result in results.items() if not result['accessible']]
# Collecter les articles avec des liens inaccessibles
articles_with_broken_links = {}
for url in inaccessible_urls:
for article in url_to_articles[url]:
if article not in articles_with_broken_links:
articles_with_broken_links[article] = []
# Trouver le lien original avec sa description
original_links = [link for link in data['article_links'][article] if link['url'] == url]
for link in original_links:
articles_with_broken_links[article].append({
'url': url,
'description': link['description'],
'status': results[url]['status'],
'error': results[url]['error']
})
# Générer les rapports
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 1. Rapport JSON détaillé
report_data = {
'meta': {
'generated_at': datetime.now().isoformat(),
'source_file': json_links_report,
'total_urls_checked': len(results),
'total_inaccessible_urls': len(inaccessible_urls),
'total_articles_with_broken_links': len(articles_with_broken_links)
},
'inaccessible_domains': {},
'articles_with_broken_links': articles_with_broken_links
}
# Regrouper par domaine
for url in inaccessible_urls:
domain = urlparse(url).netloc
if domain not in report_data['inaccessible_domains']:
report_data['inaccessible_domains'][domain] = []
report_data['inaccessible_domains'][domain].append({
'url': url,
'status': results[url]['status'],
'error': results[url]['error'],
'articles': url_to_articles[url]
})
# Sauvegarder le rapport JSON
json_report_path = os.path.join(args.output_dir, f'broken_links_report_{args.json_file}_{timestamp}.json')
with open(json_report_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
# 2. Rapport texte des domaines inaccessibles
domains_report_path = os.path.join(args.output_dir, f'inaccessible_domains_{args.json_file}_{timestamp}.txt')
with open(domains_report_path, 'w', encoding='utf-8') as f:
f.write(f"DOMAINES INACCESSIBLES ({len(report_data['inaccessible_domains'])}):\n")
f.write("=" * 80 + "\n\n")
for domain, urls in report_data['inaccessible_domains'].items():
f.write(f"{domain} ({len(urls)} URLs):\n")
for url_data in urls:
status = f"Status: {url_data['status']}" if url_data['status'] else ""
error = f"Error: {url_data['error']}" if url_data['error'] else ""
f.write(f" - {url_data['url']} {status} {error}\n")
f.write("\n")
# 3. Rapport des liens à changer dans chaque article
articles_report_path = os.path.join(args.output_dir, f'articles_with_broken_links_{timestamp}.txt')
with open(articles_report_path, 'w', encoding='utf-8') as f:
f.write(f"ARTICLES AVEC LIENS CASSÉS ({len(articles_with_broken_links)}):\n")
f.write("=" * 80 + "\n\n")
for article, links in articles_with_broken_links.items():
f.write(f"Fichier: {article}\n")
f.write("-" * 40 + "\n")
for link in links:
description = f" ({link['description']})" if link['description'] else ""
status = f"Status: {link['status']}" if link['status'] else ""
error = f"Error: {link['error']}" if link['error'] else ""
f.write(f" - {link['url']}{description} {status} {error}\n")
f.write("\n")
print("\nRapports générés:")
print(f"- Rapport JSON détaillé: {json_report_path}")
print(f"- Liste des domaines inaccessibles: {domains_report_path}")
print(f"- Liste des articles avec liens cassés: {articles_report_path}")
print(f"\nURLs vérifiées: {len(results)}")
print(f"URLs inaccessibles: {len(inaccessible_urls)}")
print(f"Articles avec liens cassés: {len(articles_with_broken_links)}")
if __name__ == "__main__":
main()