Spaces:
Running
Running
File size: 9,632 Bytes
53ea588 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Transcript synchronization processor."""
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
InterimTranscriptionFrame,
StartInterruptionFrame,
TranscriptionFrame,
TTSStartedFrame,
TTSTextFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from nvidia_pipecat.frames.transcripts import (
BotUpdatedSpeakingTranscriptFrame,
UserStoppedSpeakingTranscriptFrame,
UserUpdatedSpeakingTranscriptFrame,
)
class UserTranscriptSynchronization(FrameProcessor):
"""Synchronizes user speech transcription events across the pipeline.
This class synchronizes and exposes user speech transcripts by generating UserUpdatedSpeakingFrame
and UserStoppedSpeakingTranscriptFrame events. It filters high frequency ASR frames to only forward
transcripts that contain actual changes. The final transcript is sent with a
UserStoppedSpeakingTranscriptFrame.
Input Frames:
InterimTranscriptionFrame: ASR partial transcript
TranscriptionFrame: ASR final transcript
UserStartedSpeakingFrame: User starts speaking
UserStoppedSpeakingFrame: User stops speaking
StartInterruptionFrame: Resets processor state
Attributes:
_partial_transcript (str | None): Current partial ASR transcript.
_final_transcript (str): Final ASR transcript.
_stopped_speaking (bool | None): Whether user has stopped speaking.
"""
def __init__(self, user_started_speaking_message: str | None = None):
"""Initializes the processor with default state values.
Args:
user_started_speaking_message (str | None): Optional message to send when user starts speaking.
"""
super().__init__()
self.user_started_speaking_message: str | None = user_started_speaking_message
self._partial_transcript: str | None = None
self._final_transcript: str = ""
self._stopped_speaking: bool | None = None
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Processes frames and updates transcript state.
Args:
frame (Frame): Incoming frame to process.
direction (FrameDirection): Frame flow direction.
"""
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
if isinstance(frame, InterimTranscriptionFrame):
# Check if partial transcript changed
# TODO: We need filter out more of the duplicated or very similar transcripts with Riva TTS
if not self._partial_transcript or self._partial_transcript != frame.text:
self._partial_transcript = frame.text
updated_transcript = (
(self._final_transcript.rstrip() + " " + frame.text) if self._final_transcript else frame.text
)
await self.push_frame(UserUpdatedSpeakingTranscriptFrame(transcript=updated_transcript), direction)
elif isinstance(frame, TranscriptionFrame):
self._final_transcript += f"{frame.text} "
elif isinstance(frame, UserStartedSpeakingFrame):
self._stopped_speaking = False
if self.user_started_speaking_message:
await self.push_frame(
UserUpdatedSpeakingTranscriptFrame(transcript=self.user_started_speaking_message), direction
)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._stopped_speaking = True
elif isinstance(frame, StartInterruptionFrame):
self._partial_transcript = None
self._final_transcript = ""
self._stopped_speaking = None
# We wait until we received both, TranscriptionFrame and UserStoppedSpeakingFrame
if self._final_transcript and self._stopped_speaking:
await self.push_frame(UserStoppedSpeakingTranscriptFrame(self._final_transcript.strip()), direction)
self._partial_transcript = None
self._final_transcript = ""
self._stopped_speaking = None
class BotTranscriptSynchronization(FrameProcessor):
"""Synchronizes bot speech transcripts with audio playback.
Synchronizes TTSTextFrames with BotStartedSpeakingFrame events that indicate the start
of speech audio playback. Creates BotUpdatedSpeakingTranscriptFrame frames containing
partial and final transcripts.
The bot transcription synchronization is based on the assumption that there is a
pair of BotStartedSpeakingFrame and BotStoppedSpeakingFrame for each TTSStartedFrame.
This processor will not work correctly if these assumptions are not met.
Each BotStartedSpeakingFrame will trigger a BotUpdatedSpeakingTranscriptFrame if a
transcript is available. It is ok that multiple [TTSStartedFrame, TTSTextFrame, ...]
sequences come before the corresponding BotStoppedSpeakingFrame.
Input Frames:
TTSStartedFrame: Speech audio playback starts
TTSTextFrame: Text to be spoken
BotStartedSpeakingFrame: Bot starts speaking
BotStoppedSpeakingFrame: Bot stops speaking
StartInterruptionFrame: Resets processor state
Attributes:
_bot_started_speaking (bool): Indicates if the bot has started speaking.
_bot_transcripts_buffer (list[str]): Buffer for bot transcripts to synchronize with audio playback.
"""
def __init__(self):
"""Initialize the BotTranscriptSynchronization processor."""
super().__init__()
self._bot_started_speaking = False
self._bot_transcripts_buffer: list[str] = []
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Processes frames and manages transcript synchronization.
Args:
frame (Frame): Incoming frame to process.
direction (FrameDirection): Frame flow direction.
"""
await super().process_frame(frame, direction)
if isinstance(frame, StartInterruptionFrame):
# Reset transcript buffer
self._bot_transcripts_buffer = []
self._bot_started_speaking = False
await self.push_frame(frame, direction)
elif isinstance(frame, TTSStartedFrame):
# TODO Hacky solution, need proper solution
if self._bot_transcripts_buffer:
self._bot_transcripts_buffer.pop(0)
# Start buffering the next transcript
self._bot_transcripts_buffer.append("")
await self.push_frame(frame, direction)
elif isinstance(frame, TTSTextFrame):
# Aggregate partial transcripts
if not self._bot_transcripts_buffer:
logger.warning("TTSTextFrame received before TTSStartedFrame!")
# It looks like some TTS processors keep on sending TTSTextFrame even after a StartInterruptionFrame.
else:
if frame.text:
if self._bot_transcripts_buffer[-1]:
self._bot_transcripts_buffer[-1] += f" {frame.text}"
else:
self._bot_transcripts_buffer[-1] = frame.text
# TODO: We need to figure out how to align the partial transcripts
# with the audio. Currently, they are shown as soon as they come in.
# This is needed to get the full transcript from Elevenlabs TTS
# since it creates TTSStartedFrame right with the first incoming word
# and does not stop until all the text is generated!
if self._bot_started_speaking and len(self._bot_transcripts_buffer) == 1:
# Only create BotUpdatedSpeakingTranscriptFrame if
# BotStartedSpeakingFrame is related to the first active transcript
await self.push_frame(
BotUpdatedSpeakingTranscriptFrame(transcript=self._bot_transcripts_buffer[-1].strip()),
direction,
)
await self.push_frame(frame, direction)
elif isinstance(frame, BotStartedSpeakingFrame):
# Start synchronizing transcript from buffer with BotStartedSpeakingFrame
if self._bot_started_speaking:
logger.warning("BotStartedSpeakingFrame received before BotStoppedSpeakingFrame!")
self._bot_started_speaking = True
await self.push_frame(frame, direction)
if self._bot_transcripts_buffer and self._bot_transcripts_buffer[0]:
# If already available push the transcript
await self.push_frame(
BotUpdatedSpeakingTranscriptFrame(self._bot_transcripts_buffer[0]), FrameDirection.DOWNSTREAM
)
elif isinstance(frame, BotStoppedSpeakingFrame):
# Remove the shown transcript from the buffer
if not self._bot_started_speaking:
logger.warning("BotStoppedSpeakingFrame received before BotStartedSpeakingFrame!")
self._bot_started_speaking = False
if self._bot_transcripts_buffer:
self._bot_transcripts_buffer.pop(0)
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
|