cors support

This commit is contained in:
grey-cat-1908 2024-09-22 08:34:48 +00:00
parent 1a8f3e2980
commit 3620ea07a3
8 changed files with 57 additions and 12 deletions

View file

@ -1,7 +1,12 @@
DATABASE_USER=admin
DATABASE_PASSWORD=password
DATABASE_NAME=formaptix
PORT=3000
SECRET=super_secret
FRONTEND_HOST="http://example.com"
BACKEND_CORS_ORIGINS="http://localhost,http://localhost:5173"
ADMIN_PASSWORD=admin_password
DISABLE_ADMIN=false

View file

@ -2,7 +2,7 @@ from models import settings
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
engine = create_async_engine(settings.database)
engine = create_async_engine(settings.DATABASE)
sessions = async_sessionmaker(engine)

View file

@ -11,6 +11,8 @@ services:
- PORT=${PORT}
- ADMIN_PASSWORD=${ADMIN_PASSWORD}
- DISABLE_ADMIN=${DISABLE_ADMIN}
- BACKEND_CORS_ORIGINS=${BACKEND_CORS_ORIGINS}
- FRONTEND_HOST=${FRONTEND_HOST}
ports:
- ${PORT}:${PORT}
depends_on:

12
main.py
View file

@ -3,6 +3,7 @@ from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import models
import database
@ -22,4 +23,13 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
app.include_router(routes.router)
uvicorn.run(app, host="0.0.0.0", port=models.settings.port)
if models.settings.all_cors_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=models.settings.all_cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
uvicorn.run(app, host="0.0.0.0", port=models.settings.PORT)

View file

@ -1,12 +1,40 @@
from pydantic_settings import BaseSettings
from typing import Annotated, Any
from pydantic import AnyUrl, BeforeValidator, computed_field
from pydantic_settings import BaseSettings, SettingsConfigDict
def parse_cors(v: Any) -> list[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, list | str):
return v
raise ValueError(v)
class Settings(BaseSettings):
database: str
secret: str
port: int
admin_password: str
disable_admin: bool
model_config = SettingsConfigDict(
env_file=".env", env_ignore_empty=True, extra="ignore"
)
DATABASE: str
SECRET: str
PORT: int
FRONTEND_HOST: str = "http://localhost:5173"
BACKEND_CORS_ORIGINS: Annotated[list[AnyUrl] | str, BeforeValidator(parse_cors)] = (
[]
)
@computed_field # type: ignore[prop-decorator]
@property
def all_cors_origins(self) -> list[str]:
return [str(origin).rstrip("/") for origin in self.BACKEND_CORS_ORIGINS] + [
self.FRONTEND_HOST
]
ADMIN_PASSWORD: str
DISABLE_ADMIN: bool = False
settings = Settings()

View file

@ -16,7 +16,7 @@ async def create_user(auth: user.Auth, admin_token: Admin):
raise HTTPException(400, "Username must not be empty")
if len(auth.password.strip()) == 0:
raise HTTPException(400, "Password must not be empty")
if settings.disable_admin:
if settings.DISABLE_ADMIN:
raise HTTPException(403, "You are not admin")
salt = secrets.token_hex(8)

View file

@ -31,7 +31,7 @@ async def login(auth: models.Auth) -> models.Token:
id=user.id,
username=user.username,
token=jwt.encode(
{"sub": user.id}, settings.secret + user.password, "HS256"
{"sub": user.id}, settings.SECRET + user.password, "HS256"
),
)

View file

@ -14,7 +14,7 @@ def hash_password(password: str, salt: str) -> str:
def verify_admin(password: Annotated[str, Header(alias="x-token")]):
if password.strip() != settings.admin_password:
if password.strip() != settings.ADMIN_PASSWORD:
raise HTTPException(401, "Unauthorized")
return True
@ -39,7 +39,7 @@ async def verify_user(token: Annotated[str, Header(alias="x-token")]) -> databas
raise HTTPException(401, "Invalid token")
try:
jwt.decode(token, settings.secret + user.password, algorithms=["HS256"])
jwt.decode(token, settings.SECRET + user.password, algorithms=["HS256"])
except jwt.exceptions.InvalidSignatureError:
raise HTTPException(401, "Invalid token")