routeur_ia_api / api /routes /realtime.py
Cyril Dupland
FIrst Commit
d28f1ed
"""Real-time communication routes using WebRTC."""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from typing import Dict, Set
import json
import logging
from datetime import datetime
from core.security import get_current_user
router = APIRouter(prefix="/realtime", tags=["Real-time"])
logger = logging.getLogger(__name__)
# Store active WebSocket connections
active_connections: Set[WebSocket] = set()
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time bidirectional communication.
This endpoint provides a WebSocket connection for:
- WebRTC signaling (offer/answer/ICE candidates)
- Real-time text messages
- JSON data exchange
**Message Format:**
```json
{
"type": "offer|answer|ice_candidate|message|data",
"payload": {...}
}
```
**Authentication:**
For production, you should authenticate the WebSocket connection.
You can pass the JWT token as a query parameter: ws://host/realtime/ws?token=<jwt>
"""
await websocket.accept()
active_connections.add(websocket)
connection_id = id(websocket)
logger.info(f"WebSocket connection established: {connection_id}")
try:
# Send welcome message
await websocket.send_json({
"type": "connected",
"payload": {
"connection_id": connection_id,
"timestamp": datetime.utcnow().isoformat(),
"message": "WebSocket connection established"
}
})
# Listen for messages
while True:
# Receive message
message = await websocket.receive_text()
try:
data = json.loads(message)
message_type = data.get("type", "unknown")
payload = data.get("payload", {})
logger.info(f"Received {message_type} from {connection_id}")
# Handle different message types
if message_type == "offer":
# WebRTC offer
response = await handle_webrtc_offer(payload)
await websocket.send_json(response)
elif message_type == "answer":
# WebRTC answer
response = await handle_webrtc_answer(payload)
await websocket.send_json(response)
elif message_type == "ice_candidate":
# ICE candidate for WebRTC
response = await handle_ice_candidate(payload)
await websocket.send_json(response)
elif message_type == "message":
# Text message
response = await handle_text_message(payload)
await websocket.send_json(response)
elif message_type == "ping":
# Ping/pong for keep-alive
await websocket.send_json({
"type": "pong",
"payload": {
"timestamp": datetime.utcnow().isoformat()
}
})
else:
# Unknown message type
await websocket.send_json({
"type": "error",
"payload": {
"message": f"Unknown message type: {message_type}"
}
})
except json.JSONDecodeError:
await websocket.send_json({
"type": "error",
"payload": {
"message": "Invalid JSON format"
}
})
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected: {connection_id}")
except Exception as e:
logger.error(f"WebSocket error: {str(e)}", exc_info=True)
finally:
active_connections.discard(websocket)
logger.info(f"WebSocket connection closed: {connection_id}")
async def handle_webrtc_offer(payload: dict) -> dict:
"""
Handle WebRTC offer.
In a full implementation, this would:
1. Create a peer connection
2. Set remote description (offer)
3. Create and return an answer
Args:
payload: WebRTC offer SDP
Returns:
Response with answer or error
"""
# Placeholder implementation
# TODO: Implement full WebRTC signaling with aiortc
return {
"type": "answer",
"payload": {
"message": "WebRTC offer received. Full implementation pending.",
"sdp": payload.get("sdp", ""),
"note": "This is a placeholder. Implement with aiortc for production."
}
}
async def handle_webrtc_answer(payload: dict) -> dict:
"""
Handle WebRTC answer.
Args:
payload: WebRTC answer SDP
Returns:
Acknowledgment
"""
return {
"type": "ack",
"payload": {
"message": "WebRTC answer received"
}
}
async def handle_ice_candidate(payload: dict) -> dict:
"""
Handle ICE candidate.
Args:
payload: ICE candidate data
Returns:
Acknowledgment
"""
return {
"type": "ack",
"payload": {
"message": "ICE candidate received"
}
}
async def handle_text_message(payload: dict) -> dict:
"""
Handle text message.
This can be extended to:
- Send to AI agent for processing
- Broadcast to other connections
- Store in database
Args:
payload: Message data with 'text' field
Returns:
Response message
"""
text = payload.get("text", "")
# Echo the message back (placeholder)
# TODO: Integrate with agent service for AI responses
return {
"type": "message",
"payload": {
"text": f"Received: {text}",
"timestamp": datetime.utcnow().isoformat(),
"note": "This is an echo. Integrate with agent_service for AI responses."
}
}
@router.get("/connections")
async def get_active_connections(
current_user: dict = Depends(get_current_user)
) -> dict:
"""
Get count of active WebSocket connections.
Args:
current_user: Authenticated user
Returns:
Connection statistics
"""
return {
"active_connections": len(active_connections),
"timestamp": datetime.utcnow().isoformat()
}
@router.post("/broadcast")
async def broadcast_message(
message: dict,
current_user: dict = Depends(get_current_user)
) -> dict:
"""
Broadcast a message to all active WebSocket connections.
Args:
message: Message to broadcast
current_user: Authenticated user
Returns:
Broadcast status
"""
broadcast_count = 0
for connection in active_connections:
try:
await connection.send_json({
"type": "broadcast",
"payload": message
})
broadcast_count += 1
except Exception as e:
logger.error(f"Failed to broadcast to connection: {str(e)}")
return {
"message": "Broadcast sent",
"recipients": broadcast_count,
"timestamp": datetime.utcnow().isoformat()
}