# main.py (v1.2) — robust feed parsing, clearer SSE progress, normalized host caching, concurrent discovery import asyncio import json import uuid from collections import Counter from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Tuple from urllib.parse import urljoin, urlparse from contextlib import asynccontextmanager import httpx import feedparser from bs4 import BeautifulSoup from fastapi import FastAPI, Request, Form, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlmodel import Field, SQLModel, create_engine, Session, select from datetime import datetime, timezone # ------------------------------ # Settings / Constants # ------------------------------ REQUEST_TIMEOUT = httpx.Timeout(15.0, connect=8.0, read=15.0) HEADERS = { "User-Agent": "LinkAuditBot/1.2 (+https://example.com; contact: admin@example.com)" } COMMON_FEED_PATHS = [ "/feed", "/feed/", "/feed.xml", "/rss", "/rss.xml", "/rss/", "/atom", "/atom.xml", "/index.xml", "/blog/feed", "/blog/rss", "/blog/rss.xml", "/blog/index.xml", "/feeds/posts/default?alt=rss", # Blogger "/news/atom.xml", "/news/rss.xml", "/.rss", "/?feed=rss2", # WP variants "/category/news/feed", "/?feed=atom", ] FEED_MIME_HINTS = { "application/rss+xml", "application/atom+xml", "application/xml", "text/xml", } DISCOVERY_CONCURRENCY = 10 # ------------------------------ # Database Models (SQLModel) # ------------------------------ class PageCache(SQLModel, table=True): url: str = Field(primary_key=True) html: Optional[str] = None fetched_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class LinksCache(SQLModel, table=True): url: str = Field(primary_key=True) links_json: str # JSON list[str] extracted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class HostFeedCache(SQLModel, table=True): hostname: str = Field(primary_key=True) # normalized! feed_url: Optional[str] = None checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class FeedRun(SQLModel, table=True): id: str = Field(primary_key=True, default_factory=lambda: str(uuid.uuid4())) feed_url: str started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) finished_at: Optional[datetime] = None summary_json: Optional[str] = None # store last summary, if desired engine = create_engine("sqlite:///cache.db", echo=False) SQLModel.metadata.create_all(engine) # ------------------------------ # Data models # ------------------------------ @dataclass class HostSummary: hostname: str count: int = 0 unique_links: Set[str] = field(default_factory=set) link_counts: Counter = field(default_factory=Counter) feed_url: Optional[str] = None # ------------------------------ # Utilities # ------------------------------ def now_utc() -> datetime: return datetime.now(timezone.utc) def normalize_host(host: str) -> str: if not host: return host h = host.strip().lower().rstrip(".") if h.startswith("www."): h = h[4:] return h def is_http_url(href: str) -> bool: try: p = urlparse(href) return p.scheme in ("http", "https") except Exception: return False def absolutize(href: str, base_url: str) -> Optional[str]: if not href: return None if href.startswith("#") or href.startswith("mailto:") or href.startswith("tel:"): return None try: abs_url = urljoin(base_url, href) if is_http_url(abs_url): return abs_url except Exception: return None return None def extract_links_from_html(html: str, base_url: str) -> List[str]: soup = BeautifulSoup(html, "lxml") links: List[str] = [] for a in soup.find_all("a", href=True): u = absolutize(a.get("href"), base_url) if u: links.append(u) return links # ------------------------------ # Networking # ------------------------------ async def fetch_text(client: httpx.AsyncClient, url: str) -> Optional[str]: try: r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True) if r.status_code < 400: # Do NOT force UTF-8; respect server if present if r.encoding is None: r.encoding = r.apparent_encoding or "utf-8" return r.text except Exception: return None return None async def fetch_bytes(client: httpx.AsyncClient, url: str) -> Optional[Tuple[bytes, Optional[str]]]: try: r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True) if r.status_code < 400: ctype = r.headers.get("content-type") return r.content, ctype except Exception: return None return None async def fetch_head_ok(client: httpx.AsyncClient, url: str) -> Tuple[bool, Optional[str]]: try: r = await client.head(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True) if r.status_code < 400: return True, r.headers.get("content-type") except Exception: pass try: r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True) if r.status_code < 400: return True, r.headers.get("content-type") except Exception: pass return False, None # ------------------------------ # Cache helpers # ------------------------------ def cache_get_page(url: str) -> Optional[PageCache]: with Session(engine) as sess: return sess.get(PageCache, url) def cache_set_page(url: str, html: Optional[str]): with Session(engine) as sess: sess.merge(PageCache(url=url, html=html, fetched_at=now_utc())) sess.commit() def cache_get_links(url: str) -> Optional[List[str]]: with Session(engine) as sess: row = sess.get(LinksCache, url) if not row: return None try: return json.loads(row.links_json) except Exception: return None def cache_set_links(url: str, links: List[str]): with Session(engine) as sess: sess.merge(LinksCache(url=url, links_json=json.dumps(links), extracted_at=now_utc())) sess.commit() def cache_get_host_feed(hostname: str) -> Optional[str]: host_key = normalize_host(hostname) with Session(engine) as sess: row = sess.get(HostFeedCache, host_key) return row.feed_url if row else None def cache_set_host_feed(hostname: str, feed_url: Optional[str]): host_key = normalize_host(hostname) with Session(engine) as sess: sess.merge(HostFeedCache(hostname=host_key, feed_url=feed_url, checked_at=now_utc())) sess.commit() # ------------------------------ # Cached fetch/extract # ------------------------------ async def fetch_page_html(client: httpx.AsyncClient, url: str) -> Optional[str]: cached = cache_get_page(url) if cached and cached.html: return cached.html html = await fetch_text(client, url) cache_set_page(url, html) return html async def get_links_for_page(client: httpx.AsyncClient, url: str) -> List[str]: cached = cache_get_links(url) if cached is not None: return cached html = await fetch_page_html(client, url) if not html: cache_set_links(url, []) return [] links = extract_links_from_html(html, url) cache_set_links(url, links) return links # ------------------------------ # Robust feed parsing # ------------------------------ async def fetch_feed_entries(client: httpx.AsyncClient, feed_url: str) -> List[str]: """ Fetch feed as bytes and let feedparser infer encoding using headers. Retry a couple fallbacks for mismatched declarations. """ got = await fetch_bytes(client, feed_url) if not got: raise ValueError("Could not download the feed.") content, ctype = got parsed = feedparser.parse(content) if parsed.bozo == 0 and (parsed.feed or parsed.entries): return _entries_to_urls(parsed) # Fallback 1: strip BOM cleaned = content.lstrip(b"\xef\xbb\xbf") if cleaned is not content: parsed2 = feedparser.parse(cleaned) if parsed2.bozo == 0 and (parsed2.feed or parsed2.entries): return _entries_to_urls(parsed2) # Fallback 2: replace us-ascii decl with utf-8 try: cleaned2 = cleaned.replace(b'encoding="us-ascii"', b'encoding="utf-8"') parsed3 = feedparser.parse(cleaned2) if parsed3.bozo == 0 and (parsed3.feed or parsed3.entries): return _entries_to_urls(parsed3) except Exception: pass raise ValueError(f"Could not parse feed: {getattr(parsed, 'bozo_exception', 'unknown parse error')}") def _entries_to_urls(parsed) -> List[str]: urls: List[str] = [] for e in parsed.entries: if getattr(e, "link", None): urls.append(e.link) elif getattr(e, "id", None) and is_http_url(e.id): urls.append(e.id) seen, out = set(), [] for u in urls: if u not in seen: seen.add(u) out.append(u) return out # ------------------------------ # Feed discovery (normalized + concurrent) # ------------------------------ async def discover_feed_for_host(client: httpx.AsyncClient, hostname: str) -> Optional[str]: host_key = normalize_host(hostname) cached = cache_get_host_feed(host_key) if cached is not None: return cached bases = [] canon = host_key bases.append(f"https://{canon}") bases.append(f"http://{canon}") if not canon.startswith("www."): bases.append(f"https://www.{canon}") bases.append(f"http://www.{canon}") async def try_candidate(url: str) -> Optional[str]: ok, ctype = await fetch_head_ok(client, url) if ok and (not ctype or any(mt in ctype for mt in FEED_MIME_HINTS)): parsed = feedparser.parse(url) if parsed.bozo == 0 and (parsed.feed or parsed.entries): return url return None tasks = [] for base in bases: for path in COMMON_FEED_PATHS: tasks.append(asyncio.create_task(try_candidate(base + path))) for t in asyncio.as_completed(tasks): res = await t if res: cache_set_host_feed(host_key, res) return res for base in bases: html = await fetch_page_html(client, base + "/") if not html: continue soup = BeautifulSoup(html, "lxml") for link in soup.find_all("link", rel=True, href=True): rels = link.get("rel") if isinstance(rels, list): rels = {r.lower() for r in rels if r} else: rels = {str(rels).lower()} typ = str(link.get("type", "")).lower() href = link.get("href") if "alternate" in rels and any(mt in typ for mt in ("rss", "atom", "xml")): feed_url = urljoin(base + "/", href) parsed = feedparser.parse(feed_url) if parsed.bozo == 0 and (parsed.feed or parsed.entries): cache_set_host_feed(host_key, feed_url) return feed_url for a in soup.find_all("a", href=True): href = a.get("href", "") if any(tok in href.lower() for tok in ("rss", "atom", "feed")): feed_url = urljoin(base + "/", href) ok, ctype = await fetch_head_ok(client, feed_url) if ok: parsed = feedparser.parse(feed_url) if parsed.bozo == 0 and (parsed.feed or parsed.entries): cache_set_host_feed(host_key, feed_url) return feed_url cache_set_host_feed(host_key, None) return None # ------------------------------ # SSE plumbing # ------------------------------ class Job: def __init__(self, feed_url: str): self.id = str(uuid.uuid4()) self.feed_url = feed_url self.queue: asyncio.Queue[str] = asyncio.Queue() self.done = asyncio.Event() async def emit(self, event: str, data: dict): payload = {"event": event, "data": data, "ts": datetime.now(timezone.utc).isoformat()} await self.queue.put(f"event: {event}\ndata: {json.dumps(payload)}\n\n") async def finish(self): self.done.set() await self.queue.put("event: done\ndata: {}\n\n") JOBS: Dict[str, Job] = {} async def run_analysis_job(job: Job): with Session(engine) as sess: fr = FeedRun(feed_url=job.feed_url) sess.add(fr) sess.commit() async with httpx.AsyncClient(http2=True) as client: try: await job.emit("status", {"stage": "feed", "message": "Downloading and parsing feed…"}) post_urls = await fetch_feed_entries(client, job.feed_url) await job.emit("posts", {"count": len(post_urls)}) all_links: List[str] = [] for idx, post_url in enumerate(post_urls, start=1): await job.emit("status", {"stage": "posts", "message": f"Fetching post {idx}/{len(post_urls)}"}) links = await get_links_for_page(client, post_url) all_links.extend(links) await job.emit("post_progress", {"current": idx, "total": len(post_urls), "post_url": post_url}) host_map: Dict[str, HostSummary] = {} for link in all_links: host = normalize_host(urlparse(link).netloc) if not host: continue hs = host_map.setdefault(host, HostSummary(hostname=host)) hs.count += 1 hs.unique_links.add(link) hs.link_counts[link] += 1 hosts_sorted = sorted(host_map.values(), key=lambda s: s.count, reverse=True) await job.emit("hosts", {"count": len(hosts_sorted)}) sem = asyncio.Semaphore(DISCOVERY_CONCURRENCY) max_count = max((h.count for h in hosts_sorted), default=1) async def work(hs: HostSummary, idx: int, total: int): async with sem: await job.emit("status", {"stage": "discover", "message": f"Discovering feed for {hs.hostname} ({idx}/{total})"}) feed = await discover_feed_for_host(client, hs.hostname) hs.feed_url = feed host_dict = { "hostname": hs.hostname, "count": hs.count, "unique_link_count": len(hs.unique_links), "links": sorted(list(hs.unique_links)), "top_links": [ {"url": url, "count": cnt} for url, cnt in hs.link_counts.most_common() if cnt > 1 ], "feed_url": hs.feed_url, } html = render_host_card(host_dict, max_count, index=idx) await job.emit("host_card", {"html": html, "index": idx, "total": total}) tasks = [asyncio.create_task(work(hs, i, len(hosts_sorted))) for i, hs in enumerate(hosts_sorted, start=1)] async def heartbeat(): while any(not t.done() for t in tasks): await job.emit("status", {"stage": "discover", "message": "Still discovering host feeds…"}) await asyncio.sleep(3) hb = asyncio.create_task(heartbeat()) await asyncio.gather(*tasks) hb.cancel() summary = { "feed_url": job.feed_url, "post_count": len(post_urls), "hosts": [h.hostname for h in hosts_sorted], "fetched_at": datetime.now(timezone.utc).isoformat(), } with Session(engine) as sess: fr = sess.exec(select(FeedRun).where(FeedRun.feed_url == job.feed_url).order_by(FeedRun.started_at.desc())).first() if fr: fr.summary_json = json.dumps(summary) fr.finished_at = datetime.now(timezone.utc) sess.add(fr) sess.commit() await job.emit("summary", summary) except Exception as e: await job.emit("error", {"message": str(e)}) finally: await job.finish() # ------------------------------ # Template rendering for components # ------------------------------ templates = Jinja2Templates(directory="templates") def render_host_card(host: dict, max_count: int, index: int) -> str: from fastapi import Request class Dummy: def __init__(self): self.state = type("s", (), {})() req = Dummy() html = templates.get_template("components/host_card.html").render( request=req, host=host, max_count=max_count, index=index ) return html # ------------------------------ # FastAPI app + routes # ------------------------------ app = FastAPI(title="RSS Link Audit", version="1.2.0") app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/start", response_class=JSONResponse) async def start(feed_url: str = Form(...)): job = Job(feed_url) JOBS[job.id] = job asyncio.create_task(run_analysis_job(job)) return {"job_id": job.id} @app.get("/events/{job_id}") async def sse(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(404, "Job not found") async def event_gen(): yield f"event: hello\ndata: {{\"job_id\":\"{job.id}\"}}\n\n" while True: try: item = await asyncio.wait_for(job.queue.get(), timeout=30.0) yield item if job.done.is_set(): break except asyncio.TimeoutError: yield "event: ping\ndata: {}\n\n" JOBS.pop(job.id, None) return StreamingResponse(event_gen(), media_type="text/event-stream") @app.post("/api/analyze", response_class=JSONResponse) async def analyze_api(payload: Dict): feed_url = payload.get("feed_url") if not feed_url: raise HTTPException(status_code=400, detail="Missing 'feed_url'") job = Job(feed_url) await run_analysis_job(job) return JSONResponse(content={"ok": True}) @app.get("/healthz") async def healthz(): return {"ok": True}