osm-labo/wiki_compare/fetch_archived_proposals.py
2025-09-01 15:41:31 +02:00

697 lines
No EOL
28 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
fetch_archived_proposals.py
This script scrapes archived proposals from the OpenStreetMap wiki and extracts voting information.
It analyzes the voting patterns, counts votes by type (approve, oppose, abstain), and collects
information about the users who voted.
The script saves the data to a JSON file that can be used by the Symfony application.
Usage:
python fetch_archived_proposals.py [--force] [--limit N]
Options:
--force Force refresh of all proposals, even if they have already been processed
--limit N Limit processing to N proposals (default: process all proposals)
Output:
- archived_proposals.json file with voting information
"""
import argparse
import json
import logging
import os
import re
import sys
import time
from datetime import datetime
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup, NavigableString
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Constants
ARCHIVED_PROPOSALS_URL = "https://wiki.openstreetmap.org/wiki/Category:Archived_proposals"
import os
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
ARCHIVED_PROPOSALS_FILE = os.path.join(SCRIPT_DIR, "archived_proposals.json")
USER_AGENT = "OSM-Commerces/1.0 (https://github.com/yourusername/osm-commerces; your@email.com)"
RATE_LIMIT_DELAY = 1 # seconds between requests to avoid rate limiting
# Vote patterns
VOTE_PATTERNS = {
'approve': [
r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:approve|support|agree\s+with)\s+this\s+proposal',
r'I\s+vote\s+(?:to\s+)?(?:approve|support)',
r'(?:Symbol\s+support\s+vote\.svg|Symbol_support_vote\.svg)',
],
'oppose': [
r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:oppose|disagree\s+with|reject|do\s+not\s+support)\s+this\s+proposal',
r'I\s+vote\s+(?:to\s+)?(?:oppose|reject|against)',
r'(?:Symbol\s+oppose\s+vote\.svg|Symbol_oppose_vote\.svg)',
],
'abstain': [
r'I\s+(?:have\s+comments\s+but\s+)?abstain\s+from\s+voting',
r'I\s+(?:have\s+comments\s+but\s+)?(?:neither\s+approve\s+nor\s+oppose|am\s+neutral)',
r'(?:Symbol\s+abstain\s+vote\.svg|Symbol_abstain_vote\.svg)',
]
}
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Fetch and analyze archived OSM proposals')
parser.add_argument('--force', action='store_true', help='Force refresh of all proposals')
parser.add_argument('--limit', type=int, help='Limit processing to N proposals (default: process all)')
return parser.parse_args()
def load_existing_data():
"""Load existing archived proposals data if available"""
if os.path.exists(ARCHIVED_PROPOSALS_FILE):
try:
with open(ARCHIVED_PROPOSALS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Loaded {len(data.get('proposals', []))} existing proposals from {ARCHIVED_PROPOSALS_FILE}")
return data
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Error loading existing data: {e}")
# Return empty structure if file doesn't exist or has errors
return {
'last_updated': None,
'proposals': []
}
def save_data(data):
"""Save data to JSON file"""
try:
# Update last_updated timestamp
data['last_updated'] = datetime.now().isoformat()
with open(ARCHIVED_PROPOSALS_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(data.get('proposals', []))} proposals to {ARCHIVED_PROPOSALS_FILE}")
except IOError as e:
logger.error(f"Error saving data: {e}")
except Exception as e:
logger.error(f"Unexpected error saving data: {e}")
def fetch_page(url):
"""Fetch a page from the OSM wiki"""
headers = {
'User-Agent': USER_AGENT
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
return None
def get_proposal_urls():
"""Get URLs of all archived proposals"""
logger.info(f"Fetching archived proposals list from {ARCHIVED_PROPOSALS_URL}")
html = fetch_page(ARCHIVED_PROPOSALS_URL)
if not html:
return []
soup = BeautifulSoup(html, 'html.parser')
# Find all links in the category pages
proposal_urls = []
# Get proposals from the main category page
category_content = soup.select_one('#mw-pages')
if category_content:
for link in category_content.select('a'):
if link.get('title') and 'Category:' not in link.get('title'):
proposal_urls.append({
'title': link.get('title'),
'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href'))
})
# Check if there are subcategories
subcategories = soup.select('#mw-subcategories a')
for subcat in subcategories:
if 'Category:' in subcat.get('title', ''):
logger.info(f"Found subcategory: {subcat.get('title')}")
subcat_url = urljoin(ARCHIVED_PROPOSALS_URL, subcat.get('href'))
# Fetch the subcategory page
time.sleep(RATE_LIMIT_DELAY) # Respect rate limits
subcat_html = fetch_page(subcat_url)
if subcat_html:
subcat_soup = BeautifulSoup(subcat_html, 'html.parser')
subcat_content = subcat_soup.select_one('#mw-pages')
if subcat_content:
for link in subcat_content.select('a'):
if link.get('title') and 'Category:' not in link.get('title'):
proposal_urls.append({
'title': link.get('title'),
'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href'))
})
logger.info(f"Found {len(proposal_urls)} archived proposals")
return proposal_urls
def extract_username(text):
"""Extract username from a signature line"""
# Common patterns for signatures
patterns = [
r'--\s*\[\[User:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User:Username|Username]]
r'--\s*\[\[User:([^|\]]+)\]\]', # --[[User:Username]]
r'--\s*\[\[User talk:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User talk:Username|Username]]
r'--\s*\[\[User talk:([^|\]]+)\]\]', # --[[User talk:Username]]
r'--\s*\[\[Special:Contributions/([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[Special:Contributions/Username|Username]]
r'--\s*\[\[Special:Contributions/([^|\]]+)\]\]', # --[[Special:Contributions/Username]]
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
# If no match found with the patterns, try to find any username-like string
match = re.search(r'--\s*([A-Za-z0-9_-]+)', text)
if match:
return match.group(1).strip()
return None
def extract_date(text):
"""Extract date from a signature line"""
# Look for common date formats in signatures
date_patterns = [
r'(\d{1,2}:\d{2}, \d{1,2} [A-Za-z]+ \d{4})', # 15:30, 25 December 2023
r'(\d{1,2} [A-Za-z]+ \d{4} \d{1,2}:\d{2})', # 25 December 2023 15:30
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # 2023-12-25T15:30:00
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return None
def determine_vote_type(text):
"""Determine the type of vote from the text"""
text_lower = text.lower()
for vote_type, patterns in VOTE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, text_lower, re.IGNORECASE):
return vote_type
return None
def extract_votes(html):
"""Extract voting information from proposal HTML"""
soup = BeautifulSoup(html, 'html.parser')
# Find the voting section
voting_section = None
for heading in soup.find_all(['h2', 'h3']):
heading_text = heading.get_text().lower()
if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text:
voting_section = heading
break
if not voting_section:
logger.warning("No voting section found")
return {
'approve': {'count': 0, 'users': []},
'oppose': {'count': 0, 'users': []},
'abstain': {'count': 0, 'users': []}
}
# Get the content after the voting section heading
votes_content = []
current = voting_section.next_sibling
# Collect all elements until the next heading or the end of the document
while current and not current.name in ['h2', 'h3']:
if current.name: # Skip NavigableString objects
votes_content.append(current)
current = current.next_sibling
# Process vote lists
votes = {
'approve': {'count': 0, 'users': []},
'oppose': {'count': 0, 'users': []},
'abstain': {'count': 0, 'users': []}
}
# For tracking vote dates to calculate duration
all_vote_dates = []
# Look for lists of votes
for element in votes_content:
if element.name == 'ul':
for li in element.find_all('li'):
vote_text = li.get_text()
vote_type = determine_vote_type(vote_text)
if vote_type:
username = extract_username(vote_text)
date = extract_date(vote_text)
# Extract comment by removing vote declaration and signature
comment = vote_text
# Remove vote declaration patterns
for pattern in VOTE_PATTERNS[vote_type]:
comment = re.sub(pattern, '', comment, flags=re.IGNORECASE)
# Remove signature
signature_patterns = [
r'--\s*\[\[User:[^]]+\]\].*$',
r'--\s*\[\[User talk:[^]]+\]\].*$',
r'--\s*\[\[Special:Contributions/[^]]+\]\].*$',
r'--\s*[A-Za-z0-9_-]+.*$'
]
for pattern in signature_patterns:
comment = re.sub(pattern, '', comment, flags=re.IGNORECASE)
# Clean up the comment
comment = comment.strip()
if username:
votes[vote_type]['count'] += 1
votes[vote_type]['users'].append({
'username': username,
'date': date,
'comment': comment
})
# Add date to list for duration calculation if it's valid
if date:
try:
# Try to parse the date in different formats
parsed_date = None
for date_format in [
'%H:%M, %d %B %Y', # 15:30, 25 December 2023
'%d %B %Y %H:%M', # 25 December 2023 15:30
'%Y-%m-%dT%H:%M:%S' # 2023-12-25T15:30:00
]:
try:
parsed_date = datetime.strptime(date, date_format)
break
except ValueError:
continue
if parsed_date:
all_vote_dates.append(parsed_date)
except Exception as e:
logger.warning(f"Could not parse date '{date}': {e}")
# Calculate vote duration if we have at least two dates
if len(all_vote_dates) >= 2:
all_vote_dates.sort()
first_vote = all_vote_dates[0]
last_vote = all_vote_dates[-1]
vote_duration_days = (last_vote - first_vote).days
votes['first_vote'] = first_vote.strftime('%Y-%m-%d')
votes['last_vote'] = last_vote.strftime('%Y-%m-%d')
votes['duration_days'] = vote_duration_days
return votes
def extract_proposal_metadata(html, url, original_title=None):
"""Extract metadata about the proposal"""
soup = BeautifulSoup(html, 'html.parser')
# Get title
title_element = soup.select_one('#firstHeading')
extracted_title = title_element.get_text() if title_element else "Unknown Title"
# Debug logging
logger.debug(f"Original title: '{original_title}', Extracted title: '{extracted_title}'")
# Check if the extracted title is a username or user page
# This covers both "User:Username" and other user-related pages
if (extracted_title.startswith("User:") or
"User:" in extracted_title or
"User talk:" in extracted_title) and original_title:
logger.info(f"Extracted title '{extracted_title}' appears to be a user page. Using original title '{original_title}' instead.")
title = original_title
else:
title = extracted_title
# Get last modified date
last_modified = None
footer_info = soup.select_one('#footer-info-lastmod')
if footer_info:
last_modified_text = footer_info.get_text()
match = re.search(r'(\d{1,2} [A-Za-z]+ \d{4})', last_modified_text)
if match:
last_modified = match.group(1)
# Get content element for further processing
content = soup.select_one('#mw-content-text')
# Get proposer from the page
proposer = None
# Get proposal status from the page
status = None
# Look for table rows to find proposer and status
if content:
# Look for table rows
for row in content.select('tr'):
# Check if the row has at least two cells (th and td)
cells = row.select('th, td')
if len(cells) >= 2:
# Get the header text from the first cell
header_text = cells[0].get_text().strip().lower()
# Check for "Proposed by:" to find proposer
if "proposed by" in header_text:
# Look for user link in the next cell
user_link = cells[1].select_one('a[href*="/wiki/User:"]')
if user_link:
# Extract username from the link
href = user_link.get('href', '')
title = user_link.get('title', '')
# Try to get username from title attribute first
if title and title.startswith('User:'):
proposer = title[5:] # Remove 'User:' prefix
# Otherwise try to extract from href
elif href:
href_match = re.search(r'/wiki/User:([^/]+)', href)
if href_match:
proposer = href_match.group(1)
# If still no proposer, use the link text
if not proposer and user_link.get_text():
proposer = user_link.get_text().strip()
logger.info(f"Found proposer in table: {proposer}")
# Check for "Proposal status:" to find status
elif "proposal status" in header_text:
# Get the status from the next cell
status_cell = cells[1]
# First try to find a link with a category title containing status
status_link = status_cell.select_one('a[title*="Category:Proposals with"]')
if status_link:
# Extract status from the title attribute
status_match = re.search(r'Category:Proposals with "([^"]+)" status', status_link.get('title', ''))
if status_match:
status = status_match.group(1)
logger.info(f"Found status in table link: {status}")
# If no status found in link, try to get text content
if not status:
status_text = status_cell.get_text().strip()
# Try to match one of the known statuses
known_statuses = [
"Draft", "Proposed", "Voting", "Post-vote", "Approved",
"Rejected", "Abandoned", "Canceled", "Obsoleted",
"Inactive", "Undefined"
]
for known_status in known_statuses:
if known_status.lower() in status_text.lower():
status = known_status
logger.info(f"Found status in table text: {status}")
break
# If no proposer found in table, try the first paragraph method
if not proposer:
first_paragraph = soup.select_one('#mw-content-text p')
if first_paragraph:
proposer_match = re.search(r'(?:proposed|created|authored)\s+by\s+\[\[User:([^|\]]+)', first_paragraph.get_text())
if proposer_match:
proposer = proposer_match.group(1)
logger.info(f"Found proposer in paragraph: {proposer}")
# Count sections, links, and words
section_count = len(soup.select('#mw-content-text h2, #mw-content-text h3, #mw-content-text h4')) if content else 0
# Count links excluding user/talk pages (voting signatures)
links = []
if content:
for link in content.select('a'):
href = link.get('href', '')
if href and not re.search(r'User:|User_talk:|Special:Contributions', href):
links.append(href)
link_count = len(links)
# Approximate word count
word_count = 0
if content:
# Get text content excluding navigation elements
for nav in content.select('.navbox, .ambox, .tmbox, .mw-editsection'):
nav.decompose()
# Also exclude the voting section to count only the proposal content
voting_section = None
for heading in content.find_all(['h2', 'h3']):
heading_text = heading.get_text().lower()
if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text:
voting_section = heading
break
if voting_section:
# Remove the voting section and everything after it
current = voting_section
while current:
next_sibling = current.next_sibling
# Only call decompose() if current is not a NavigableString
# NavigableString objects don't have a decompose() method
if not isinstance(current, NavigableString):
current.decompose()
current = next_sibling
# Count words in the remaining content
text = content.get_text()
word_count = len(re.findall(r'\b\w+\b', text))
return {
'title': title,
'url': url,
'last_modified': last_modified,
'proposer': proposer,
'status': status,
'section_count': section_count,
'link_count': link_count,
'word_count': word_count
}
def process_proposal(proposal, force=False):
"""Process a single proposal and extract voting information"""
url = proposal['url']
title = proposal['title']
logger.info(f"Processing proposal: {title}")
# Fetch the proposal page
html = fetch_page(url)
if not html:
return None
# Extract metadata
metadata = extract_proposal_metadata(html, url, original_title=title)
# Extract votes
votes = extract_votes(html)
# Combine metadata and votes
result = {**metadata, 'votes': votes}
# Calculate total votes and percentages
total_votes = votes['approve']['count'] + votes['oppose']['count'] + votes['abstain']['count']
if total_votes > 0:
result['total_votes'] = total_votes
result['approve_percentage'] = round((votes['approve']['count'] / total_votes) * 100, 1)
result['oppose_percentage'] = round((votes['oppose']['count'] / total_votes) * 100, 1)
result['abstain_percentage'] = round((votes['abstain']['count'] / total_votes) * 100, 1)
else:
result['total_votes'] = 0
result['approve_percentage'] = 0
result['oppose_percentage'] = 0
result['abstain_percentage'] = 0
return result
def main():
"""Main function to execute the script"""
args = parse_arguments()
force = args.force
limit = args.limit
logger.info("Starting fetch_archived_proposals.py")
if limit:
logger.info(f"Processing limited to {limit} proposals")
# Load existing data
data = load_existing_data()
# Get list of proposal URLs
proposal_urls = get_proposal_urls()
# Apply limit if specified
if limit and limit < len(proposal_urls):
logger.info(f"Limiting processing from {len(proposal_urls)} to {limit} proposals")
proposal_urls = proposal_urls[:limit]
# Create a map of existing proposals by URL for quick lookup
existing_proposals = {p['url']: p for p in data.get('proposals', [])}
# Process each proposal
new_proposals = []
processed_count = 0
for proposal in proposal_urls:
url = proposal['url']
original_title = proposal['title']
# Skip if already processed and not forcing refresh
if url in existing_proposals and not force:
logger.info(f"Skipping already processed proposal: {original_title}")
new_proposals.append(existing_proposals[url])
continue
# Process the proposal
time.sleep(RATE_LIMIT_DELAY) # Respect rate limits
processed = process_proposal(proposal, force)
if processed:
# Ensure the title is preserved from the original proposal
if processed.get('title') != original_title:
# Check if the title contains "User:" - if it does, we've already handled it in extract_proposal_metadata
# and don't need to log a warning
if "User:" in processed.get('title', ''):
logger.debug(f"Title contains 'User:' - already handled in extract_proposal_metadata")
else:
logger.warning(f"Title changed during processing from '{original_title}' to '{processed.get('title')}'. Restoring original title.")
processed['title'] = original_title
new_proposals.append(processed)
processed_count += 1
# Check if we've reached the limit
if limit and processed_count >= limit:
logger.info(f"Reached limit of {limit} processed proposals")
break
# Update the data
data['proposals'] = new_proposals
# Calculate global statistics
total_proposals = len(new_proposals)
total_votes = sum(p.get('total_votes', 0) for p in new_proposals)
# Calculate votes per proposal statistics, excluding proposals with 0 votes
proposals_with_votes = [p for p in new_proposals if p.get('total_votes', 0) > 0]
num_proposals_with_votes = len(proposals_with_votes)
if num_proposals_with_votes > 0:
# Calculate average votes per proposal (excluding proposals with 0 votes)
votes_per_proposal = [p.get('total_votes', 0) for p in proposals_with_votes]
avg_votes_per_proposal = round(sum(votes_per_proposal) / num_proposals_with_votes, 1)
# Calculate median votes per proposal
votes_per_proposal.sort()
if num_proposals_with_votes % 2 == 0:
# Even number of proposals, average the middle two
median_votes_per_proposal = round((votes_per_proposal[num_proposals_with_votes // 2 - 1] +
votes_per_proposal[num_proposals_with_votes // 2]) / 2, 1)
else:
# Odd number of proposals, take the middle one
median_votes_per_proposal = votes_per_proposal[num_proposals_with_votes // 2]
# Calculate standard deviation of votes per proposal
mean = sum(votes_per_proposal) / num_proposals_with_votes
variance = sum((x - mean) ** 2 for x in votes_per_proposal) / num_proposals_with_votes
std_dev_votes_per_proposal = round((variance ** 0.5), 1)
else:
avg_votes_per_proposal = 0
median_votes_per_proposal = 0
std_dev_votes_per_proposal = 0
# Count unique voters
all_voters = set()
for p in new_proposals:
for vote_type in ['approve', 'oppose', 'abstain']:
for user in p.get('votes', {}).get(vote_type, {}).get('users', []):
if 'username' in user:
all_voters.add(user['username'])
# Find most active voters
voter_counts = {}
for p in new_proposals:
for vote_type in ['approve', 'oppose', 'abstain']:
for user in p.get('votes', {}).get(vote_type, {}).get('users', []):
if 'username' in user:
username = user['username']
if username not in voter_counts:
voter_counts[username] = {'total': 0, 'approve': 0, 'oppose': 0, 'abstain': 0}
voter_counts[username]['total'] += 1
voter_counts[username][vote_type] += 1
# Sort voters by total votes
top_voters = sorted(
[{'username': k, **v} for k, v in voter_counts.items()],
key=lambda x: x['total'],
reverse=True
)[:100] # Top 100 voters
# Count proposals by status
status_counts = {}
for p in new_proposals:
status = p.get('status')
if status:
status_counts[status] = status_counts.get(status, 0) + 1
else:
status_counts['Unknown'] = status_counts.get('Unknown', 0) + 1
# Ensure status_counts is never empty
if not status_counts:
status_counts['No Status'] = 0
# Calculate average vote duration
proposals_with_duration = [p for p in new_proposals if 'votes' in p and 'duration_days' in p['votes']]
avg_vote_duration = 0
if proposals_with_duration:
total_duration = sum(p['votes']['duration_days'] for p in proposals_with_duration)
avg_vote_duration = round(total_duration / len(proposals_with_duration), 1)
# Add statistics to the data
data['statistics'] = {
'total_proposals': total_proposals,
'total_votes': total_votes,
'avg_votes_per_proposal': avg_votes_per_proposal,
'median_votes_per_proposal': median_votes_per_proposal,
'std_dev_votes_per_proposal': std_dev_votes_per_proposal,
'avg_vote_duration_days': avg_vote_duration,
'unique_voters': len(all_voters),
'top_voters': top_voters,
'status_distribution': status_counts
}
# Save the data
save_data(data)
logger.info("Script completed successfully")
if __name__ == "__main__":
main()