252 lines
No EOL
9.8 KiB
Python
Executable file
252 lines
No EOL
9.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import json
|
|
import logging
|
|
import argparse
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse, parse_qs, urlencode
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# URL for recent changes in OSM Wiki (namespace 202 is for Tag pages)
|
|
RECENT_CHANGES_URL = "https://wiki.openstreetmap.org/w/index.php?hidebots=1&hidenewpages=1&hidecategorization=1&hideWikibase=1&hidelog=1&hidenewuserlog=1&namespace=202&limit=250&days=30&enhanced=1&title=Special:RecentChanges&urlversion=2"
|
|
|
|
# Threshold for suspicious deletions (percentage of total content)
|
|
DELETION_THRESHOLD_PERCENT = 5.0
|
|
|
|
# Base URL for OSM Wiki
|
|
WIKI_BASE_URL = "https://wiki.openstreetmap.org"
|
|
|
|
def fetch_recent_changes():
|
|
"""
|
|
Fetch the recent changes page from OSM Wiki
|
|
"""
|
|
logger.info(f"Fetching recent changes from {RECENT_CHANGES_URL}")
|
|
try:
|
|
response = requests.get(RECENT_CHANGES_URL)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching recent changes: {e}")
|
|
return None
|
|
|
|
def fetch_page_content(page_title):
|
|
"""
|
|
Fetch the content of a wiki page to count characters
|
|
"""
|
|
url = f"{WIKI_BASE_URL}/wiki/{page_title}"
|
|
logger.info(f"Fetching page content from {url}")
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching page content: {e}")
|
|
return None
|
|
|
|
def count_page_characters(html_content):
|
|
"""
|
|
Count the total number of characters in the wiki page content
|
|
"""
|
|
if not html_content:
|
|
return 0
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Find the main content div
|
|
content_div = soup.select_one('#mw-content-text')
|
|
if not content_div:
|
|
return 0
|
|
|
|
# Get all text content
|
|
text_content = content_div.get_text(strip=True)
|
|
|
|
# Count characters
|
|
char_count = len(text_content)
|
|
logger.info(f"Page has {char_count} characters")
|
|
|
|
return char_count
|
|
|
|
def generate_diff_url(page_title, oldid):
|
|
"""
|
|
Generate URL to view the diff of a specific revision
|
|
"""
|
|
return f"{WIKI_BASE_URL}/w/index.php?title={page_title}&diff=prev&oldid={oldid}"
|
|
|
|
def generate_history_url(page_title):
|
|
"""
|
|
Generate URL to view the history of a page
|
|
"""
|
|
return f"{WIKI_BASE_URL}/w/index.php?title={page_title}&action=history"
|
|
|
|
def load_existing_deletions():
|
|
"""
|
|
Load existing suspicious deletions from the JSON file
|
|
"""
|
|
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'suspicious_deletions.json')
|
|
existing_pages = set()
|
|
|
|
try:
|
|
if os.path.exists(output_file):
|
|
with open(output_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if 'deletions' in data:
|
|
for deletion in data['deletions']:
|
|
if 'page_title' in deletion:
|
|
existing_pages.add(deletion['page_title'])
|
|
logger.info(f"Loaded {len(existing_pages)} existing pages from {output_file}")
|
|
else:
|
|
logger.info(f"No existing file found at {output_file}")
|
|
except Exception as e:
|
|
logger.error(f"Error loading existing deletions: {e}")
|
|
|
|
return existing_pages
|
|
|
|
def parse_suspicious_deletions(html_content):
|
|
"""
|
|
Parse the HTML content to find suspicious deletions
|
|
"""
|
|
if not html_content:
|
|
return []
|
|
|
|
# Load existing pages from the JSON file
|
|
existing_pages = load_existing_deletions()
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
suspicious_deletions = []
|
|
|
|
# Find all change list lines
|
|
change_lines = soup.select('.mw-changeslist .mw-changeslist-line')
|
|
logger.info(f"Found {len(change_lines)} change lines to analyze")
|
|
|
|
for line in change_lines:
|
|
# Look for deletion indicators
|
|
deletion_indicator = line.select_one('.mw-plusminus-neg')
|
|
if deletion_indicator:
|
|
# Extract the deletion size
|
|
deletion_text = deletion_indicator.text.strip()
|
|
try:
|
|
# Remove any non-numeric characters except minus sign
|
|
deletion_size = int(''.join(c for c in deletion_text if c.isdigit() or c == '-'))
|
|
|
|
# Skip if deletion size is not greater than 100 characters
|
|
if abs(deletion_size) <= 100:
|
|
logger.info(f"Skipping deletion with size {deletion_size} (not > 100 characters)")
|
|
continue
|
|
|
|
# Get the page title and URL
|
|
title_element = line.select_one('.mw-changeslist-title')
|
|
if title_element:
|
|
page_title = title_element.text.strip()
|
|
|
|
# Skip if page is already in the JSON file
|
|
if page_title in existing_pages:
|
|
logger.info(f"Skipping {page_title} (already in JSON file)")
|
|
continue
|
|
|
|
page_url = title_element.get('href', '')
|
|
if not page_url.startswith('http'):
|
|
page_url = f"{WIKI_BASE_URL}{page_url}"
|
|
|
|
# Extract oldid from the URL if available
|
|
oldid = None
|
|
if 'oldid=' in page_url:
|
|
parsed_url = urlparse(page_url)
|
|
query_params = parse_qs(parsed_url.query)
|
|
if 'oldid' in query_params:
|
|
oldid = query_params['oldid'][0]
|
|
|
|
# Fetch the page content to count characters
|
|
page_html = fetch_page_content(page_title)
|
|
total_chars = count_page_characters(page_html)
|
|
|
|
# Calculate deletion percentage
|
|
deletion_percentage = 0
|
|
if total_chars > 0:
|
|
deletion_percentage = (abs(deletion_size) / total_chars) * 100
|
|
|
|
# If deletion percentage is significant
|
|
if deletion_percentage > DELETION_THRESHOLD_PERCENT:
|
|
# Get the timestamp
|
|
timestamp_element = line.select_one('.mw-changeslist-date')
|
|
timestamp = timestamp_element.text.strip() if timestamp_element else ""
|
|
|
|
# Get the user who made the change
|
|
user_element = line.select_one('.mw-userlink')
|
|
user = user_element.text.strip() if user_element else "Unknown"
|
|
|
|
# Get the comment if available
|
|
comment_element = line.select_one('.comment')
|
|
comment = comment_element.text.strip() if comment_element else ""
|
|
|
|
# Generate diff and history URLs
|
|
diff_url = generate_diff_url(page_title, oldid) if oldid else ""
|
|
history_url = generate_history_url(page_title)
|
|
|
|
suspicious_deletions.append({
|
|
'page_title': page_title,
|
|
'page_url': page_url,
|
|
'diff_url': diff_url,
|
|
'history_url': history_url,
|
|
'deletion_size': deletion_size,
|
|
'total_chars': total_chars,
|
|
'deletion_percentage': round(deletion_percentage, 2),
|
|
'timestamp': timestamp,
|
|
'user': user,
|
|
'comment': comment
|
|
})
|
|
logger.info(f"Found suspicious deletion: {page_title} ({deletion_size} chars, {deletion_percentage:.2f}% of content)")
|
|
except ValueError:
|
|
logger.warning(f"Could not parse deletion size from: {deletion_text}")
|
|
|
|
return suspicious_deletions
|
|
|
|
def save_suspicious_deletions(suspicious_deletions):
|
|
"""
|
|
Save the suspicious deletions to a JSON file
|
|
"""
|
|
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'suspicious_deletions.json')
|
|
|
|
# Add timestamp to the data
|
|
data = {
|
|
'last_updated': datetime.now().isoformat(),
|
|
'deletions': suspicious_deletions
|
|
}
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"Saved {len(suspicious_deletions)} suspicious deletions to {output_file}")
|
|
return output_file
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Detect suspicious deletions in OSM Wiki recent changes')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print results without saving to file')
|
|
args = parser.parse_args()
|
|
|
|
html_content = fetch_recent_changes()
|
|
if html_content:
|
|
suspicious_deletions = parse_suspicious_deletions(html_content)
|
|
|
|
if args.dry_run:
|
|
logger.info(f"Found {len(suspicious_deletions)} suspicious deletions:")
|
|
for deletion in suspicious_deletions:
|
|
logger.info(f"- {deletion['page_title']}: {deletion['deletion_size']} chars by {deletion['user']}")
|
|
else:
|
|
output_file = save_suspicious_deletions(suspicious_deletions)
|
|
logger.info(f"Results saved to {output_file}")
|
|
else:
|
|
logger.error("Failed to fetch recent changes. Exiting.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |