"""legal-eye Auth v0 (2026-05-22) — magic-link email auth. Design: - User submits email → backend mints a single-use magic token (15-min TTL) and emails them a link `/auth?token=...` via reused SMTP config. - User clicks → /v1/auth/verify validates + consumes the token + mints a long-lived (30-day) session token. Returns the session token. - Client stores session token in localStorage. Every API call that wants to know who the user is sends `Authorization: Bearer `. Storage (all JSONL in runtime/): users.jsonl — one record per user: {id, email, created_ts, last_seen_ts} auth_tokens.jsonl — pending magic tokens: {token, email, expires_ts} auth_sessions.jsonl — active sessions: {session_token, user_id, email, created_ts, expires_ts, last_used_ts} Notes: - No password hashing (no passwords). - HF Spaces ephemeral storage means sessions die on container restart. Users re-login via magic link — mild friction, no security loss. - SMTP env vars reused: LE_LEAD_EMAIL_SMTP_{HOST,PORT,USER,PASS} + LE_LEAD_EMAIL_FROM. If unset → magic-link can't send; v0 returns the token in the response (dev-only fallback noted in logs). - Rate limiting: max 5 magic-link requests per email per 5 min. """ import os, json, time, uuid, secrets, threading, hashlib, re from pathlib import Path from typing import Optional, Dict, Any, List _RUNTIME_DIR = Path("tau_rag/runtime") USERS_PATH = _RUNTIME_DIR / "users.jsonl" TOKENS_PATH = _RUNTIME_DIR / "auth_tokens.jsonl" SESSIONS_PATH = _RUNTIME_DIR / "auth_sessions.jsonl" _LOCK = threading.RLock() # re-entrant: get_session calls _touch_user which also locks TOKEN_TTL_SECONDS = 15 * 60 # magic link: 15 min SESSION_TTL_SECONDS = 30 * 24 * 60 * 60 # session: 30 days # In-memory rate limiter — email → list of (ts) of recent requests _RATE: Dict[str, List[float]] = {} RATE_WINDOW_S = 5 * 60 RATE_MAX = 5 def _norm_email(email: str) -> str: return (email or "").strip().lower() def _email_valid(email: str) -> bool: if not email or len(email) > 254: return False if "@" not in email: return False local, _, domain = email.rpartition("@") if not local or not domain or "." not in domain: return False return True def _read_jsonl(path: Path) -> List[Dict[str, Any]]: if not path.exists(): return [] out = [] try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: out.append(json.loads(line)) except Exception: continue except Exception: pass return out def _write_jsonl_atomic(path: Path, items: List[Dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w", encoding="utf-8") as f: for it in items: f.write(json.dumps(it, ensure_ascii=False) + "\n") tmp.replace(path) def _append_jsonl(path: Path, record: Dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def _rate_check(email: str) -> bool: """Returns True if we should allow; False if rate-limited.""" now = time.time() bucket = _RATE.setdefault(email, []) # Drop old entries while bucket and bucket[0] < now - RATE_WINDOW_S: bucket.pop(0) if len(bucket) >= RATE_MAX: return False bucket.append(now) return True def _get_or_create_user(email: str) -> Dict[str, Any]: """Find user by email or create a new record. Returns the user dict.""" email = _norm_email(email) users = _read_jsonl(USERS_PATH) for u in users: if u.get("email") == email: return u now = time.time() user = { "id": str(uuid.uuid4()), "email": email, "created_ts": now, "created_iso": time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime()), "last_seen_ts": now, } with _LOCK: _append_jsonl(USERS_PATH, user) return user def _touch_user(user_id: str) -> None: """Update last_seen_ts for a user (best-effort).""" try: with _LOCK: users = _read_jsonl(USERS_PATH) changed = False for u in users: if u.get("id") == user_id: u["last_seen_ts"] = time.time() u["last_seen_iso"] = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime()) changed = True break if changed: _write_jsonl_atomic(USERS_PATH, users) except Exception: pass def request_magic_link(email: str) -> Dict[str, Any]: """Generate a magic token, store with TTL, return the token (caller is responsible for emailing the link).""" email = _norm_email(email) if not _email_valid(email): return {"ok": False, "reason": "invalid_email"} if not _rate_check(email): return {"ok": False, "reason": "rate_limited"} token = secrets.token_urlsafe(32) now = time.time() record = { "token": token, "email": email, "created_ts": now, "expires_ts": now + TOKEN_TTL_SECONDS, } with _LOCK: _append_jsonl(TOKENS_PATH, record) return {"ok": True, "token": token, "email": email, "expires_ts": record["expires_ts"]} def verify_magic_link(token: str) -> Dict[str, Any]: """Consume a magic token: validate, delete from tokens, create user if needed, mint a session token, return it.""" token = (token or "").strip() if not token: return {"ok": False, "reason": "missing_token"} now = time.time() with _LOCK: tokens = _read_jsonl(TOKENS_PATH) found = None remaining = [] for t in tokens: if t.get("token") == token and not found: # Check expiry if t.get("expires_ts", 0) < now: return {"ok": False, "reason": "token_expired"} found = t # consume — don't re-add else: # Drop expired tokens while we're here (housekeeping) if t.get("expires_ts", 0) >= now: remaining.append(t) if not found: return {"ok": False, "reason": "token_not_found"} # Rewrite without the consumed token _write_jsonl_atomic(TOKENS_PATH, remaining) # Get-or-create user user = _get_or_create_user(found["email"]) # Mint session session_token = secrets.token_urlsafe(48) session = { "session_token": session_token, "user_id": user["id"], "email": user["email"], "created_ts": now, "created_iso": time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime()), "expires_ts": now + SESSION_TTL_SECONDS, "last_used_ts": now, } with _LOCK: _append_jsonl(SESSIONS_PATH, session) return { "ok": True, "session_token": session_token, "user": { "id": user["id"], "email": user["email"], }, "expires_ts": session["expires_ts"], } def get_session(session_token: str) -> Optional[Dict[str, Any]]: """Look up an active session by token. Updates last_used_ts on hit. Returns None if not found, expired, or invalid.""" if not session_token: return None now = time.time() with _LOCK: sessions = _read_jsonl(SESSIONS_PATH) for s in sessions: if s.get("session_token") == session_token: if s.get("expires_ts", 0) < now: return None # Best-effort update of last_used_ts s["last_used_ts"] = now try: _write_jsonl_atomic(SESSIONS_PATH, sessions) except Exception: pass _touch_user(s["user_id"]) return s return None def logout(session_token: str) -> bool: """Invalidate a session.""" if not session_token: return False with _LOCK: sessions = _read_jsonl(SESSIONS_PATH) remaining = [s for s in sessions if s.get("session_token") != session_token] if len(remaining) == len(sessions): return False _write_jsonl_atomic(SESSIONS_PATH, remaining) return True def list_users() -> List[Dict[str, Any]]: """Admin only — list all users.""" return _read_jsonl(USERS_PATH) def session_from_header(authorization_header: Optional[str]) -> Optional[Dict[str, Any]]: """Extract session from 'Authorization: Bearer ' header.""" if not authorization_header: return None parts = authorization_header.strip().split(None, 1) if len(parts) != 2 or parts[0].lower() != "bearer": return None return get_session(parts[1].strip())