routeur_ia_api / tools /storage.py
Cyril Dupland
Enhance conversation workflows by introducing summarization and classification agents. Implement retrieval nodes for document fetching, and update settings for Supabase integration. Add Markdown to PDF conversion utilities and improve agent service to handle document metadata. Refactor agent registry to support orchestrated workflows.
0ef1224
raw
history blame
2.39 kB
"""Storage utilities for uploading files to Supabase Storage."""
from typing import Optional
import os
import logging
from supabase import create_client, Client
from config.settings import settings
SUPABASE_URL: Optional[str] = settings.supabase_url or os.environ.get("SUPABASE_URL")
SUPABASE_KEY: Optional[str] = settings.supabase_key or os.environ.get("SUPABASE_KEY")
SUPABASE_KEY: Optional[str] = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # 🔒 pas la clé anon
SUPABASE_BUCKET: str = os.environ.get("SUPABASE_BUCKET", "conversations-document")
def _get_client() -> Client:
if not SUPABASE_URL or not SUPABASE_KEY:
raise RuntimeError("SUPABASE_URL or SUPABASE_KEY is not configured in environment")
return create_client(SUPABASE_URL, SUPABASE_KEY)
def upload_pdf_to_supabase(file_path: str, filename: str) -> str:
"""Upload a local PDF file to Supabase Storage and return a public URL.
Args:
file_path: Local path to the PDF file to upload
filename: Destination filename in the bucket (can include folders)
Returns:
Public URL string to access the uploaded file
"""
client = _get_client()
# Allow nested folders in the provided filename if caller includes them
destination_path = filename.lstrip("/\\")
# Upload with explicit content type and upsert enabled to avoid 409/400
# Use storage3 expected keys: content_type, cache_control, upsert
options = {"content-type": "application/pdf", "cache_control": "public, max-age=3600", "upsert": "true"}
try:
with open(file_path, "rb") as file_obj:
client.storage.from_(SUPABASE_BUCKET).upload(
destination_path,
file_obj,
file_options=options,
)
except Exception as exc:
raise RuntimeError(f"Supabase upload failed: {exc}")
# Resolve a public URL using SDK helper
public = client.storage.from_(SUPABASE_BUCKET).get_public_url(destination_path)
if isinstance(public, dict):
data = public.get("data") or {}
url = data.get("publicUrl") or data.get("public_url")
if url:
return url
if isinstance(public, str) and public.startswith("http"):
return public
# Fallback to deterministic URL construction
return f"{SUPABASE_URL}/storage/v1/object/public/{SUPABASE_BUCKET}/{destination_path}"