Spaces:
Running
Running
File size: 2,768 Bytes
53ea588 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
"""Utility processors for pipecat."""
from loguru import logger
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameBlockingProcessor(FrameProcessor):
"""A frame processor that blocks forwarding frames after a specified frame number of a specific type.
This processor counts frames of a specific type and stops forwarding them after reaching a specified threshold.
It can be used to limit the number of specific frame types processed in a pipeline.
Args:
block_after_frame (int): The frame number after which to block forwarding frames.
frame_type (Type[Frame]): The type of frame to count and block.
reset_frame_type (Optional[Type[Frame]]): If provided, frames of this type will reset the counter.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
def __init__(
self, block_after_frame: int, frame_type: type[Frame], reset_frame_type: type[Frame] | None = None, **kwargs
):
"""Initialize the frame blocking processor.
Args:
block_after_frame: The frame number after which to block forwarding frames.
frame_type: The type of frame to count and block.
reset_frame_type: If provided, frames of this type will reset the counter.
**kwargs: Additional arguments passed to parent FrameProcessor.
"""
super().__init__(**kwargs)
self.block_after_frame = block_after_frame
self.frame_type = frame_type
self.reset_frame_type = reset_frame_type
self.frame_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process a frame.
Args:
frame: The frame to process.
direction: The direction of frame processing.
"""
await super().process_frame(frame, direction)
# Check for reset frame type first
if self.reset_frame_type and isinstance(frame, self.reset_frame_type):
logger.debug(f"Resetting counter on {self.reset_frame_type.__name__} frame")
self.frame_count = 0
await self.push_frame(frame, direction)
elif isinstance(frame, self.frame_type):
self.frame_count += 1
if self.frame_count <= self.block_after_frame:
await self.push_frame(frame, direction)
else:
logger.debug(
f"Blocking {self.frame_type.__name__} frame {self.frame_count}"
f" (threshold: {self.block_after_frame})"
)
else:
# Forward non-matching frame types without counting
await self.push_frame(frame, direction)
|