osm-labo/wiki_compare/fetch_recent_changes.py
2025-08-31 22:53:28 +02:00

385 lines
No EOL
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
fetch_recent_changes.py
This script fetches recent changes from the OpenStreetMap wiki for the French namespace
and stores the URLs of these pages. It specifically targets the recent changes page:
https://wiki.openstreetmap.org/w/index.php?hidebots=1&hidepreviousrevisions=1&hidecategorization=1&hideWikibase=1&hidelog=1&hidenewuserlog=1&namespace=202&limit=500&days=30&enhanced=1&title=Special:RecentChanges&urlversion=2
Usage:
python fetch_recent_changes.py [--dry-run] [--force]
Options:
--dry-run Run the script without saving the results to a file
--force Force update even if the cache is still fresh (less than 1 hour old)
Output:
- recent_changes.json: JSON file with information about recent changes in the French namespace
- Log messages about the scraping process and results
"""
import json
import argparse
import logging
import os
import re
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Constants
# Use the directory of this script to determine the output file path
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
OUTPUT_FILE = os.path.join(SCRIPT_DIR, "recent_changes.json")
RECENT_CHANGES_URL = "https://wiki.openstreetmap.org/w/index.php?hidebots=1&hidepreviousrevisions=1&hidecategorization=1&hideWikibase=1&hidelog=1&hidenewuserlog=1&namespace=202&limit=500&days=30&enhanced=1&title=Special:RecentChanges&urlversion=2"
WIKI_BASE_URL = "https://wiki.openstreetmap.org"
CACHE_DURATION = timedelta(hours=1) # Cache duration of 1 hour
def is_cache_fresh():
"""
Check if the cache file exists and is less than CACHE_DURATION old
Returns:
bool: True if cache is fresh, False otherwise
"""
if not os.path.exists(OUTPUT_FILE):
return False
try:
with open(OUTPUT_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
last_updated = datetime.fromisoformat(data.get('last_updated', '2000-01-01T00:00:00'))
now = datetime.now()
return (now - last_updated) < CACHE_DURATION
except (IOError, json.JSONDecodeError, ValueError) as e:
logger.error(f"Error checking cache freshness: {e}")
return False
def get_page_content(url):
"""
Get the HTML content of a page
Args:
url (str): URL to fetch
Returns:
str: HTML content of the page or None if request failed
"""
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
return None
def extract_recent_changes(html_content):
"""
Extract recent changes from the wiki page HTML
Args:
html_content (str): HTML content of the recent changes page
Returns:
list: List of recent change dictionaries
"""
if not html_content:
return []
soup = BeautifulSoup(html_content, 'html.parser')
recent_changes = []
# Find the main changeslist container
# According to the issue description, we should look for .mw-changeslist
changes_list = soup.find('div', class_='mw-changeslist')
if not changes_list:
# If still not found, look for the content area
content_div = soup.find('div', id='mw-content-text')
if content_div:
# Try to find the changeslist div
changes_list = content_div.find('div', class_='mw-changeslist')
if not changes_list:
# Log the HTML structure to help debug
logger.warning("Could not find recent changes list. HTML structure:")
body = soup.find('body')
if body:
content_area = body.find('div', id='content')
if content_area:
logger.warning(f"Content area classes: {content_area.get('class', [])}")
main_content = content_area.find('div', id='mw-content-text')
if main_content:
logger.warning(f"Main content first child: {main_content.find().name if main_content.find() else 'None'}")
return []
logger.info(f"Found changes list with tag: {changes_list.name}, classes: {changes_list.get('class', [])}")
# Process each change item - based on the actual HTML structure
# According to the debug output, the changes are in tr elements
change_items = changes_list.find_all('tr')
# If no tr elements found directly, look for tables with class mw-changeslist-line
if not change_items:
tables = changes_list.find_all('table', class_='mw-changeslist-line')
for table in tables:
trs = table.find_all('tr')
change_items.extend(trs)
logger.info(f"Found {len(change_items)} change items")
for item in change_items:
# Extract the page link from the mw-changeslist-title class
page_link = item.find('a', class_='mw-changeslist-title')
if not page_link:
# If not found with the specific class, try to find any link that might be the page link
inner_td = item.find('td', class_='mw-changeslist-line-inner')
if inner_td:
links = inner_td.find_all('a')
for link in links:
href = link.get('href', '')
if '/wiki/' in href and 'action=history' not in href and 'diff=' not in href:
page_link = link
break
if not page_link:
# Skip items without a page link (might be headers or other elements)
continue
page_name = page_link.get_text().strip()
page_url = page_link.get('href')
if not page_url.startswith('http'):
page_url = WIKI_BASE_URL + page_url
# Extract the timestamp from the mw-enhanced-rc class
timestamp_td = item.find('td', class_='mw-enhanced-rc')
timestamp = timestamp_td.get_text().strip() if timestamp_td else "Unknown"
# Extract the user from the mw-userlink class
user_link = item.find('a', class_='mw-userlink')
user = user_link.get_text().strip() if user_link else "Unknown"
# Extract the comment from the comment class
comment_span = item.find('span', class_='comment')
comment = comment_span.get_text().strip() if comment_span else ""
# Extract the change size from the mw-diff-bytes class
size_span = item.find('span', class_='mw-diff-bytes')
if size_span:
change_size = size_span.get_text().strip()
else:
# If not found, try to extract from the text
change_size = "0"
text = item.get_text()
size_matches = re.findall(r'\(\s*([+-]?\d+)\s*\)', text)
if size_matches:
change_size = size_matches[0]
recent_changes.append({
"page_name": page_name,
"page_url": page_url,
"timestamp": timestamp,
"user": user,
"comment": comment,
"change_size": change_size
})
logger.debug(f"Extracted change: {page_name} by {user}")
logger.info(f"Extracted {len(recent_changes)} recent changes")
return recent_changes
def save_results(recent_changes, dry_run=False):
"""
Save the results to a JSON file
Args:
recent_changes (list): List of recent change dictionaries
dry_run (bool): If True, don't actually save to file
Returns:
bool: True if saving was successful or dry run, False otherwise
"""
if dry_run:
logger.info("DRY RUN: Would have saved results to file")
logger.info(f"Recent changes: {len(recent_changes)}")
for change in recent_changes[:5]: # Show only first 5 for brevity
logger.info(f" - {change['page_name']}: {change['page_url']} ({change['timestamp']})")
if len(recent_changes) > 5:
logger.info(f" ... and {len(recent_changes) - 5} more")
return True
# Log some details about the recent changes
logger.info(f"Preparing to save {len(recent_changes)} recent changes")
if recent_changes:
logger.info(f"First change: {recent_changes[0]['page_name']} by {recent_changes[0]['user']}")
# Prepare the data structure
data = {
"last_updated": datetime.now().isoformat(),
"recent_changes": recent_changes
}
# Get the file's last modified time before saving
before_mtime = None
if os.path.exists(OUTPUT_FILE):
before_mtime = os.path.getmtime(OUTPUT_FILE)
logger.info(f"File {OUTPUT_FILE} exists, last modified at {datetime.fromtimestamp(before_mtime)}")
try:
# Print the JSON data that we're trying to save
json_data = json.dumps(data, indent=2, ensure_ascii=False)
logger.info(f"JSON data to save (first 500 chars): {json_data[:500]}...")
# Save the data to a temporary file first
temp_file = OUTPUT_FILE + ".tmp"
logger.info(f"Writing data to temporary file {temp_file}")
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(json_data)
# Check if the temporary file was created and has content
if os.path.exists(temp_file):
temp_size = os.path.getsize(temp_file)
logger.info(f"Temporary file {temp_file} created, size: {temp_size} bytes")
# Read the content of the temporary file to verify
with open(temp_file, 'r', encoding='utf-8') as f:
temp_content = f.read(500) # Read first 500 chars
logger.info(f"Temporary file content (first 500 chars): {temp_content}...")
# Move the temporary file to the final location
logger.info(f"Moving temporary file to {OUTPUT_FILE}")
import shutil
shutil.move(temp_file, OUTPUT_FILE)
else:
logger.error(f"Failed to create temporary file {temp_file}")
# Check if the file was actually updated
if os.path.exists(OUTPUT_FILE):
after_mtime = os.path.getmtime(OUTPUT_FILE)
file_size = os.path.getsize(OUTPUT_FILE)
logger.info(f"File {OUTPUT_FILE} exists, size: {file_size} bytes, mtime: {datetime.fromtimestamp(after_mtime)}")
# Read the content of the file to verify
with open(OUTPUT_FILE, 'r', encoding='utf-8') as f:
file_content = f.read(500) # Read first 500 chars
logger.info(f"File content (first 500 chars): {file_content}...")
if before_mtime and after_mtime <= before_mtime:
logger.warning(f"File {OUTPUT_FILE} was not updated (mtime did not change)")
else:
logger.error(f"File {OUTPUT_FILE} does not exist after saving")
# Copy the file to the public directory
public_file = os.path.join(os.path.dirname(os.path.dirname(OUTPUT_FILE)), 'public', os.path.basename(OUTPUT_FILE))
logger.info(f"Copying {OUTPUT_FILE} to {public_file}")
shutil.copy2(OUTPUT_FILE, public_file)
# Check if the public file was created
if os.path.exists(public_file):
public_size = os.path.getsize(public_file)
logger.info(f"Public file {public_file} created, size: {public_size} bytes")
else:
logger.error(f"Failed to create public file {public_file}")
logger.info(f"Successfully saved {len(recent_changes)} recent changes to {OUTPUT_FILE}")
return True
except IOError as e:
logger.error(f"Error saving results to {OUTPUT_FILE}: {e}")
return False
def main():
"""Main function to execute the script"""
parser = argparse.ArgumentParser(description="Fetch recent changes from the OSM wiki French namespace")
parser.add_argument("--dry-run", action="store_true", help="Run without saving results to file")
parser.add_argument("--force", action="store_true", help="Force update even if cache is fresh")
parser.add_argument("--debug", action="store_true", help="Save HTML content to a file for debugging")
args = parser.parse_args()
logger.info("Starting fetch_recent_changes.py")
# Check if cache is fresh
if is_cache_fresh() and not args.force:
logger.info(f"Cache is still fresh (less than {CACHE_DURATION.total_seconds()/3600} hours old)")
logger.info(f"Use --force to update anyway")
return
# Get the recent changes page content
html_content = get_page_content(RECENT_CHANGES_URL)
if not html_content:
logger.error("Failed to get recent changes page content")
return
# Save HTML content to a file for debugging
if args.debug:
debug_file = "recent_changes_debug.html"
try:
with open(debug_file, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.info(f"Saved HTML content to {debug_file} for debugging")
except IOError as e:
logger.error(f"Error saving HTML content to {debug_file}: {e}")
# Parse the HTML to find the structure
soup = BeautifulSoup(html_content, 'html.parser')
# Find the main content area
content_div = soup.find('div', id='mw-content-text')
if content_div:
logger.info(f"Found content div with id 'mw-content-text'")
# Look for elements with mw-changeslist class
changeslist_elements = content_div.find_all(class_='mw-changeslist')
logger.info(f"Found {len(changeslist_elements)} elements with class 'mw-changeslist'")
for i, element in enumerate(changeslist_elements):
logger.info(f"Element {i+1} tag: {element.name}, classes: {element.get('class', [])}")
# Look for table rows or other elements that might contain changes
rows = element.find_all('tr')
divs = element.find_all('div', class_='mw-changeslist-line')
lis = element.find_all('li')
logger.info(f" - Contains {len(rows)} tr elements")
logger.info(f" - Contains {len(divs)} div.mw-changeslist-line elements")
logger.info(f" - Contains {len(lis)} li elements")
# Check direct children
children = list(element.children)
logger.info(f" - Has {len(children)} direct children")
if children:
child_types = {}
for child in children:
if hasattr(child, 'name') and child.name:
child_type = child.name
child_types[child_type] = child_types.get(child_type, 0) + 1
logger.info(f" - Direct children types: {child_types}")
# Extract recent changes
recent_changes = extract_recent_changes(html_content)
if not recent_changes:
logger.warning("No recent changes found")
# Save results
success = save_results(recent_changes, args.dry_run)
if success:
logger.info("Script completed successfully")
else:
logger.error("Script completed with errors")
if __name__ == "__main__":
main()