File size: 2,472 Bytes
53ea588
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License

"""Logging utilities when working with pipecat pipelines connected to ACE."""

import asyncio
import sys

from loguru import logger


async def logger_context(coro, **kwargs):
    """Wrapper coroutine to contextualize the logger during the execution of `coro`."""
    with logger.contextualize(**kwargs):
        return await coro


def setup_default_ace_logging(
    level: str = "INFO",
    stream_id: str | None = None,
):
    """Setup logger for ACE.

    Updates the logging format to include stream_id if available.

    Args:
        stream_id (str, optional) : Set the stream_id globally to use in log output.
            If you want different stream ids for different pipelines running
            in the same process use `logger_context` instead
        level (str): Logging level
    """
    logger_format = (
        "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
        "<level>{level: <8}</level> | "
        "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
        "streamId={extra[stream_id]} - <level>{message}</level>"
    )
    logger.configure(extra={"stream_id": stream_id or "n/a"})  # Default values
    logger.remove()
    logger.add(sys.stderr, format=logger_format, level=level)


def log_execution(func):
    """Decorator to log the start and end of an async function execution.

    Args:
        func (coroutine): The async function to be decorated.

    Returns:
        coroutine: The wrapped async function.
    """

    async def wrapper(*args, **kwargs):
        class_name = args[0].__class__.__name__ if args and hasattr(args[0], "__class__") else None
        name = f"{class_name}.{func.__name__}" if class_name else func.__name__
        args_str = ", ".join(str(arg)[:20] for arg in args)
        kwargs_str = ", ".join(f"{k}={str(v)[:20]}" for k, v in kwargs.items())
        parameters = f"{args_str}, {kwargs_str}" if kwargs else args_str

        logger.debug(f"Starting execution of {name}({parameters})")
        try:
            result = await asyncio.wait_for(func(*args, **kwargs), timeout=1.0)
        except TimeoutError:
            logger.error(f"Finished execution of {name} with timeout after 1 second")
            return None

        logger.debug(f"Finished execution of {name}({parameters})")
        return result

    return wrapper