Compare commits

..

6 commits
0.0.1 ... main

Author SHA1 Message Date
9d6d509618 wip 2025-11-22 21:59:30 -06:00
cd33982985
remove the database 2024-03-01 10:26:10 -06:00
841723103c
format 2024-03-01 10:25:48 -06:00
f42e8e4807
create fastapi 2024-03-01 10:23:13 -06:00
a21dbb08d4
add cli 2024-03-01 07:10:57 -06:00
85554e2169
wip 2024-02-29 15:05:57 -06:00
9 changed files with 379 additions and 80 deletions

1
.gitignore vendored
View file

@ -962,3 +962,4 @@ FodyWeavers.xsd
# Additional files built by Visual Studio # Additional files built by Visual Studio
# End of https://www.toptal.com/developers/gitignore/api/vim,node,data,emacs,python,pycharm,executable,sublimetext,visualstudio,visualstudiocode # End of https://www.toptal.com/developers/gitignore/api/vim,node,data,emacs,python,pycharm,executable,sublimetext,visualstudio,visualstudiocode
database.db

Binary file not shown.

View file

@ -24,12 +24,15 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: Implementation :: PyPy",
] ]
dependencies = [ 'rich', 'sqlmodel', 'typer' ] dependencies = [ 'rich', 'sqlmodel', 'typer', 'iterfzf', 'fastapi', 'uvicorn', 'httpx']
[project.urls] [project.urls]
Documentation = "https://github.com/unknown/sqlmodel-base#readme" Documentation = "https://github.com/waylonwalker/sqlmodel-base#readme"
Issues = "https://github.com/unknown/sqlmodel-base/issues" Issues = "https://github.com/waylonwalker/sqlmodel-base/issues"
Source = "https://github.com/unknown/sqlmodel-base" Source = "https://github.com/waylonwalker/sqlmodel-base"
[project.scripts]
sqlmodel-base = "sqlmodel_base.cli:app"
[tool.hatch.version] [tool.hatch.version]
path = "sqlmodel_base/__about__.py" path = "sqlmodel_base/__about__.py"

View file

