dq_hosted1 / app.py
csabakecskemeti's picture
Upload app.py
549d679 verified
raw
history blame
17.9 kB
import os
import gradio as gr
from typing import List
import logging
import logging.handlers
import json
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_tavily import TavilySearch
# Configuration - set to False to disable detailed logging
ENABLE_DETAILED_LOGGING = True
# Setup logging with rotation (7 days max)
if ENABLE_DETAILED_LOGGING:
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Setup console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# Setup rotating file handler (7 days, daily rotation)
file_handler = logging.handlers.TimedRotatingFileHandler(
'agent.log',
when='midnight',
interval=1,
backupCount=7, # Keep 7 days of logs
encoding='utf-8'
)
file_handler.setFormatter(formatter)
# Configure root logger
logging.basicConfig(
level=logging.INFO,
handlers=[console_handler, file_handler]
)
else:
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
# Configuration from environment variables
llm_ip = os.environ.get('public_ip')
llm_port = os.environ.get('port')
llm_key = os.environ.get('api_key')
llm_model = os.environ.get('model')
# Tavily API configuration
tavily_key = os.environ.get('tavily_key', '')
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tavily API key present: {bool(tavily_key)}")
if tavily_key:
logger.info(f"Tavily API key length: {len(tavily_key)}")
else:
logger.warning("No Tavily API key found in environment variables")
# Tool calling agent implementation
class ToolCallingAgentChat:
def __init__(self, ip: str, port: str, api_key: str, model: str):
self.ip = ip
self.port = port
self.api_key = api_key
self.model = model
self.llm = None
self.tools = []
self.conversation_id = None
self._setup_agent()
def reset_conversation(self):
"""Reset conversation state"""
import uuid
self.conversation_id = str(uuid.uuid4())
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== CONVERSATION RESET ===")
logger.info(f"New conversation ID: {self.conversation_id}")
def _setup_agent(self):
"""Initialize the tool calling agent"""
try:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== SETTING UP TOOL CALLING AGENT ===")
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http"
logger.info(f"LLM URL: {protocol}://{self.ip}:{self.port}/v1")
logger.info(f"Model: {self.model}")
logger.info(f"Using protocol: {protocol}")
# Create OpenAI-compatible model with HTTPS support
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http"
base_url = f"{protocol}://{self.ip}:{self.port}/v1"
# Add custom headers for Cloudflare bypass
cf_bypass_key = os.environ.get('cf_bypass_key', 'devquasar2025')
self.llm = ChatOpenAI(
base_url=base_url,
api_key=self.api_key or "ollama", # Use provided key or default
model=self.model,
temperature=0.7,
default_headers={
"x-cf-bypass": cf_bypass_key,
"User-Agent": "DevQuasar-Agent/1.0"
}
)
if ENABLE_DETAILED_LOGGING:
logger.info("LLM created successfully")
# Define web search tool
if tavily_key:
if ENABLE_DETAILED_LOGGING:
logger.info("Setting up Tavily search tool")
try:
@tool
def web_search(query: str) -> str:
"""Search the web for current information about any topic. Use this when you need up-to-date information, current events, or real-time data."""
try:
tavily_tool = TavilySearch(
tavily_api_key=tavily_key,
max_results=5,
topic="general",
include_answer=True,
search_depth="advanced"
)
result = tavily_tool.invoke({"query": query})
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tavily search successful for query: {query}")
return result
except Exception as e:
error_str = str(e).lower()
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tavily search failed for query '{query}': {e}")
logger.error(f"Exception type: {type(e).__name__}")
# Check for rate limit or quota issues
if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']):
if ENABLE_DETAILED_LOGGING:
logger.warning(f"Tavily rate limit/quota exceeded: {e}")
return "I can't search the web right now due to rate limits."
else:
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tavily API error: {e}")
return f"I can't search the web right now. Error: {str(e)[:100]}"
self.tools = [web_search]
if ENABLE_DETAILED_LOGGING:
logger.info("Tavily search tool created successfully")
except Exception as e:
if ENABLE_DETAILED_LOGGING:
logger.error(f"Failed to create Tavily tool: {e}")
self.tools = []
else:
if ENABLE_DETAILED_LOGGING:
logger.warning("No Tavily API key found, no web search tool available")
self.tools = []
# Bind tools to the model
if self.tools:
self.llm_with_tools = self.llm.bind_tools(self.tools)
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tools bound to model: {[tool.name for tool in self.tools]}")
else:
self.llm_with_tools = self.llm
if ENABLE_DETAILED_LOGGING:
logger.info("No tools available, using base model")
if ENABLE_DETAILED_LOGGING:
logger.info("Tool calling agent created successfully")
except Exception as e:
logger.error(f"=== AGENT SETUP ERROR ===")
logger.error(f"Failed to setup agent: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise e
def update_config(self, ip: str, port: str, api_key: str, model: str):
"""Update LLM configuration"""
if (ip != self.ip or port != self.port or
api_key != self.api_key or model != self.model):
self.ip = ip
self.port = port
self.api_key = api_key
self.model = model
self._setup_agent()
def chat(self, message: str, history: List[List[str]]) -> str:
"""Generate chat response using tool calling"""
try:
if not self.llm_with_tools:
return "Error: Agent not initialized"
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== USER INPUT ===")
logger.info(f"Message: {message}")
logger.info(f"History length: {len(history)}")
# Convert history to messages for context with system message
from langchain_core.messages import SystemMessage
messages = [SystemMessage(content="""You are a helpful AI assistant with web search capabilities.
IMPORTANT: You have access to a web_search tool. Use it when users ask about:
- Current events, news, or recent information
- Today's date or current time
- Real-time data or recent updates
- Specific facts that may have changed recently
- When users explicitly ask you to search the web
For general knowledge questions that don't require current information, answer directly without using tools.""")]
for user_msg, assistant_msg in history:
messages.append(HumanMessage(content=user_msg))
if assistant_msg: # Only add if assistant responded
messages.append(AIMessage(content=assistant_msg))
# Add current message
messages.append(HumanMessage(content=message))
# Get initial response from LLM
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== INVOKING LLM ===")
logger.info(f"Total messages in context: {len(messages)}")
response = self.llm_with_tools.invoke(messages)
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== LLM RESPONSE ===")
logger.info(f"Response type: {type(response)}")
logger.info(f"Response content: {response.content}")
logger.info(f"Has tool calls: {bool(response.tool_calls if hasattr(response, 'tool_calls') else False)}")
if hasattr(response, 'tool_calls') and response.tool_calls:
logger.info(f"Tool calls: {response.tool_calls}")
logger.info(f"Response additional_kwargs: {getattr(response, 'additional_kwargs', {})}")
# Check if LLM wants to call tools
tool_calls_to_execute = []
# Method 1: Proper tool calls
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_calls_to_execute = response.tool_calls
# Method 2: Fallback - check if model mentioned search in content and user asked for current info
elif ("search" in message.lower() or "today" in message.lower() or "current" in message.lower() or "recent" in message.lower()) and self.tools:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== FALLBACK TOOL CALLING ===")
logger.info(f"Detected need for search based on keywords")
# Manually trigger web search
import uuid
tool_calls_to_execute = [{
'name': 'web_search',
'args': {'query': message},
'id': str(uuid.uuid4())
}]
if tool_calls_to_execute:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== TOOL CALLS DETECTED ===")
logger.info(f"Number of tool calls: {len(tool_calls_to_execute)}")
# Add the LLM response to messages
messages.append(response)
# Execute tool calls
for tool_call in tool_calls_to_execute:
if ENABLE_DETAILED_LOGGING:
logger.info(f"Executing tool: {tool_call['name']} with args: {tool_call['args']}")
# Find and execute the tool
tool_result = None
for tool in self.tools:
if tool.name == tool_call['name']:
try:
tool_result = tool.invoke(tool_call['args'])
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tool executed successfully: {tool_call['name']}")
break
except Exception as e:
tool_result = f"Tool execution failed: {str(e)}"
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tool execution failed: {e}")
if tool_result is None:
tool_result = f"Tool {tool_call['name']} not found"
# Add tool result to messages
messages.append(ToolMessage(
content=str(tool_result),
tool_call_id=tool_call['id']
))
# Get final response from LLM after tool execution
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== GETTING FINAL RESPONSE ===")
final_response = self.llm_with_tools.invoke(messages)
final_message = final_response.content
else:
# No tool calls, use the direct response
final_message = response.content
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== FINAL MESSAGE ===")
logger.info(f"Final message: {final_message}")
return final_message
except Exception as e:
error_msg = f"Agent error: {str(e)}"
logger.error(f"=== AGENT ERROR ===")
logger.error(f"Error: {e}")
logger.error(f"Error type: {type(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Global agent instance
tool_calling_agent = ToolCallingAgentChat(llm_ip, llm_port, llm_key, llm_model)
def generate_response(message: str, history: List[List[str]], system_prompt: str,
max_tokens: int, ip: str, port: str, api_key: str, model: str):
"""Generate response using tool calling agent"""
global tool_calling_agent
try:
# Update agent configuration if changed
tool_calling_agent.update_config(ip, port, api_key, model)
# Reset conversation if history is empty (indicates clear button was pressed)
if len(history) == 0:
tool_calling_agent.reset_conversation()
# Generate response
response = tool_calling_agent.chat(message, history)
# Stream the response word by word for better UX
words = response.split()
current_response = ""
for word in words:
current_response += word + " "
yield current_response.strip()
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
yield error_msg
# CSS to fix avatar distortion and positioning issues
avatar_css = """
/* Fix avatar image distortion with multiple selectors for compatibility */
#chatbot .avatar-container img,
#chatbot .message-row img,
#chatbot .message img,
#chatbot img[src*="twemoji"],
#chatbot img[src*="huggingface"],
.gr-chatbot img {
width: 40px !important;
height: 40px !important;
min-width: 40px !important;
min-height: 40px !important;
max-width: 40px !important;
max-height: 40px !important;
border-radius: 50% !important;
object-fit: cover !important;
aspect-ratio: 1 / 1 !important;
margin: 0px !important;
padding: 0px !important;
display: block !important;
flex-shrink: 0 !important;
}
/* Force square containers */
#chatbot .avatar-container,
.gr-chatbot .avatar-container {
width: 40px !important;
height: 40px !important;
min-width: 40px !important;
min-height: 40px !important;
flex-shrink: 0 !important;
display: flex !important;
align-items: center !important;
justify-content: center !important;
}
"""
# Create Gradio ChatInterface
chatbot = gr.ChatInterface(
generate_response,
chatbot=gr.Chatbot(
avatar_images=[
"https://cdn.jsdelivr.net/gh/twitter/twemoji@latest/assets/72x72/1f464.png", # User avatar (person emoji)
"https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" # Bot avatar
],
height="64vh",
elem_id="chatbot"
),
additional_inputs=[
gr.Textbox(
"You are a helpful AI assistant with web search capabilities. Use web search when you need current information, recent events, or real-time data.",
label="System Prompt",
lines=2
),
gr.Slider(50, 2048, label="Max Tokens", value=512,
info="Maximum number of tokens in the response"),
gr.Textbox(llm_ip, label="LLM IP Address",
info="IP address of the OpenAI-compatible LLM server"),
gr.Textbox(llm_port, label="LLM Port",
info="Port of the LLM server"),
gr.Textbox(llm_key, label="API Key", type="password",
info="API key for the LLM server"),
gr.Textbox(llm_model, label="Model Name",
info="Name of the model to use"),
],
title="🤖 DQ Micro Agent",
description="DevQuasar self hosted Micro Agent with websearch capabilities",
theme="finlaymacklon/smooth_slate",
css=avatar_css
)
if __name__ == "__main__":
chatbot.queue().launch()