fciannella's picture
Working with service run on 7860
53ea588
raw
history blame
2.77 kB
"""Utility processors for pipecat."""
from loguru import logger
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameBlockingProcessor(FrameProcessor):
"""A frame processor that blocks forwarding frames after a specified frame number of a specific type.
This processor counts frames of a specific type and stops forwarding them after reaching a specified threshold.
It can be used to limit the number of specific frame types processed in a pipeline.
Args:
block_after_frame (int): The frame number after which to block forwarding frames.
frame_type (Type[Frame]): The type of frame to count and block.
reset_frame_type (Optional[Type[Frame]]): If provided, frames of this type will reset the counter.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
def __init__(
self, block_after_frame: int, frame_type: type[Frame], reset_frame_type: type[Frame] | None = None, **kwargs
):
"""Initialize the frame blocking processor.
Args:
block_after_frame: The frame number after which to block forwarding frames.
frame_type: The type of frame to count and block.
reset_frame_type: If provided, frames of this type will reset the counter.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
super().__init__(**kwargs)
self.block_after_frame = block_after_frame
self.frame_type = frame_type
self.reset_frame_type = reset_frame_type
self.frame_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process a frame.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
# Check for reset frame type first
if self.reset_frame_type and isinstance(frame, self.reset_frame_type):
logger.debug(f"Resetting counter on {self.reset_frame_type.__name__} frame")
self.frame_count = 0
await self.push_frame(frame, direction)
elif isinstance(frame, self.frame_type):
self.frame_count += 1
if self.frame_count <= self.block_after_frame:
await self.push_frame(frame, direction)
else:
logger.debug(
f"Blocking {self.frame_type.__name__} frame {self.frame_count}"
f" (threshold: {self.block_after_frame})"
)
else:
# Forward non-matching frame types without counting
await self.push_frame(frame, direction)