Cyril Dupland commited on
Commit
5227b54
·
1 Parent(s): a60bcb7

Add post-processing pipeline for carbon impact, pricing, and equivalences. Introduce orchestrator and context management for enhanced metadata handling in agent service.

Browse files
config/settings.py CHANGED
@@ -32,6 +32,39 @@ class Settings(BaseSettings):
32
  supabase_match_fn: str = "match_documents"
33
  rag_top_k: int = 5
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  model_config = SettingsConfigDict(
36
  env_file=".env",
37
  env_file_encoding="utf-8",
 
32
  supabase_match_fn: str = "match_documents"
33
  rag_top_k: int = 5
34
 
35
+ # Post-processing pipeline configuration
36
+ postprocessors_enabled: list[str] = [
37
+ "carbon_impact",
38
+ "pricing",
39
+ "equivalences",
40
+ ]
41
+
42
+ currency: str = "USD"
43
+
44
+ # Pricing per 1,000,000 tokens (input/output)
45
+ pricing: dict = {
46
+ # Complete or override in .env via nested JSON if needed
47
+ "mistral-medium-latest": {"input_per_1m": 0.40, "output_per_1m": 2.00},
48
+ "mistral-small-latest": {"input_per_1m": 0.10, "output_per_1m": 0.30},
49
+ "mistral-large-latest": {"input_per_1m": 2.00, "output_per_1m": 6.00},
50
+ "magistral-small-latest": {"input_per_1m": 0.50, "output_per_1m": 1.50},
51
+ "magistral-medium-latest": {"input_per_1m": 2.00, "output_per_1m": 5.00},
52
+ }
53
+
54
+ # Equivalence ratios using kgCO2eq as input
55
+ equivalence_ratios: dict = {
56
+ # https://impactco2.fr/outils/comparateur
57
+ # Ratios expressed as UNITS per kgCO2eq (invert of kgCO2 per unit)
58
+ # smartphone: 85.9 kgCO2 per unit → 1 / 85.9 ≈ 0.011643 smartphones per kgCO2
59
+ "smartphone_per_kgCO2eq": 0.011643,
60
+ # car (km): 219 g CO2 per km → 0.219 kg per km → 1 / 0.219 ≈ 4.566210 km per kgCO2
61
+ "car_km_per_kgCO2eq": 4.566210,
62
+ # tgv (km): 2.93 g CO2 per km → 0.00293 kg per km → 1 / 0.00293 ≈ 341.296928 km per kgCO2
63
+ "tgv_km_per_kgCO2eq": 341.296928,
64
+ # water (l): 321 g CO2 per liter → 0.321 kg per liter → 1 / 0.321 ≈ 3.115265 liters per kgCO2
65
+ "water_l_per_kgCO2eq": 3.115265,
66
+ }
67
+
68
  model_config = SettingsConfigDict(
69
  env_file=".env",
70
  env_file_encoding="utf-8",
services/agent_service.py CHANGED
@@ -6,7 +6,8 @@ from langchain_core.language_models.chat_models import BaseChatModel
6
  from domain.enums import ModelName, AgentType
7
  from .llm_service import llm_service
8
  from .agent_registry import agent_registry
9
- from .impact_service import trace_llm_impact
 
10
 
11
 
12
  class AgentService:
@@ -62,21 +63,40 @@ class AgentService:
62
  # Prepare messages
63
  messages = self._prepare_messages(message, conversation_history)
64
 
65
- # Execute graph
 
66
  result = await graph.ainvoke({"messages": messages})
 
67
 
68
  # Extract response
69
  response_message = result["messages"][-1]
70
  response_content = response_message.content
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  return {
73
  "response": response_content,
74
  "model": model_name.value,
75
  "agent_type": agent_type.value,
76
- "usage": getattr(response_message, "usage_metadata", None),
77
- "metadata": {
78
- "message_count": len(result["messages"])
79
- }
80
  }
81
 
82
  async def stream(
@@ -205,18 +225,16 @@ class AgentService:
205
  "documents": documents
206
  }
207
 
208
- # Compute latency and emissions for the final chunk
209
  latency_s = time.time() - start_time
210
- emissions_kg = None
211
- try:
212
- emissions_kg = trace_llm_impact(
213
- provider=model_name.provider.value,
214
- model=model_name.value,
215
- usage=usage_totals or {},
216
- latency=latency_s,
217
- )
218
- except Exception:
219
- emissions_kg = None
220
 
221
  # Send final chunk
222
  yield {
@@ -228,7 +246,7 @@ class AgentService:
228
  "usage": usage_totals,
229
  "usage_by_model": usage_by_model,
230
  "latency_s": latency_s,
231
- **({"emissions_kgCO2eq": emissions_kg, "emissions_gCO2eq": emissions_kg * 1000.0} if emissions_kg is not None else {})
232
  },
233
  "documents": documents
234
  }
 
6
  from domain.enums import ModelName, AgentType
7
  from .llm_service import llm_service
8
  from .agent_registry import agent_registry
9
+ from services.postprocessing.registry import build_orchestrator
10
+ from services.postprocessing.context import RunContext
11
 
12
 
13
  class AgentService:
 
63
  # Prepare messages
64
  messages = self._prepare_messages(message, conversation_history)
65
 
66
+ # Execute graph with latency
67
+ start_time = time.time()
68
  result = await graph.ainvoke({"messages": messages})
69
+ latency_s = time.time() - start_time
70
 
71
  # Extract response
72
  response_message = result["messages"][-1]
73
  response_content = response_message.content
74
 
75
+ # Prepare metadata and run post-processing pipeline
76
+ usage = getattr(response_message, "usage_metadata", None) or {}
77
+ usage_totals = self._normalize_usage(usage)
78
+ usage_by_model = {model_name.value: usage_totals}
79
+
80
+ ctx = RunContext(
81
+ provider=model_name.provider.value,
82
+ model=model_name.value,
83
+ usage_totals=usage_totals,
84
+ usage_by_model=usage_by_model,
85
+ latency_s=latency_s,
86
+ )
87
+ build_orchestrator().run(ctx)
88
+
89
+ base_metadata: Dict[str, Any] = {
90
+ "message_count": len(result["messages"]),
91
+ }
92
+ base_metadata.update(ctx.metadata_out)
93
+
94
  return {
95
  "response": response_content,
96
  "model": model_name.value,
97
  "agent_type": agent_type.value,
98
+ "usage": usage,
99
+ "metadata": base_metadata,
 
 
100
  }
101
 
102
  async def stream(
 
225
  "documents": documents
226
  }
227
 
228
+ # Compute latency and run post-processing pipeline for the final chunk
229
  latency_s = time.time() - start_time
230
+ ctx = RunContext(
231
+ provider=model_name.provider.value,
232
+ model=model_name.value,
233
+ usage_totals=usage_totals,
234
+ usage_by_model=usage_by_model,
235
+ latency_s=latency_s,
236
+ )
237
+ build_orchestrator().run(ctx)
 
 
238
 
239
  # Send final chunk
240
  yield {
 
246
  "usage": usage_totals,
247
  "usage_by_model": usage_by_model,
248
  "latency_s": latency_s,
249
+ **ctx.metadata_out
250
  },
251
  "documents": documents
252
  }
