#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ fetch_archived_proposals.py This script scrapes archived proposals from the OpenStreetMap wiki and extracts voting information. It analyzes the voting patterns, counts votes by type (approve, oppose, abstain), and collects information about the users who voted. The script saves the data to a JSON file that can be used by the Symfony application. Usage: python fetch_archived_proposals.py [--force] [--limit N] Options: --force Force refresh of all proposals, even if they have already been processed --limit N Limit processing to N proposals (default: process all proposals) Output: - archived_proposals.json file with voting information """ import argparse import json import logging import os import re import sys import time from datetime import datetime from urllib.parse import urljoin import requests from bs4 import BeautifulSoup, NavigableString # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Constants ARCHIVED_PROPOSALS_URL = "https://wiki.openstreetmap.org/wiki/Category:Archived_proposals" import os SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) ARCHIVED_PROPOSALS_FILE = os.path.join(SCRIPT_DIR, "archived_proposals.json") USER_AGENT = "OSM-Commerces/1.0 (https://github.com/yourusername/osm-commerces; your@email.com)" RATE_LIMIT_DELAY = 1 # seconds between requests to avoid rate limiting # Vote patterns VOTE_PATTERNS = { 'approve': [ r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:approve|support|agree\s+with)\s+this\s+proposal', r'I\s+vote\s+(?:to\s+)?(?:approve|support)', r'(?:Symbol\s+support\s+vote\.svg|Symbol_support_vote\.svg)', ], 'oppose': [ r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:oppose|disagree\s+with|reject|do\s+not\s+support)\s+this\s+proposal', r'I\s+vote\s+(?:to\s+)?(?:oppose|reject|against)', r'(?:Symbol\s+oppose\s+vote\.svg|Symbol_oppose_vote\.svg)', ], 'abstain': [ r'I\s+(?:have\s+comments\s+but\s+)?abstain\s+from\s+voting', r'I\s+(?:have\s+comments\s+but\s+)?(?:neither\s+approve\s+nor\s+oppose|am\s+neutral)', r'(?:Symbol\s+abstain\s+vote\.svg|Symbol_abstain_vote\.svg)', ] } def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Fetch and analyze archived OSM proposals') parser.add_argument('--force', action='store_true', help='Force refresh of all proposals') parser.add_argument('--limit', type=int, help='Limit processing to N proposals (default: process all)') return parser.parse_args() def load_existing_data(): """Load existing archived proposals data if available""" if os.path.exists(ARCHIVED_PROPOSALS_FILE): try: with open(ARCHIVED_PROPOSALS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"Loaded {len(data.get('proposals', []))} existing proposals from {ARCHIVED_PROPOSALS_FILE}") return data except (json.JSONDecodeError, IOError) as e: logger.error(f"Error loading existing data: {e}") # Return empty structure if file doesn't exist or has errors return { 'last_updated': None, 'proposals': [] } def save_data(data): """Save data to JSON file""" try: # Update last_updated timestamp data['last_updated'] = datetime.now().isoformat() with open(ARCHIVED_PROPOSALS_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logger.info(f"Saved {len(data.get('proposals', []))} proposals to {ARCHIVED_PROPOSALS_FILE}") except IOError as e: logger.error(f"Error saving data: {e}") except Exception as e: logger.error(f"Unexpected error saving data: {e}") def fetch_page(url): """Fetch a page from the OSM wiki""" headers = { 'User-Agent': USER_AGENT } try: response = requests.get(url, headers=headers) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logger.error(f"Error fetching {url}: {e}") return None def get_proposal_urls(): """Get URLs of all archived proposals""" logger.info(f"Fetching archived proposals list from {ARCHIVED_PROPOSALS_URL}") html = fetch_page(ARCHIVED_PROPOSALS_URL) if not html: return [] soup = BeautifulSoup(html, 'html.parser') # Find all links in the category pages proposal_urls = [] # Get proposals from the main category page category_content = soup.select_one('#mw-pages') if category_content: for link in category_content.select('a'): if link.get('title') and 'Category:' not in link.get('title'): proposal_urls.append({ 'title': link.get('title'), 'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href')) }) # Check if there are subcategories subcategories = soup.select('#mw-subcategories a') for subcat in subcategories: if 'Category:' in subcat.get('title', ''): logger.info(f"Found subcategory: {subcat.get('title')}") subcat_url = urljoin(ARCHIVED_PROPOSALS_URL, subcat.get('href')) # Fetch the subcategory page time.sleep(RATE_LIMIT_DELAY) # Respect rate limits subcat_html = fetch_page(subcat_url) if subcat_html: subcat_soup = BeautifulSoup(subcat_html, 'html.parser') subcat_content = subcat_soup.select_one('#mw-pages') if subcat_content: for link in subcat_content.select('a'): if link.get('title') and 'Category:' not in link.get('title'): proposal_urls.append({ 'title': link.get('title'), 'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href')) }) logger.info(f"Found {len(proposal_urls)} archived proposals") return proposal_urls def extract_username(text): """Extract username from a signature line""" # Common patterns for signatures patterns = [ r'--\s*\[\[User:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User:Username|Username]] r'--\s*\[\[User:([^|\]]+)\]\]', # --[[User:Username]] r'--\s*\[\[User talk:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User talk:Username|Username]] r'--\s*\[\[User talk:([^|\]]+)\]\]', # --[[User talk:Username]] r'--\s*\[\[Special:Contributions/([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[Special:Contributions/Username|Username]] r'--\s*\[\[Special:Contributions/([^|\]]+)\]\]', # --[[Special:Contributions/Username]] ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1).strip() # If no match found with the patterns, try to find any username-like string match = re.search(r'--\s*([A-Za-z0-9_-]+)', text) if match: return match.group(1).strip() return None def extract_date(text): """Extract date from a signature line""" # Look for common date formats in signatures date_patterns = [ r'(\d{1,2}:\d{2}, \d{1,2} [A-Za-z]+ \d{4})', # 15:30, 25 December 2023 r'(\d{1,2} [A-Za-z]+ \d{4} \d{1,2}:\d{2})', # 25 December 2023 15:30 r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # 2023-12-25T15:30:00 ] for pattern in date_patterns: match = re.search(pattern, text) if match: return match.group(1) return None def determine_vote_type(text): """Determine the type of vote from the text""" text_lower = text.lower() for vote_type, patterns in VOTE_PATTERNS.items(): for pattern in patterns: if re.search(pattern, text_lower, re.IGNORECASE): return vote_type return None def extract_votes(html): """Extract voting information from proposal HTML""" soup = BeautifulSoup(html, 'html.parser') # Find the voting section voting_section = None for heading in soup.find_all(['h2', 'h3']): heading_text = heading.get_text().lower() if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text: voting_section = heading break if not voting_section: logger.warning("No voting section found") return { 'approve': {'count': 0, 'users': []}, 'oppose': {'count': 0, 'users': []}, 'abstain': {'count': 0, 'users': []} } # Get the content after the voting section heading votes_content = [] current = voting_section.next_sibling # Collect all elements until the next heading or the end of the document while current and not current.name in ['h2', 'h3']: if current.name: # Skip NavigableString objects votes_content.append(current) current = current.next_sibling # Process vote lists votes = { 'approve': {'count': 0, 'users': []}, 'oppose': {'count': 0, 'users': []}, 'abstain': {'count': 0, 'users': []} } # For tracking vote dates to calculate duration all_vote_dates = [] # Look for lists of votes for element in votes_content: if element.name == 'ul': for li in element.find_all('li'): vote_text = li.get_text() vote_type = determine_vote_type(vote_text) if vote_type: username = extract_username(vote_text) date = extract_date(vote_text) # Extract comment by removing vote declaration and signature comment = vote_text # Remove vote declaration patterns for pattern in VOTE_PATTERNS[vote_type]: comment = re.sub(pattern, '', comment, flags=re.IGNORECASE) # Remove signature signature_patterns = [ r'--\s*\[\[User:[^]]+\]\].*$', r'--\s*\[\[User talk:[^]]+\]\].*$', r'--\s*\[\[Special:Contributions/[^]]+\]\].*$', r'--\s*[A-Za-z0-9_-]+.*$' ] for pattern in signature_patterns: comment = re.sub(pattern, '', comment, flags=re.IGNORECASE) # Clean up the comment comment = comment.strip() if username: votes[vote_type]['count'] += 1 votes[vote_type]['users'].append({ 'username': username, 'date': date, 'comment': comment }) # Add date to list for duration calculation if it's valid if date: try: # Try to parse the date in different formats parsed_date = None for date_format in [ '%H:%M, %d %B %Y', # 15:30, 25 December 2023 '%d %B %Y %H:%M', # 25 December 2023 15:30 '%Y-%m-%dT%H:%M:%S' # 2023-12-25T15:30:00 ]: try: parsed_date = datetime.strptime(date, date_format) break except ValueError: continue if parsed_date: all_vote_dates.append(parsed_date) except Exception as e: logger.warning(f"Could not parse date '{date}': {e}") # Calculate vote duration if we have at least two dates if len(all_vote_dates) >= 2: all_vote_dates.sort() first_vote = all_vote_dates[0] last_vote = all_vote_dates[-1] vote_duration_days = (last_vote - first_vote).days votes['first_vote'] = first_vote.strftime('%Y-%m-%d') votes['last_vote'] = last_vote.strftime('%Y-%m-%d') votes['duration_days'] = vote_duration_days return votes def extract_proposal_metadata(html, url): """Extract metadata about the proposal""" soup = BeautifulSoup(html, 'html.parser') # Get title title_element = soup.select_one('#firstHeading') title = title_element.get_text() if title_element else "Unknown Title" # Get last modified date last_modified = None footer_info = soup.select_one('#footer-info-lastmod') if footer_info: last_modified_text = footer_info.get_text() match = re.search(r'(\d{1,2} [A-Za-z]+ \d{4})', last_modified_text) if match: last_modified = match.group(1) # Get content element for further processing content = soup.select_one('#mw-content-text') # Get proposer from the page proposer = None # Get proposal status from the page status = None # Look for table rows to find proposer and status if content: # Look for table rows for row in content.select('tr'): # Check if the row has at least two cells (th and td) cells = row.select('th, td') if len(cells) >= 2: # Get the header text from the first cell header_text = cells[0].get_text().strip().lower() # Check for "Proposed by:" to find proposer if "proposed by" in header_text: # Look for user link in the next cell user_link = cells[1].select_one('a[href*="/wiki/User:"]') if user_link: # Extract username from the link href = user_link.get('href', '') title = user_link.get('title', '') # Try to get username from title attribute first if title and title.startswith('User:'): proposer = title[5:] # Remove 'User:' prefix # Otherwise try to extract from href elif href: href_match = re.search(r'/wiki/User:([^/]+)', href) if href_match: proposer = href_match.group(1) # If still no proposer, use the link text if not proposer and user_link.get_text(): proposer = user_link.get_text().strip() logger.info(f"Found proposer in table: {proposer}") # Check for "Proposal status:" to find status elif "proposal status" in header_text: # Get the status from the next cell status_cell = cells[1] # First try to find a link with a category title containing status status_link = status_cell.select_one('a[title*="Category:Proposals with"]') if status_link: # Extract status from the title attribute status_match = re.search(r'Category:Proposals with "([^"]+)" status', status_link.get('title', '')) if status_match: status = status_match.group(1) logger.info(f"Found status in table link: {status}") # If no status found in link, try to get text content if not status: status_text = status_cell.get_text().strip() # Try to match one of the known statuses known_statuses = [ "Draft", "Proposed", "Voting", "Post-vote", "Approved", "Rejected", "Abandoned", "Canceled", "Obsoleted", "Inactive", "Undefined" ] for known_status in known_statuses: if known_status.lower() in status_text.lower(): status = known_status logger.info(f"Found status in table text: {status}") break # If no proposer found in table, try the first paragraph method if not proposer: first_paragraph = soup.select_one('#mw-content-text p') if first_paragraph: proposer_match = re.search(r'(?:proposed|created|authored)\s+by\s+\[\[User:([^|\]]+)', first_paragraph.get_text()) if proposer_match: proposer = proposer_match.group(1) logger.info(f"Found proposer in paragraph: {proposer}") # Count sections, links, and words section_count = len(soup.select('#mw-content-text h2, #mw-content-text h3, #mw-content-text h4')) if content else 0 # Count links excluding user/talk pages (voting signatures) links = [] if content: for link in content.select('a'): href = link.get('href', '') if href and not re.search(r'User:|User_talk:|Special:Contributions', href): links.append(href) link_count = len(links) # Approximate word count word_count = 0 if content: # Get text content excluding navigation elements for nav in content.select('.navbox, .ambox, .tmbox, .mw-editsection'): nav.decompose() # Also exclude the voting section to count only the proposal content voting_section = None for heading in content.find_all(['h2', 'h3']): heading_text = heading.get_text().lower() if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text: voting_section = heading break if voting_section: # Remove the voting section and everything after it current = voting_section while current: next_sibling = current.next_sibling # Only call decompose() if current is not a NavigableString # NavigableString objects don't have a decompose() method if not isinstance(current, NavigableString): current.decompose() current = next_sibling # Count words in the remaining content text = content.get_text() word_count = len(re.findall(r'\b\w+\b', text)) return { 'title': title, 'url': url, 'last_modified': last_modified, 'proposer': proposer, 'status': status, 'section_count': section_count, 'link_count': link_count, 'word_count': word_count } def process_proposal(proposal, force=False): """Process a single proposal and extract voting information""" url = proposal['url'] title = proposal['title'] logger.info(f"Processing proposal: {title}") # Fetch the proposal page html = fetch_page(url) if not html: return None # Extract metadata metadata = extract_proposal_metadata(html, url) # Extract votes votes = extract_votes(html) # Combine metadata and votes result = {**metadata, 'votes': votes} # Calculate total votes and percentages total_votes = votes['approve']['count'] + votes['oppose']['count'] + votes['abstain']['count'] if total_votes > 0: result['total_votes'] = total_votes result['approve_percentage'] = round((votes['approve']['count'] / total_votes) * 100, 1) result['oppose_percentage'] = round((votes['oppose']['count'] / total_votes) * 100, 1) result['abstain_percentage'] = round((votes['abstain']['count'] / total_votes) * 100, 1) else: result['total_votes'] = 0 result['approve_percentage'] = 0 result['oppose_percentage'] = 0 result['abstain_percentage'] = 0 return result def main(): """Main function to execute the script""" args = parse_arguments() force = args.force limit = args.limit logger.info("Starting fetch_archived_proposals.py") if limit: logger.info(f"Processing limited to {limit} proposals") # Load existing data data = load_existing_data() # Get list of proposal URLs proposal_urls = get_proposal_urls() # Apply limit if specified if limit and limit < len(proposal_urls): logger.info(f"Limiting processing from {len(proposal_urls)} to {limit} proposals") proposal_urls = proposal_urls[:limit] # Create a map of existing proposals by URL for quick lookup existing_proposals = {p['url']: p for p in data.get('proposals', [])} # Process each proposal new_proposals = [] processed_count = 0 for proposal in proposal_urls: url = proposal['url'] original_title = proposal['title'] # Skip if already processed and not forcing refresh if url in existing_proposals and not force: logger.info(f"Skipping already processed proposal: {original_title}") new_proposals.append(existing_proposals[url]) continue # Process the proposal time.sleep(RATE_LIMIT_DELAY) # Respect rate limits processed = process_proposal(proposal, force) if processed: # Ensure the title is preserved from the original proposal if processed.get('title') != original_title: logger.warning(f"Title changed during processing from '{original_title}' to '{processed.get('title')}'. Restoring original title.") processed['title'] = original_title new_proposals.append(processed) processed_count += 1 # Check if we've reached the limit if limit and processed_count >= limit: logger.info(f"Reached limit of {limit} processed proposals") break # Update the data data['proposals'] = new_proposals # Calculate global statistics total_proposals = len(new_proposals) total_votes = sum(p.get('total_votes', 0) for p in new_proposals) avg_votes_per_proposal = round(total_votes / total_proposals, 1) if total_proposals > 0 else 0 # Count unique voters all_voters = set() for p in new_proposals: for vote_type in ['approve', 'oppose', 'abstain']: for user in p.get('votes', {}).get(vote_type, {}).get('users', []): if 'username' in user: all_voters.add(user['username']) # Find most active voters voter_counts = {} for p in new_proposals: for vote_type in ['approve', 'oppose', 'abstain']: for user in p.get('votes', {}).get(vote_type, {}).get('users', []): if 'username' in user: username = user['username'] if username not in voter_counts: voter_counts[username] = {'total': 0, 'approve': 0, 'oppose': 0, 'abstain': 0} voter_counts[username]['total'] += 1 voter_counts[username][vote_type] += 1 # Sort voters by total votes top_voters = sorted( [{'username': k, **v} for k, v in voter_counts.items()], key=lambda x: x['total'], reverse=True )[:100] # Top 100 voters # Count proposals by status status_counts = {} for p in new_proposals: status = p.get('status') if status: status_counts[status] = status_counts.get(status, 0) + 1 else: status_counts['Unknown'] = status_counts.get('Unknown', 0) + 1 # Calculate average vote duration proposals_with_duration = [p for p in new_proposals if 'votes' in p and 'duration_days' in p['votes']] avg_vote_duration = 0 if proposals_with_duration: total_duration = sum(p['votes']['duration_days'] for p in proposals_with_duration) avg_vote_duration = round(total_duration / len(proposals_with_duration), 1) # Add statistics to the data data['statistics'] = { 'total_proposals': total_proposals, 'total_votes': total_votes, 'avg_votes_per_proposal': avg_votes_per_proposal, 'avg_vote_duration_days': avg_vote_duration, 'unique_voters': len(all_voters), 'top_voters': top_voters, 'status_distribution': status_counts } # Save the data save_data(data) logger.info("Script completed successfully") if __name__ == "__main__": main()