import hashlib import secrets from datetime import datetime, timezone from fastapi import Request, HTTPException, status from fastapi.responses import RedirectResponse from sqlalchemy import select from app.models.base import async_session from app.models.user import User from app.models.auth import Session def hash_password(password: str) -> str: salt = secrets.token_hex(16) h = hashlib.sha256((salt + password).encode()).hexdigest() return f"{salt}:{h}" def verify_password(password: str, hashed: str) -> bool: salt, h = hashed.split(":", 1) return h == hashlib.sha256((salt + password).encode()).hexdigest() async def create_session(user_id: int) -> str: token = secrets.token_hex(32) async with async_session() as session: db_session = Session( user_id=user_id, token=token, created_at=datetime.now(timezone.utc).isoformat(), ) session.add(db_session) await session.commit() return token async def get_current_user(request: Request): token = request.cookies.get("session_token") if not token: raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}) async with async_session() as session: result = await session.execute( select(Session).where(Session.token == token) ) db_session = result.scalar_one_or_none() if not db_session: raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}) result = await session.execute( select(User).where(User.id == db_session.user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}) return user async def optional_user(request: Request): token = request.cookies.get("session_token") if not token: return None async with async_session() as session: result = await session.execute( select(Session).where(Session.token == token) ) db_session = result.scalar_one_or_none() if not db_session: return None result = await session.execute( select(User).where(User.id == db_session.user_id) ) return result.scalar_one_or_none()