| """Text-to-speech and LLM response generation.""" |
|
|
| import logging |
|
|
| from .config import PERSONALITIES, get_random_personality |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| DEFAULT_MODEL_FAST = "llama-3.1-8b-instant" |
| DEFAULT_MODEL_SMART = "llama-3.3-70b-versatile" |
|
|
| |
| |
| ORPHEUS_TAGS = ("cheerful", "happy", "excited", "angry", "frustrated", "sad", "whispered", "shouting", "neutral") |
|
|
| import re as _re |
| _ORPHEUS_TAG_RE = _re.compile(r"\[(" + "|".join(ORPHEUS_TAGS) + r")\]\s*", flags=_re.IGNORECASE) |
|
|
|
|
| def strip_orpheus_tags(text: str) -> str: |
| """Remove [tag] markers from text — used when sending to non-Orpheus TTS so |
| Edge/ElevenLabs don't speak 'bracket cheerful bracket' verbatim. |
| """ |
| return _ORPHEUS_TAG_RE.sub("", text or "").strip() |
|
|
|
|
| class LLMResponder: |
| """Generate snarky responses using Groq (free) or fallback to pre-written.""" |
|
|
| def __init__( |
| self, |
| api_key: str = "", |
| personality: str = "mixtape", |
| user_name: str = "", |
| custom_prompt: str = "", |
| model: str = "", |
| enable_orpheus_tags: bool = False, |
| ): |
| self.api_key = api_key |
| self.client = None |
| self.personality = personality |
| self.user_name = (user_name or "").strip() |
| self.custom_prompt = (custom_prompt or "").strip() |
| self.enable_orpheus_tags = bool(enable_orpheus_tags) |
| |
| |
| self.model = model.strip() if model else ( |
| DEFAULT_MODEL_SMART if self.custom_prompt else DEFAULT_MODEL_FAST |
| ) |
|
|
| if api_key: |
| try: |
| from groq import Groq |
| self.client = Groq(api_key=api_key) |
| logger.info( |
| f"Groq LLM initialized: personality={PERSONALITIES.get(personality, {}).get('name', personality)}" |
| f" model={self.model}" |
| f" name={'SET' if self.user_name else 'UNSET'}" |
| f" custom_prompt={'SET' if self.custom_prompt else 'UNSET'}" |
| ) |
| except ImportError: |
| logger.warning("groq package not installed, using pre-written lines") |
| except Exception as e: |
| logger.warning(f"Groq init failed: {e}, using pre-written lines") |
|
|
| def _user_overlay(self) -> str: |
| """Build the 'name + custom user directive' overlay that rides on every Groq call. |
| |
| The custom prompt is given top billing — it can change tone, vocabulary, and |
| even override personality defaults. Personality is still applied below as |
| secondary tone guidance. |
| """ |
| parts = [] |
| if self.user_name: |
| parts.append(f"The user's name is {self.user_name}. Address them by name occasionally.") |
| if self.custom_prompt: |
| parts.append("USER DIRECTIVE (highest priority — follow this above all other tone rules):") |
| parts.append(self.custom_prompt) |
| return "\n".join(parts).strip() |
|
|
| def get_response(self, phone_count: int, context: str = "") -> str: |
| """Get a snarky response about phone usage.""" |
|
|
| |
| if not self.client: |
| return self._get_prewritten_shame() |
|
|
| try: |
| |
| if self.personality == "mixtape": |
| actual_personality = get_random_personality() |
| else: |
| actual_personality = self.personality |
|
|
| personality_data = PERSONALITIES.get(actual_personality, PERSONALITIES["angry_boss"]) |
|
|
| |
| shame_data = personality_data["shame"] |
| voice_desc = personality_data["voice"] |
| avoid = personality_data.get("avoid", "") |
|
|
| |
| personality_prompt = f"""{voice_desc} |
| |
| TONE: {shame_data['tone']} |
| STRUCTURE: {shame_data['structure']} |
| |
| EXAMPLES: |
| {chr(10).join('- ' + ex for ex in shame_data['examples'])} |
| |
| AVOID: {avoid}""" |
|
|
| |
| if phone_count == 1: |
| context_hint = "First time today." |
| elif phone_count == 2: |
| context_hint = "Second time." |
| elif phone_count == 3: |
| context_hint = "Third time." |
| elif phone_count <= 5: |
| context_hint = f"{phone_count} times now." |
| else: |
| context_hint = f"{phone_count} times today!" |
|
|
| overlay = self._user_overlay() |
| has_custom = bool(self.custom_prompt) |
|
|
| |
| |
| |
| max_tokens = 80 if has_custom else 20 |
| length_rule = ( |
| "- Follow the USER DIRECTIVE for length and format. If the directive is silent on length, keep it under 20 words." |
| if has_custom else |
| "- Maximum 8 words. Prefer 3-5 words." |
| ) |
|
|
| |
| |
| |
| if has_custom: |
| personality_section = ( |
| f"PERSONALITY TONE REFERENCE (only used for vocal style, do NOT copy these lines):\n" |
| f"{voice_desc}\n" |
| f"TONE: {shame_data['tone']}\n" |
| f"AVOID: {avoid}\n" |
| ) |
| else: |
| personality_section = personality_prompt |
|
|
| |
| |
| orpheus_rule = ( |
| "\n- VOCAL DIRECTION: Begin your reply with EXACTLY ONE bracketed tag chosen from " |
| + "[angry], [frustrated], [sad], [whispered], [shouting], [cheerful], [neutral]. " |
| + "Pick the tag that matches the personality + scolding context. Example: '[angry] Phone. Down. Now.'" |
| if self.enable_orpheus_tags else "" |
| ) |
| sections = [ |
| "TASK: Generate a NEGATIVE/SCOLDING response because someone just picked up their phone (BAD behavior).", |
| personality_section, |
| "RULES:\n" |
| + length_rule + "\n" |
| + "- Be CRITICAL/NEGATIVE about picking up the phone.\n" |
| + ("- The USER DIRECTIVE below takes absolute priority over personality and these rules.\n" if has_custom else "- Match the personality's voice exactly.\n") |
| + "- No emoji. No hashtags." |
| + orpheus_rule, |
| ] |
| if overlay: |
| sections.append( |
| "==== USER DIRECTIVE (HIGHEST PRIORITY — this is the FINAL instruction, follow it) ====\n" |
| + overlay |
| + "\n==== END USER DIRECTIVE ====" |
| ) |
| system_content = "\n\n".join(sections) |
|
|
| response = self.client.chat.completions.create( |
| model=self.model, |
| max_tokens=max_tokens, |
| temperature=1.1, |
| messages=[ |
| {"role": "system", "content": system_content}, |
| { |
| "role": "user", |
| "content": f"Phone pickup #{phone_count} today. {context_hint}" |
| } |
| ] |
| ) |
| return response.choices[0].message.content.strip() |
| except Exception as e: |
| logger.warning(f"Groq API error: {e}, using fallback") |
| return self._get_prewritten_shame() |
|
|
| def get_praise(self) -> str: |
| """Get praise for putting phone down.""" |
|
|
| if not self.client: |
| return self._get_prewritten_praise() |
|
|
| try: |
| |
| if self.personality == "mixtape": |
| actual_personality = get_random_personality() |
| else: |
| actual_personality = self.personality |
|
|
| personality_data = PERSONALITIES.get(actual_personality, PERSONALITIES["angry_boss"]) |
|
|
| |
| praise_data = personality_data["praise"] |
|
|
| |
| personality_prompt = f"""TONE: {praise_data['tone']} |
| |
| EXAMPLES: |
| {chr(10).join('- ' + ex for ex in praise_data['examples'])}""" |
|
|
| overlay = self._user_overlay() |
| has_custom = bool(self.custom_prompt) |
| max_tokens = 60 if has_custom else 15 |
| length_rule = ( |
| "- Follow the USER DIRECTIVE for length and format. If silent on length, keep it under 15 words." |
| if has_custom else |
| "- Maximum 5 words. Prefer 2-3 words." |
| ) |
|
|
| if has_custom: |
| personality_section = ( |
| f"PERSONALITY TONE REFERENCE (vocal style only — do NOT copy these lines verbatim):\n" |
| f"{personality_prompt}" |
| ) |
| else: |
| personality_section = personality_prompt |
|
|
| orpheus_rule = ( |
| "\n- VOCAL DIRECTION: Begin your reply with EXACTLY ONE bracketed tag chosen from " |
| + "[cheerful], [happy], [excited], [whispered], [neutral]. " |
| + "Pick the tag that matches the personality + praising context. Example: '[cheerful] Well done.'" |
| if self.enable_orpheus_tags else "" |
| ) |
| sections = [ |
| "TASK: Generate a POSITIVE/APPROVING response because someone just put their phone down (GOOD behavior).", |
| personality_section, |
| "RULES:\n" |
| + length_rule + "\n" |
| + "- Be POSITIVE/APPROVING about putting the phone down.\n" |
| + ("- The USER DIRECTIVE below takes absolute priority.\n" if has_custom else "- Match the personality's voice exactly.\n") |
| + "- No emoji." |
| + orpheus_rule, |
| ] |
| if overlay: |
| sections.append( |
| "==== USER DIRECTIVE (HIGHEST PRIORITY — this is the FINAL instruction, follow it) ====\n" |
| + overlay |
| + "\n==== END USER DIRECTIVE ====" |
| ) |
| system_content = "\n\n".join(sections) |
|
|
| response = self.client.chat.completions.create( |
| model=self.model, |
| max_tokens=max_tokens, |
| temperature=0.8, |
| messages=[ |
| {"role": "system", "content": system_content}, |
| {"role": "user", "content": "Phone down."} |
| ] |
| ) |
| return response.choices[0].message.content.strip() |
| except Exception: |
| return self._get_prewritten_praise() |
|
|
| def _get_prewritten_shame(self) -> str: |
| """Get personality-specific pre-written shame line.""" |
| import random |
|
|
| |
| if self.personality == "mixtape": |
| actual_personality = get_random_personality() |
| else: |
| actual_personality = self.personality |
|
|
| personality_data = PERSONALITIES.get(actual_personality, PERSONALITIES["angry_boss"]) |
| prewritten = personality_data.get("prewritten_shame", []) |
| return random.choice(prewritten) |
|
|
| def _get_prewritten_praise(self) -> str: |
| """Get personality-specific pre-written praise line.""" |
| import random |
|
|
| |
| if self.personality == "mixtape": |
| actual_personality = get_random_personality() |
| else: |
| actual_personality = self.personality |
|
|
| personality_data = PERSONALITIES.get(actual_personality, PERSONALITIES["angry_boss"]) |
| prewritten = personality_data.get("prewritten_praise", []) |
| return random.choice(prewritten) |
|
|
|
|
| GROQ_TTS_MODEL_EN = "canopylabs/orpheus-v1-english" |
| GROQ_TTS_MODEL_AR = "canopylabs/orpheus-arabic-saudi" |
| GROQ_TTS_VOICES_EN = {"autumn", "diana", "hannah", "austin", "daniel", "troy"} |
| GROQ_TTS_DEFAULT_VOICE = "autumn" |
|
|
|
|
| class TextToSpeech: |
| """Convert text to speech using Edge TTS (free), Groq Orpheus, or ElevenLabs. |
| |
| Provider priority is controlled by `tts_provider`: |
| - "auto" : prefer ElevenLabs (if eleven_key works), else Groq (if groq_key), else Edge |
| - "groq" : Groq Orpheus only (falls back to Edge on failure) |
| - "elevenlabs" : ElevenLabs only (falls back to Edge on failure) |
| - "edge" : Edge TTS only |
| """ |
|
|
| def __init__( |
| self, |
| elevenlabs_key: str = "", |
| voice: str = "", |
| eleven_voice_id: str = "", |
| personality: str = "mixtape", |
| groq_key: str = "", |
| groq_tts_voice: str = "", |
| groq_tts_model: str = "", |
| tts_provider: str = "auto", |
| ): |
| self.elevenlabs_key = elevenlabs_key |
| self.user_edge_voice = voice |
| self.user_eleven_voice = eleven_voice_id |
| self.personality = personality |
| self.eleven_client = None |
| self.chars_used = 0 |
| self.MONTHLY_LIMIT = 9000 |
| self.working_voice_cache = {} |
|
|
| |
| self.groq_key = groq_key |
| gv = (groq_tts_voice or "").strip().lower() |
| self.groq_tts_voice = gv if gv in GROQ_TTS_VOICES_EN else GROQ_TTS_DEFAULT_VOICE |
| self.groq_tts_model = (groq_tts_model or "").strip() or GROQ_TTS_MODEL_EN |
| provider = (tts_provider or "auto").strip().lower() |
| self.tts_provider = provider if provider in {"auto", "groq", "elevenlabs", "edge"} else "auto" |
|
|
| if elevenlabs_key: |
| try: |
| from elevenlabs import ElevenLabs |
| self.eleven_client = ElevenLabs(api_key=elevenlabs_key) |
| logger.info(f"ElevenLabs TTS initialized (voices will be validated on first use)") |
| except ImportError: |
| logger.warning("elevenlabs package not installed, using Edge TTS") |
| except Exception as e: |
| logger.warning(f"ElevenLabs init failed: {e}, using Edge TTS") |
|
|
| if self.groq_key: |
| logger.info( |
| f"Groq TTS available: model={self.groq_tts_model} voice={self.groq_tts_voice}" |
| ) |
| logger.info(f"TTS provider selection: {self.tts_provider}") |
|
|
| def _get_voice_for_personality(self): |
| """Get the appropriate voice based on personality and user override.""" |
| personality_data = PERSONALITIES.get(self.personality, PERSONALITIES["mixtape"]) |
|
|
| |
| edge_voice = self.user_edge_voice if self.user_edge_voice else personality_data.get("default_voice", "en-US-AnaNeural") |
|
|
| |
| if self.user_eleven_voice: |
| |
| eleven_voices = [self.user_eleven_voice] |
| else: |
| |
| eleven_voice_data = personality_data.get("default_eleven_voices", personality_data.get("default_eleven_voice", "21m00Tcm4TlvDq8ikWAM")) |
| if isinstance(eleven_voice_data, list): |
| eleven_voices = eleven_voice_data |
| else: |
| eleven_voices = [eleven_voice_data] |
|
|
| return edge_voice, eleven_voices |
|
|
| async def synthesize(self, text: str, output_path: str = "/tmp/judgy_reachy_tts.mp3") -> str: |
| """Convert text to speech, return path to audio file.""" |
|
|
| |
| edge_voice, eleven_voices = self._get_voice_for_personality() |
|
|
| provider = self.tts_provider |
|
|
| |
| if provider == "edge": |
| |
| spoken = strip_orpheus_tags(text) |
| logger.info(f"TTS provider=edge: using Edge TTS voice={edge_voice}") |
| return await self._synthesize_edge(spoken, output_path, edge_voice) |
|
|
| |
| if provider == "groq": |
| if self.groq_key: |
| try: |
| wav_path = output_path.rsplit(".", 1)[0] + ".wav" |
| logger.info(f"TTS provider=groq: Orpheus voice={self.groq_tts_voice}") |
| |
| return await self._synthesize_groq(text, wav_path) |
| except Exception as e: |
| logger.warning(f"Groq TTS failed: {e}, falling back to Edge TTS") |
| else: |
| logger.warning("TTS provider=groq but no groq_key; falling back to Edge TTS") |
| |
| return await self._synthesize_edge(strip_orpheus_tags(text), output_path, edge_voice) |
|
|
| |
| text_no_tags = strip_orpheus_tags(text) |
|
|
| |
| |
| |
| if self.eleven_client and (self.chars_used + len(text_no_tags)) < self.MONTHLY_LIMIT: |
| |
| if self.personality in self.working_voice_cache: |
| try: |
| cached_voice = self.working_voice_cache[self.personality] |
| logger.info(f"Using cached ElevenLabs voice: {cached_voice}") |
| return await self._synthesize_elevenlabs(text_no_tags, output_path, cached_voice) |
| except Exception as e: |
| logger.warning(f"Cached voice failed: {e}, trying other voices") |
| |
| del self.working_voice_cache[self.personality] |
|
|
| |
| for voice_id in eleven_voices: |
| try: |
| logger.info(f"Trying ElevenLabs voice: {voice_id}") |
| result = await self._synthesize_elevenlabs(text_no_tags, output_path, voice_id) |
| |
| self.working_voice_cache[self.personality] = voice_id |
| logger.info(f"✓ Voice {voice_id} works! Cached for {self.personality}") |
| return result |
| except Exception as e: |
| logger.warning(f"Voice {voice_id} failed: {e}, trying next...") |
| continue |
|
|
| |
| logger.warning(f"All ElevenLabs voices failed for {self.personality}, falling back to Groq/Edge") |
|
|
| |
| |
| if self.groq_key: |
| try: |
| wav_path = output_path.rsplit(".", 1)[0] + ".wav" |
| logger.info(f"Auto-path -> Groq Orpheus voice={self.groq_tts_voice}") |
| |
| return await self._synthesize_groq(text, wav_path) |
| except Exception as e: |
| logger.warning(f"Groq TTS failed in auto-path: {e}, falling back to Edge TTS") |
|
|
| |
| logger.info(f"Using Edge TTS with voice: {edge_voice}") |
| return await self._synthesize_edge(text_no_tags, output_path, edge_voice) |
|
|
| async def _synthesize_elevenlabs(self, text: str, output_path: str, voice_id: str) -> str: |
| """Use ElevenLabs for high-quality voice.""" |
| audio = self.eleven_client.text_to_speech.convert( |
| text=text, |
| voice_id=voice_id, |
| model_id="eleven_multilingual_v2", |
| ) |
|
|
| with open(output_path, "wb") as f: |
| for chunk in audio: |
| f.write(chunk) |
|
|
| self.chars_used += len(text) |
| logger.debug(f"ElevenLabs TTS: {len(text)} chars, total: {self.chars_used}") |
| return output_path |
|
|
| async def _synthesize_edge(self, text: str, output_path: str, voice: str) -> str: |
| """Use Edge TTS (free, unlimited).""" |
| import edge_tts |
|
|
| communicate = edge_tts.Communicate(text, voice) |
| await communicate.save(output_path) |
|
|
| logger.debug(f"Edge TTS: {len(text)} chars with voice {voice}") |
| return output_path |
|
|
| async def _synthesize_groq(self, text: str, output_path: str) -> str: |
| """Use Groq Orpheus TTS. Returns path to a WAV file. |
| |
| Runs the blocking HTTP call in a thread so we don't stall the asyncio loop. |
| """ |
| import asyncio |
| import requests |
|
|
| url = "https://api.groq.com/openai/v1/audio/speech" |
| payload = { |
| "model": self.groq_tts_model, |
| "input": text, |
| "voice": self.groq_tts_voice, |
| "response_format": "wav", |
| } |
| headers = { |
| "Authorization": f"Bearer {self.groq_key}", |
| "Content-Type": "application/json", |
| } |
|
|
| def _call(): |
| r = requests.post(url, json=payload, headers=headers, timeout=20) |
| r.raise_for_status() |
| with open(output_path, "wb") as f: |
| f.write(r.content) |
| return len(r.content) |
|
|
| n = await asyncio.to_thread(_call) |
| logger.info(f"Groq Orpheus TTS: {len(text)} chars -> {n} bytes WAV ({self.groq_tts_voice})") |
| return output_path |
|
|