Spaces:
Running
Running
File size: 2,472 Bytes
53ea588 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Logging utilities when working with pipecat pipelines connected to ACE."""
import asyncio
import sys
from loguru import logger
async def logger_context(coro, **kwargs):
"""Wrapper coroutine to contextualize the logger during the execution of `coro`."""
with logger.contextualize(**kwargs):
return await coro
def setup_default_ace_logging(
level: str = "INFO",
stream_id: str | None = None,
):
"""Setup logger for ACE.
Updates the logging format to include stream_id if available.
Args:
stream_id (str, optional) : Set the stream_id globally to use in log output.
If you want different stream ids for different pipelines running
in the same process use `logger_context` instead
level (str): Logging level
"""
logger_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"streamId={extra[stream_id]} - <level>{message}</level>"
)
logger.configure(extra={"stream_id": stream_id or "n/a"}) # Default values
logger.remove()
logger.add(sys.stderr, format=logger_format, level=level)
def log_execution(func):
"""Decorator to log the start and end of an async function execution.
Args:
func (coroutine): The async function to be decorated.
Returns:
coroutine: The wrapped async function.
"""
async def wrapper(*args, **kwargs):
class_name = args[0].__class__.__name__ if args and hasattr(args[0], "__class__") else None
name = f"{class_name}.{func.__name__}" if class_name else func.__name__
args_str = ", ".join(str(arg)[:20] for arg in args)
kwargs_str = ", ".join(f"{k}={str(v)[:20]}" for k, v in kwargs.items())
parameters = f"{args_str}, {kwargs_str}" if kwargs else args_str
logger.debug(f"Starting execution of {name}({parameters})")
try:
result = await asyncio.wait_for(func(*args, **kwargs), timeout=1.0)
except TimeoutError:
logger.error(f"Finished execution of {name} with timeout after 1 second")
return None
logger.debug(f"Finished execution of {name}({parameters})")
return result
return wrapper
|