"""Helpers for LangGraph stream chunks (custom writer payloads, tuple parsing).""" from __future__ import annotations from typing import Any, Dict, Optional, Tuple def parse_stream_graph_item(msg: Any) -> Tuple[Optional[str], Any]: """ Normalize items yielded by CompiledGraph.stream(stream_mode=[...]). LangGraph typically yields: - (mode, payload) when stream_mode is a list and subgraphs=False - (namespace, mode, payload) when subgraph streaming is enabled Some versions may emit dict-shaped stream parts; handle defensively. """ if isinstance(msg, dict): m = msg.get("type") or msg.get("mode") or msg.get("event") if m == "custom": return "custom", msg.get("data", msg) if m in ("messages", "updates", "values", "debug"): return str(m), msg.get("data", msg.get("chunk", msg)) return None, msg if isinstance(msg, (tuple, list)): if len(msg) == 2: mode, payload = msg[0], msg[1] return (str(mode) if mode is not None else None, payload) if len(msg) == 3: _ns, mode, payload = msg[0], msg[1], msg[2] return (str(mode) if mode is not None else None, payload) return None, msg def normalize_custom_writer_payload(raw: Any) -> Dict[str, Any]: """ Map get_stream_writer() payloads to a dict with a stable 'kind' for clients. - str -> tool status line (legacy tools passing plain strings) - dict with 'kind' -> returned as a shallow copy - dict without 'kind' -> kind 'custom', other keys preserved - other -> kind 'custom' with a string representation """ if isinstance(raw, str): return {"kind": "tool", "message": raw} if isinstance(raw, dict): kind = raw.get("kind") if isinstance(kind, str) and kind != "": return dict(raw) rest = {k: v for k, v in raw.items() if k != "kind"} return {"kind": "custom", **rest} return {"kind": "custom", "message": str(raw)}