@ -1,18 +1,21 @@
from typing import Optional from typing import List, Optional
from pydantic import BaseModel, validator import httpx
import typer
import uvicorn
from fastapi import APIRouter, Depends, FastAPI
from iterfzf import iterfzf
from pydantic import BaseModel
from pydantic_core._pydantic_core import PydanticUndefinedType
from rich.console import Console from rich.console import Console
from sqlalchemy import func from sqlalchemy import func
from sqlmodel import Field, Session, SQLModel, create_engine, select from sqlmodel import Session, SQLModel, select
from sqlmodel_base.database import get_engine, get_session
console = Console() console = Console()
def get_session():
with Session(engine) as session:
yield session
class PagedResult(BaseModel): class PagedResult(BaseModel):
items: list items: list
total: int total: int
@ -22,51 +25,134 @@ class PagedResult(BaseModel):
class Base(SQLModel): class Base(SQLModel):
def create(self): @classmethod
with Session(engine) as session: @property
def engine(self):
engine = get_engine()
return engine
def create(self, session: Optional[Session] = Depends(get_session)):
if isinstance(session, Session):
validated = self.model_validate(self) validated = self.model_validate(self)
session.add(self.sqlmodel_update(validated)) session.add(self.sqlmodel_update(validated))
session.commit() session.commit()
session.refresh(self) session.refresh(self)
return self return self
else:
response = httpx.post(
"http://localhost:8000/create/", json=self.model_dump_json()
)
breakpoint()
return response
@classmethod @classmethod
def get(cls, id): def interactive_create(cls, id: Optional[int] = None):
with Session(engine) as session: data = {}
return session.get(cls, id) for name, field in cls.__fields__.items():
default = field.default
if (
default is None or isinstance(default, PydanticUndefinedType)
) and not field.is_required():
default = "None"
if (isinstance(default, PydanticUndefinedType)) and field.is_required():
default = None
value = typer.prompt(f"{name}: ", default=default)
if value and value != "" and value != "None":
data[name] = value
item = cls(**data).create()
console.print(item)
@classmethod @classmethod
def get_all(cls): def pick(cls):
with Session(engine) as session: all = cls.all()
return session.exec(select(cls)).all() item = iterfzf([item.model_dump_json() for item in all])
if not item:
console.print("No item selected")
return
return cls.get(cls.parse_raw(item).id)
@classmethod @classmethod
def get_count(cls): def get(cls, id: int):
with Session(engine) as session: with Session(cls.engine) as session:
return session.exec(func.count(Hero.id)).scalar() if hasattr(cls, "__table_class__"):
return session.get(cls.__table_class__, id)
return cls.model_validate(session.get(cls, id))
@classmethod @classmethod
def get_first(cls): def get_or_pick(cls, id: Optional[int] = None):
with Session(engine) as session: if id is None:
return session.exec(select(cls).limit(1)).first() return cls.pick()
return cls.get(id=id)
@classmethod @classmethod
def get_last(cls): def all(cls) -> List:
with Session(engine) as session: with Session(cls.engine) as session:
return session.exec(select(cls).order_by(cls.id.desc()).limit(1)).first() if hasattr(cls, "__table_class__"):
return session.exec(select(cls.__table_class__)).all()
return [cls.model_validate(i) for i in session.exec(select(cls)).all()]
@classmethod @classmethod
def get_random(cls): def count(cls) -> int:
with Session(engine) as session: with Session(cls.engine) as session:
return session.exec(select(cls).order_by(cls.id).limit(1)).first() if hasattr(cls, "__table_class__"):
return session.exec(func.count(cls.__table_class__.id)).scalar()
return session.exec(func.count(cls.id)).scalar()
@classmethod @classmethod
def get_page(cls, page: int = 1, page_size: int = 20): def first(cls):
with Session(engine) as session: with Session(cls.engine) as session:
items = session.exec( if hasattr(cls, "__table_class__"):
select(cls).offset((page - 1) * page_size).limit(page_size) table = cls.__table_class__
).all() else:
total = cls.get_count() table = cls
return cls.model_validate(
session.exec(select(table).order_by(table.id.asc()).limit(1)).first()
)
@classmethod
def last(cls):
with Session(cls.engine) as session:
if hasattr(cls, "__table_class__"):
table = cls.__table_class__
else:
table = cls
return session.exec(
select(table).order_by(table.id.desc()).limit(1)
).first()
@classmethod
def get_page(
cls,
page: int = 1,
page_size: int = 20,
all: bool = False,
reverse: bool = False,
):
with Session(cls.engine) as session:
if hasattr(cls, "__table_class__"):
table = cls.__table_class__
else:
table = cls
if all:
items = session.exec(select(table)).all()
page_size = len(items)
else:
if reverse:
items = session.exec(
select(table)
.offset((page - 1) * page_size)
.limit(page_size)
.order_by(table.id.desc())
).all()
else:
items = session.exec(
select(table)
.offset((page - 1) * page_size)
.limit(page_size)
.order_by(table.id)
).all()
total = table.count()
# determine if there is a next page # determine if there is a next page
if page * page_size < total: if page * page_size < total:
next_page = page + 1 next_page = page + 1
@ -82,72 +168,126 @@ class Base(SQLModel):
) )
def delete(self): def delete(self):
with Session(engine) as session: with Session(self.engine) as session:
session.delete(self) session.delete(self)
session.commit() session.commit()
return self return self
def update(self): def update(self):
with Session(engine) as session: with Session(self.engine) as session:
validated = self.model_validate(self) validated = self.model_validate(self)
session.add(self.sqlmodel_update(validated)) session.add(self.sqlmodel_update(validated))
session.commit() session.commit()
session.refresh(self) session.refresh(self)
return self return self
@classmethod
def interactive_update(cls, id: Optional[int] = None):
item = cls.get_or_pick(id=id)
if not item:
console.print("No item selected")
return
for field in item.__fields__.keys():
if field == "id":
continue
value = typer.prompt(f"{field}: ", default=getattr(item, field) or "None")
if (
value
and value != ""
and value != "None"
and value != getattr(item, field)
):
setattr(item, field, value)
item.update()
console.print(item)
class Hero(Base, table=True): @classmethod
id: Optional[int] = Field(default=None, primary_key=True) def api(cls):
name: str api = FastAPI(
secret_name: str title="FastAPI",
age: Optional[int] = None version="0.1.0",
# docs_url=None,
# redoc_url=None,
# openapi_url=None,
# openapi_tags=tags_metadata,
# dependencies=[Depends(set_user), Depends(set_prefers)],
)
@validator("age") api.include_router(cls.router())
def validate_age(cls, v):
if v is None:
return v
if v > 0:
return v
return abs(v)
return api
sqlite_file_name = "database.db" @classmethod
sqlite_url = f"sqlite:///{sqlite_file_name}" def router(cls):
router = APIRouter()
# router.add_api_route("/get/", cls.get, methods=["GET"])
# router.add_api_route("/list/", cls.all, methods=["GET"])
# router.add_api_route("/create/", cls.create, methods=["POST"])
# router.add_api_route("/update/", cls.interactive_update, methods=["PUT"])
engine = create_engine(sqlite_url) # , echo=True) @router.get("/")
def get(id: int) -> cls:
return cls.get(id=id)
@router.get("/list", include_in_schema=False)
@router.get("/list/")
def get_page(
page: int = 1,
page_size: int = 20,
all: bool = False,
reverse: bool = False,
) -> PagedResult:
return cls.get_page()
# replace with alembic commands @router.post("/create")
def create_db_and_tables(): def create(cls: cls) -> cls:
SQLModel.metadata.create_all(engine) return cls.create()
@router.put("/update")
def update() -> cls:
return cls.update()
def create_heroes(): return router
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson").create()
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador").create()
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48).create()
# with Session(engine) as session: @classmethod
# session.add(hero_1) @property
# session.add(hero_2) def cli(cls):
# session.add(hero_3) app = typer.Typer()
#
# session.commit()
@app.command()
def get(id: int = typer.Option(None, help="Hero ID")):
console.print(cls.get_or_pick(id=id))
def page_heroes(): @app.command()
next_page = 1 def create():
while next_page: console.print(cls.interactive_create())
page = Hero.get_page(page=next_page, page_size=2)
console.print(page)
next_page = page.next_page
@app.command()
def list(
page: int = typer.Option(1, help="Page number"),
page_size: int = typer.Option(20, help="Page size"),
all: bool = typer.Option(False, help="Show all heroes"),
reverse: bool = typer.Option(False, help="Reverse order"),
):
console.print(
cls.get_page(
page=page,
page_size=page_size,
all=all,
reverse=reverse,
)
)
def main(): @app.command()
create_db_and_tables() def api():
create_heroes() cls.run_api()
page_heroes()
@app.command()
def update():
console.print(cls.interactive_update())
if __name__ == "__main__": return app
main()
@classmethod
def run_api(cls):
uvicorn.run(cls.api(), host="127.0.0.1", port=8000)

