File size: 23,535 Bytes
53ea588
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
"""Speech-to-speech client with latency measurement for performance testing.

This module provides a WebSocket client that sends audio files to a speech-to-speech
service and measures the latency between when user audio ends and bot response begins.
"""

import argparse
import asyncio
import datetime
import io
import json
import os
import signal
import sys
import time
import uuid
import wave

import websockets
from pipecat.frames.protobufs import frames_pb2
from websockets.exceptions import ConnectionClosed


def log_error(msg):
    """Write error message to stderr with timestamp."""
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[ERROR] {timestamp} - {msg}", file=sys.stderr, flush=True)


# Global constants
SILENCE_TIMEOUT = 0.2  # Standard silence timeout in seconds
CHUNK_DURATION_MS = 32  # Standard chunk duration in milliseconds

# List to store latency values
latency_values = []

# List to store filtered latency values (above threshold)
filtered_latency_values = []

# Global variable to track timestamps
timestamps = {"input_audio_file_end": None, "first_response_after_input": None}

# Global glitch detection
glitch_detected = False

# Global flag and event for controlling silence sending
silence_control = {
    "running": False,
    "event": asyncio.Event(),
    "audio_params": None,  # Will store (frame_rate, n_channels, chunk_size)
}

# Global control for continuous operation
continuous_control = {
    "running": True,
    "collecting_metrics": False,
    "start_time": None,
    "test_duration": 100,  # Default 100 seconds
    "threshold": 0.5,  # Default threshold for filtered latency
}


# Signal handler for graceful shutdown
def signal_handler(signum, frame):
    """Handle system signals for graceful shutdown."""
    print(f"\nReceived signal {signum}, shutting down gracefully...")
    continuous_control["running"] = False
    sys.exit(0)


# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)


def write_audio_to_wav(data, wf, create_new_file=False, output_file="bot_response.wav"):
    """Write audio data to WAV file."""
    try:
        # Parse protobuf frame
        try:
            proto = frames_pb2.Frame.FromString(data)
            which = proto.WhichOneof("frame")
            if which is None:
                return wf, None, None, None
        except Exception as e:
            log_error(f"Failed to parse protobuf frame: {e}")
            return wf, None, None, None

        args = getattr(proto, which)
        sample_rate = getattr(args, "sample_rate", 16000)
        num_channels = getattr(args, "num_channels", 1)
        audio_data = getattr(args, "audio", None)
        if audio_data is None:
            return wf, None, None, None

        # Extract raw audio data from WAV format if needed
        try:
            with io.BytesIO(audio_data) as buffer, wave.open(buffer, "rb") as wav_file:
                audio_data = wav_file.readframes(wav_file.getnframes())
                sample_rate = wav_file.getframerate()
                num_channels = wav_file.getnchannels()
        except Exception:
            # If not WAV format, use audio_data as-is
            pass

        # Create WAV file if needed
        if create_new_file and wf is None:
            try:
                wf = wave.open(output_file, "wb")  # noqa: SIM115
                wf.setnchannels(num_channels)
                wf.setsampwidth(2)
                wf.setframerate(sample_rate)
            except Exception as e:
                log_error(f"Failed to create WAV file {output_file}: {e}")
                return None, None, None, None

        # Write audio data directly
        if wf is not None:
            try:
                wf.writeframes(audio_data)
            except Exception as e:
                log_error(f"Failed to write audio data: {e}")
                return None, None, None, None

        return wf, sample_rate, num_channels, audio_data
    except Exception as e:
        log_error(f"Unexpected error in write_audio_to_wav: {e}")
        return wf, None, None, None