services/postprocessing/base.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Post-processing pipeline base interfaces and orchestrator."""
2
+ from typing import Protocol, List
3
+ from .context import RunContext
4
+
5
+
6
+ class PostProcessor(Protocol):
7
+ name: str
8
+
9
+ def process(self, ctx: RunContext) -> None:
10
+ ...
11
+
12
+
13
+ class PostProcessingOrchestrator:
14
+ def __init__(self, processors: List[PostProcessor]):
15
+ self._processors = processors
16
+
17
+ def run(self, ctx: RunContext) -> None:
18
+ for processor in self._processors:
19
+ try:
20
+ processor.process(ctx)
21
+ except Exception:
22
+ # Best effort: don't break the response if a processor fails
23
+ continue
24
+
25
+
services/postprocessing/context.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Shared context for post-processing pipeline."""
2
+ from dataclasses import dataclass, field
3
+ from typing import Dict, Any
4
+
5
+
6
+ @dataclass
7
+ class RunContext:
8
+ provider: str
9
+ model: str
10
+ usage_totals: Dict[str, int]
11
+ usage_by_model: Dict[str, Dict[str, int]]
12
+ latency_s: float
13
+ metadata_out: Dict[str, Any] = field(default_factory=dict)
14
+
15
+
services/postprocessing/processors/carbon_impact.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Carbon impact post-processor using ecologits."""
2
+ from services.impact_service import trace_llm_impact
3
+ from ..context import RunContext
4
+
5
+
6
+ class CarbonImpactProcessor:
7
+ name = "carbon_impact"
8
+
9
+ def process(self, ctx: RunContext) -> None:
10
+ kg = trace_llm_impact(ctx.provider, ctx.model, ctx.usage_totals, ctx.latency_s)
11
+ if kg is not None:
12
+ ctx.metadata_out["emissions_kgCO2eq"] = kg
13
+ ctx.metadata_out["emissions_gCO2eq"] = kg * 1000.0
14
+
15
+
services/postprocessing/processors/equivalences.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Equivalences post-processor mapping kgCO2eq to concrete equivalents."""
2
+ from config import settings
3
+ from ..context import RunContext
4
+
5
+
6
+ class EquivalencesProcessor:
7
+ name = "equivalences"
8
+
9
+ def process(self, ctx: RunContext) -> None:
10
+ kg = ctx.metadata_out.get("emissions_kgCO2eq")
11
+ if kg is None:
12
+ return
13
+ ratios = getattr(settings, "equivalence_ratios", {})
14
+ try:
15
+ eq = {
16
+ "water_liters": round(kg * float(ratios.get("water_l_per_kgCO2eq", 0)), 4),
17
+ "car_km": round(kg * float(ratios.get("car_km_per_kgCO2eq", 0)), 4),
18
+ "tgv_km": round(kg * float(ratios.get("tgv_km_per_kgCO2eq", 0)), 4),
19
+ "smartphone": round(kg * float(ratios.get("smartphone_per_kgCO2eq", 0)), 4),
20
+ }
21
+ except Exception:
22
+ return
23
+ ctx.metadata_out["equivalences"] = eq
24
+
25
+
services/postprocessing/processors/pricing.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Pricing post-processor computing costs per model and totals."""
2
+ from config import settings
3
+ from ..context import RunContext
4
+
5
+
6
+ class PricingProcessor:
7
+ name = "pricing"
8
+
9
+ def process(self, ctx: RunContext) -> None:
10
+ currency = getattr(settings, "currency", "EUR")
11
+ pricing_cfg = getattr(settings, "pricing", {})
12
+
13
+ by_model = {}
14
+ total_cost = 0.0
15
+
16
+ for model_id, usage in ctx.usage_by_model.items():
17
+ cfg = pricing_cfg.get(model_id) or {}
18
+ # Prefer per-million token pricing; fallback to per-1k if provided
19
+ in_rate_1m = cfg.get("input_per_1m")
20
+ out_rate_1m = cfg.get("output_per_1m")
21
+ if in_rate_1m is None and cfg.get("input_per_1k") is not None:
22
+ in_rate_1m = float(cfg.get("input_per_1k")) * 1000.0
23
+ if out_rate_1m is None and cfg.get("output_per_1k") is not None:
24
+ out_rate_1m = float(cfg.get("output_per_1k")) * 1000.0
25
+
26
+ in_rate_1m = float(in_rate_1m or 0)
27
+ out_rate_1m = float(out_rate_1m or 0)
28
+
29
+ cost_in = (usage.get("input_tokens", 0) / 1_000_000.0) * in_rate_1m
30
+ cost_out = (usage.get("output_tokens", 0) / 1_000_000.0) * out_rate_1m
31
+ model_total = cost_in + cost_out
32
+ by_model[model_id] = {
33
+ "input": round(cost_in, 6),
34
+ "output": round(cost_out, 6),
35
+ "total": round(model_total, 6),
36
+ }
37
+ total_cost += model_total
38
+
39
+ ctx.metadata_out["pricing"] = {
40
+ "currency": currency,
41
+ "total_cost": round(total_cost, 6),
42
+ "by_model": by_model,
43
+ }
44
+
45
+
services/postprocessing/registry.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Registry for building the post-processing orchestrator."""
2
+ from config import settings
3
+ from .base import PostProcessingOrchestrator
4
+ from .processors.carbon_impact import CarbonImpactProcessor
5
+ from .processors.pricing import PricingProcessor
6
+ from .processors.equivalences import EquivalencesProcessor
7
+
8
+
9
+ ALL = {
10
+ "carbon_impact": CarbonImpactProcessor(),
11
+ "pricing": PricingProcessor(),
12
+ "equivalences": EquivalencesProcessor(),
13
+ }
14
+
15
+
16
+ def build_orchestrator() -> PostProcessingOrchestrator:
17
+ enabled = getattr(settings, "postprocessors_enabled", [
18
+ "carbon_impact",
19
+ "pricing",
20
+ "equivalences",
21
+ ])
22
+ processors = [ALL[name] for name in enabled if name in ALL]
23
+ return PostProcessingOrchestrator(processors)
24
+
25
+