fciannella's picture
Working with service run on 7860
53ea588
raw
history blame
5.26 kB
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Proactivity processor that manages automated bot responses during conversation lulls.
Monitors conversation activity and triggers automated responses after periods of silence
to maintain user engagement.
"""
import asyncio
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
Frame,
TTSSpeakFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from nvidia_pipecat.frames.action import FinishedPresenceUserActionFrame, StartedPresenceUserActionFrame
class ProactivityProcessor(FrameProcessor):
"""Manages automated bot responses during conversation pauses.
Monitors user presence and conversation activity, automatically generating
proactive messages during extended periods of silence.
Attributes:
default_message (str): Message sent when the inactivity timer expires.
timer_duration (float): Seconds to wait before triggering proactive message.
Input Frames:
StartedPresenceUserActionFrame: User becomes present
FinishedPresenceUserActionFrame: User leaves
BotStartedSpeakingFrame: Bot starts speaking
BotStoppedSpeakingFrame: Bot stops speaking
UserStartedSpeakingFrame: User starts speaking
UserStoppedSpeakingFrame: User stops speaking
EndFrame: Pipeline ends
"""
def __init__(self, default_message: str = "I'm here if you need me!", timer_duration: float = 100, **kwargs):
"""Initializes the processor with specified message and timer settings.
Args:
default_message (str): Message sent when inactivity timer expires.
timer_duration (float): Seconds to wait before sending message.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
super().__init__(**kwargs)
self.default_message = default_message
self.timer_duration = timer_duration
self._timer_task = None
self._user_present = False
async def _start_timer(self):
"""Start the proactivity timer.
Internal method that manages the timer countdown and sends the default message
when the timer expires.
"""
try:
logger.debug("Timer started")
await asyncio.sleep(self.timer_duration)
logger.info(f"Timer expired, sending default message: {self.default_message}")
await self.push_frame(TTSSpeakFrame(self.default_message), FrameDirection.DOWNSTREAM)
except asyncio.CancelledError:
# Timer cancelled
logger.debug("Timer cancelled")
raise
async def _reset_timer(self):
"""Reset the proactivity timer.
Internal method that cancels any existing timer and starts a new countdown.
"""
if self._timer_task:
await self.cancel_task(self._timer_task)
logger.debug("Resetting Timer")
self._timer_task = self.create_task(self._start_timer())
async def _stop_timer(self):
"""Stop the proactivity timer.
Internal method that cancels the current timer without starting a new one.
"""
logger.debug("Stopping timer")
if self._timer_task:
await self.cancel_task(self._timer_task)
self._timer_task = None
async def cleanup(self):
"""Clean up processor resources.
Ensures the proactivity timer is properly stopped when the processor shuts down.
"""
await super().cleanup()
await self._stop_timer()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and manage proactivity timer state.
Handles various conversation events to start, stop, or reset the proactivity timer
based on user presence and speaking states.
Args:
frame (Frame): Incoming frame to process.
direction (FrameDirection): Frame flow direction.
"""
await super().process_frame(frame, direction)
await super().push_frame(frame, direction)
if isinstance(frame, StartedPresenceUserActionFrame):
self._user_present = True
await self._reset_timer()
elif isinstance(frame, BotStoppedSpeakingFrame | UserStoppedSpeakingFrame):
# Whenever the user or bot is done talking we want to reset the timer
if self._user_present:
await self._reset_timer()
elif isinstance(frame, BotStartedSpeakingFrame | UserStartedSpeakingFrame):
# When either the user or the bot starts speaking we don't want to interrupt
if self._user_present:
await self._stop_timer()
elif isinstance(frame, EndFrame):
# Stop the timer when the pipeline ends
await self._stop_timer()
elif isinstance(frame, FinishedPresenceUserActionFrame):
self._user_present = False
await self._stop_timer()