Cyril Dupland commited on
Commit
9f8934d
·
1 Parent(s): b3630e8

feat voice: implement real-time response segmentation for agent replies, enhancing transcript delivery via WebRTC data channels. Update LangGraphProcessor to split responses into segments and modify frontend to display these segments in real-time. Update documentation to reflect new segment handling features.

Browse files
docs/VOICE_CLIENT_INTEGRATION.md CHANGED
@@ -519,6 +519,65 @@ Le polling HTTP sur `GET /voice/transcript/{conversation_id}` reste recommandé
519
  - recharger une conversation plus tard,
520
  - exporter les transcriptions.
521
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
522
  ---
523
 
524
  ## 4. Paramètres VAD (Voice Activity Detection)
 
519
  - recharger une conversation plus tard,
520
  - exporter les transcriptions.
521
 
522
+ ### 3.5. Segments de réponse agent en temps réel
523
+
524
+ Les réponses de l'agent sont également découpées en plusieurs segments texte côté serveur (dans `LangGraphProcessor`) et envoyées au client au fur et à mesure de leur synthèse vocale.
525
+
526
+ Le format des messages est :
527
+
528
+ ```json
529
+ {
530
+ "type": "assistant_segment",
531
+ "text": "phrase de la reponse agent",
532
+ "segment_index": 2,
533
+ "total_segments": 5,
534
+ "conversation_id": "uuid-de-la-conversation"
535
+ }
536
+ ```
537
+
538
+ #### SmallWebRTC – écouter les segments de réponse
539
+
540
+ ```javascript
541
+ const dc = pc.createDataChannel("pipecat-app");
542
+
543
+ dc.onmessage = (event) => {
544
+ try {
545
+ const msg = JSON.parse(event.data);
546
+ if (!msg || typeof msg !== "object") return;
547
+
548
+ if (msg.type === "buffered_transcript") {
549
+ displayUserSegment(msg.text, msg.segment_index);
550
+ } else if (msg.type === "assistant_segment") {
551
+ displayAssistantSegment(msg.text, msg.segment_index, msg.total_segments);
552
+ }
553
+ } catch (e) {
554
+ console.warn("Message datachannel non JSON:", event.data);
555
+ }
556
+ };
557
+ ```
558
+
559
+ #### Daily – écouter les segments de réponse
560
+
561
+ ```javascript
562
+ const callObject = window.Daily.createCallObject();
563
+
564
+ callObject.on("app-message", (ev) => {
565
+ const msg = ev.data;
566
+ if (!msg || typeof msg !== "object") return;
567
+
568
+ if (msg.type === "buffered_transcript") {
569
+ displayUserSegment(msg.text, msg.segment_index);
570
+ } else if (msg.type === "assistant_segment") {
571
+ displayAssistantSegment(msg.text, msg.segment_index, msg.total_segments);
572
+ }
573
+ });
574
+ ```
575
+
576
+ Comme pour les segments STT, ces messages sont complémentaires au polling HTTP :
577
+
578
+ - les segments `assistant_segment` offrent une vue temps réel, synchronisée grossièrement avec la voix,
579
+ - l'endpoint `GET /voice/transcript/{conversation_id}` fournit l'historique complet structuré `user` / `assistant`.
580
+
581
  ---
582
 
583
  ## 4. Paramètres VAD (Voice Activity Detection)
