This commit is contained in:
Waylon Walker 2023-06-22 16:02:03 -05:00
parent 8480cd3d24
commit e86e432102
No known key found for this signature in database
GPG key ID: 66E2BF2B4190EFE4
11 changed files with 352 additions and 69 deletions

View file

@ -1,4 +1,5 @@
from pathlib import Path
from typing import Annotated
import alembic
from alembic.config import Config
@ -6,6 +7,7 @@ import copier
import typer
from learn_sql_model.cli.common import verbose_callback
from learn_sql_model.config import get_config
model_app = typer.Typer()
@ -40,11 +42,18 @@ def create_revision(
callback=verbose_callback,
help="show the log messages",
),
message: str = typer.Option(
prompt=True,
),
message: Annotated[
str,
typer.Option(
"--message",
"-m",
prompt=True,
),
] = None,
):
alembic_cfg = Config("alembic.ini")
config = get_config()
alembic_cfg.set_main_option("sqlalchemy.url", config.database_url)
alembic.command.revision(
config=alembic_cfg,
message=message,
@ -63,7 +72,17 @@ def checkout(
revision: str = typer.Option("head"),
):
alembic_cfg = Config("alembic.ini")
alembic.command.upgrade(config=alembic_cfg, revision="head")
config = get_config()
alembic_cfg.set_main_option("sqlalchemy.url", config.database_url)
alembic.command.upgrade(config=alembic_cfg, revision=revision)
@model_app.command()
def status():
alembic_cfg = Config("alembic.ini")
config = get_config()
alembic_cfg.set_main_option("sqlalchemy.url", config.database_url)
alembic.command.current(config=alembic_cfg)
@model_app.command()

View file

@ -0,0 +1,151 @@
import sqlite3
from graphviz import Digraph
from learn_sql_model.config import get_config
config = get_config()
def generate_er_diagram(output_path):
# Connect to the SQLite database
database_path = config.database_url.replace("sqlite:///", "")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Get the table names from the database
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
# Create a new Digraph
dot = Digraph(format="png")
dot.attr(rankdir="TD")
# Iterate over the tables
for table in tables:
table_name = table[0]
dot.node(table_name, shape="box")
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
# Add the columns to the table node
for column in columns:
column_name = column[1]
dot.node(f"{table_name}.{column_name}", label=column_name, shape="oval")
dot.edge(table_name, f"{table_name}.{column_name}")
# Check for foreign key relationships
cursor.execute(f"PRAGMA foreign_key_list({table_name});")
foreign_keys = cursor.fetchall()
# Add dotted lines for foreign key relationships
for foreign_key in foreign_keys:
from_column = foreign_key[3]
to_table = foreign_key[2]
to_column = foreign_key[4]
dot.node(f"{to_table}.{to_column}", shape="oval")
dot.edge(
f"{table_name}.{from_column}", f"{to_table}.{to_column}", style="dotted"
)
# Render and save the diagram
dot.render(output_path.replace(".png", ""), cleanup=True)
# Close the database connection
cursor.close()
conn.close()
def generate_er_markdown(output_path, er_diagram_path):
# Connect to the SQLite database
database_path = config.database_url.replace("sqlite:///", "")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# Get the table names from the database
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
with open(output_path, "w") as f:
# Write the ER Diagram image
f.write(f"![ER Diagram]({er_diagram_path})\n\n---\n\n")
# Iterate over the tables
for table in tables:
table_name = table[0]
f.write(f"## Table: {table_name}\n\n")
# Get the table columns
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
f.write("### First 5 rows\n\n")
cursor.execute(f"SELECT * FROM {table_name} LIMIT 5;")
rows = cursor.fetchall()
f.write(f'| {" | ".join([c[1] for c in columns])} |\n')
f.write("|")
for column in columns:
# ---
f.write(f'{"-"*(len(column[1]) + 2)}|')
f.write("\n")
for row in rows:
f.write(f'| {" | ".join([str(r) for r in row])} |\n')
f.write("\n")
cursor.execute(f"PRAGMA foreign_key_list({table_name});")
foreign_keys = cursor.fetchall()
# Add dotted lines for foreign key relationships
fkeys = {}
for foreign_key in foreign_keys:
from_column = foreign_key[3]
to_table = foreign_key[2]
to_column = foreign_key[4]
fkeys[from_column] = f"{to_table}.{to_column}"
# Replace 'description' with the actual column name in the table that contains the description, if applicable
try:
cursor.execute(f"SELECT description FROM {table_name} LIMIT 1;")
description = cursor.fetchone()
if description:
f.write(f"### Description\n\n{description[0]}\n\n")
except:
...
# Write the table columns
f.write("### Columns\n\n")
f.write("| Column Name | Type | Foreign Key | Example Value |\n")
f.write("|-------------|------|-------------|---------------|\n")
for column in columns:
column_name = column[1]
column_type = column[2]
fkey = ""
if column_name in fkeys:
fkey = fkeys[column_name]
f.write(f"| {column_name} | {column_type} | {fkey} | | |\n")
f.write("\n")
# Get the count of records
cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
records_count = cursor.fetchone()[0]
f.write(
f"### Records Count\n\nThe table {table_name} contains {records_count} records.\n\n---\n\n"
)
# Close the database connection
cursor.close()
conn.close()
if __name__ == "__main__":
# Usage example
database_path = "database.db"
md_output_path = "database.md"
er_output_path = "er_diagram.png"
generate_er_diagram(database_path, er_output_path)
generate_markdown(database_path, md_output_path, er_output_path)

View file

@ -2,10 +2,9 @@ from typing import Dict, Optional
import httpx
from pydantic import BaseModel
from sqlmodel import Field, Relationship, SQLModel
from sqlmodel import Field, SQLModel
from learn_sql_model.config import config
from learn_sql_model.models.pet import Pet
class HeroBase(SQLModel, table=False):
@ -13,12 +12,12 @@ class HeroBase(SQLModel, table=False):
secret_name: str
x: int
y: int
size: int
age: Optional[int] = None
shoe_size: Optional[int] = None
# size: int
# age: Optional[int] = None
# shoe_size: Optional[int] = None
pet_id: Optional[int] = Field(default=None, foreign_key="pet.id")
pet: Optional[Pet] = Relationship(back_populates="hero")
# pet_id: Optional[int] = Field(default=None, foreign_key="pet.id")
# pet: Optional[Pet] = Relationship(back_populates="hero")
class Hero(HeroBase, table=True):
@ -73,13 +72,13 @@ class HeroUpdate(SQLModel):
# all other fields, must match the model, but with Optional default None
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
shoe_size: Optional[int] = None
x: Optional[int]
y: Optional[int]
# age: Optional[int] = None
# shoe_size: Optional[int] = None
# x: Optional[int]
# y: Optional[int]
pet_id: Optional[int] = Field(default=None, foreign_key="pet.id")
pet: Optional[Pet] = Relationship(back_populates="hero")
# pet_id: Optional[int] = Field(default=None, foreign_key="pet.id")
# pet: Optional[Pet] = Relationship(back_populates="hero")
def update(self) -> Hero:
r = httpx.patch(