File size: 6,726 Bytes
beb8e32
 
c92d558
 
 
beb8e32
 
 
 
 
 
 
 
 
 
 
 
 
 
c92d558
 
 
 
 
beb8e32
 
 
c92d558
 
beb8e32
 
 
 
 
 
c92d558
 
beb8e32
 
 
 
 
c92d558
 
beb8e32
a33ba57
 
 
 
c92d558
a33ba57
beb8e32
a33ba57
2f0e868
a33ba57
 
 
 
beb8e32
 
a33ba57
 
beb8e32
2f0e868
beb8e32
a33ba57
 
2f0e868
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
beb8e32
2f0e868
a33ba57
 
2f0e868
a33ba57
beb8e32
a33ba57
 
beb8e32
a33ba57
 
 
 
 
beb8e32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a33ba57
beb8e32
 
 
 
 
 
 
a33ba57
beb8e32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a33ba57
beb8e32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""Lightweight head tracker using YOLO for face detection.

Ported from reachy_mini_conversation_app for voice assistant integration.
Model is loaded at initialization time (not lazy) to ensure face tracking
is ready immediately when the camera server starts.
"""

from __future__ import annotations
import logging
from typing import Tuple, Optional

import numpy as np
from numpy.typing import NDArray


logger = logging.getLogger(__name__)


class HeadTracker:
    """Lightweight head tracker using YOLO for face detection.
    
    Model is loaded at initialization time to ensure face tracking
    is ready immediately (matching conversation_app behavior).
    """

    def __init__(
        self,
        model_repo: str = "AdamCodd/YOLOv11n-face-detection",
        model_filename: str = "model.pt",
        confidence_threshold: float = 0.3,
        device: str = "cpu",
    ) -> None:
        """Initialize YOLO-based head tracker.

        Args:
            model_repo: HuggingFace model repository
            model_filename: Model file name
            confidence_threshold: Minimum confidence for face detection
            device: Device to run inference on ('cpu' or 'cuda')
        """
        self.confidence_threshold = confidence_threshold
        self.model = None
        self._model_repo = model_repo
        self._model_filename = model_filename
        self._device = device
        self._detections_class = None
        self._model_load_attempted = False
        self._model_load_error: Optional[str] = None
        
        # Load model immediately at init (not lazy)
        self._load_model()

    def _load_model(self) -> None:
        """Load YOLO model with retry logic."""
        if self._model_load_attempted:
            return
        
        self._model_load_attempted = True
        
        try:
            from ultralytics import YOLO
            from supervision import Detections
            from huggingface_hub import hf_hub_download
            import time
            
            self._detections_class = Detections
            
            # Download with retries
            max_retries = 3
            retry_delay = 5
            model_path = None
            last_error = None
            
            for attempt in range(max_retries):
                try:
                    model_path = hf_hub_download(
                        repo_id=self._model_repo,
                        filename=self._model_filename,
                    )
                    break
                except Exception as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        logger.warning(
                            "Model download failed (attempt %d/%d): %s. Retrying in %ds...",
                            attempt + 1, max_retries, e, retry_delay
                        )
                        time.sleep(retry_delay)
            
            if model_path is None:
                raise last_error
            
            self.model = YOLO(model_path).to(self._device)
            logger.info("YOLO face detection model loaded")
        except ImportError as e:
            self._model_load_error = f"Missing dependencies: {e}"
            logger.warning("Face tracking disabled - missing dependencies: %s", e)
            self.model = None
        except Exception as e:
            self._model_load_error = str(e)
            logger.error("Failed to load YOLO model: %s", e)
            self.model = None

    @property
    def is_available(self) -> bool:
        """Check if the head tracker is available and ready."""
        return self.model is not None and self._detections_class is not None

    def _select_best_face(self, detections) -> Optional[int]:
        """Select the best face based on confidence and area.

        Args:
            detections: Supervision detections object

        Returns:
            Index of best face or None if no valid faces
        """
        if detections.xyxy.shape[0] == 0:
            return None

        if detections.confidence is None:
            return None

        # Filter by confidence threshold
        valid_mask = detections.confidence >= self.confidence_threshold
        if not np.any(valid_mask):
            return None

        valid_indices = np.where(valid_mask)[0]

        # Calculate areas for valid detections
        boxes = detections.xyxy[valid_indices]
        areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])

        # Combine confidence and area (weighted towards larger faces)
        confidences = detections.confidence[valid_indices]
        scores = confidences * 0.7 + (areas / np.max(areas)) * 0.3

        best_idx = valid_indices[np.argmax(scores)]
        return int(best_idx)

    def _bbox_to_normalized_coords(
        self, bbox: NDArray[np.float32], w: int, h: int
    ) -> NDArray[np.float32]:
        """Convert bounding box center to normalized coordinates [-1, 1].

        Args:
            bbox: Bounding box [x1, y1, x2, y2]
            w: Image width
            h: Image height

        Returns:
            Center point in [-1, 1] coordinates
        """
        center_x = (bbox[0] + bbox[2]) / 2.0
        center_y = (bbox[1] + bbox[3]) / 2.0

        # Normalize to [0, 1] then to [-1, 1]
        norm_x = (center_x / w) * 2.0 - 1.0
        norm_y = (center_y / h) * 2.0 - 1.0

        return np.array([norm_x, norm_y], dtype=np.float32)

    def get_head_position(
        self, img: NDArray[np.uint8]
    ) -> Tuple[Optional[NDArray[np.float32]], Optional[float]]:
        """Get head position from face detection.

        Args:
            img: Input image (BGR format)

        Returns:
            Tuple of (face_center [-1,1], confidence) or (None, None) if no face
        """
        if not self.is_available:
            return None, None

        h, w = img.shape[:2]

        try:
            # Run YOLO inference
            results = self.model(img, verbose=False)
            detections = self._detections_class.from_ultralytics(results[0])

            # Select best face
            face_idx = self._select_best_face(detections)
            if face_idx is None:
                return None, None

            bbox = detections.xyxy[face_idx]
            confidence = None
            if detections.confidence is not None:
                confidence = float(detections.confidence[face_idx])

            # Get face center in [-1, 1] coordinates
            face_center = self._bbox_to_normalized_coords(bbox, w, h)

            return face_center, confidence

        except Exception as e:
            logger.debug("Error in head position detection: %s", e)
            return None, None