services/voice/README.md CHANGED
@@ -329,6 +329,43 @@ dc.onmessage = (event) => {
329
 
330
  Le polling HTTP sur `/voice/transcript/{conversation_id}` reste utile pour reconstruire l'historique complet (user/assistant) ou pour des usages offline/export.
331
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
332
  ---
333
 
334
  ## 4. Intégration dans d’autres types de clients
 
329
 
330
  Le polling HTTP sur `/voice/transcript/{conversation_id}` reste utile pour reconstruire l'historique complet (user/assistant) ou pour des usages offline/export.
331
 
332
+ ### 3.5. Segments de réponse agent en temps réel
333
+
334
+ En plus des segments STT utilisateur, les réponses de l'agent sont découpées en plusieurs segments texte dans `LangGraphProcessor` et renvoyées au client au fil de l'eau.
335
+
336
+ - Chaque segment de réponse agent génère un message JSON de la forme :
337
+
338
+ ```json
339
+ {
340
+ "type": "assistant_segment",
341
+ "text": "phrase de la reponse agent",
342
+ "segment_index": 2,
343
+ "total_segments": 5,
344
+ "conversation_id": "uuid-de-la-conversation"
345
+ }
346
+ ```
347
+
348
+ - Côté SmallWebRTC, ces messages sont reçus sur le même data channel `pipecat-app` :
349
+
350
+ ```javascript
351
+ dc.onmessage = (event) => {
352
+ try {
353
+ const msg = JSON.parse(event.data);
354
+ if (msg.type === "buffered_transcript") {
355
+ // segments STT utilisateur
356
+ displayUserSegment(msg.text, msg.segment_index);
357
+ } else if (msg.type === "assistant_segment") {
358
+ // segments de reponse agent
359
+ displayAssistantSegment(msg.text, msg.segment_index, msg.total_segments);
360
+ }
361
+ } catch (e) {
362
+ console.warn("Message datachannel non JSON:", event.data);
363
+ }
364
+ };
365
+ ```
366
+
367
+ Sur la page de test `voice.html`, ces segments sont déjà affichés dans la colonne de droite (fil de discussion en temps réel).
368
+
369
  ---
370
 
371
  ## 4. Intégration dans d’autres types de clients
services/voice/langgraph_processor.py CHANGED
@@ -1,7 +1,7 @@
1
  """Pipecat FrameProcessor that routes transcribed text through the LangGraph agent."""
2
  import re
3
  import logging
4
- from typing import Optional
5
 
6
  from pipecat.processors.frame_processor import FrameProcessor
7
  from pipecat.frames.frames import (
@@ -9,6 +9,7 @@ from pipecat.frames.frames import (
9
  TextFrame,
10
  LLMFullResponseStartFrame,
11
  LLMFullResponseEndFrame,
 
12
  )
13
 
14
  from services.voice.transcript_store import TranscriptStore
@@ -53,10 +54,25 @@ class LangGraphProcessor(FrameProcessor):
53
  TranscriptStore.append(self.conversation_id, "assistant", response)
54
 
55
  clean = self._clean_response_for_tts(response)
56
- logger.info("Sending to TTS: %s", clean)
 
 
 
57
 
58
  await self.push_frame(LLMFullResponseStartFrame())
59
- await self.push_frame(TextFrame(clean))
 
 
 
 
 
 
 
 
 
 
 
 
60
  await self.push_frame(LLMFullResponseEndFrame())
61
  except Exception:
62
  logger.exception("Error in LangGraphProcessor")
@@ -104,3 +120,57 @@ class LangGraphProcessor(FrameProcessor):
104
  clean = re.sub(r"\s+", " ", clean)
105
 
106
  return clean.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Pipecat FrameProcessor that routes transcribed text through the LangGraph agent."""
2
  import re
3
  import logging
4
+ from typing import Optional, List
5
 
6
  from pipecat.processors.frame_processor import FrameProcessor
7
  from pipecat.frames.frames import (
 
9
  TextFrame,
10
  LLMFullResponseStartFrame,
11
  LLMFullResponseEndFrame,
12
+ OutputTransportMessageFrame,
13
  )
14
 
15
  from services.voice.transcript_store import TranscriptStore
 
54
  TranscriptStore.append(self.conversation_id, "assistant", response)
55
 
56
  clean = self._clean_response_for_tts(response)
57
+ logger.info("Sending to TTS (cleaned): %s", clean)
58
+
59
+ segments = self._split_into_segments(clean)
60
+ logger.info("Split response into %d segment(s)", len(segments))
61
 
62
  await self.push_frame(LLMFullResponseStartFrame())
63
+ for idx, segment in enumerate(segments, start=1):
64
+ await self.push_frame(TextFrame(segment))
65
+ await self.push_frame(
66
+ OutputTransportMessageFrame(
67
+ {
68
+ "type": "assistant_segment",
69
+ "text": segment,
70
+ "segment_index": idx,
71
+ "total_segments": len(segments),
72
+ "conversation_id": self.conversation_id,
73
+ }
74
+ )
75
+ )
76
  await self.push_frame(LLMFullResponseEndFrame())
77
  except Exception:
78
  logger.exception("Error in LangGraphProcessor")
 
120
  clean = re.sub(r"\s+", " ", clean)
121
 
122
  return clean.strip()
123
+
124
+ # ------------------------------------------------------------------
125
+ # Segmentation helpers
126
+ # ------------------------------------------------------------------
127
+
128
+ @staticmethod
129
+ def _split_into_segments(text: str) -> List[str]:
130
+ """Split cleaned assistant text into sentence-like segments.
131
+
132
+ Uses simple punctuation-based splitting on `.`, `?`, `!` and
133
+ falls back to a single-segment list if no punctuation is found.
134
+ Very short segments are merged back into neighbours.
135
+ """
136
+ if not text:
137
+ return []
138
+
139
+ # Split on end-of-sentence punctuation followed by whitespace.
140
+ parts = re.split(r"(?<=[.?!])\s+", text)
141
+ parts = [p.strip() for p in parts if p and p.strip()]
142
+
143
+ if not parts:
144
+ return []
145
+ if len(parts) == 1:
146
+ return parts
147
+
148
+ # Merge very short trailing segments into the previous one to avoid noise.
149
+ merged: List[str] = []
150
+ buffer = ""
151
+ for idx, part in enumerate(parts):
152
+ if buffer:
153
+ candidate = buffer + " " + part
154
+ else:
155
+ candidate = part
156
+
157
+ # Heuristic: keep segments with at least ~10 characters,
158
+ # otherwise merge with the next piece.
159
+ if len(candidate) < 10 and idx < len(parts) - 1:
160
+ buffer = candidate
161
+ continue
162
+
163
+ if buffer and candidate is not buffer:
164
+ merged.append(candidate)
165
+ buffer = ""
166
+ else:
167
+ merged.append(part)
168
+ buffer = ""
169
+
170
+ if buffer:
171
+ if merged:
172
+ merged[-1] = merged[-1] + " " + buffer
173
+ else:
174
+ merged.append(buffer)
175
+
176
+ return merged
static/voice.html CHANGED
@@ -414,6 +414,29 @@
414
  bufferThreadEl.scrollTop = bufferThreadEl.scrollHeight;
415
  }
416
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
417
  async function fetchTranscript() {
418
  if (!conversationId) return;
419
  try {
@@ -484,8 +507,11 @@
484
  if (!event || typeof event.data !== "string") return;
485
  try {
486
  const msg = JSON.parse(event.data);
487
- if (msg && msg.type === "buffered_transcript") {
 
488
  appendBufferedSegment(msg);
 
 
489
  }
490
  } catch (e) {
491
  // Ignore non-JSON messages (ou logs si besoin)
 
414
  bufferThreadEl.scrollTop = bufferThreadEl.scrollHeight;
415
  }
416
 
417
+ function appendAssistantSegment(msg) {
418
+ if (!bufferThreadEl || !msg || !msg.text) return;
419
+
420
+ const wrapper = document.createElement("div");
421
+
422
+ const meta = document.createElement("div");
423
+ meta.className = "buffer-segment-meta";
424
+ const idx = typeof msg.segment_index === "number" ? msg.segment_index : null;
425
+ meta.textContent = idx !== null
426
+ ? `Agent · segment #${idx}${msg.total_segments ? " / " + msg.total_segments : ""}`
427
+ : "Agent";
428
+
429
+ const bubble = document.createElement("div");
430
+ bubble.className = "buffer-segment-bubble";
431
+ bubble.style.background = "#e0f2fe"; // legerement different des segments user
432
+ bubble.textContent = msg.text;
433
+
434
+ wrapper.appendChild(meta);
435
+ wrapper.appendChild(bubble);
436
+ bufferThreadEl.appendChild(wrapper);
437
+ bufferThreadEl.scrollTop = bufferThreadEl.scrollHeight;
438
+ }
439
+
440
  async function fetchTranscript() {
441
  if (!conversationId) return;
442
  try {
 
507
  if (!event || typeof event.data !== "string") return;
508
  try {
509
  const msg = JSON.parse(event.data);
510
+ if (!msg || typeof msg !== "object") return;
511
+ if (msg.type === "buffered_transcript") {
512
  appendBufferedSegment(msg);
513
+ } else if (msg.type === "assistant_segment") {
514
+ appendAssistantSegment(msg);
515
  }
516
  } catch (e) {
517
  // Ignore non-JSON messages (ou logs si besoin)