File size: 9,632 Bytes
53ea588
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License

"""Transcript synchronization processor."""

from loguru import logger
from pipecat.frames.frames import (
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    Frame,
    InterimTranscriptionFrame,
    StartInterruptionFrame,
    TranscriptionFrame,
    TTSStartedFrame,
    TTSTextFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

from nvidia_pipecat.frames.transcripts import (
    BotUpdatedSpeakingTranscriptFrame,
    UserStoppedSpeakingTranscriptFrame,
    UserUpdatedSpeakingTranscriptFrame,
)


class UserTranscriptSynchronization(FrameProcessor):
    """Synchronizes user speech transcription events across the pipeline.

    This class synchronizes and exposes user speech transcripts by generating UserUpdatedSpeakingFrame
    and UserStoppedSpeakingTranscriptFrame events. It filters high frequency ASR frames to only forward
    transcripts that contain actual changes. The final transcript is sent with a
    UserStoppedSpeakingTranscriptFrame.

    Input Frames:
        InterimTranscriptionFrame: ASR partial transcript
        TranscriptionFrame: ASR final transcript
        UserStartedSpeakingFrame: User starts speaking
        UserStoppedSpeakingFrame: User stops speaking
        StartInterruptionFrame: Resets processor state

    Attributes:
        _partial_transcript (str | None): Current partial ASR transcript.
        _final_transcript (str): Final ASR transcript.
        _stopped_speaking (bool | None): Whether user has stopped speaking.
    """

    def __init__(self, user_started_speaking_message: str | None = None):
        """Initializes the processor with default state values.

        Args:
            user_started_speaking_message (str | None): Optional message to send when user starts speaking.
        """
        super().__init__()
        self.user_started_speaking_message: str | None = user_started_speaking_message
        self._partial_transcript: str | None = None
        self._final_transcript: str = ""
        self._stopped_speaking: bool | None = None

    async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
        """Processes frames and updates transcript state.

        Args:
            frame (Frame): Incoming frame to process.
            direction (FrameDirection): Frame flow direction.
        """
        await super().process_frame(frame, direction)
        await self.push_frame(frame, direction)

        if isinstance(frame, InterimTranscriptionFrame):
            # Check if partial transcript changed
            # TODO: We need filter out more of the duplicated or very similar transcripts with Riva TTS
            if not self._partial_transcript or self._partial_transcript != frame.text:
                self._partial_transcript = frame.text
                updated_transcript = (
                    (self._final_transcript.rstrip() + " " + frame.text) if self._final_transcript else frame.text
                )
                await self.push_frame(UserUpdatedSpeakingTranscriptFrame(transcript=updated_transcript), direction)
        elif isinstance(frame, TranscriptionFrame):
            self._final_transcript += f"{frame.text} "
        elif isinstance(frame, UserStartedSpeakingFrame):
            self._stopped_speaking = False
            if self.user_started_speaking_message:
                await self.push_frame(
                    UserUpdatedSpeakingTranscriptFrame(transcript=self.user_started_speaking_message), direction
                )
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._stopped_speaking = True
        elif isinstance(frame, StartInterruptionFrame):
            self._partial_transcript = None
            self._final_transcript = ""
            self._stopped_speaking = None

        # We wait until we received both, TranscriptionFrame and UserStoppedSpeakingFrame
        if self._final_transcript and self._stopped_speaking:
            await self.push_frame(UserStoppedSpeakingTranscriptFrame(self._final_transcript.strip()), direction)
            self._partial_transcript = None
            self._final_transcript = ""
            self._stopped_speaking = None


class BotTranscriptSynchronization(FrameProcessor):
    """Synchronizes bot speech transcripts with audio playback.

    Synchronizes TTSTextFrames with BotStartedSpeakingFrame events that indicate the start
    of speech audio playback. Creates BotUpdatedSpeakingTranscriptFrame frames containing
    partial and final transcripts.

    The bot transcription synchronization is based on the assumption that there is a
    pair of BotStartedSpeakingFrame and BotStoppedSpeakingFrame for each TTSStartedFrame.
    This processor will not work correctly if these assumptions are not met.
    Each BotStartedSpeakingFrame will trigger a BotUpdatedSpeakingTranscriptFrame if a
    transcript is available. It is ok that multiple [TTSStartedFrame, TTSTextFrame, ...]
    sequences come before the corresponding BotStoppedSpeakingFrame.

    Input Frames:
        TTSStartedFrame: Speech audio playback starts
        TTSTextFrame: Text to be spoken
        BotStartedSpeakingFrame: Bot starts speaking
        BotStoppedSpeakingFrame: Bot stops speaking
        StartInterruptionFrame: Resets processor state

    Attributes:
        _bot_started_speaking (bool): Indicates if the bot has started speaking.
        _bot_transcripts_buffer (list[str]): Buffer for bot transcripts to synchronize with audio playback.
    """

    def __init__(self):
        """Initialize the BotTranscriptSynchronization processor."""
        super().__init__()
        self._bot_started_speaking = False
        self._bot_transcripts_buffer: list[str] = []

    async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
        """Processes frames and manages transcript synchronization.

        Args:
            frame (Frame): Incoming frame to process.
            direction (FrameDirection): Frame flow direction.
        """
        await super().process_frame(frame, direction)
        if isinstance(frame, StartInterruptionFrame):
            # Reset transcript buffer
            self._bot_transcripts_buffer = []
            self._bot_started_speaking = False
            await self.push_frame(frame, direction)
        elif isinstance(frame, TTSStartedFrame):
            # TODO Hacky solution, need proper solution
            if self._bot_transcripts_buffer:
                self._bot_transcripts_buffer.pop(0)
            # Start buffering the next transcript
            self._bot_transcripts_buffer.append("")
            await self.push_frame(frame, direction)
        elif isinstance(frame, TTSTextFrame):
            # Aggregate partial transcripts
            if not self._bot_transcripts_buffer:
                logger.warning("TTSTextFrame received before TTSStartedFrame!")
                # It looks like some TTS processors keep on sending TTSTextFrame even after a StartInterruptionFrame.
            else:
                if frame.text:
                    if self._bot_transcripts_buffer[-1]:
                        self._bot_transcripts_buffer[-1] += f" {frame.text}"
                    else:
                        self._bot_transcripts_buffer[-1] = frame.text
                # TODO: We need to figure out how to align the partial transcripts
                # with the audio. Currently, they are shown as soon as they come in.
                # This is needed to get the full transcript from Elevenlabs TTS
                # since it creates TTSStartedFrame right with the first incoming word
                # and does not stop until all the text is generated!
                if self._bot_started_speaking and len(self._bot_transcripts_buffer) == 1:
                    # Only create BotUpdatedSpeakingTranscriptFrame if
                    # BotStartedSpeakingFrame is related to the first active transcript
                    await self.push_frame(
                        BotUpdatedSpeakingTranscriptFrame(transcript=self._bot_transcripts_buffer[-1].strip()),
                        direction,
                    )
            await self.push_frame(frame, direction)
        elif isinstance(frame, BotStartedSpeakingFrame):
            # Start synchronizing transcript from buffer with BotStartedSpeakingFrame
            if self._bot_started_speaking:
                logger.warning("BotStartedSpeakingFrame received before BotStoppedSpeakingFrame!")
            self._bot_started_speaking = True
            await self.push_frame(frame, direction)
            if self._bot_transcripts_buffer and self._bot_transcripts_buffer[0]:
                # If already available push the transcript
                await self.push_frame(
                    BotUpdatedSpeakingTranscriptFrame(self._bot_transcripts_buffer[0]), FrameDirection.DOWNSTREAM
                )
        elif isinstance(frame, BotStoppedSpeakingFrame):
            # Remove the shown transcript from the buffer
            if not self._bot_started_speaking:
                logger.warning("BotStoppedSpeakingFrame received before BotStartedSpeakingFrame!")
            self._bot_started_speaking = False
            if self._bot_transcripts_buffer:
                self._bot_transcripts_buffer.pop(0)
            await self.push_frame(frame, direction)
        else:
            await self.push_frame(frame, direction)