async def send_audio_file(websocket, file_path):
    """Send audio file content with streaming simulation."""
    # Pause silence sending while we send the real audio
    silence_control["event"].set()

    try:
        if not os.path.exists(file_path):
            log_error(f"Input audio file not found: {file_path}")
            return

        try:
            with wave.open(file_path, "rb") as wav_file:
                n_channels = wav_file.getnchannels()
                frame_rate = wav_file.getframerate()
                sample_width = wav_file.getsampwidth()

                # Store audio parameters for silence generation
                chunk_size = int((frame_rate * n_channels * CHUNK_DURATION_MS) / 1000) * sample_width
                silence_control["audio_params"] = (
                    frame_rate,
                    n_channels,
                    chunk_size,
                )

                # Stream the audio file
                frames_sent = 0
                while True:
                    try:
                        chunk = wav_file.readframes(chunk_size // sample_width)
                        if not chunk:
                            break
                        audio_frame = frames_pb2.AudioRawFrame(
                            audio=chunk, sample_rate=frame_rate, num_channels=n_channels
                        )
                        frame = frames_pb2.Frame(audio=audio_frame)
                        await websocket.send(frame.SerializeToString())
                        frames_sent += 1
                        await asyncio.sleep(CHUNK_DURATION_MS / 1000)
                    except Exception as e:
                        log_error(f"Error sending audio frame {frames_sent}: {e}")
                        raise  # Re-raise to handle in outer try block
        except wave.Error as e:
            log_error(f"Failed to read WAV file {file_path}: {e}")
            return
    except Exception as e:
        log_error(f"Error in send_audio_file: {e}")
        return
    finally:
        # Always record when input audio ends and resume silence sending
        timestamps["input_audio_file_end"] = datetime.datetime.now()
        print(f"User stopped speaking at: {timestamps['input_audio_file_end'].strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
        silence_control["event"].clear()


async def silence_sender_loop(websocket):
    """Background task to continuously send silence when no other audio is being sent."""
    silence_control["running"] = True
    print("Silence sender loop started")
    consecutive_errors = 0
    max_consecutive_errors = 5

    try:
        while silence_control["running"]:
            try:
                # Wait until we're allowed to send silence
                if silence_control["event"].is_set() or silence_control["audio_params"] is None:
                    await asyncio.sleep(0.1)  # Short sleep to avoid CPU spinning
                    continue

                # Extract audio parameters
                frame_rate, n_channels, chunk_size = silence_control["audio_params"]

                # Send a chunk of silence
                silent_chunk = b"\x00" * chunk_size
                audio_frame = frames_pb2.AudioRawFrame(
                    audio=silent_chunk, sample_rate=frame_rate, num_channels=n_channels
                )
                frame = frames_pb2.Frame(audio=audio_frame)
                await websocket.send(frame.SerializeToString())
                await asyncio.sleep(CHUNK_DURATION_MS / 1000)

                # Reset error counter on successful send
                consecutive_errors = 0

            except ConnectionClosed:
                print("WebSocket connection closed in silence sender loop")
                break
            except Exception as e:
                consecutive_errors += 1
                print(f"Error in silence sender loop (attempt {consecutive_errors}/{max_consecutive_errors}): {e}")

                # If too many consecutive errors, stop the loop
                if consecutive_errors >= max_consecutive_errors:
                    print(f"Too many consecutive errors ({consecutive_errors}), stopping silence sender")
                    break

                # Brief pause before retry to avoid overwhelming the system
                await asyncio.sleep(1.0)

    except Exception as e:
        print(f"Fatal error in silence sender loop: {e}")
    finally:
        print("Silence sender loop stopped")
        silence_control["running"] = False


async def receive_audio(
    websocket,
    wf=None,
    create_new_file=True,
    is_after_input=False,
    output_wav="bot_response.wav",
    is_initial=False,
    timeout=1.0,
):
    """Receive audio data and handle streaming playback simulation."""
    global glitch_detected

    if is_initial:
        print("Waiting up to 5 seconds for initial bot introduction audio if available...")
        try:
            # Wait for first data packet with 5 second timeout
            data = await asyncio.wait_for(websocket.recv(), timeout=5.0)
        except TimeoutError:
            print("No initial bot introduction received after 5 seconds, continuing...")
            return wf
    else:
        # For non-initial audio, receive normally
        data = await websocket.recv()

    try:
        # Wait for first data packet
        data = await websocket.recv()

        # Record first response timestamp if after input
        if is_after_input:
            timestamps["first_response_after_input"] = datetime.datetime.now()
            formatted_time = timestamps["first_response_after_input"].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
            print(f"Bot started speaking at {formatted_time}")

        # Process first audio packet
        wf, sample_rate, num_channels, audio_data = write_audio_to_wav(data, wf, create_new_file, output_wav)

        # Initialize timing for glitch detection
        audio_start_time = time.time()
        cumulative_audio_duration = 0.0  # Total duration of audio received (in seconds)

        # Calculate duration of first chunk if we have audio data
        if audio_data and sample_rate and num_channels:
            bytes_per_sample = 2  # Assuming 16-bit audio
            samples_in_chunk = len(audio_data) // (num_channels * bytes_per_sample)
            chunk_duration_seconds = samples_in_chunk / sample_rate
            cumulative_audio_duration += chunk_duration_seconds

        # Continue receiving audio data until silence threshold reached
        last_data_time = time.time()

        while True:
            try:
                data = await asyncio.wait_for(websocket.recv(), timeout=timeout)
                current_time = time.time()
                last_data_time = current_time

                # Process audio data
                wf, sample_rate, num_channels, audio_data = write_audio_to_wav(data, wf, False, output_wav)

                # Update cumulative audio duration
                if audio_data and sample_rate and num_channels:
                    bytes_per_sample = 2  # Assuming 16-bit audio
                    samples_in_chunk = len(audio_data) // (num_channels * bytes_per_sample)
                    chunk_duration_seconds = samples_in_chunk / sample_rate
                    cumulative_audio_duration += chunk_duration_seconds

                    # Check for glitch: real elapsed time vs cumulative audio duration
                    real_elapsed_time = current_time - audio_start_time
                    audio_deficit = real_elapsed_time - cumulative_audio_duration

                    if audio_deficit >= 0.032:  # 32ms threshold for glitch detection
                        print(f"Audio glitch detected: {audio_deficit * 1000:.1f}ms audio deficit")
                        glitch_detected = True

            except TimeoutError:
                # Check if silence duration exceeds threshold
                if time.time() - last_data_time >= SILENCE_TIMEOUT:
                    return wf
            except Exception as e:
                log_error(f"Error receiving audio data: {e}")
                if wf is not None and create_new_file:
                    try:
                        wf.close()
                    except Exception as close_error:
                        log_error(f"Error closing WAV file: {close_error}")
                return None
    except Exception as e:
        log_error(f"Fatal error in receive_audio: {e}")
        if wf is not None and create_new_file:
            try:
                wf.close()
            except Exception as close_error:
                log_error(f"Error closing WAV file: {close_error}")
        return None


async def process_conversation_turn(websocket, audio_file_path, wf, turn_index, output_wav="bot_response.wav"):
    """Process a single conversation turn with the given audio file."""
    print(f"\n----- Processing conversation turn {turn_index + 1} -----")

    # Reset timestamps for this turn
    timestamps["input_audio_file_end"] = None
    timestamps["first_response_after_input"] = None

    # Start both sending and receiving in parallel for realistic latency measurement
    print(f"Sending user input audio from {audio_file_path}...")

    # Start sending audio file in background
    send_task = asyncio.create_task(send_audio_file(websocket, audio_file_path))

    # Start receiving bot response immediately (parallel to sending)
    receive_task = asyncio.create_task(
        receive_audio(websocket, wf=wf, create_new_file=(wf is None), is_after_input=True, output_wav=output_wav)
    )

    # Wait for both tasks to complete
    wf = await receive_task
    await send_task  # Ensure sending is also complete

    # Calculate and store latency only if we're collecting metrics
    if continuous_control["collecting_metrics"]:
        latency = None
        if timestamps["input_audio_file_end"] is not None and timestamps["first_response_after_input"] is not None:
            latency = (timestamps["first_response_after_input"] - timestamps["input_audio_file_end"]).total_seconds()
            print(f"Latency for Turn {turn_index + 1}: {latency:.3f} seconds")
            latency_values.append(latency)

            # Add to filtered latency if above threshold
            if latency > continuous_control["threshold"]:
                filtered_latency_values.append(latency)
            else:
                print("Reverse Barge-In Detected!")

    return wf


async def continuous_audio_loop(websocket, audio_files, wf, output_wav):
    """Continuously loop through audio files until stopped."""
    turn_index = 0

    while continuous_control["running"]:
        # Check if we should start collecting metrics
        if (
            continuous_control["start_time"]
            and time.time() >= continuous_control["start_time"]
            and not continuous_control["collecting_metrics"]
        ):
            continuous_control["collecting_metrics"] = True
            print(f"\n=== STARTING METRICS COLLECTION at {datetime.datetime.now().strftime('%H:%M:%S')} ===")

        # Check if we should stop collecting metrics
        if (
            continuous_control["start_time"]
            and continuous_control["collecting_metrics"]
            and time.time() >= continuous_control["start_time"] + continuous_control["test_duration"]
        ):
            print(f"\n=== STOPPING METRICS COLLECTION at {datetime.datetime.now().strftime('%H:%M:%S')} ===")
            continuous_control["collecting_metrics"] = False
            continuous_control["running"] = False
            break

        # Process current audio file
        audio_file = audio_files[turn_index % len(audio_files)]
        wf = await process_conversation_turn(websocket, audio_file, wf, turn_index, output_wav)
        turn_index += 1

        # Small delay between turns to prevent overwhelming the system
        await asyncio.sleep(0.1)

    return wf


async def main():
    """Main execution function."""
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Speech-to-speech client with latency measurement")
    parser.add_argument(
        "--stream-id", type=str, default=str(uuid.uuid4()), help="Unique stream ID (default: random UUID)"
    )
    parser.add_argument("--host", type=str, default="0.0.0.0", help="WebSocket server host (default: 0.0.0.0)")
    parser.add_argument("--port", type=int, default=8100, help="WebSocket server port (default: 8100)")
    parser.add_argument(
        "--output-dir", type=str, default="./results", help="Directory to store output files (default: ./results)"
    )
    parser.add_argument("--start-delay", type=float, default=0, help="Delay in seconds before starting (default: 0)")
    parser.add_argument(
        "--metrics-start-time",
        type=float,
        default=0,
        help="Unix timestamp when to start collecting metrics (default: 0)",
    )
    parser.add_argument(
        "--test-duration", type=float, default=100, help="Duration in seconds to collect metrics (default: 100)"
    )
    parser.add_argument(
        "--threshold", type=float, default=0.5, help="Threshold for filtered average latency calculation (default: 0.5)"
    )
    args = parser.parse_args()

    # Create output directory if it doesn't exist
    os.makedirs(args.output_dir, exist_ok=True)

    # Construct WebSocket URI with unique stream ID
    uri = f"ws://{args.host}:{args.port}/ws/{args.stream_id}"

    # Output file paths
    output_wav = os.path.join(args.output_dir, f"bot_response_{args.stream_id}.wav")
    output_results = os.path.join(args.output_dir, f"latency_results_{args.stream_id}.json")

    print(f"Starting client with stream ID: {args.stream_id}")
    print(f"WebSocket URI: {uri}")
    print(f"Start delay: {args.start_delay} seconds")
    print(f"Metrics start time: {args.metrics_start_time}")
    print(f"Test duration: {args.test_duration} seconds")
    print(f"Latency threshold: {args.threshold} seconds")

    # Set up timing controls
    if args.start_delay > 0:
        print(f"Waiting {args.start_delay} seconds before starting...")
        await asyncio.sleep(args.start_delay)

    if args.metrics_start_time > 0:
        continuous_control["start_time"] = args.metrics_start_time
        continuous_control["test_duration"] = args.test_duration
        print(f"Will start collecting metrics at timestamp {args.metrics_start_time}")

    # Set threshold for filtered latency calculation
    continuous_control["threshold"] = args.threshold

    # Define the array of input audio files
    # Get the directory where this script is located
    script_dir = os.path.dirname(os.path.abspath(__file__))
    audio_files_dir = os.path.join(script_dir, "audio_files")

    input_audio_files = [
        os.path.join(audio_files_dir, "output_file.wav"),
        # os.path.join(audio_files_dir, "query_1.wav"),
        # os.path.join(audio_files_dir, "query_2.wav"),
        # os.path.join(audio_files_dir, "query_3.wav"),
        # os.path.join(audio_files_dir, "query_4.wav"),
        # os.path.join(audio_files_dir, "query_5.wav"),
        # os.path.join(audio_files_dir, "query_6.wav"),
        # os.path.join(audio_files_dir, "query_7.wav"),
        # os.path.join(audio_files_dir, "query_8.wav"),
        # os.path.join(audio_files_dir, "query_9.wav"),
        # os.path.join(audio_files_dir, "query_10.wav"),
    ]

    # Clear any previous values
    latency_values.clear()
    filtered_latency_values.clear()

    # Initialize silence control
    silence_control["event"] = asyncio.Event()
    silence_control["event"].set()  # Start with silence sending paused

    try:
        async with websockets.connect(uri) as websocket:
            # First, try to receive any initial output audio
            wf = await receive_audio(
                websocket,
                wf=None,
                create_new_file=True,
                is_after_input=False,
                output_wav=output_wav,
                is_initial=True,
            )

            # Start the silence sender task
            asyncio.create_task(silence_sender_loop(websocket))

            # Start continuous audio loop
            wf = await continuous_audio_loop(websocket, input_audio_files, wf, output_wav)

            # Clean up and stop the silence sender
            silence_control["running"] = False
            silence_control["event"].set()  # Make sure it's not waiting
            await asyncio.sleep(0.2)  # Give it time to exit cleanly

            if wf is not None:
                wf.close()
                print(f"All output saved to {output_wav}")

    except ConnectionClosed:
        # Normal WebSocket closure, not an error
        pass
    except Exception as e:
        print(f"Connection error: {e}")
    finally:
        # Always save results, regardless of how the connection ended
        if latency_values:
            avg_latency = sum(latency_values) / len(latency_values)

            # Calculate filtered average latency
            filtered_avg_latency = None
            if filtered_latency_values:
                filtered_avg_latency = sum(filtered_latency_values) / len(filtered_latency_values)

            print("\n----- Final Latency Summary -----")
            print(f"Average Latency across {len(latency_values)} turns: {avg_latency:.3f} seconds")

            if filtered_avg_latency is not None:
                print(
                    f"Filtered Average Latency (>{args.threshold}s) across {len(filtered_latency_values)} turns: "
                    f"{filtered_avg_latency:.3f} seconds"
                )
            else:
                print(f"Filtered Average Latency: No latencies above {args.threshold}s threshold")

            # Calculate reverse barge-ins (latencies below threshold)
            reverse_barge_ins_count = len(latency_values) - len(filtered_latency_values)
            print(f"Reverse Barge-Ins Detected: {reverse_barge_ins_count} latencies below {args.threshold}s threshold")

            # Report glitch detection results
            if glitch_detected:
                print("⚠️  AUDIO GLITCHES DETECTED: Audio chunks arrived with gaps larger than playback time")
            else:
                print("✅ No audio glitches detected: Audio streaming was smooth")

            print("----------------------------------------")

            # Save results to JSON file
            results = {
                "stream_id": args.stream_id,
                "individual_latencies": latency_values,
                "average_latency": avg_latency,
                "filtered_latencies": filtered_latency_values,
                "filtered_average_latency": filtered_avg_latency,
                "threshold": args.threshold,
                "num_turns": len(latency_values),
                "num_filtered_turns": len(filtered_latency_values),
                "reverse_barge_ins_count": len(latency_values) - len(filtered_latency_values),
                "glitch_detected": glitch_detected,
                "timestamp": datetime.datetime.now().isoformat(),
                "metrics_start_time": continuous_control["start_time"],
                "test_duration": continuous_control["test_duration"],
            }

            with open(output_results, "w") as f:
                json.dump(results, f, indent=2)


if __name__ == "__main__":
    asyncio.run(main())