| import asyncio | |
| import os | |
| import sys | |
| import time | |
| import logging | |
| from pipecat.frames.frames import ( | |
| TextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, | |
| TTSStartedFrame, TTSStoppedFrame | |
| ) | |
| from pipecat.pipeline.pipeline import Pipeline | |
| from pipecat.pipeline.runner import PipelineRunner | |
| from pipecat.pipeline.task import PipelineParams | |
| from pipecat.processors.frame_processor import FrameProcessor, FrameDirection | |
| from pipecat.services.elevenlabs.tts import ElevenLabsTTSService | |
| from pipecat.services.deepgram.stt import DeepgramSTTService | |
| from pipecat.transports.services.daily import DailyParams, DailyTransport | |
| from pipecat.audio.vad.silero import SileroVADAnalyzer | |
| from azure_openai import AzureOpenAILLMService | |
| from elevenlabs import ElevenLabs | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| SILENCE_TIMEOUT_SECONDS = float(os.environ.get("SILENCE_TIMEOUT_SECONDS", 10)) | |
| MAX_SILENCE_PROMPTS = int(os.environ.get("MAX_SILENCE_PROMPTS", 3)) | |
| SILENCE_PROMPT_TEXT = "Are you still there?" | |
| GOODBYE_PROMPT_TEXT = "It seems you're no longer there. I'm hanging up now. Goodbye." | |
| class SilenceAndCallLogicProcessor(FrameProcessor): | |
| def __init__(self, tts_service, pipeline, app): | |
| super().__init__() | |
| self.tts_service = tts_service | |
| self.pipeline = pipeline | |
| self.app = app | |
| self.last_activity_ts = time.time() | |
| self.silence_prompts_count = 0 | |
| self._bot_is_speaking = False | |
| self._silence_check_task = None | |
| self.app.current_call_stats["silence_events"] = 0 | |
| async def start(self): | |
| self.last_activity_ts = time.time() | |
| self.silence_prompts_count = 0 | |
| self._bot_is_speaking = False | |
| if self._silence_check_task: | |
| self._silence_check_task.cancel() | |
| self._silence_check_task = asyncio.create_task(self._check_silence_loop()) | |
| async def stop(self): | |
| if self._silence_check_task: | |
| self._silence_check_task.cancel() | |
| try: | |
| await self._silence_check_task | |
| except asyncio.CancelledError: | |
| pass | |
| await self.tts_service.stop() | |
| def _reset_activity_timer(self): | |
| self.last_activity_ts = time.time() | |
| self.silence_prompts_count = 0 | |
| async def process_frame(self, frame, direction): | |
| if isinstance(frame, (UserStartedSpeakingFrame, TextFrame)) and direction == FrameDirection.UPSTREAM: | |
| self._reset_activity_timer() | |
| if isinstance(frame, TTSStartedFrame) and direction == FrameDirection.DOWNSTREAM: | |
| self._bot_is_speaking = True | |
| elif isinstance(frame, TTSStoppedFrame) and direction == FrameDirection.DOWNSTREAM: | |
| self._bot_is_speaking = False | |
| self.last_activity_ts = time.time() | |
| await self.push_frame(frame, direction) | |
| async def _check_silence_loop(self): | |
| while True: | |
| await asyncio.sleep(1) | |
| if self._bot_is_speaking: | |
| continue | |
| if time.time() - self.last_activity_ts > SILENCE_TIMEOUT_SECONDS: | |
| self.app.current_call_stats["silence_events"] += 1 | |
| self.silence_prompts_count += 1 | |
| self._bot_is_speaking = True | |
| if self.silence_prompts_count >= MAX_SILENCE_PROMPTS: | |
| await self.push_frame(TextFrame(GOODBYE_PROMPT_TEXT), FrameDirection.DOWNSTREAM) | |
| await asyncio.sleep(2) | |
| await self.pipeline.stop_when_done() | |
| break | |
| else: | |
| await self.push_frame(TextFrame(SILENCE_PROMPT_TEXT), FrameDirection.DOWNSTREAM) | |
| self.last_activity_ts = time.time() | |
| self._bot_is_speaking = False | |
| class PhoneChatbotApp: | |
| def __init__(self): | |
| self.daily_transport = None | |
| self.pipeline = None | |
| self.stt_service = None | |
| self.tts_service = None | |
| self.llm_service = None | |
| self.silence_processor = None | |
| self.call_start_time = None | |
| self.current_call_stats = { | |
| "duration_seconds": 0, | |
| "silence_events": 0, | |
| "start_time": None, | |
| "end_time": None, | |
| "ended_by_silence": False | |
| } | |
| def _reset_call_stats(self): | |
| self.call_start_time = time.time() | |
| self.current_call_stats = { | |
| "duration_seconds": 0, | |
| "silence_events": 0, | |
| "start_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.call_start_time)), | |
| "end_time": None, | |
| "ended_by_silence": False | |
| } | |
| async def _log_call_summary(self): | |
| if self.call_start_time: | |
| call_end_time = time.time() | |
| self.current_call_stats["duration_seconds"] = round(call_end_time - self.call_start_time, 2) | |
| self.current_call_stats["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(call_end_time)) | |
| if self.silence_processor and self.silence_processor.silence_prompts_count >= MAX_SILENCE_PROMPTS: | |
| self.current_call_stats["ended_by_silence"] = True | |
| logger.info("--- Post-Call Summary ---") | |
| for key, value in self.current_call_stats.items(): | |
| logger.info(f" {key.replace('_', ' ').title()}: {value}") | |
| logger.info("-------------------------") | |
| def setup_pipeline_hook(self, pipeline_params: PipelineParams, room_url: str, token: str): | |
| self._reset_call_stats() | |
| self.pipeline = Pipeline([self.stt_service, self.llm_service, self.tts_service]) | |
| self.silence_processor = SilenceAndCallLogicProcessor( | |
| tts_service=self.tts_service, | |
| pipeline=self.pipeline, | |
| app=self | |
| ) | |
| self.pipeline.processors.append(self.silence_processor) | |
| pipeline_params.pipeline = self.pipeline | |
| pipeline_params.params = PipelineParams(allow_interruptions=True, enable_metrics=True) | |
| return pipeline_params | |
| def validate_voice_id(self, voice_id: str) -> bool: | |
| try: | |
| client = ElevenLabs(api_key=os.environ.get("elevenlabs")) | |
| client.voices.get(voice_id=voice_id) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to validate ElevenLabs voice ID {voice_id}: {e}") | |
| return False | |
| async def run(self): | |
| required_keys = [ | |
| "deepgram", "elevenlabs", "dailyco", "azure_openai" | |
| ] | |
| missing_keys = [key for key in required_keys if not os.environ.get(key)] | |
| if missing_keys: | |
| logger.error(f"Missing environment variables: {', '.join(missing_keys)}") | |
| sys.exit(1) | |
| voice_id = os.environ.get("ELEVENLABS_VOICE_ID", "cgSgspJ2msm6clMCkdW9") | |
| if not self.validate_voice_id(voice_id): | |
| logger.error(f"Invalid ElevenLabs voice ID: {voice_id}") | |
| sys.exit(1) | |
| self.stt_service = DeepgramSTTService( | |
| api_key=os.environ.get("deepgram"), | |
| input_audio_format="linear16" | |
| ) | |
| self.tts_service = ElevenLabsTTSService( | |
| api_key=os.environ.get("elevenlabs"), | |
| voice_id=voice_id | |
| ) | |
| self.llm_service = AzureOpenAILLMService( | |
| preprompt="You are a friendly and helpful phone assistant." | |
| ) | |
| self.daily_transport = DailyTransport( | |
| os.environ.get("DAILY_DOMAIN", "your-username.daily.co"), | |
| os.environ.get("dailyco"), | |
| None, | |
| None, | |
| "Pipecat Phone Demo", | |
| vad_analyzer=SileroVADAnalyzer(), | |
| daily_params=DailyParams( | |
| audio_in_enabled=True, | |
| audio_out_enabled=True, | |
| transcription_enabled=False | |
| ) | |
| ) | |
| self.daily_transport.pipeline_params_hook = self.setup_pipeline_hook | |
| runner = PipelineRunner() | |
| try: | |
| await runner.run(self.daily_transport) | |
| except KeyboardInterrupt: | |
| logger.info("Ctrl+C pressed, shutting down") | |
| except Exception as e: | |
| logger.error(f"An error occurred: {e}", exc_info=True) | |
| finally: | |
| await self._log_call_summary() | |
| if self.pipeline: | |
| await self.pipeline.stop_when_done() | |
| if self.silence_processor: | |
| await self.silence_processor.stop() | |