|
|
|
|
|
|
|
|
""" |
|
|
@Time : 2023/6/1 12:41 |
|
|
@Author : alexanderwu |
|
|
@File : logs.py |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import asyncio |
|
|
import inspect |
|
|
import sys |
|
|
from contextvars import ContextVar |
|
|
from datetime import datetime |
|
|
from functools import partial |
|
|
from typing import Any |
|
|
|
|
|
from loguru import logger as _logger |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
from metagpt.const import METAGPT_ROOT |
|
|
|
|
|
LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream") |
|
|
|
|
|
|
|
|
class ToolLogItem(BaseModel): |
|
|
type_: str = Field(alias="type", default="str", description="Data type of `value` field.") |
|
|
name: str |
|
|
value: Any |
|
|
|
|
|
|
|
|
TOOL_LOG_END_MARKER = ToolLogItem( |
|
|
type="str", name="end_marker", value="\x18\x19\x1B\x18" |
|
|
) |
|
|
|
|
|
_print_level = "INFO" |
|
|
|
|
|
|
|
|
def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None): |
|
|
"""Adjust the log level to above level""" |
|
|
global _print_level |
|
|
_print_level = print_level |
|
|
|
|
|
current_date = datetime.now() |
|
|
formatted_date = current_date.strftime("%Y%m%d") |
|
|
log_name = f"{name}_{formatted_date}" if name else formatted_date |
|
|
|
|
|
_logger.remove() |
|
|
_logger.add(sys.stderr, level=print_level) |
|
|
_logger.add(METAGPT_ROOT / f"logs/{log_name}.txt", level=logfile_level) |
|
|
return _logger |
|
|
|
|
|
|
|
|
logger = define_log_level() |
|
|
|
|
|
|
|
|
def log_llm_stream(msg): |
|
|
""" |
|
|
Logs a message to the LLM stream. |
|
|
|
|
|
Args: |
|
|
msg: The message to be logged. |
|
|
|
|
|
Notes: |
|
|
If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called), |
|
|
the message will not be added to the LLM stream queue. |
|
|
""" |
|
|
|
|
|
queue = get_llm_stream_queue() |
|
|
if queue: |
|
|
queue.put_nowait(msg) |
|
|
_llm_stream_log(msg) |
|
|
|
|
|
|
|
|
def log_tool_output(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""): |
|
|
"""interface for logging tool output, can be set to log tool output in different ways to different places with set_tool_output_logfunc""" |
|
|
_tool_output_log(output=output, tool_name=tool_name) |
|
|
|
|
|
|
|
|
async def log_tool_output_async(output: ToolLogItem | list[ToolLogItem], tool_name: str = ""): |
|
|
"""async interface for logging tool output, used when output contains async object""" |
|
|
await _tool_output_log_async(output=output, tool_name=tool_name) |
|
|
|
|
|
|
|
|
async def get_human_input(prompt: str = ""): |
|
|
"""interface for getting human input, can be set to get input from different sources with set_human_input_func""" |
|
|
if inspect.iscoroutinefunction(_get_human_input): |
|
|
return await _get_human_input(prompt) |
|
|
else: |
|
|
return _get_human_input(prompt) |
|
|
|
|
|
|
|
|
def set_llm_stream_logfunc(func): |
|
|
global _llm_stream_log |
|
|
_llm_stream_log = func |
|
|
|
|
|
|
|
|
def set_tool_output_logfunc(func): |
|
|
global _tool_output_log |
|
|
_tool_output_log = func |
|
|
|
|
|
|
|
|
async def set_tool_output_logfunc_async(func): |
|
|
|
|
|
global _tool_output_log_async |
|
|
_tool_output_log_async = func |
|
|
|
|
|
|
|
|
def set_human_input_func(func): |
|
|
global _get_human_input |
|
|
_get_human_input = func |
|
|
|
|
|
|
|
|
_llm_stream_log = partial(print, end="") |
|
|
|
|
|
|
|
|
_tool_output_log = ( |
|
|
lambda *args, **kwargs: None |
|
|
) |
|
|
|
|
|
|
|
|
async def _tool_output_log_async(*args, **kwargs): |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
def create_llm_stream_queue(): |
|
|
"""Creates a new LLM stream queue and sets it in the context variable. |
|
|
|
|
|
Returns: |
|
|
The newly created asyncio.Queue instance. |
|
|
""" |
|
|
queue = asyncio.Queue() |
|
|
LLM_STREAM_QUEUE.set(queue) |
|
|
return queue |
|
|
|
|
|
|
|
|
def get_llm_stream_queue(): |
|
|
"""Retrieves the current LLM stream queue from the context variable. |
|
|
|
|
|
Returns: |
|
|
The asyncio.Queue instance if set, otherwise None. |
|
|
""" |
|
|
return LLM_STREAM_QUEUE.get(None) |
|
|
|
|
|
|
|
|
_get_human_input = input |
|
|
|
|
|
|
|
|
def _llm_stream_log(msg): |
|
|
if _print_level in ["INFO"]: |
|
|
print(msg, end="") |
|
|
|