Spaces:
Running
Running
| # SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: BSD 2-Clause License | |
| """Proactivity processor that manages automated bot responses during conversation lulls. | |
| Monitors conversation activity and triggers automated responses after periods of silence | |
| to maintain user engagement. | |
| """ | |
| import asyncio | |
| from loguru import logger | |
| from pipecat.frames.frames import ( | |
| BotStartedSpeakingFrame, | |
| BotStoppedSpeakingFrame, | |
| EndFrame, | |
| Frame, | |
| TTSSpeakFrame, | |
| UserStartedSpeakingFrame, | |
| UserStoppedSpeakingFrame, | |
| ) | |
| from pipecat.processors.frame_processor import FrameDirection, FrameProcessor | |
| from nvidia_pipecat.frames.action import FinishedPresenceUserActionFrame, StartedPresenceUserActionFrame | |
| class ProactivityProcessor(FrameProcessor): | |
| """Manages automated bot responses during conversation pauses. | |
| Monitors user presence and conversation activity, automatically generating | |
| proactive messages during extended periods of silence. | |
| Attributes: | |
| default_message (str): Message sent when the inactivity timer expires. | |
| timer_duration (float): Seconds to wait before triggering proactive message. | |
| Input Frames: | |
| StartedPresenceUserActionFrame: User becomes present | |
| FinishedPresenceUserActionFrame: User leaves | |
| BotStartedSpeakingFrame: Bot starts speaking | |
| BotStoppedSpeakingFrame: Bot stops speaking | |
| UserStartedSpeakingFrame: User starts speaking | |
| UserStoppedSpeakingFrame: User stops speaking | |
| EndFrame: Pipeline ends | |
| """ | |
| def __init__(self, default_message: str = "I'm here if you need me!", timer_duration: float = 100, **kwargs): | |
| """Initializes the processor with specified message and timer settings. | |
| Args: | |
| default_message (str): Message sent when inactivity timer expires. | |
| timer_duration (float): Seconds to wait before sending message. | |
| **kwargs: Additional arguments passed to parent FrameProcessor. | |
| """ | |
| super().__init__(**kwargs) | |
| self.default_message = default_message | |
| self.timer_duration = timer_duration | |
| self._timer_task = None | |
| self._user_present = False | |
| async def _start_timer(self): | |
| """Start the proactivity timer. | |
| Internal method that manages the timer countdown and sends the default message | |
| when the timer expires. | |
| """ | |
| try: | |
| logger.debug("Timer started") | |
| await asyncio.sleep(self.timer_duration) | |
| logger.info(f"Timer expired, sending default message: {self.default_message}") | |
| await self.push_frame(TTSSpeakFrame(self.default_message), FrameDirection.DOWNSTREAM) | |
| except asyncio.CancelledError: | |
| # Timer cancelled | |
| logger.debug("Timer cancelled") | |
| raise | |
| async def _reset_timer(self): | |
| """Reset the proactivity timer. | |
| Internal method that cancels any existing timer and starts a new countdown. | |
| """ | |
| if self._timer_task: | |
| await self.cancel_task(self._timer_task) | |
| logger.debug("Resetting Timer") | |
| self._timer_task = self.create_task(self._start_timer()) | |
| async def _stop_timer(self): | |
| """Stop the proactivity timer. | |
| Internal method that cancels the current timer without starting a new one. | |
| """ | |
| logger.debug("Stopping timer") | |
| if self._timer_task: | |
| await self.cancel_task(self._timer_task) | |
| self._timer_task = None | |
| async def cleanup(self): | |
| """Clean up processor resources. | |
| Ensures the proactivity timer is properly stopped when the processor shuts down. | |
| """ | |
| await super().cleanup() | |
| await self._stop_timer() | |
| async def process_frame(self, frame: Frame, direction: FrameDirection): | |
| """Process incoming frames and manage proactivity timer state. | |
| Handles various conversation events to start, stop, or reset the proactivity timer | |
| based on user presence and speaking states. | |
| Args: | |
| frame (Frame): Incoming frame to process. | |
| direction (FrameDirection): Frame flow direction. | |
| """ | |
| await super().process_frame(frame, direction) | |
| await super().push_frame(frame, direction) | |
| if isinstance(frame, StartedPresenceUserActionFrame): | |
| self._user_present = True | |
| await self._reset_timer() | |
| elif isinstance(frame, BotStoppedSpeakingFrame | UserStoppedSpeakingFrame): | |
| # Whenever the user or bot is done talking we want to reset the timer | |
| if self._user_present: | |
| await self._reset_timer() | |
| elif isinstance(frame, BotStartedSpeakingFrame | UserStartedSpeakingFrame): | |
| # When either the user or the bot starts speaking we don't want to interrupt | |
| if self._user_present: | |
| await self._stop_timer() | |
| elif isinstance(frame, EndFrame): | |
| # Stop the timer when the pipeline ends | |
| await self._stop_timer() | |
| elif isinstance(frame, FinishedPresenceUserActionFrame): | |
| self._user_present = False | |
| await self._stop_timer() | |