#!/usr/bin/env python3 # -*- coding: utf-8 -*- import requests from bs4 import BeautifulSoup import json import logging import argparse import os import re from datetime import datetime from urllib.parse import urlparse, parse_qs, urlencode # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # URL for recent changes in OSM Wiki (namespace 202 is for Tag pages) RECENT_CHANGES_URL = "https://wiki.openstreetmap.org/w/index.php?hidebots=1&hidenewpages=1&hidecategorization=1&hideWikibase=1&hidelog=1&hidenewuserlog=1&namespace=202&limit=250&days=30&enhanced=1&title=Special:RecentChanges&urlversion=2" # Threshold for suspicious deletions (percentage of total content) DELETION_THRESHOLD_PERCENT = 5.0 # Base URL for OSM Wiki WIKI_BASE_URL = "https://wiki.openstreetmap.org" def fetch_recent_changes(): """ Fetch the recent changes page from OSM Wiki """ logger.info(f"Fetching recent changes from {RECENT_CHANGES_URL}") try: response = requests.get(RECENT_CHANGES_URL) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logger.error(f"Error fetching recent changes: {e}") return None def fetch_page_content(page_title): """ Fetch the content of a wiki page to count characters """ url = f"{WIKI_BASE_URL}/wiki/{page_title}" logger.info(f"Fetching page content from {url}") try: response = requests.get(url) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logger.error(f"Error fetching page content: {e}") return None def count_page_characters(html_content): """ Count the total number of characters in the wiki page content """ if not html_content: return 0 soup = BeautifulSoup(html_content, 'html.parser') # Find the main content div content_div = soup.select_one('#mw-content-text') if not content_div: return 0 # Get all text content text_content = content_div.get_text(strip=True) # Count characters char_count = len(text_content) logger.info(f"Page has {char_count} characters") return char_count def generate_diff_url(page_title, oldid): """ Generate URL to view the diff of a specific revision """ return f"{WIKI_BASE_URL}/w/index.php?title={page_title}&diff=prev&oldid={oldid}" def generate_history_url(page_title): """ Generate URL to view the history of a page """ return f"{WIKI_BASE_URL}/w/index.php?title={page_title}&action=history" def load_existing_deletions(): """ Load existing suspicious deletions from the JSON file """ output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'suspicious_deletions.json') existing_pages = set() try: if os.path.exists(output_file): with open(output_file, 'r', encoding='utf-8') as f: data = json.load(f) if 'deletions' in data: for deletion in data['deletions']: if 'page_title' in deletion: existing_pages.add(deletion['page_title']) logger.info(f"Loaded {len(existing_pages)} existing pages from {output_file}") else: logger.info(f"No existing file found at {output_file}") except Exception as e: logger.error(f"Error loading existing deletions: {e}") return existing_pages def parse_suspicious_deletions(html_content): """ Parse the HTML content to find suspicious deletions """ if not html_content: return [] # Load existing pages from the JSON file existing_pages = load_existing_deletions() soup = BeautifulSoup(html_content, 'html.parser') suspicious_deletions = [] # Find all change list lines change_lines = soup.select('.mw-changeslist .mw-changeslist-line') logger.info(f"Found {len(change_lines)} change lines to analyze") for line in change_lines: # Look for deletion indicators deletion_indicator = line.select_one('.mw-plusminus-neg') if deletion_indicator: # Extract the deletion size deletion_text = deletion_indicator.text.strip() try: # Remove any non-numeric characters except minus sign deletion_size = int(''.join(c for c in deletion_text if c.isdigit() or c == '-')) # Skip if deletion size is not greater than 100 characters if abs(deletion_size) <= 100: logger.info(f"Skipping deletion with size {deletion_size} (not > 100 characters)") continue # Get the page title and URL title_element = line.select_one('.mw-changeslist-title') if title_element: page_title = title_element.text.strip() # Skip if page is already in the JSON file if page_title in existing_pages: logger.info(f"Skipping {page_title} (already in JSON file)") continue page_url = title_element.get('href', '') if not page_url.startswith('http'): page_url = f"{WIKI_BASE_URL}{page_url}" # Extract oldid from the URL if available oldid = None if 'oldid=' in page_url: parsed_url = urlparse(page_url) query_params = parse_qs(parsed_url.query) if 'oldid' in query_params: oldid = query_params['oldid'][0] # Fetch the page content to count characters page_html = fetch_page_content(page_title) total_chars = count_page_characters(page_html) # Calculate deletion percentage deletion_percentage = 0 if total_chars > 0: deletion_percentage = (abs(deletion_size) / total_chars) * 100 # If deletion percentage is significant if deletion_percentage > DELETION_THRESHOLD_PERCENT: # Get the timestamp timestamp_element = line.select_one('.mw-changeslist-date') timestamp = timestamp_element.text.strip() if timestamp_element else "" # Get the user who made the change user_element = line.select_one('.mw-userlink') user = user_element.text.strip() if user_element else "Unknown" # Get the comment if available comment_element = line.select_one('.comment') comment = comment_element.text.strip() if comment_element else "" # Generate diff and history URLs diff_url = generate_diff_url(page_title, oldid) if oldid else "" history_url = generate_history_url(page_title) suspicious_deletions.append({ 'page_title': page_title, 'page_url': page_url, 'diff_url': diff_url, 'history_url': history_url, 'deletion_size': deletion_size, 'total_chars': total_chars, 'deletion_percentage': round(deletion_percentage, 2), 'timestamp': timestamp, 'user': user, 'comment': comment }) logger.info(f"Found suspicious deletion: {page_title} ({deletion_size} chars, {deletion_percentage:.2f}% of content)") except ValueError: logger.warning(f"Could not parse deletion size from: {deletion_text}") return suspicious_deletions def save_suspicious_deletions(suspicious_deletions): """ Save the suspicious deletions to a JSON file """ output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'suspicious_deletions.json') # Add timestamp to the data data = { 'last_updated': datetime.now().isoformat(), 'deletions': suspicious_deletions } with open(output_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"Saved {len(suspicious_deletions)} suspicious deletions to {output_file}") return output_file def main(): parser = argparse.ArgumentParser(description='Detect suspicious deletions in OSM Wiki recent changes') parser.add_argument('--dry-run', action='store_true', help='Print results without saving to file') args = parser.parse_args() html_content = fetch_recent_changes() if html_content: suspicious_deletions = parse_suspicious_deletions(html_content) if args.dry_run: logger.info(f"Found {len(suspicious_deletions)} suspicious deletions:") for deletion in suspicious_deletions: logger.info(f"- {deletion['page_title']}: {deletion['deletion_size']} chars by {deletion['user']}") else: output_file = save_suspicious_deletions(suspicious_deletions) logger.info(f"Results saved to {output_file}") else: logger.error("Failed to fetch recent changes. Exiting.") if __name__ == "__main__": main()