11
sqlmodel_base/cli.py Normal file
View file

@ -0,0 +1,11 @@
import typer
from sqlmodel_base.hero.cli import hero_app
app = typer.Typer()
app.add_typer(hero_app, name="hero")
if __name__ == "__main__":
app()

18
sqlmodel_base/database.py Normal file
View file

@ -0,0 +1,18 @@
from functools import lru_cache
from sqlmodel import Session, SQLModel, create_engine
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
@lru_cache
def get_engine():
engine = create_engine(sqlite_url)
SQLModel.metadata.create_all(engine)
return engine
def get_session():
with Session(get_engine()) as session:
yield session

65
sqlmodel_base/hero/cli.py Normal file
View file

@ -0,0 +1,65 @@
from rich.console import Console
from sqlmodel_base.database import get_engine
from sqlmodel_base.hero.models import Hero
engine = get_engine()
hero_app = Hero.cli
console = Console()
# @hero_app.callback()
# def hero():
# "model cli"
# @hero_app.command()
# def get(id: int = typer.Option(None, help="Hero ID")):
# console.print(Hero.get_or_pick(id=id))
# @hero_app.command()
# def list(
# page: int = typer.Option(1, help="Page number"),
# page_size: int = typer.Option(20, help="Page size"),
# all: bool = typer.Option(False, help="Show all heroes"),
# reverse: bool = typer.Option(False, help="Reverse order"),
# ):
# console.print(
# Hero.get_page(page=page, page_size=page_size, all=all, reverse=reverse)
# )
# @hero_app.command()
# def create(
# name: str = typer.Option(..., help="Hero name", prompt=True),
# secret_name: str = typer.Option(..., help="Hero secret name", prompt=True),
# age: int = typer.Option(None, help="Hero age", prompt=True),
# ):
# hero = Hero(
# name=name,
# secret_name=secret_name,
# age=age,
# ).create()
# console.print(hero)
# @hero_app.command()
# def update(
# id: int = typer.Option(None, help="Hero ID"),
# name: str = typer.Option(None, help="Hero name"),
# secret_name: str = typer.Option(None, help="Hero secret name"),
# age: int = typer.Option(None, help="Hero age"),
# ):
# hero = Hero.interactive_update(id=id)
# console.print(hero)
# @hero_app.command()
# def create_heroes():
# team_1 = Team.get(id=1)
# if not team_1:
# team_1 = Team(name="Team 1", headquarters="Headquarters 1").create()
# for _ in range(50):
# Hero(name="Deadpond", secret_name="Dive Wilson", team_id=team_1.id).create()
# Hero(name="Spider-Boy", secret_name="Pedro Parqueador").create()
# Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48).create()

View file

@ -0,0 +1,50 @@
from typing import Optional
from pydantic import validator
from rich.console import Console
from sqlmodel import Field
from sqlmodel_base.base import Base
console = Console()
class HeroBase(Base):
name: str
secret_name: str
age: Optional[int] = None
team_id: Optional[int] = Field(default=None, foreign_key="team.id")
@validator("age")
def validate_age(cls, v):
if v is None:
return v
if v > 0:
return v
return abs(v)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
__table_class__ = Hero
pass
class HeroRead(HeroBase):
__table_class__ = Hero
id: int
class HeroUpdate(Base, table=False):
__table_class__ = Hero
name: Optional[str]
secret_name: Optional[str]
age: Optional[int]
team_id: Optional[int]
if __name__ == "__main__":
Hero.cli()

View file

@ -0,0 +1,11 @@
from typing import Optional
from sqlmodel import Field
from sqlmodel_base.base import Base
class Team(Base, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str