accounts (admin && user)

This commit is contained in:
grey-cat-1908 2024-08-10 17:01:34 +03:00
parent d9cf862040
commit 12ee0d4e7b
11 changed files with 146 additions and 32 deletions

View file

@ -2,11 +2,12 @@ from models import settings
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from .user import *
engine = create_async_engine(settings.database)
sessions = async_sessionmaker(engine)
class Base(DeclarativeBase):
pass
from .user import User

View file

@ -18,6 +18,7 @@ async def lifespan(app: FastAPI):
await connection.run_sync(database.Base.metadata.create_all)
yield
app = FastAPI(lifespan=lifespan)
app.include_router(routes.router)

View file

@ -3,7 +3,7 @@ import pydantic
class BaseModel(pydantic.BaseModel):
class Config:
orm_mode = True
from_attributes = True
from .settings import settings

View file

@ -17,3 +17,11 @@ class Token(BaseModel):
id: int
username: str
token: str
class DeleteUser(BaseModel):
username: str
class UpdatePassword(BaseModel):
password: str

19
poetry.lock generated
View file

@ -533,6 +533,23 @@ azure-key-vault = ["azure-identity (>=1.16.0)", "azure-keyvault-secrets (>=4.8.0
toml = ["tomli (>=2.0.1)"]
yaml = ["pyyaml (>=6.0.1)"]
[[package]]
name = "pyjwt"
version = "2.9.0"
description = "JSON Web Token implementation in Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "PyJWT-2.9.0-py3-none-any.whl", hash = "sha256:3b02fb0f44517787776cf48f2ae25d8e14f300e6d7545a4315cee571a415e850"},
{file = "pyjwt-2.9.0.tar.gz", hash = "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c"},
]
[package.extras]
crypto = ["cryptography (>=3.4.0)"]
dev = ["coverage[toml] (==5.0.4)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=6.0.0,<7.0.0)", "sphinx", "sphinx-rtd-theme", "zope.interface"]
docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"]
tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"]
[[package]]
name = "python-dotenv"
version = "1.0.1"
@ -694,4 +711,4 @@ standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)",
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "c63b5753bf55ec6d3ae3cf225ad9d388fa1705cc2ce49718822221a8c19b2d66"
content-hash = "1d617553a03b53b743a89007a49b7167e21a07b5c19277b56b8b436af92c1f00"

View file

@ -14,6 +14,7 @@ pydantic = "^2.8.2"
SQLAlchemy = "^2.0.32"
pydantic-settings = "^2.4.0"
asyncpg = "^0.29.0"
pyjwt = "^2.9.0"
[tool.poetry.group.dev.dependencies]
isort = "^5.13.2"

Binary file not shown.

View file

@ -1,7 +1,9 @@
from fastapi import APIRouter
from . import admin
from . import user
router = APIRouter()
router.include_router(admin.router)
router.include_router(user.router)

View file

@ -1,29 +1,15 @@
import secrets
from typing import Annotated
import hashlib
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from fastapi import APIRouter, HTTPException
from sqlalchemy import select, delete
import database
from models import settings, user
from models import settings, user, DeleteUser
from .utils import Admin, hash_password
router = APIRouter(prefix="/admin")
def hash_password(password: str, salt: str) -> str:
return hashlib.sha512((password + salt).encode('utf-8')).hexdigest()
def verify_admin(token: str):
if token != settings.admin_password:
raise HTTPException(401, "Unauthorized")
return True
Admin = Annotated[bool, Depends(verify_admin, use_cache=False)]
@router.post("/user")
async def create_user(auth: user.Auth, admin_token: Admin):
if len(auth.username.strip()) == 0:
@ -36,17 +22,26 @@ async def create_user(auth: user.Auth, admin_token: Admin):
salt = secrets.token_hex(8)
async with database.sessions.begin() as session:
stmt = select(database.User).where(database.User.username == auth.username)
db_user = session.execute(stmt).scalar_one_or_none()
if db_user is not None:
stmt = select(database.User).where(
database.User.username == auth.username.strip()
)
db_request = await session.execute(stmt)
user = db_request.scalar_one_or_none()
if user is not None:
raise HTTPException(400, "User with this username already exists")
new_user = database.User(
username=auth.username,
password=hash_password(auth.password, salt),
username=auth.username.strip(),
password=hash_password(auth.password.strip(), salt),
salt=salt,
)
session.add(new_user)
await session.flush()
return {'status': 'Success'}
@router.delete("/user")
async def delete_user(user: DeleteUser, admin_token: Admin):
async with database.sessions.begin() as session:
stmt = delete(database.User).where(
database.User.username == user.username.strip()
)
await session.execute(stmt)

View file

@ -1,8 +1,47 @@
import hashlib
import secrets
from fastapi import APIRouter
from fastapi.security import OAuth2PasswordBearer
import jwt
from fastapi import APIRouter, HTTPException
from sqlalchemy import select
import database
import models
from models import settings
from .utils import hash_password, User
router = APIRouter(prefix="/user")
@router.post("/login")
async def login(auth: models.Auth):
async with database.sessions.begin() as session:
stmt = select(database.User).where(
database.User.username == auth.username.strip()
)
request = await session.execute(stmt)
user = request.scalar_one_or_none()
if (
user is None
or hash_password(auth.password.strip(), user.salt) != user.password
):
raise HTTPException(403, "Forbidden")
return models.Token(
id=user.id,
username=user.username,
token=jwt.encode(
{"sub": user.id}, settings.secret + user.password, "HS256"
),
)
@router.put("/update/password")
async def update_password(user: User, new: models.UpdatePassword):
if len(new.password.strip()) == 0:
raise HTTPException(400, "Password must not be empty")
async with database.sessions.begin() as session:
session.add(user)
user.salt = secrets.token_hex(8)
user.password = hash_password(new.password.strip(), user.salt)

50
routes/utils.py Normal file
View file

@ -0,0 +1,50 @@
from typing import Annotated
import hashlib
import jwt
from fastapi import Depends, HTTPException, Header
from sqlalchemy import select
from models import settings
import database
def hash_password(password: str, salt: str) -> str:
return hashlib.sha512((password + salt).encode("utf-8")).hexdigest()
def verify_admin(password: Annotated[str, Header(alias="x-token")]):
if password.strip() != settings.admin_password:
raise HTTPException(401, "Unauthorized")
return True
async def verify_user(token: Annotated[str, Header(alias="x-token")]) -> database.User:
try:
data = jwt.decode(
token, algorithms=["HS256"], options={"verify_signature": False}
)
except jwt.exceptions.DecodeError:
raise HTTPException(401, "Invalid token")
if "sub" not in data and not isinstance(data["sub"], int):
raise HTTPException(401, "Invalid token")
async with database.sessions.begin() as session:
stmt = select(database.User).where(database.User.id == data.get("sub"))
db_request = await session.execute(stmt)
user = db_request.scalar_one_or_none()
if user is None:
raise HTTPException(401, "Invalid token")
try:
jwt.decode(token, settings.secret + user.password, algorithms=["HS256"])
except jwt.exceptions.InvalidSignatureError:
raise HTTPException(401, "Invalid token")
return user
User = Annotated[database.User, Depends(verify_user, use_cache=False)]
Admin = Annotated[bool, Depends(verify_admin, use_cache=False)]