Spaces:
Running
Running
| from collections.abc import Callable | |
| from typing import Any | |
| from any_agent.callbacks import Callback, Context | |
| from any_agent.tracing.attributes import GenAI | |
| class StreamlitStatusCallback(Callback): | |
| """Callback to update Streamlit status with agent progress.""" | |
| def __init__(self, status_callback: Callable[[str], None]): | |
| self.status_callback = status_callback | |
| def after_llm_call(self, context: Context, *args, **kwargs) -> Context: | |
| """Update status after LLM calls.""" | |
| span = context.current_span | |
| input_value = span.attributes.get(GenAI.INPUT_MESSAGES, "") | |
| output_value = span.attributes.get(GenAI.OUTPUT, "") | |
| self._update_status(span.name, input_value, output_value) | |
| return context | |
| def after_tool_execution(self, context: Context, *args, **kwargs) -> Context: | |
| """Update status after tool executions.""" | |
| span = context.current_span | |
| input_value = span.attributes.get(GenAI.TOOL_ARGS, "") | |
| output_value = span.attributes.get(GenAI.OUTPUT, "") | |
| self._update_status(span.name, input_value, output_value) | |
| return context | |
| def _update_status(self, step_name: str, input_value: str, output_value: str): | |
| """Update the Streamlit status with formatted information.""" | |
| if input_value: | |
| try: | |
| import json | |
| parsed_input = json.loads(input_value) | |
| if isinstance(parsed_input, list) and len(parsed_input) > 0: | |
| input_value = str(parsed_input[-1]) | |
| except Exception: | |
| pass | |
| if output_value: | |
| try: | |
| import json | |
| parsed_output = json.loads(output_value) | |
| if isinstance(parsed_output, list) and len(parsed_output) > 0: | |
| output_value = str(parsed_output[-1]) | |
| except Exception: | |
| pass | |
| max_length = 800 | |
| if len(input_value) > max_length: | |
| input_value = f"[Truncated]...{input_value[-max_length:]}" | |
| if len(output_value) > max_length: | |
| output_value = f"[Truncated]...{output_value[-max_length:]}" | |
| if input_value or output_value: | |
| message = f"Step: {step_name}\n" | |
| if input_value: | |
| message += f"Input: {input_value}\n" | |
| if output_value: | |
| message += f"Output: {output_value}" | |
| else: | |
| message = f"Step: {step_name}" | |
| self.status_callback(message) | |
| def export_logs(agent: Any, callback: Callable[[str], None]) -> None: | |
| """Add a Streamlit status callback to the agent. | |
| This function adds a custom callback to the agent that will update | |
| the Streamlit status with progress information during agent execution. | |
| """ | |
| status_callback = StreamlitStatusCallback(callback) | |
| if agent.config.callbacks is None: | |
| agent.config.callbacks = [] | |
| agent.config.callbacks.append(status_callback) | |