rss-link-app/main.py
2025-09-03 20:22:39 -05:00

537 lines
18 KiB
Python

# main.py (v1.2) — robust feed parsing, clearer SSE progress, normalized host caching, concurrent discovery
import asyncio
import json
import uuid
from collections import Counter
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urljoin, urlparse
from contextlib import asynccontextmanager
import httpx
import feedparser
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlmodel import Field, SQLModel, create_engine, Session, select
from datetime import datetime, timezone
# ------------------------------
# Settings / Constants
# ------------------------------
REQUEST_TIMEOUT = httpx.Timeout(15.0, connect=8.0, read=15.0)
HEADERS = {
"User-Agent": "LinkAuditBot/1.2 (+https://example.com; contact: admin@example.com)"
}
COMMON_FEED_PATHS = [
"/feed", "/feed/", "/feed.xml",
"/rss", "/rss.xml", "/rss/",
"/atom", "/atom.xml",
"/index.xml",
"/blog/feed", "/blog/rss", "/blog/rss.xml", "/blog/index.xml",
"/feeds/posts/default?alt=rss", # Blogger
"/news/atom.xml", "/news/rss.xml",
"/.rss", "/?feed=rss2", # WP variants
"/category/news/feed", "/?feed=atom",
]
FEED_MIME_HINTS = {
"application/rss+xml",
"application/atom+xml",
"application/xml",
"text/xml",
}
DISCOVERY_CONCURRENCY = 10
# ------------------------------
# Database Models (SQLModel)
# ------------------------------
class PageCache(SQLModel, table=True):
url: str = Field(primary_key=True)
html: Optional[str] = None
fetched_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class LinksCache(SQLModel, table=True):
url: str = Field(primary_key=True)
links_json: str # JSON list[str]
extracted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class HostFeedCache(SQLModel, table=True):
hostname: str = Field(primary_key=True) # normalized!
feed_url: Optional[str] = None
checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class FeedRun(SQLModel, table=True):
id: str = Field(primary_key=True, default_factory=lambda: str(uuid.uuid4()))
feed_url: str
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
finished_at: Optional[datetime] = None
summary_json: Optional[str] = None # store last summary, if desired
engine = create_engine("sqlite:///cache.db", echo=False)
SQLModel.metadata.create_all(engine)
# ------------------------------
# Data models
# ------------------------------
@dataclass
class HostSummary:
hostname: str
count: int = 0
unique_links: Set[str] = field(default_factory=set)
link_counts: Counter = field(default_factory=Counter)
feed_url: Optional[str] = None
# ------------------------------
# Utilities
# ------------------------------
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def normalize_host(host: str) -> str:
if not host:
return host
h = host.strip().lower().rstrip(".")
if h.startswith("www."):
h = h[4:]
return h
def is_http_url(href: str) -> bool:
try:
p = urlparse(href)
return p.scheme in ("http", "https")
except Exception:
return False
def absolutize(href: str, base_url: str) -> Optional[str]:
if not href:
return None
if href.startswith("#") or href.startswith("mailto:") or href.startswith("tel:"):
return None
try:
abs_url = urljoin(base_url, href)
if is_http_url(abs_url):
return abs_url
except Exception:
return None
return None
def extract_links_from_html(html: str, base_url: str) -> List[str]:
soup = BeautifulSoup(html, "lxml")
links: List[str] = []
for a in soup.find_all("a", href=True):
u = absolutize(a.get("href"), base_url)
if u:
links.append(u)
return links
# ------------------------------
# Networking
# ------------------------------
async def fetch_text(client: httpx.AsyncClient, url: str) -> Optional[str]:
try:
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
if r.status_code < 400:
# Do NOT force UTF-8; respect server if present
if r.encoding is None:
r.encoding = r.apparent_encoding or "utf-8"
return r.text
except Exception:
return None
return None
async def fetch_bytes(client: httpx.AsyncClient, url: str) -> Optional[Tuple[bytes, Optional[str]]]:
try:
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
if r.status_code < 400:
ctype = r.headers.get("content-type")
return r.content, ctype
except Exception:
return None
return None
async def fetch_head_ok(client: httpx.AsyncClient, url: str) -> Tuple[bool, Optional[str]]:
try:
r = await client.head(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
if r.status_code < 400:
return True, r.headers.get("content-type")
except Exception:
pass
try:
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
if r.status_code < 400:
return True, r.headers.get("content-type")
except Exception:
pass
return False, None
# ------------------------------
# Cache helpers
# ------------------------------
def cache_get_page(url: str) -> Optional[PageCache]:
with Session(engine) as sess:
return sess.get(PageCache, url)
def cache_set_page(url: str, html: Optional[str]):
with Session(engine) as sess:
sess.merge(PageCache(url=url, html=html, fetched_at=now_utc()))
sess.commit()
def cache_get_links(url: str) -> Optional[List[str]]:
with Session(engine) as sess:
row = sess.get(LinksCache, url)
if not row:
return None
try:
return json.loads(row.links_json)
except Exception:
return None
def cache_set_links(url: str, links: List[str]):
with Session(engine) as sess:
sess.merge(LinksCache(url=url, links_json=json.dumps(links), extracted_at=now_utc()))
sess.commit()
def cache_get_host_feed(hostname: str) -> Optional[str]:
host_key = normalize_host(hostname)
with Session(engine) as sess:
row = sess.get(HostFeedCache, host_key)
return row.feed_url if row else None
def cache_set_host_feed(hostname: str, feed_url: Optional[str]):
host_key = normalize_host(hostname)
with Session(engine) as sess:
sess.merge(HostFeedCache(hostname=host_key, feed_url=feed_url, checked_at=now_utc()))
sess.commit()
# ------------------------------
# Cached fetch/extract
# ------------------------------
async def fetch_page_html(client: httpx.AsyncClient, url: str) -> Optional[str]:
cached = cache_get_page(url)
if cached and cached.html:
return cached.html
html = await fetch_text(client, url)
cache_set_page(url, html)
return html
async def get_links_for_page(client: httpx.AsyncClient, url: str) -> List[str]:
cached = cache_get_links(url)
if cached is not None:
return cached
html = await fetch_page_html(client, url)
if not html:
cache_set_links(url, [])
return []
links = extract_links_from_html(html, url)
cache_set_links(url, links)
return links
# ------------------------------
# Robust feed parsing
# ------------------------------
async def fetch_feed_entries(client: httpx.AsyncClient, feed_url: str) -> List[str]:
"""
Fetch feed as bytes and let feedparser infer encoding using headers.
Retry a couple fallbacks for mismatched declarations.
"""
got = await fetch_bytes(client, feed_url)
if not got:
raise ValueError("Could not download the feed.")
content, ctype = got
parsed = feedparser.parse(content)
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
return _entries_to_urls(parsed)
# Fallback 1: strip BOM
cleaned = content.lstrip(b"\xef\xbb\xbf")
if cleaned is not content:
parsed2 = feedparser.parse(cleaned)
if parsed2.bozo == 0 and (parsed2.feed or parsed2.entries):
return _entries_to_urls(parsed2)
# Fallback 2: replace us-ascii decl with utf-8
try:
cleaned2 = cleaned.replace(b'encoding="us-ascii"', b'encoding="utf-8"')
parsed3 = feedparser.parse(cleaned2)
if parsed3.bozo == 0 and (parsed3.feed or parsed3.entries):
return _entries_to_urls(parsed3)
except Exception:
pass
raise ValueError(f"Could not parse feed: {getattr(parsed, 'bozo_exception', 'unknown parse error')}")
def _entries_to_urls(parsed) -> List[str]:
urls: List[str] = []
for e in parsed.entries:
if getattr(e, "link", None):
urls.append(e.link)
elif getattr(e, "id", None) and is_http_url(e.id):
urls.append(e.id)
seen, out = set(), []
for u in urls:
if u not in seen:
seen.add(u)
out.append(u)
return out
# ------------------------------
# Feed discovery (normalized + concurrent)
# ------------------------------
async def discover_feed_for_host(client: httpx.AsyncClient, hostname: str) -> Optional[str]:
host_key = normalize_host(hostname)
cached = cache_get_host_feed(host_key)
if cached is not None:
return cached
bases = []
canon = host_key
bases.append(f"https://{canon}")
bases.append(f"http://{canon}")
if not canon.startswith("www."):
bases.append(f"https://www.{canon}")
bases.append(f"http://www.{canon}")
async def try_candidate(url: str) -> Optional[str]:
ok, ctype = await fetch_head_ok(client, url)
if ok and (not ctype or any(mt in ctype for mt in FEED_MIME_HINTS)):
parsed = feedparser.parse(url)
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
return url
return None
tasks = []
for base in bases:
for path in COMMON_FEED_PATHS:
tasks.append(asyncio.create_task(try_candidate(base + path)))
for t in asyncio.as_completed(tasks):
res = await t
if res:
cache_set_host_feed(host_key, res)
return res
for base in bases:
html = await fetch_page_html(client, base + "/")
if not html:
continue
soup = BeautifulSoup(html, "lxml")
for link in soup.find_all("link", rel=True, href=True):
rels = link.get("rel")
if isinstance(rels, list):
rels = {r.lower() for r in rels if r}
else:
rels = {str(rels).lower()}
typ = str(link.get("type", "")).lower()
href = link.get("href")
if "alternate" in rels and any(mt in typ for mt in ("rss", "atom", "xml")):
feed_url = urljoin(base + "/", href)
parsed = feedparser.parse(feed_url)
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
cache_set_host_feed(host_key, feed_url)
return feed_url
for a in soup.find_all("a", href=True):
href = a.get("href", "")
if any(tok in href.lower() for tok in ("rss", "atom", "feed")):
feed_url = urljoin(base + "/", href)
ok, ctype = await fetch_head_ok(client, feed_url)
if ok:
parsed = feedparser.parse(feed_url)
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
cache_set_host_feed(host_key, feed_url)
return feed_url
cache_set_host_feed(host_key, None)
return None
# ------------------------------
# SSE plumbing
# ------------------------------
class Job:
def __init__(self, feed_url: str):
self.id = str(uuid.uuid4())
self.feed_url = feed_url
self.queue: asyncio.Queue[str] = asyncio.Queue()
self.done = asyncio.Event()
async def emit(self, event: str, data: dict):
payload = {"event": event, "data": data, "ts": datetime.now(timezone.utc).isoformat()}
await self.queue.put(f"event: {event}\ndata: {json.dumps(payload)}\n\n")
async def finish(self):
self.done.set()
await self.queue.put("event: done\ndata: {}\n\n")
JOBS: Dict[str, Job] = {}
async def run_analysis_job(job: Job):
with Session(engine) as sess:
fr = FeedRun(feed_url=job.feed_url)
sess.add(fr)
sess.commit()
async with httpx.AsyncClient(http2=True) as client:
try:
await job.emit("status", {"stage": "feed", "message": "Downloading and parsing feed…"})
post_urls = await fetch_feed_entries(client, job.feed_url)
await job.emit("posts", {"count": len(post_urls)})
all_links: List[str] = []
for idx, post_url in enumerate(post_urls, start=1):
await job.emit("status", {"stage": "posts", "message": f"Fetching post {idx}/{len(post_urls)}"})
links = await get_links_for_page(client, post_url)
all_links.extend(links)
await job.emit("post_progress", {"current": idx, "total": len(post_urls), "post_url": post_url})
host_map: Dict[str, HostSummary] = {}
for link in all_links:
host = normalize_host(urlparse(link).netloc)
if not host:
continue
hs = host_map.setdefault(host, HostSummary(hostname=host))
hs.count += 1
hs.unique_links.add(link)
hs.link_counts[link] += 1
hosts_sorted = sorted(host_map.values(), key=lambda s: s.count, reverse=True)
await job.emit("hosts", {"count": len(hosts_sorted)})
sem = asyncio.Semaphore(DISCOVERY_CONCURRENCY)
max_count = max((h.count for h in hosts_sorted), default=1)
async def work(hs: HostSummary, idx: int, total: int):
async with sem:
await job.emit("status", {"stage": "discover", "message": f"Discovering feed for {hs.hostname} ({idx}/{total})"})
feed = await discover_feed_for_host(client, hs.hostname)
hs.feed_url = feed
host_dict = {
"hostname": hs.hostname,
"count": hs.count,
"unique_link_count": len(hs.unique_links),
"links": sorted(list(hs.unique_links)),
"top_links": [
{"url": url, "count": cnt}
for url, cnt in hs.link_counts.most_common()
if cnt > 1
],
"feed_url": hs.feed_url,
}
html = render_host_card(host_dict, max_count, index=idx)
await job.emit("host_card", {"html": html, "index": idx, "total": total})
tasks = [asyncio.create_task(work(hs, i, len(hosts_sorted))) for i, hs in enumerate(hosts_sorted, start=1)]
async def heartbeat():
while any(not t.done() for t in tasks):
await job.emit("status", {"stage": "discover", "message": "Still discovering host feeds…"})
await asyncio.sleep(3)
hb = asyncio.create_task(heartbeat())
await asyncio.gather(*tasks)
hb.cancel()
summary = {
"feed_url": job.feed_url,
"post_count": len(post_urls),
"hosts": [h.hostname for h in hosts_sorted],
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
with Session(engine) as sess:
fr = sess.exec(select(FeedRun).where(FeedRun.feed_url == job.feed_url).order_by(FeedRun.started_at.desc())).first()
if fr:
fr.summary_json = json.dumps(summary)
fr.finished_at = datetime.now(timezone.utc)
sess.add(fr)
sess.commit()
await job.emit("summary", summary)
except Exception as e:
await job.emit("error", {"message": str(e)})
finally:
await job.finish()
# ------------------------------
# Template rendering for components
# ------------------------------
templates = Jinja2Templates(directory="templates")
def render_host_card(host: dict, max_count: int, index: int) -> str:
from fastapi import Request
class Dummy:
def __init__(self): self.state = type("s", (), {})()
req = Dummy()
html = templates.get_template("components/host_card.html").render(
request=req, host=host, max_count=max_count, index=index
)
return html
# ------------------------------
# FastAPI app + routes
# ------------------------------
app = FastAPI(title="RSS Link Audit", version="1.2.0")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/start", response_class=JSONResponse)
async def start(feed_url: str = Form(...)):
job = Job(feed_url)
JOBS[job.id] = job
asyncio.create_task(run_analysis_job(job))
return {"job_id": job.id}
@app.get("/events/{job_id}")
async def sse(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(404, "Job not found")
async def event_gen():
yield f"event: hello\ndata: {{\"job_id\":\"{job.id}\"}}\n\n"
while True:
try:
item = await asyncio.wait_for(job.queue.get(), timeout=30.0)
yield item
if job.done.is_set():
break
except asyncio.TimeoutError:
yield "event: ping\ndata: {}\n\n"
JOBS.pop(job.id, None)
return StreamingResponse(event_gen(), media_type="text/event-stream")
@app.post("/api/analyze", response_class=JSONResponse)
async def analyze_api(payload: Dict):
feed_url = payload.get("feed_url")
if not feed_url:
raise HTTPException(status_code=400, detail="Missing 'feed_url'")
job = Job(feed_url)
await run_analysis_job(job)
return JSONResponse(content={"ok": True})
@app.get("/healthz")
async def healthz():
return {"ok": True}