| """JWT security and authentication utilities.""" |
| from datetime import datetime, timedelta |
| from typing import Optional |
| from jose import JWTError, jwt |
| from fastapi import HTTPException, status, Depends |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from config import settings |
|
|
|
|
| |
| security = HTTPBearer() |
|
|
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: |
| """ |
| Create a JWT access token. |
| |
| Args: |
| data: Dictionary of data to encode in the token |
| expires_delta: Optional custom expiration time |
| |
| Returns: |
| Encoded JWT token string |
| """ |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.utcnow() + expires_delta |
| else: |
| expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expiration_minutes) |
| |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode( |
| to_encode, |
| settings.jwt_secret_key, |
| algorithm=settings.jwt_algorithm |
| ) |
| return encoded_jwt |
|
|
|
|
| def verify_token(token: str) -> dict: |
| """ |
| Verify and decode a JWT token. |
| |
| Args: |
| token: JWT token string |
| |
| Returns: |
| Decoded token payload |
| |
| Raises: |
| HTTPException: If token is invalid or expired |
| """ |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| try: |
| payload = jwt.decode( |
| token, |
| settings.jwt_secret_key, |
| algorithms=[settings.jwt_algorithm] |
| ) |
| return payload |
| except JWTError: |
| raise credentials_exception |
|
|
|
|
| async def get_current_user( |
| credentials: HTTPAuthorizationCredentials = Depends(security) |
| ) -> dict: |
| """ |
| FastAPI dependency to get current authenticated user from JWT token. |
| |
| Args: |
| credentials: HTTP Authorization credentials with Bearer token |
| |
| Returns: |
| User data from token payload |
| |
| Raises: |
| HTTPException: If token is invalid |
| """ |
| token = credentials.credentials |
| payload = verify_token(token) |
| return payload |
|
|
|
|