#!/usr/bin/env python3 # produit un fichier swagger à partir d'une application streamlit import argparse import ast import json import os import re import sys from typing import Any, Dict, List, Optional, Tuple DOC_TAG_ROUTE = re.compile(r"@route\s+(?P\S+)") DOC_TAG_METHOD = re.compile(r"@method\s+(?PGET|POST|PUT|PATCH|DELETE|OPTIONS|HEAD)", re.IGNORECASE) DOC_TAG_SUMMARY = re.compile(r"@summary\s+(?P.+)") DOC_TAG_DESC = re.compile(r"@desc(ription)?\s+(?P.+)") DOC_TAG_PARAM = re.compile(r"@param\s+(?P[a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*(?P[^\s:]+)(?:\s*:\s*(?Pquery|path|header|cookie|body))?(?:\s*-\s*(?P.*))?", re.IGNORECASE) DOC_TAG_RESP = re.compile(r"@response\s+(?P\d{3})\s*:\s*(?P.*)") LINE_ANNOTATION = re.compile( r"#\s*api:\s*(?P.+)", re.IGNORECASE ) def parse_kv_line(line: str) -> Dict[str, str]: kvs: Dict[str, str] = {} m = LINE_ANNOTATION.search(line) if not m: return kvs parts = re.split(r"\s+", m.group("kvs").strip()) for part in parts: if "=" in part: k, v = part.split("=", 1) kvs[k.strip().lower()] = v.strip() return kvs def parse_docstring(doc: Optional[str]) -> Dict[str, Any]: result: Dict[str, Any] = {"params": [], "responses": []} if not doc: return result for line in doc.splitlines(): line = line.strip() if not line: continue m = DOC_TAG_ROUTE.search(line) if m: result["route"] = m.group("path") continue m = DOC_TAG_METHOD.search(line) if m: result["method"] = m.group("method").upper() continue m = DOC_TAG_SUMMARY.search(line) if m: result["summary"] = m.group("summary").strip() continue m = DOC_TAG_DESC.search(line) if m: result["description"] = m.group("desc").strip() continue m = DOC_TAG_PARAM.search(line) if m: result["params"].append( { "name": m.group("name"), "type": (m.group("type") or "string"), "in": (m.group("where") or "query").lower(), "description": (m.group("desc") or "").strip(), } ) continue m = DOC_TAG_RESP.search(line) if m: result["responses"].append( { "code": m.group("code"), "description": (m.group("desc") or "").strip(), } ) continue return result def guess_schema(t: str) -> Dict[str, Any]: t = t.lower() if t in ("int", "integer"): # basic mapping return {"type": "integer"} if t in ("float", "number", "double"): return {"type": "number"} if t in ("bool", "boolean"): return {"type": "boolean"} if t in ("array", "list"): return {"type": "array", "items": {"type": "string"}} if t in ("object", "dict", "map"): return {"type": "object"} return {"type": "string"} def build_operation(meta: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: method = meta.get("method", "GET").lower() summary = meta.get("summary") description = meta.get("description") params = meta.get("params", []) responses = meta.get("responses", []) op: Dict[str, Any] = { "tags": ["streamlit"], "parameters": [], "responses": {"200": {"description": "OK"}}, } if summary: op["summary"] = summary if description: op["description"] = description request_body_props: List[Dict[str, Any]] = [] for p in params: where = p.get("in", "query") name = p.get("name") type_ = p.get("type", "string") desc = p.get("description", "") schema = guess_schema(type_) if where == "body": request_body_props.append({"name": name, "schema": schema, "description": desc}) elif where in ("query", "path", "header", "cookie"): op["parameters"].append( { "name": name, "in": where, "required": where == "path", "description": desc, "schema": schema, } ) if request_body_props: properties = {x["name"]: x["schema"] for x in request_body_props} op["requestBody"] = { "required": True, "content": { "application/json": { "schema": { "type": "object", "properties": properties, } } }, } if responses: op["responses"] = {r["code"]: {"description": r["description"] or ""} for r in responses} return method, op def scan_file(path: str) -> List[Dict[str, Any]]: with open(path, "r", encoding="utf-8") as f: src = f.read() try: tree = ast.parse(src, filename=path) except SyntaxError: return [] endpoints: List[Dict[str, Any]] = [] # 1) Docstrings de fonctions top-level for node in tree.body: if isinstance(node, ast.FunctionDef): meta = parse_docstring(ast.get_docstring(node)) if "route" in meta: endpoints.append(meta) # 2) Annotations en ligne: # api: route=/x method=GET summary=... for i, line in enumerate(src.splitlines(), start=1): if "# api:" in line.lower(): kv = parse_kv_line(line) if "route" in kv: endpoints.append( { "route": kv.get("route"), "method": (kv.get("method") or "GET").upper(), "summary": kv.get("summary"), "description": kv.get("description"), "params": [], "responses": [], } ) return endpoints def generate_openapi(src_dir: str, title: str, version: str, server_url: Optional[str]) -> Dict[str, Any]: spec: Dict[str, Any] = { "openapi": "3.0.3", "info": {"title": title, "version": version}, "paths": {}, } if server_url: spec["servers"] = [{"url": server_url}] for root, _dirs, files in os.walk(src_dir): for fname in files: if not fname.endswith(".py"): continue fpath = os.path.join(root, fname) endpoints = scan_file(fpath) for meta in endpoints: route = meta.get("route") if not route: continue method, op = build_operation(meta) if route not in spec["paths"]: spec["paths"][route] = {} spec["paths"][route][method] = op return spec def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser(description="Génère un fichier OpenAPI (Swagger) depuis une app Streamlit par conventions.") parser.add_argument("--src", default=os.environ.get("STREAMLIT_SRC", "./streamlit"), help="Répertoire source de l'app Streamlit") parser.add_argument("--out", default="openapi.json", help="Chemin du fichier de sortie OpenAPI JSON") parser.add_argument("--title", default="Streamlit API", help="Titre de l'API") parser.add_argument("--version", default="0.1.0", help="Version de l'API") parser.add_argument("--server-url", default=os.environ.get("OPENAPI_SERVER_URL"), help="URL du serveur à inclure dans le spec") args = parser.parse_args(argv) spec = generate_openapi(args.src, args.title, args.version, args.server_url) with open(args.out, "w", encoding="utf-8") as f: json.dump(spec, f, ensure_ascii=False, indent=2) print(f"OpenAPI généré: {args.out}") return 0 if __name__ == "__main__": raise SystemExit(main())