"""JWT security and authentication utilities.""" from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from config import settings # Security scheme for JWT Bearer token security = HTTPBearer() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """ Create a JWT access token. Args: data: Dictionary of data to encode in the token expires_delta: Optional custom expiration time Returns: Encoded JWT token string """ to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expiration_minutes) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.jwt_secret_key, algorithm=settings.jwt_algorithm ) return encoded_jwt def verify_token(token: str) -> dict: """ Verify and decode a JWT token. Args: token: JWT token string Returns: Decoded token payload Raises: HTTPException: If token is invalid or expired """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm] ) return payload except JWTError: raise credentials_exception async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> dict: """ FastAPI dependency to get current authenticated user from JWT token. Args: credentials: HTTP Authorization credentials with Bearer token Returns: User data from token payload Raises: HTTPException: If token is invalid """ token = credentials.credentials payload = verify_token(token) return payload