DebateT / metagpt /logs.py
hhhhdgfs's picture
Upload 918 files
18bc133 verified
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/6/1 12:41
@Author : alexanderwu
@File : logs.py
"""
from __future__ import annotations
import asyncio
import inspect
import sys
from contextvars import ContextVar
from datetime import datetime
from functools import partial
from typing import Any
from loguru import logger as _logger
from pydantic import BaseModel, Field
from metagpt.const import METAGPT_ROOT
LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream")
class ToolLogItem(BaseModel):
type_: str = Field(alias="type", default="str", description="Data type of `value` field.")
name: str
value: Any
TOOL_LOG_END_MARKER = ToolLogItem(
type="str", name="end_marker", value="\x18\x19\x1B\x18"
) # A special log item to suggest the end of a stream log
_print_level = "INFO"
def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
"""Adjust the log level to above level"""
global _print_level
_print_level = print_level
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = f"{name}_{formatted_date}" if name else formatted_date # name a log with prefix name
_logger.remove()
_logger.add(sys.stderr, level=print_level)
_logger.add(METAGPT_ROOT / f"logs/{log_name}.txt", level=logfile_level)
return _logger
logger = define_log_level()
def log_llm_stream(msg):
"""
Logs a message to the LLM stream.
Args:
msg: The message to be logged.
Notes:
If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called),
the message will not be added to the LLM stream queue.
"""
queue = get_llm_stream_queue()
if queue:
queue.put_nowait(msg)
_llm_stream_log(msg)
def log_tool_output(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""):
"""interface for logging tool output, can be set to log tool output in different ways to different places with set_tool_output_logfunc"""
_tool_output_log(output=output, tool_name=tool_name)
async def log_tool_output_async(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""):
"""async interface for logging tool output, used when output contains async object"""
await _tool_output_log_async(output=output, tool_name=tool_name)
async def get_human_input(prompt: str = ""):
"""interface for getting human input, can be set to get input from different sources with set_human_input_func"""
if inspect.iscoroutinefunction(_get_human_input):
return await _get_human_input(prompt)
else:
return _get_human_input(prompt)
def set_llm_stream_logfunc(func):
global _llm_stream_log
_llm_stream_log = func
def set_tool_output_logfunc(func):
global _tool_output_log
_tool_output_log = func
async def set_tool_output_logfunc_async(func):
# async version
global _tool_output_log_async
_tool_output_log_async = func
def set_human_input_func(func):
global _get_human_input
_get_human_input = func
_llm_stream_log = partial(print, end="")
_tool_output_log = (
lambda *args, **kwargs: None
) # a dummy function to avoid errors if set_tool_output_logfunc is not called
async def _tool_output_log_async(*args, **kwargs):
# async version
pass
def create_llm_stream_queue():
"""Creates a new LLM stream queue and sets it in the context variable.
Returns:
The newly created asyncio.Queue instance.
"""
queue = asyncio.Queue()
LLM_STREAM_QUEUE.set(queue)
return queue
def get_llm_stream_queue():
"""Retrieves the current LLM stream queue from the context variable.
Returns:
The asyncio.Queue instance if set, otherwise None.
"""
return LLM_STREAM_QUEUE.get(None)
_get_human_input = input # get human input from console by default
def _llm_stream_log(msg):
if _print_level in ["INFO"]:
print(msg, end="")