537 lines
18 KiB
Python
537 lines
18 KiB
Python
# main.py (v1.2) — robust feed parsing, clearer SSE progress, normalized host caching, concurrent discovery
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from collections import Counter
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from urllib.parse import urljoin, urlparse
|
|
from contextlib import asynccontextmanager
|
|
|
|
import httpx
|
|
import feedparser
|
|
from bs4 import BeautifulSoup
|
|
|
|
from fastapi import FastAPI, Request, Form, HTTPException
|
|
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from sqlmodel import Field, SQLModel, create_engine, Session, select
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
# ------------------------------
|
|
# Settings / Constants
|
|
# ------------------------------
|
|
REQUEST_TIMEOUT = httpx.Timeout(15.0, connect=8.0, read=15.0)
|
|
HEADERS = {
|
|
"User-Agent": "LinkAuditBot/1.2 (+https://example.com; contact: admin@example.com)"
|
|
}
|
|
|
|
COMMON_FEED_PATHS = [
|
|
"/feed", "/feed/", "/feed.xml",
|
|
"/rss", "/rss.xml", "/rss/",
|
|
"/atom", "/atom.xml",
|
|
"/index.xml",
|
|
"/blog/feed", "/blog/rss", "/blog/rss.xml", "/blog/index.xml",
|
|
"/feeds/posts/default?alt=rss", # Blogger
|
|
"/news/atom.xml", "/news/rss.xml",
|
|
"/.rss", "/?feed=rss2", # WP variants
|
|
"/category/news/feed", "/?feed=atom",
|
|
]
|
|
|
|
FEED_MIME_HINTS = {
|
|
"application/rss+xml",
|
|
"application/atom+xml",
|
|
"application/xml",
|
|
"text/xml",
|
|
}
|
|
|
|
DISCOVERY_CONCURRENCY = 10
|
|
|
|
|
|
# ------------------------------
|
|
# Database Models (SQLModel)
|
|
# ------------------------------
|
|
class PageCache(SQLModel, table=True):
|
|
url: str = Field(primary_key=True)
|
|
html: Optional[str] = None
|
|
fetched_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
class LinksCache(SQLModel, table=True):
|
|
url: str = Field(primary_key=True)
|
|
links_json: str # JSON list[str]
|
|
extracted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
class HostFeedCache(SQLModel, table=True):
|
|
hostname: str = Field(primary_key=True) # normalized!
|
|
feed_url: Optional[str] = None
|
|
checked_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
class FeedRun(SQLModel, table=True):
|
|
id: str = Field(primary_key=True, default_factory=lambda: str(uuid.uuid4()))
|
|
feed_url: str
|
|
started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
|
finished_at: Optional[datetime] = None
|
|
summary_json: Optional[str] = None # store last summary, if desired
|
|
|
|
|
|
engine = create_engine("sqlite:///cache.db", echo=False)
|
|
SQLModel.metadata.create_all(engine)
|
|
|
|
|
|
# ------------------------------
|
|
# Data models
|
|
# ------------------------------
|
|
@dataclass
|
|
class HostSummary:
|
|
hostname: str
|
|
count: int = 0
|
|
unique_links: Set[str] = field(default_factory=set)
|
|
link_counts: Counter = field(default_factory=Counter)
|
|
feed_url: Optional[str] = None
|
|
|
|
|
|
# ------------------------------
|
|
# Utilities
|
|
# ------------------------------
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
def normalize_host(host: str) -> str:
|
|
if not host:
|
|
return host
|
|
h = host.strip().lower().rstrip(".")
|
|
if h.startswith("www."):
|
|
h = h[4:]
|
|
return h
|
|
|
|
def is_http_url(href: str) -> bool:
|
|
try:
|
|
p = urlparse(href)
|
|
return p.scheme in ("http", "https")
|
|
except Exception:
|
|
return False
|
|
|
|
def absolutize(href: str, base_url: str) -> Optional[str]:
|
|
if not href:
|
|
return None
|
|
if href.startswith("#") or href.startswith("mailto:") or href.startswith("tel:"):
|
|
return None
|
|
try:
|
|
abs_url = urljoin(base_url, href)
|
|
if is_http_url(abs_url):
|
|
return abs_url
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
def extract_links_from_html(html: str, base_url: str) -> List[str]:
|
|
soup = BeautifulSoup(html, "lxml")
|
|
links: List[str] = []
|
|
for a in soup.find_all("a", href=True):
|
|
u = absolutize(a.get("href"), base_url)
|
|
if u:
|
|
links.append(u)
|
|
return links
|
|
|
|
|
|
# ------------------------------
|
|
# Networking
|
|
# ------------------------------
|
|
async def fetch_text(client: httpx.AsyncClient, url: str) -> Optional[str]:
|
|
try:
|
|
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
|
|
if r.status_code < 400:
|
|
# Do NOT force UTF-8; respect server if present
|
|
if r.encoding is None:
|
|
r.encoding = r.apparent_encoding or "utf-8"
|
|
return r.text
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
async def fetch_bytes(client: httpx.AsyncClient, url: str) -> Optional[Tuple[bytes, Optional[str]]]:
|
|
try:
|
|
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
|
|
if r.status_code < 400:
|
|
ctype = r.headers.get("content-type")
|
|
return r.content, ctype
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
async def fetch_head_ok(client: httpx.AsyncClient, url: str) -> Tuple[bool, Optional[str]]:
|
|
try:
|
|
r = await client.head(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
|
|
if r.status_code < 400:
|
|
return True, r.headers.get("content-type")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
r = await client.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT, follow_redirects=True)
|
|
if r.status_code < 400:
|
|
return True, r.headers.get("content-type")
|
|
except Exception:
|
|
pass
|
|
return False, None
|
|
|
|
|
|
# ------------------------------
|
|
# Cache helpers
|
|
# ------------------------------
|
|
def cache_get_page(url: str) -> Optional[PageCache]:
|
|
with Session(engine) as sess:
|
|
return sess.get(PageCache, url)
|
|
|
|
def cache_set_page(url: str, html: Optional[str]):
|
|
with Session(engine) as sess:
|
|
sess.merge(PageCache(url=url, html=html, fetched_at=now_utc()))
|
|
sess.commit()
|
|
|
|
def cache_get_links(url: str) -> Optional[List[str]]:
|
|
with Session(engine) as sess:
|
|
row = sess.get(LinksCache, url)
|
|
if not row:
|
|
return None
|
|
try:
|
|
return json.loads(row.links_json)
|
|
except Exception:
|
|
return None
|
|
|
|
def cache_set_links(url: str, links: List[str]):
|
|
with Session(engine) as sess:
|
|
sess.merge(LinksCache(url=url, links_json=json.dumps(links), extracted_at=now_utc()))
|
|
sess.commit()
|
|
|
|
def cache_get_host_feed(hostname: str) -> Optional[str]:
|
|
host_key = normalize_host(hostname)
|
|
with Session(engine) as sess:
|
|
row = sess.get(HostFeedCache, host_key)
|
|
return row.feed_url if row else None
|
|
|
|
def cache_set_host_feed(hostname: str, feed_url: Optional[str]):
|
|
host_key = normalize_host(hostname)
|
|
with Session(engine) as sess:
|
|
sess.merge(HostFeedCache(hostname=host_key, feed_url=feed_url, checked_at=now_utc()))
|
|
sess.commit()
|
|
|
|
|
|
# ------------------------------
|
|
# Cached fetch/extract
|
|
# ------------------------------
|
|
async def fetch_page_html(client: httpx.AsyncClient, url: str) -> Optional[str]:
|
|
cached = cache_get_page(url)
|
|
if cached and cached.html:
|
|
return cached.html
|
|
html = await fetch_text(client, url)
|
|
cache_set_page(url, html)
|
|
return html
|
|
|
|
async def get_links_for_page(client: httpx.AsyncClient, url: str) -> List[str]:
|
|
cached = cache_get_links(url)
|
|
if cached is not None:
|
|
return cached
|
|
html = await fetch_page_html(client, url)
|
|
if not html:
|
|
cache_set_links(url, [])
|
|
return []
|
|
links = extract_links_from_html(html, url)
|
|
cache_set_links(url, links)
|
|
return links
|
|
|
|
|
|
# ------------------------------
|
|
# Robust feed parsing
|
|
# ------------------------------
|
|
async def fetch_feed_entries(client: httpx.AsyncClient, feed_url: str) -> List[str]:
|
|
"""
|
|
Fetch feed as bytes and let feedparser infer encoding using headers.
|
|
Retry a couple fallbacks for mismatched declarations.
|
|
"""
|
|
got = await fetch_bytes(client, feed_url)
|
|
if not got:
|
|
raise ValueError("Could not download the feed.")
|
|
content, ctype = got
|
|
|
|
parsed = feedparser.parse(content)
|
|
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
|
|
return _entries_to_urls(parsed)
|
|
# Fallback 1: strip BOM
|
|
cleaned = content.lstrip(b"\xef\xbb\xbf")
|
|
if cleaned is not content:
|
|
parsed2 = feedparser.parse(cleaned)
|
|
if parsed2.bozo == 0 and (parsed2.feed or parsed2.entries):
|
|
return _entries_to_urls(parsed2)
|
|
# Fallback 2: replace us-ascii decl with utf-8
|
|
try:
|
|
cleaned2 = cleaned.replace(b'encoding="us-ascii"', b'encoding="utf-8"')
|
|
parsed3 = feedparser.parse(cleaned2)
|
|
if parsed3.bozo == 0 and (parsed3.feed or parsed3.entries):
|
|
return _entries_to_urls(parsed3)
|
|
except Exception:
|
|
pass
|
|
raise ValueError(f"Could not parse feed: {getattr(parsed, 'bozo_exception', 'unknown parse error')}")
|
|
|
|
def _entries_to_urls(parsed) -> List[str]:
|
|
urls: List[str] = []
|
|
for e in parsed.entries:
|
|
if getattr(e, "link", None):
|
|
urls.append(e.link)
|
|
elif getattr(e, "id", None) and is_http_url(e.id):
|
|
urls.append(e.id)
|
|
seen, out = set(), []
|
|
for u in urls:
|
|
if u not in seen:
|
|
seen.add(u)
|
|
out.append(u)
|
|
return out
|
|
|
|
|
|
# ------------------------------
|
|
# Feed discovery (normalized + concurrent)
|
|
# ------------------------------
|
|
async def discover_feed_for_host(client: httpx.AsyncClient, hostname: str) -> Optional[str]:
|
|
host_key = normalize_host(hostname)
|
|
cached = cache_get_host_feed(host_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
bases = []
|
|
canon = host_key
|
|
bases.append(f"https://{canon}")
|
|
bases.append(f"http://{canon}")
|
|
if not canon.startswith("www."):
|
|
bases.append(f"https://www.{canon}")
|
|
bases.append(f"http://www.{canon}")
|
|
|
|
async def try_candidate(url: str) -> Optional[str]:
|
|
ok, ctype = await fetch_head_ok(client, url)
|
|
if ok and (not ctype or any(mt in ctype for mt in FEED_MIME_HINTS)):
|
|
parsed = feedparser.parse(url)
|
|
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
|
|
return url
|
|
return None
|
|
|
|
tasks = []
|
|
for base in bases:
|
|
for path in COMMON_FEED_PATHS:
|
|
tasks.append(asyncio.create_task(try_candidate(base + path)))
|
|
for t in asyncio.as_completed(tasks):
|
|
res = await t
|
|
if res:
|
|
cache_set_host_feed(host_key, res)
|
|
return res
|
|
|
|
for base in bases:
|
|
html = await fetch_page_html(client, base + "/")
|
|
if not html:
|
|
continue
|
|
soup = BeautifulSoup(html, "lxml")
|
|
for link in soup.find_all("link", rel=True, href=True):
|
|
rels = link.get("rel")
|
|
if isinstance(rels, list):
|
|
rels = {r.lower() for r in rels if r}
|
|
else:
|
|
rels = {str(rels).lower()}
|
|
typ = str(link.get("type", "")).lower()
|
|
href = link.get("href")
|
|
if "alternate" in rels and any(mt in typ for mt in ("rss", "atom", "xml")):
|
|
feed_url = urljoin(base + "/", href)
|
|
parsed = feedparser.parse(feed_url)
|
|
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
|
|
cache_set_host_feed(host_key, feed_url)
|
|
return feed_url
|
|
for a in soup.find_all("a", href=True):
|
|
href = a.get("href", "")
|
|
if any(tok in href.lower() for tok in ("rss", "atom", "feed")):
|
|
feed_url = urljoin(base + "/", href)
|
|
ok, ctype = await fetch_head_ok(client, feed_url)
|
|
if ok:
|
|
parsed = feedparser.parse(feed_url)
|
|
if parsed.bozo == 0 and (parsed.feed or parsed.entries):
|
|
cache_set_host_feed(host_key, feed_url)
|
|
return feed_url
|
|
|
|
cache_set_host_feed(host_key, None)
|
|
return None
|
|
|
|
|
|
# ------------------------------
|
|
# SSE plumbing
|
|
# ------------------------------
|
|
class Job:
|
|
def __init__(self, feed_url: str):
|
|
self.id = str(uuid.uuid4())
|
|
self.feed_url = feed_url
|
|
self.queue: asyncio.Queue[str] = asyncio.Queue()
|
|
self.done = asyncio.Event()
|
|
|
|
async def emit(self, event: str, data: dict):
|
|
payload = {"event": event, "data": data, "ts": datetime.now(timezone.utc).isoformat()}
|
|
await self.queue.put(f"event: {event}\ndata: {json.dumps(payload)}\n\n")
|
|
|
|
async def finish(self):
|
|
self.done.set()
|
|
await self.queue.put("event: done\ndata: {}\n\n")
|
|
|
|
|
|
JOBS: Dict[str, Job] = {}
|
|
|
|
|
|
async def run_analysis_job(job: Job):
|
|
with Session(engine) as sess:
|
|
fr = FeedRun(feed_url=job.feed_url)
|
|
sess.add(fr)
|
|
sess.commit()
|
|
|
|
async with httpx.AsyncClient(http2=True) as client:
|
|
try:
|
|
await job.emit("status", {"stage": "feed", "message": "Downloading and parsing feed…"})
|
|
post_urls = await fetch_feed_entries(client, job.feed_url)
|
|
await job.emit("posts", {"count": len(post_urls)})
|
|
|
|
all_links: List[str] = []
|
|
for idx, post_url in enumerate(post_urls, start=1):
|
|
await job.emit("status", {"stage": "posts", "message": f"Fetching post {idx}/{len(post_urls)}"})
|
|
links = await get_links_for_page(client, post_url)
|
|
all_links.extend(links)
|
|
await job.emit("post_progress", {"current": idx, "total": len(post_urls), "post_url": post_url})
|
|
|
|
host_map: Dict[str, HostSummary] = {}
|
|
for link in all_links:
|
|
host = normalize_host(urlparse(link).netloc)
|
|
if not host:
|
|
continue
|
|
hs = host_map.setdefault(host, HostSummary(hostname=host))
|
|
hs.count += 1
|
|
hs.unique_links.add(link)
|
|
hs.link_counts[link] += 1
|
|
|
|
hosts_sorted = sorted(host_map.values(), key=lambda s: s.count, reverse=True)
|
|
await job.emit("hosts", {"count": len(hosts_sorted)})
|
|
|
|
sem = asyncio.Semaphore(DISCOVERY_CONCURRENCY)
|
|
max_count = max((h.count for h in hosts_sorted), default=1)
|
|
|
|
async def work(hs: HostSummary, idx: int, total: int):
|
|
async with sem:
|
|
await job.emit("status", {"stage": "discover", "message": f"Discovering feed for {hs.hostname} ({idx}/{total})"})
|
|
feed = await discover_feed_for_host(client, hs.hostname)
|
|
hs.feed_url = feed
|
|
host_dict = {
|
|
"hostname": hs.hostname,
|
|
"count": hs.count,
|
|
"unique_link_count": len(hs.unique_links),
|
|
"links": sorted(list(hs.unique_links)),
|
|
"top_links": [
|
|
{"url": url, "count": cnt}
|
|
for url, cnt in hs.link_counts.most_common()
|
|
if cnt > 1
|
|
],
|
|
"feed_url": hs.feed_url,
|
|
}
|
|
html = render_host_card(host_dict, max_count, index=idx)
|
|
await job.emit("host_card", {"html": html, "index": idx, "total": total})
|
|
|
|
tasks = [asyncio.create_task(work(hs, i, len(hosts_sorted))) for i, hs in enumerate(hosts_sorted, start=1)]
|
|
async def heartbeat():
|
|
while any(not t.done() for t in tasks):
|
|
await job.emit("status", {"stage": "discover", "message": "Still discovering host feeds…"})
|
|
await asyncio.sleep(3)
|
|
hb = asyncio.create_task(heartbeat())
|
|
await asyncio.gather(*tasks)
|
|
hb.cancel()
|
|
|
|
summary = {
|
|
"feed_url": job.feed_url,
|
|
"post_count": len(post_urls),
|
|
"hosts": [h.hostname for h in hosts_sorted],
|
|
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
with Session(engine) as sess:
|
|
fr = sess.exec(select(FeedRun).where(FeedRun.feed_url == job.feed_url).order_by(FeedRun.started_at.desc())).first()
|
|
if fr:
|
|
fr.summary_json = json.dumps(summary)
|
|
fr.finished_at = datetime.now(timezone.utc)
|
|
sess.add(fr)
|
|
sess.commit()
|
|
|
|
await job.emit("summary", summary)
|
|
except Exception as e:
|
|
await job.emit("error", {"message": str(e)})
|
|
finally:
|
|
await job.finish()
|
|
|
|
|
|
# ------------------------------
|
|
# Template rendering for components
|
|
# ------------------------------
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
def render_host_card(host: dict, max_count: int, index: int) -> str:
|
|
from fastapi import Request
|
|
class Dummy:
|
|
def __init__(self): self.state = type("s", (), {})()
|
|
req = Dummy()
|
|
html = templates.get_template("components/host_card.html").render(
|
|
request=req, host=host, max_count=max_count, index=index
|
|
)
|
|
return html
|
|
|
|
|
|
# ------------------------------
|
|
# FastAPI app + routes
|
|
# ------------------------------
|
|
app = FastAPI(title="RSS Link Audit", version="1.2.0")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
|
|
@app.post("/start", response_class=JSONResponse)
|
|
async def start(feed_url: str = Form(...)):
|
|
job = Job(feed_url)
|
|
JOBS[job.id] = job
|
|
asyncio.create_task(run_analysis_job(job))
|
|
return {"job_id": job.id}
|
|
|
|
|
|
@app.get("/events/{job_id}")
|
|
async def sse(job_id: str):
|
|
job = JOBS.get(job_id)
|
|
if not job:
|
|
raise HTTPException(404, "Job not found")
|
|
|
|
async def event_gen():
|
|
yield f"event: hello\ndata: {{\"job_id\":\"{job.id}\"}}\n\n"
|
|
while True:
|
|
try:
|
|
item = await asyncio.wait_for(job.queue.get(), timeout=30.0)
|
|
yield item
|
|
if job.done.is_set():
|
|
break
|
|
except asyncio.TimeoutError:
|
|
yield "event: ping\ndata: {}\n\n"
|
|
JOBS.pop(job.id, None)
|
|
|
|
return StreamingResponse(event_gen(), media_type="text/event-stream")
|
|
|
|
|
|
@app.post("/api/analyze", response_class=JSONResponse)
|
|
async def analyze_api(payload: Dict):
|
|
feed_url = payload.get("feed_url")
|
|
if not feed_url:
|
|
raise HTTPException(status_code=400, detail="Missing 'feed_url'")
|
|
job = Job(feed_url)
|
|
await run_analysis_job(job)
|
|
return JSONResponse(content={"ok": True})
|
|
|
|
|
|
@app.get("/healthz")
|
|
async def healthz():
|
|
return {"ok": True}
|