import io from typing import Any from fastapi.testclient import TestClient from app import app from services.task_service import task_registry client = TestClient(app) def _get_auth_headers() -> dict[str, str]: # The API currently allows generating a token without real auth checks. from domain.models import TokenRequest from fastapi.testclient import TestClient as _TC from app import app as _app _client = _TC(_app) resp = _client.post("/auth/token", json=TokenRequest(username="test", password="test").model_dump()) resp.raise_for_status() token = resp.json()["access_token"] return {"Authorization": f"Bearer {token}"} def test_txt_ingestion_creates_job_and_uses_text_pipeline() -> None: headers = _get_auth_headers() content = b"Hello world\nThis is a simple text file used for ingestion tests." files: dict[str, Any] = { "file": ("test.txt", io.BytesIO(content), "text/plain"), } response = client.post( "/projects/test-project/documents", headers=headers, files=files, ) assert response.status_code in (200, 202) data = response.json() job_id = data["job_id"] status_resp = client.get( f"/projects/test-project/documents/jobs/{job_id}", headers=headers, ) assert status_resp.status_code == 200 status_data = status_resp.json() assert status_data["job_id"] == job_id assert status_data["status"] in {"running", "succeeded", "queued", "failed"} assert status_data["chunks_total"] is not None