697 lines
No EOL
28 KiB
Python
697 lines
No EOL
28 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
fetch_archived_proposals.py
|
|
|
|
This script scrapes archived proposals from the OpenStreetMap wiki and extracts voting information.
|
|
It analyzes the voting patterns, counts votes by type (approve, oppose, abstain), and collects
|
|
information about the users who voted.
|
|
|
|
The script saves the data to a JSON file that can be used by the Symfony application.
|
|
|
|
Usage:
|
|
python fetch_archived_proposals.py [--force] [--limit N]
|
|
|
|
Options:
|
|
--force Force refresh of all proposals, even if they have already been processed
|
|
--limit N Limit processing to N proposals (default: process all proposals)
|
|
|
|
Output:
|
|
- archived_proposals.json file with voting information
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup, NavigableString
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Constants
|
|
ARCHIVED_PROPOSALS_URL = "https://wiki.openstreetmap.org/wiki/Category:Archived_proposals"
|
|
import os
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
ARCHIVED_PROPOSALS_FILE = os.path.join(SCRIPT_DIR, "archived_proposals.json")
|
|
USER_AGENT = "OSM-Commerces/1.0 (https://github.com/yourusername/osm-commerces; your@email.com)"
|
|
RATE_LIMIT_DELAY = 1 # seconds between requests to avoid rate limiting
|
|
|
|
# Vote patterns
|
|
VOTE_PATTERNS = {
|
|
'approve': [
|
|
r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:approve|support|agree\s+with)\s+this\s+proposal',
|
|
r'I\s+vote\s+(?:to\s+)?(?:approve|support)',
|
|
r'(?:Symbol\s+support\s+vote\.svg|Symbol_support_vote\.svg)',
|
|
],
|
|
'oppose': [
|
|
r'I\s+(?:(?:strongly|fully|completely|wholeheartedly)\s+)?(?:oppose|disagree\s+with|reject|do\s+not\s+support)\s+this\s+proposal',
|
|
r'I\s+vote\s+(?:to\s+)?(?:oppose|reject|against)',
|
|
r'(?:Symbol\s+oppose\s+vote\.svg|Symbol_oppose_vote\.svg)',
|
|
],
|
|
'abstain': [
|
|
r'I\s+(?:have\s+comments\s+but\s+)?abstain\s+from\s+voting',
|
|
r'I\s+(?:have\s+comments\s+but\s+)?(?:neither\s+approve\s+nor\s+oppose|am\s+neutral)',
|
|
r'(?:Symbol\s+abstain\s+vote\.svg|Symbol_abstain_vote\.svg)',
|
|
]
|
|
}
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(description='Fetch and analyze archived OSM proposals')
|
|
parser.add_argument('--force', action='store_true', help='Force refresh of all proposals')
|
|
parser.add_argument('--limit', type=int, help='Limit processing to N proposals (default: process all)')
|
|
return parser.parse_args()
|
|
|
|
def load_existing_data():
|
|
"""Load existing archived proposals data if available"""
|
|
if os.path.exists(ARCHIVED_PROPOSALS_FILE):
|
|
try:
|
|
with open(ARCHIVED_PROPOSALS_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
logger.info(f"Loaded {len(data.get('proposals', []))} existing proposals from {ARCHIVED_PROPOSALS_FILE}")
|
|
return data
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.error(f"Error loading existing data: {e}")
|
|
|
|
# Return empty structure if file doesn't exist or has errors
|
|
return {
|
|
'last_updated': None,
|
|
'proposals': []
|
|
}
|
|
|
|
def save_data(data):
|
|
"""Save data to JSON file"""
|
|
try:
|
|
# Update last_updated timestamp
|
|
data['last_updated'] = datetime.now().isoformat()
|
|
|
|
with open(ARCHIVED_PROPOSALS_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"Saved {len(data.get('proposals', []))} proposals to {ARCHIVED_PROPOSALS_FILE}")
|
|
except IOError as e:
|
|
logger.error(f"Error saving data: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error saving data: {e}")
|
|
|
|
def fetch_page(url):
|
|
"""Fetch a page from the OSM wiki"""
|
|
headers = {
|
|
'User-Agent': USER_AGENT
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching {url}: {e}")
|
|
return None
|
|
|
|
def get_proposal_urls():
|
|
"""Get URLs of all archived proposals"""
|
|
logger.info(f"Fetching archived proposals list from {ARCHIVED_PROPOSALS_URL}")
|
|
|
|
html = fetch_page(ARCHIVED_PROPOSALS_URL)
|
|
if not html:
|
|
return []
|
|
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Find all links in the category pages
|
|
proposal_urls = []
|
|
|
|
# Get proposals from the main category page
|
|
category_content = soup.select_one('#mw-pages')
|
|
if category_content:
|
|
for link in category_content.select('a'):
|
|
if link.get('title') and 'Category:' not in link.get('title'):
|
|
proposal_urls.append({
|
|
'title': link.get('title'),
|
|
'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href'))
|
|
})
|
|
|
|
# Check if there are subcategories
|
|
subcategories = soup.select('#mw-subcategories a')
|
|
for subcat in subcategories:
|
|
if 'Category:' in subcat.get('title', ''):
|
|
logger.info(f"Found subcategory: {subcat.get('title')}")
|
|
subcat_url = urljoin(ARCHIVED_PROPOSALS_URL, subcat.get('href'))
|
|
|
|
# Fetch the subcategory page
|
|
time.sleep(RATE_LIMIT_DELAY) # Respect rate limits
|
|
subcat_html = fetch_page(subcat_url)
|
|
if subcat_html:
|
|
subcat_soup = BeautifulSoup(subcat_html, 'html.parser')
|
|
subcat_content = subcat_soup.select_one('#mw-pages')
|
|
if subcat_content:
|
|
for link in subcat_content.select('a'):
|
|
if link.get('title') and 'Category:' not in link.get('title'):
|
|
proposal_urls.append({
|
|
'title': link.get('title'),
|
|
'url': urljoin(ARCHIVED_PROPOSALS_URL, link.get('href'))
|
|
})
|
|
|
|
logger.info(f"Found {len(proposal_urls)} archived proposals")
|
|
return proposal_urls
|
|
|
|
def extract_username(text):
|
|
"""Extract username from a signature line"""
|
|
# Common patterns for signatures
|
|
patterns = [
|
|
r'--\s*\[\[User:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User:Username|Username]]
|
|
r'--\s*\[\[User:([^|\]]+)\]\]', # --[[User:Username]]
|
|
r'--\s*\[\[User talk:([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[User talk:Username|Username]]
|
|
r'--\s*\[\[User talk:([^|\]]+)\]\]', # --[[User talk:Username]]
|
|
r'--\s*\[\[Special:Contributions/([^|\]]+)(?:\|[^\]]+)?\]\]', # --[[Special:Contributions/Username|Username]]
|
|
r'--\s*\[\[Special:Contributions/([^|\]]+)\]\]', # --[[Special:Contributions/Username]]
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
# If no match found with the patterns, try to find any username-like string
|
|
match = re.search(r'--\s*([A-Za-z0-9_-]+)', text)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
return None
|
|
|
|
def extract_date(text):
|
|
"""Extract date from a signature line"""
|
|
# Look for common date formats in signatures
|
|
date_patterns = [
|
|
r'(\d{1,2}:\d{2}, \d{1,2} [A-Za-z]+ \d{4})', # 15:30, 25 December 2023
|
|
r'(\d{1,2} [A-Za-z]+ \d{4} \d{1,2}:\d{2})', # 25 December 2023 15:30
|
|
r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})', # 2023-12-25T15:30:00
|
|
]
|
|
|
|
for pattern in date_patterns:
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
def determine_vote_type(text):
|
|
"""Determine the type of vote from the text"""
|
|
text_lower = text.lower()
|
|
|
|
for vote_type, patterns in VOTE_PATTERNS.items():
|
|
for pattern in patterns:
|
|
if re.search(pattern, text_lower, re.IGNORECASE):
|
|
return vote_type
|
|
|
|
return None
|
|
|
|
def extract_votes(html):
|
|
"""Extract voting information from proposal HTML"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Find the voting section
|
|
voting_section = None
|
|
for heading in soup.find_all(['h2', 'h3']):
|
|
heading_text = heading.get_text().lower()
|
|
if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text:
|
|
voting_section = heading
|
|
break
|
|
|
|
if not voting_section:
|
|
logger.warning("No voting section found")
|
|
return {
|
|
'approve': {'count': 0, 'users': []},
|
|
'oppose': {'count': 0, 'users': []},
|
|
'abstain': {'count': 0, 'users': []}
|
|
}
|
|
|
|
# Get the content after the voting section heading
|
|
votes_content = []
|
|
current = voting_section.next_sibling
|
|
|
|
# Collect all elements until the next heading or the end of the document
|
|
while current and not current.name in ['h2', 'h3']:
|
|
if current.name: # Skip NavigableString objects
|
|
votes_content.append(current)
|
|
current = current.next_sibling
|
|
|
|
# Process vote lists
|
|
votes = {
|
|
'approve': {'count': 0, 'users': []},
|
|
'oppose': {'count': 0, 'users': []},
|
|
'abstain': {'count': 0, 'users': []}
|
|
}
|
|
|
|
# For tracking vote dates to calculate duration
|
|
all_vote_dates = []
|
|
|
|
# Look for lists of votes
|
|
for element in votes_content:
|
|
if element.name == 'ul':
|
|
for li in element.find_all('li'):
|
|
vote_text = li.get_text()
|
|
vote_type = determine_vote_type(vote_text)
|
|
|
|
if vote_type:
|
|
username = extract_username(vote_text)
|
|
date = extract_date(vote_text)
|
|
|
|
# Extract comment by removing vote declaration and signature
|
|
comment = vote_text
|
|
|
|
# Remove vote declaration patterns
|
|
for pattern in VOTE_PATTERNS[vote_type]:
|
|
comment = re.sub(pattern, '', comment, flags=re.IGNORECASE)
|
|
|
|
# Remove signature
|
|
signature_patterns = [
|
|
r'--\s*\[\[User:[^]]+\]\].*$',
|
|
r'--\s*\[\[User talk:[^]]+\]\].*$',
|
|
r'--\s*\[\[Special:Contributions/[^]]+\]\].*$',
|
|
r'--\s*[A-Za-z0-9_-]+.*$'
|
|
]
|
|
for pattern in signature_patterns:
|
|
comment = re.sub(pattern, '', comment, flags=re.IGNORECASE)
|
|
|
|
# Clean up the comment
|
|
comment = comment.strip()
|
|
|
|
if username:
|
|
votes[vote_type]['count'] += 1
|
|
votes[vote_type]['users'].append({
|
|
'username': username,
|
|
'date': date,
|
|
'comment': comment
|
|
})
|
|
|
|
# Add date to list for duration calculation if it's valid
|
|
if date:
|
|
try:
|
|
# Try to parse the date in different formats
|
|
parsed_date = None
|
|
for date_format in [
|
|
'%H:%M, %d %B %Y', # 15:30, 25 December 2023
|
|
'%d %B %Y %H:%M', # 25 December 2023 15:30
|
|
'%Y-%m-%dT%H:%M:%S' # 2023-12-25T15:30:00
|
|
]:
|
|
try:
|
|
parsed_date = datetime.strptime(date, date_format)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if parsed_date:
|
|
all_vote_dates.append(parsed_date)
|
|
except Exception as e:
|
|
logger.warning(f"Could not parse date '{date}': {e}")
|
|
|
|
# Calculate vote duration if we have at least two dates
|
|
if len(all_vote_dates) >= 2:
|
|
all_vote_dates.sort()
|
|
first_vote = all_vote_dates[0]
|
|
last_vote = all_vote_dates[-1]
|
|
vote_duration_days = (last_vote - first_vote).days
|
|
votes['first_vote'] = first_vote.strftime('%Y-%m-%d')
|
|
votes['last_vote'] = last_vote.strftime('%Y-%m-%d')
|
|
votes['duration_days'] = vote_duration_days
|
|
|
|
return votes
|
|
|
|
def extract_proposal_metadata(html, url, original_title=None):
|
|
"""Extract metadata about the proposal"""
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Get title
|
|
title_element = soup.select_one('#firstHeading')
|
|
extracted_title = title_element.get_text() if title_element else "Unknown Title"
|
|
|
|
# Debug logging
|
|
logger.debug(f"Original title: '{original_title}', Extracted title: '{extracted_title}'")
|
|
|
|
# Check if the extracted title is a username or user page
|
|
# This covers both "User:Username" and other user-related pages
|
|
if (extracted_title.startswith("User:") or
|
|
"User:" in extracted_title or
|
|
"User talk:" in extracted_title) and original_title:
|
|
logger.info(f"Extracted title '{extracted_title}' appears to be a user page. Using original title '{original_title}' instead.")
|
|
title = original_title
|
|
else:
|
|
title = extracted_title
|
|
|
|
# Get last modified date
|
|
last_modified = None
|
|
footer_info = soup.select_one('#footer-info-lastmod')
|
|
if footer_info:
|
|
last_modified_text = footer_info.get_text()
|
|
match = re.search(r'(\d{1,2} [A-Za-z]+ \d{4})', last_modified_text)
|
|
if match:
|
|
last_modified = match.group(1)
|
|
|
|
# Get content element for further processing
|
|
content = soup.select_one('#mw-content-text')
|
|
|
|
# Get proposer from the page
|
|
proposer = None
|
|
|
|
# Get proposal status from the page
|
|
status = None
|
|
|
|
# Look for table rows to find proposer and status
|
|
if content:
|
|
# Look for table rows
|
|
for row in content.select('tr'):
|
|
# Check if the row has at least two cells (th and td)
|
|
cells = row.select('th, td')
|
|
if len(cells) >= 2:
|
|
# Get the header text from the first cell
|
|
header_text = cells[0].get_text().strip().lower()
|
|
|
|
# Check for "Proposed by:" to find proposer
|
|
if "proposed by" in header_text:
|
|
# Look for user link in the next cell
|
|
user_link = cells[1].select_one('a[href*="/wiki/User:"]')
|
|
if user_link:
|
|
# Extract username from the link
|
|
href = user_link.get('href', '')
|
|
title = user_link.get('title', '')
|
|
|
|
# Try to get username from title attribute first
|
|
if title and title.startswith('User:'):
|
|
proposer = title[5:] # Remove 'User:' prefix
|
|
# Otherwise try to extract from href
|
|
elif href:
|
|
href_match = re.search(r'/wiki/User:([^/]+)', href)
|
|
if href_match:
|
|
proposer = href_match.group(1)
|
|
|
|
# If still no proposer, use the link text
|
|
if not proposer and user_link.get_text():
|
|
proposer = user_link.get_text().strip()
|
|
|
|
logger.info(f"Found proposer in table: {proposer}")
|
|
|
|
# Check for "Proposal status:" to find status
|
|
elif "proposal status" in header_text:
|
|
# Get the status from the next cell
|
|
status_cell = cells[1]
|
|
|
|
# First try to find a link with a category title containing status
|
|
status_link = status_cell.select_one('a[title*="Category:Proposals with"]')
|
|
if status_link:
|
|
# Extract status from the title attribute
|
|
status_match = re.search(r'Category:Proposals with "([^"]+)" status', status_link.get('title', ''))
|
|
if status_match:
|
|
status = status_match.group(1)
|
|
logger.info(f"Found status in table link: {status}")
|
|
|
|
# If no status found in link, try to get text content
|
|
if not status:
|
|
status_text = status_cell.get_text().strip()
|
|
# Try to match one of the known statuses
|
|
known_statuses = [
|
|
"Draft", "Proposed", "Voting", "Post-vote", "Approved",
|
|
"Rejected", "Abandoned", "Canceled", "Obsoleted",
|
|
"Inactive", "Undefined"
|
|
]
|
|
for known_status in known_statuses:
|
|
if known_status.lower() in status_text.lower():
|
|
status = known_status
|
|
logger.info(f"Found status in table text: {status}")
|
|
break
|
|
|
|
# If no proposer found in table, try the first paragraph method
|
|
if not proposer:
|
|
first_paragraph = soup.select_one('#mw-content-text p')
|
|
if first_paragraph:
|
|
proposer_match = re.search(r'(?:proposed|created|authored)\s+by\s+\[\[User:([^|\]]+)', first_paragraph.get_text())
|
|
if proposer_match:
|
|
proposer = proposer_match.group(1)
|
|
logger.info(f"Found proposer in paragraph: {proposer}")
|
|
|
|
# Count sections, links, and words
|
|
section_count = len(soup.select('#mw-content-text h2, #mw-content-text h3, #mw-content-text h4')) if content else 0
|
|
|
|
# Count links excluding user/talk pages (voting signatures)
|
|
links = []
|
|
if content:
|
|
for link in content.select('a'):
|
|
href = link.get('href', '')
|
|
if href and not re.search(r'User:|User_talk:|Special:Contributions', href):
|
|
links.append(href)
|
|
link_count = len(links)
|
|
|
|
# Approximate word count
|
|
word_count = 0
|
|
if content:
|
|
# Get text content excluding navigation elements
|
|
for nav in content.select('.navbox, .ambox, .tmbox, .mw-editsection'):
|
|
nav.decompose()
|
|
|
|
# Also exclude the voting section to count only the proposal content
|
|
voting_section = None
|
|
for heading in content.find_all(['h2', 'h3']):
|
|
heading_text = heading.get_text().lower()
|
|
if 'voting' in heading_text or 'votes' in heading_text or 'poll' in heading_text:
|
|
voting_section = heading
|
|
break
|
|
|
|
if voting_section:
|
|
# Remove the voting section and everything after it
|
|
current = voting_section
|
|
while current:
|
|
next_sibling = current.next_sibling
|
|
# Only call decompose() if current is not a NavigableString
|
|
# NavigableString objects don't have a decompose() method
|
|
if not isinstance(current, NavigableString):
|
|
current.decompose()
|
|
current = next_sibling
|
|
|
|
# Count words in the remaining content
|
|
text = content.get_text()
|
|
word_count = len(re.findall(r'\b\w+\b', text))
|
|
|
|
return {
|
|
'title': title,
|
|
'url': url,
|
|
'last_modified': last_modified,
|
|
'proposer': proposer,
|
|
'status': status,
|
|
'section_count': section_count,
|
|
'link_count': link_count,
|
|
'word_count': word_count
|
|
}
|
|
|
|
def process_proposal(proposal, force=False):
|
|
"""Process a single proposal and extract voting information"""
|
|
url = proposal['url']
|
|
title = proposal['title']
|
|
|
|
logger.info(f"Processing proposal: {title}")
|
|
|
|
# Fetch the proposal page
|
|
html = fetch_page(url)
|
|
if not html:
|
|
return None
|
|
|
|
# Extract metadata
|
|
metadata = extract_proposal_metadata(html, url, original_title=title)
|
|
|
|
# Extract votes
|
|
votes = extract_votes(html)
|
|
|
|
# Combine metadata and votes
|
|
result = {**metadata, 'votes': votes}
|
|
|
|
# Calculate total votes and percentages
|
|
total_votes = votes['approve']['count'] + votes['oppose']['count'] + votes['abstain']['count']
|
|
|
|
if total_votes > 0:
|
|
result['total_votes'] = total_votes
|
|
result['approve_percentage'] = round((votes['approve']['count'] / total_votes) * 100, 1)
|
|
result['oppose_percentage'] = round((votes['oppose']['count'] / total_votes) * 100, 1)
|
|
result['abstain_percentage'] = round((votes['abstain']['count'] / total_votes) * 100, 1)
|
|
else:
|
|
result['total_votes'] = 0
|
|
result['approve_percentage'] = 0
|
|
result['oppose_percentage'] = 0
|
|
result['abstain_percentage'] = 0
|
|
|
|
return result
|
|
|
|
def main():
|
|
"""Main function to execute the script"""
|
|
args = parse_arguments()
|
|
force = args.force
|
|
limit = args.limit
|
|
|
|
logger.info("Starting fetch_archived_proposals.py")
|
|
if limit:
|
|
logger.info(f"Processing limited to {limit} proposals")
|
|
|
|
# Load existing data
|
|
data = load_existing_data()
|
|
|
|
# Get list of proposal URLs
|
|
proposal_urls = get_proposal_urls()
|
|
|
|
# Apply limit if specified
|
|
if limit and limit < len(proposal_urls):
|
|
logger.info(f"Limiting processing from {len(proposal_urls)} to {limit} proposals")
|
|
proposal_urls = proposal_urls[:limit]
|
|
|
|
# Create a map of existing proposals by URL for quick lookup
|
|
existing_proposals = {p['url']: p for p in data.get('proposals', [])}
|
|
|
|
# Process each proposal
|
|
new_proposals = []
|
|
processed_count = 0
|
|
for proposal in proposal_urls:
|
|
url = proposal['url']
|
|
original_title = proposal['title']
|
|
|
|
# Skip if already processed and not forcing refresh
|
|
if url in existing_proposals and not force:
|
|
logger.info(f"Skipping already processed proposal: {original_title}")
|
|
new_proposals.append(existing_proposals[url])
|
|
continue
|
|
|
|
# Process the proposal
|
|
time.sleep(RATE_LIMIT_DELAY) # Respect rate limits
|
|
processed = process_proposal(proposal, force)
|
|
|
|
if processed:
|
|
# Ensure the title is preserved from the original proposal
|
|
if processed.get('title') != original_title:
|
|
# Check if the title contains "User:" - if it does, we've already handled it in extract_proposal_metadata
|
|
# and don't need to log a warning
|
|
if "User:" in processed.get('title', ''):
|
|
logger.debug(f"Title contains 'User:' - already handled in extract_proposal_metadata")
|
|
else:
|
|
logger.warning(f"Title changed during processing from '{original_title}' to '{processed.get('title')}'. Restoring original title.")
|
|
processed['title'] = original_title
|
|
|
|
new_proposals.append(processed)
|
|
processed_count += 1
|
|
|
|
# Check if we've reached the limit
|
|
if limit and processed_count >= limit:
|
|
logger.info(f"Reached limit of {limit} processed proposals")
|
|
break
|
|
|
|
# Update the data
|
|
data['proposals'] = new_proposals
|
|
|
|
# Calculate global statistics
|
|
total_proposals = len(new_proposals)
|
|
total_votes = sum(p.get('total_votes', 0) for p in new_proposals)
|
|
|
|
# Calculate votes per proposal statistics, excluding proposals with 0 votes
|
|
proposals_with_votes = [p for p in new_proposals if p.get('total_votes', 0) > 0]
|
|
num_proposals_with_votes = len(proposals_with_votes)
|
|
|
|
if num_proposals_with_votes > 0:
|
|
# Calculate average votes per proposal (excluding proposals with 0 votes)
|
|
votes_per_proposal = [p.get('total_votes', 0) for p in proposals_with_votes]
|
|
avg_votes_per_proposal = round(sum(votes_per_proposal) / num_proposals_with_votes, 1)
|
|
|
|
# Calculate median votes per proposal
|
|
votes_per_proposal.sort()
|
|
if num_proposals_with_votes % 2 == 0:
|
|
# Even number of proposals, average the middle two
|
|
median_votes_per_proposal = round((votes_per_proposal[num_proposals_with_votes // 2 - 1] +
|
|
votes_per_proposal[num_proposals_with_votes // 2]) / 2, 1)
|
|
else:
|
|
# Odd number of proposals, take the middle one
|
|
median_votes_per_proposal = votes_per_proposal[num_proposals_with_votes // 2]
|
|
|
|
# Calculate standard deviation of votes per proposal
|
|
mean = sum(votes_per_proposal) / num_proposals_with_votes
|
|
variance = sum((x - mean) ** 2 for x in votes_per_proposal) / num_proposals_with_votes
|
|
std_dev_votes_per_proposal = round((variance ** 0.5), 1)
|
|
else:
|
|
avg_votes_per_proposal = 0
|
|
median_votes_per_proposal = 0
|
|
std_dev_votes_per_proposal = 0
|
|
|
|
# Count unique voters
|
|
all_voters = set()
|
|
for p in new_proposals:
|
|
for vote_type in ['approve', 'oppose', 'abstain']:
|
|
for user in p.get('votes', {}).get(vote_type, {}).get('users', []):
|
|
if 'username' in user:
|
|
all_voters.add(user['username'])
|
|
|
|
# Find most active voters
|
|
voter_counts = {}
|
|
for p in new_proposals:
|
|
for vote_type in ['approve', 'oppose', 'abstain']:
|
|
for user in p.get('votes', {}).get(vote_type, {}).get('users', []):
|
|
if 'username' in user:
|
|
username = user['username']
|
|
if username not in voter_counts:
|
|
voter_counts[username] = {'total': 0, 'approve': 0, 'oppose': 0, 'abstain': 0}
|
|
voter_counts[username]['total'] += 1
|
|
voter_counts[username][vote_type] += 1
|
|
|
|
# Sort voters by total votes
|
|
top_voters = sorted(
|
|
[{'username': k, **v} for k, v in voter_counts.items()],
|
|
key=lambda x: x['total'],
|
|
reverse=True
|
|
)[:100] # Top 100 voters
|
|
|
|
# Count proposals by status
|
|
status_counts = {}
|
|
for p in new_proposals:
|
|
status = p.get('status')
|
|
if status:
|
|
status_counts[status] = status_counts.get(status, 0) + 1
|
|
else:
|
|
status_counts['Unknown'] = status_counts.get('Unknown', 0) + 1
|
|
|
|
# Ensure status_counts is never empty
|
|
if not status_counts:
|
|
status_counts['No Status'] = 0
|
|
|
|
# Calculate average vote duration
|
|
proposals_with_duration = [p for p in new_proposals if 'votes' in p and 'duration_days' in p['votes']]
|
|
avg_vote_duration = 0
|
|
if proposals_with_duration:
|
|
total_duration = sum(p['votes']['duration_days'] for p in proposals_with_duration)
|
|
avg_vote_duration = round(total_duration / len(proposals_with_duration), 1)
|
|
|
|
# Add statistics to the data
|
|
data['statistics'] = {
|
|
'total_proposals': total_proposals,
|
|
'total_votes': total_votes,
|
|
'avg_votes_per_proposal': avg_votes_per_proposal,
|
|
'median_votes_per_proposal': median_votes_per_proposal,
|
|
'std_dev_votes_per_proposal': std_dev_votes_per_proposal,
|
|
'avg_vote_duration_days': avg_vote_duration,
|
|
'unique_voters': len(all_voters),
|
|
'top_voters': top_voters,
|
|
'status_distribution': status_counts
|
|
}
|
|
|
|
# Save the data
|
|
save_data(data)
|
|
|
|
logger.info("Script completed successfully")
|
|
|
|
if __name__ == "__main__":
|
|
main() |