learn-searchcraft/ingest_thoughts.py
2025-11-22 19:58:34 -06:00

189 lines
5.6 KiB
Python

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "httpx",
# "beautifulsoup4",
# "dateparser",
# ]
# ///
import asyncio
import httpx
import subprocess
from bs4 import BeautifulSoup
from collections import Counter, defaultdict
from dateutil import parser as date_parser
SEARCHCRAFT_URL = "http://0.0.0.0:8000"
INDEX_NAME = "thoughts-links"
SQLITE_DB = "~/.config/thoughts/database2.db"
response_counter = Counter()
failure_reasons = defaultdict(list)
def get_links():
command = f'sqlite3 {SQLITE_DB} "select link from post"'
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return [
line.strip()
for line in result.stdout.strip().split("\n")
if line.strip() and line.startswith("http")
]
def extract_content(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
selectors = [
"article",
"div[itemprop=articleBody]",
"main",
"div[class*=post]",
"div[class*=article]",
"body",
]
for selector in selectors:
el = soup.select_one(selector)
if el and el.get_text(strip=True):
return el.get_text("\n", strip=True)
return soup.get_text("\n", strip=True)
def extract_metadata(soup: BeautifulSoup) -> dict:
def get_meta(properties: list[str]) -> str | None:
for prop in properties:
tag = soup.find("meta", attrs={"property": prop}) or soup.find(
"meta", attrs={"name": prop}
)
if tag and tag.get("content"):
return tag["content"].strip()
return None
title = (
get_meta(["og:title", "twitter:title", "title"])
or (soup.title.string.strip() if soup.title else None)
or (soup.find("h1").get_text(strip=True) if soup.find("h1") else None)
)
description = get_meta(
["og:description", "description", "twitter:description"]
) or (soup.find("p").get_text(strip=True) if soup.find("p") else None)
image = get_meta(["og:image", "twitter:image", "image"])
created_at_raw = get_meta(
[
"article:published_time",
"og:published_time",
"date",
"published_time",
"pubdate",
]
) or (
soup.find("time", {"datetime": True}).get("datetime")
if soup.find("time", {"datetime": True})
else None
)
created_at = None
if created_at_raw:
try:
created_at = date_parser.parse(created_at_raw)
except Exception:
pass
return {
"title": title or "",
"description": description or "",
"image": image or "",
"created_at": created_at,
}
async def fetch_and_parse(client, url):
try:
resp = await client.get(url, timeout=10)
response_counter[resp.status_code] += 1
if resp.status_code in {402, 403, 429}:
reasons = {
402: "Payment Required",
403: "Forbidden (Crawler?)",
429: "Too Many Requests",
}
failure_reasons[reasons[resp.status_code]].append(url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
body = extract_content(resp.text)
meta = extract_metadata(soup)
return {
"id": str(url),
"url": str(url),
"body": str(body),
"title": str(meta["title"]),
"description": str(meta["description"]),
"image": str(meta["image"]),
# "created_at": meta["created_at"],
}
except httpx.TimeoutException:
failure_reasons["Timeout"].append(url)
except httpx.ConnectError:
failure_reasons["Connection Error"].append(url)
except httpx.RequestError as e:
failure_reasons["Request Error"].append(f"{url}: {str(e)}")
except httpx.HTTPStatusError as e:
failure_reasons["HTTP Error"].append(f"{url}: {str(e)}")
except Exception as e:
failure_reasons["Unknown"].append(f"{url}: {str(e)}")
return None
async def upload_documents():
links = get_links()
print(f"\U0001f517 Found {len(links)} links")
async with httpx.AsyncClient(
headers={"User-Agent": "Mozilla/5.0 (compatible; LinkFetcher/1.0)"}
) as client:
tasks = [fetch_and_parse(client, url) for url in links]
results = await asyncio.gather(*tasks)
documents = [doc for doc in results if doc]
print(f"✅ Parsed {len(documents)} valid documents")
print("\n📊 Response Summary:")
for code, count in response_counter.items():
print(f" {code}: {count}")
total = sum(response_counter.values())
failures = total - response_counter[200]
print(f"\n❌ Failure Rate: {failures}/{total} ({failures / total:.2%})")
if failure_reasons:
print("\n🔍 Failure Reasons:")
for reason, urls in failure_reasons.items():
print(f" {reason}: {len(urls)}")
# Upload if needed
if documents:
response = await client.post(
f"{SEARCHCRAFT_URL}/index/{INDEX_NAME}/documents", json=documents
)
response.raise_for_status()
print("Uploaded:", response.json())
commit_resp = await client.post(
f"{SEARCHCRAFT_URL}/index/{INDEX_NAME}/commit"
)
commit_resp.raise_for_status()
print("Committed:", commit_resp.json())
else:
breakpoint()
if __name__ == "__main__":
asyncio.run(upload_documents())