File size: 3,945 Bytes
9df97a2
 
 
 
 
 
 
 
 
 
 
 
 
64ba296
 
 
 
 
 
 
 
 
 
 
 
 
9df97a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Security utilities for password hashing and JWT token generation.
"""

from datetime import datetime, timedelta
from typing import Optional
import os
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.schemas.user import TokenData


# Configuration
#SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production-to-something-very-secure-and-random")
SECRET_KEY = os.getenv("SECRET_KEY")
_INSECURE_DEFAULT = "your-secret-key-change-in-production-to-something-very-secure-and-random"
if not SECRET_KEY or SECRET_KEY == _INSECURE_DEFAULT:
    # Autorise un fallback uniquement en dev local explicite
    if os.getenv("ALLOW_INSECURE_SECRET", "false").lower() == "true":
        SECRET_KEY = _INSECURE_DEFAULT
    else:
        raise RuntimeError(
            "SECRET_KEY manquante ou non securisee. "
            "Definis une vraie cle via la variable d'environnement SECRET_KEY "
            "(genere-la avec: python -c \"import secrets; print(secrets.token_hex(32))\")."
        )
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 * 24 * 60  # 30 days in minutes

# Password hashing
# Create custom CryptContext with multiple hash schemes
# Supports argon2id (preferred), bcrypt, etc.
pwd_context = CryptContext(
    schemes=["argon2", "bcrypt"],
    deprecated="auto"
)


def get_password_hash(password: str) -> str:
    """
    Hash a password using bcrypt.
    
    Args:
        password: Plain text password (max 72 bytes due to bcrypt limitation)
        
    Returns:
        Hashed password
    """
    # Truncate password to 72 bytes to comply with bcrypt limitation
    truncated_password = password.encode('utf-8')[:72].decode('utf-8', errors='ignore')
    return pwd_context.hash(truncated_password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Verify a password against its hash using bcrypt.
    
    Args:
        plain_password: Plain text password to verify
        hashed_password: Hashed password to compare against
        
    Returns:
        True if password matches, False otherwise
    """
    try:
        # Truncate password to 72 bytes to comply with bcrypt limitation
        truncated_password = plain_password.encode('utf-8')[:72].decode('utf-8', errors='ignore')
        return pwd_context.verify(truncated_password, hashed_password)
    except Exception:
        # If hash could not be identified, try plain text comparison (for test data)
        # THIS IS ONLY FOR DEVELOPMENT - REMOVE IN PRODUCTION
        return plain_password == hashed_password


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    Create a JWT access token.
    
    Args:
        data: Dictionary containing token claims (e.g., {"sub": "user@example.com", "user_id": 1})
        expires_delta: Custom expiration time. If None, defaults to ACCESS_TOKEN_EXPIRE_MINUTES
        
    Returns:
        JWT token string
    """
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


def decode_token(token: str) -> TokenData:
    """
    Decode and validate a JWT token.
    
    Args:
        token: JWT token string
        
    Returns:
        TokenData with decoded claims
        
    Raises:
        JWTError: If token is invalid or expired
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        user_id: int = payload.get("user_id")
        
        if email is None or user_id is None:
            raise JWTError("Invalid token claims")
        
        return TokenData(sub=email, user_id=user_id)
    except JWTError:
        raise