Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| from typing import List | |
| import logging | |
| import logging.handlers | |
| import json | |
| from datetime import datetime | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage, AIMessage, ToolMessage | |
| from langchain_tavily import TavilySearch | |
| # Configuration - set to False to disable detailed logging | |
| ENABLE_DETAILED_LOGGING = True | |
| # Setup logging with rotation (7 days max) | |
| if ENABLE_DETAILED_LOGGING: | |
| # Create formatter | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| # Setup console handler | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(formatter) | |
| # Setup rotating file handler (7 days, daily rotation) | |
| file_handler = logging.handlers.TimedRotatingFileHandler( | |
| 'agent.log', | |
| when='midnight', | |
| interval=1, | |
| backupCount=7, # Keep 7 days of logs | |
| encoding='utf-8' | |
| ) | |
| file_handler.setFormatter(formatter) | |
| # Configure root logger | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| handlers=[console_handler, file_handler] | |
| ) | |
| else: | |
| logging.basicConfig(level=logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| def get_current_date() -> str: | |
| """Get current date in YYYY-MM-DD format""" | |
| return datetime.now().strftime("%Y-%m-%d") | |
| def create_system_prompt() -> str: | |
| """Create dynamic system prompt with current date""" | |
| current_date = get_current_date() | |
| return f"""You are a helpful AI assistant with web search capabilities. | |
| TODAY'S DATE: {current_date} | |
| IMPORTANT: You have access to a web_search tool. Consider your knowledge cutoff date and today's date to decide when to search: | |
| USE WEB SEARCH when users ask about: | |
| - Events after your knowledge cutoff date | |
| - Current events, breaking news, or recent developments | |
| - Today's date, current time, or "what's happening now" | |
| - Real-time data (weather, stock prices, sports scores) | |
| - Recent updates to ongoing situations | |
| - Information that changes frequently | |
| - When users explicitly ask you to search the web | |
| DO NOT use web search for: | |
| - Historical facts before your cutoff date | |
| - General knowledge that doesn't change (capitals, basic science, etc.) | |
| - Established facts and concepts | |
| - Personal advice or opinions | |
| When in doubt about whether information might be outdated, use web search to get the most current information.""" | |
| # Configuration from environment variables | |
| llm_ip = os.environ.get('public_ip') | |
| llm_port = os.environ.get('port') | |
| llm_key = os.environ.get('api_key') | |
| llm_model = os.environ.get('model') | |
| # Tavily API configuration | |
| tavily_key = os.environ.get('tavily_key', '') | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"Tavily API key present: {bool(tavily_key)}") | |
| if tavily_key: | |
| logger.info(f"Tavily API key length: {len(tavily_key)}") | |
| else: | |
| logger.warning("No Tavily API key found in environment variables") | |
| # Tool calling agent implementation | |
| class ToolCallingAgentChat: | |
| def __init__(self, ip: str, port: str, api_key: str, model: str): | |
| self.ip = ip | |
| self.port = port | |
| self.api_key = api_key | |
| self.model = model | |
| self.llm = None | |
| self.tools = [] | |
| self.conversation_id = None | |
| self._setup_agent() | |
| def reset_conversation(self): | |
| """Reset conversation state""" | |
| import uuid | |
| self.conversation_id = str(uuid.uuid4()) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== CONVERSATION RESET ===") | |
| logger.info(f"New conversation ID: {self.conversation_id}") | |
| def _setup_agent(self): | |
| """Initialize the tool calling agent""" | |
| try: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== SETTING UP TOOL CALLING AGENT ===") | |
| protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" | |
| logger.info(f"LLM URL: {protocol}://{self.ip}:{self.port}/v1") | |
| logger.info(f"Model: {self.model}") | |
| logger.info(f"Using protocol: {protocol}") | |
| # Create OpenAI-compatible model with HTTPS support | |
| protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" | |
| base_url = f"{protocol}://{self.ip}:{self.port}/v1" | |
| # Add custom headers for Cloudflare bypass | |
| cf_bypass_key = os.environ.get('cf_bypass_key', 'devquasar2025') | |
| self.llm = ChatOpenAI( | |
| base_url=base_url, | |
| api_key=self.api_key or "ollama", # Use provided key or default | |
| model=self.model, | |
| temperature=0.7, | |
| default_headers={ | |
| "x-cf-bypass": cf_bypass_key, | |
| "User-Agent": "DevQuasar-Agent/1.0" | |
| } | |
| ) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info("LLM created successfully") | |
| # Define web search tool | |
| if tavily_key: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info("Setting up Tavily search tool") | |
| try: | |
| def web_search(query: str) -> str: | |
| """Search the web for current information about any topic. Use this when you need up-to-date information, current events, or real-time data.""" | |
| try: | |
| tavily_tool = TavilySearch( | |
| tavily_api_key=tavily_key, | |
| max_results=5, | |
| topic="general", | |
| include_answer=True, | |
| search_depth="advanced" | |
| ) | |
| result = tavily_tool.invoke({"query": query}) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"Tavily search successful for query: {query}") | |
| return result | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.error(f"Tavily search failed for query '{query}': {e}") | |
| logger.error(f"Exception type: {type(e).__name__}") | |
| # Check for rate limit or quota issues | |
| if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']): | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.warning(f"Tavily rate limit/quota exceeded: {e}") | |
| return "I can't search the web right now due to rate limits." | |
| else: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.error(f"Tavily API error: {e}") | |
| return f"I can't search the web right now. Error: {str(e)[:100]}" | |
| self.tools = [web_search] | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info("Tavily search tool created successfully") | |
| except Exception as e: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.error(f"Failed to create Tavily tool: {e}") | |
| self.tools = [] | |
| else: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.warning("No Tavily API key found, no web search tool available") | |
| self.tools = [] | |
| # Bind tools to the model | |
| if self.tools: | |
| self.llm_with_tools = self.llm.bind_tools(self.tools) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"Tools bound to model: {[tool.name for tool in self.tools]}") | |
| else: | |
| self.llm_with_tools = self.llm | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info("No tools available, using base model") | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info("Tool calling agent created successfully") | |
| except Exception as e: | |
| logger.error(f"=== AGENT SETUP ERROR ===") | |
| logger.error(f"Failed to setup agent: {e}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| raise e | |
| def update_config(self, ip: str, port: str, api_key: str, model: str): | |
| """Update LLM configuration""" | |
| if (ip != self.ip or port != self.port or | |
| api_key != self.api_key or model != self.model): | |
| self.ip = ip | |
| self.port = port | |
| self.api_key = api_key | |
| self.model = model | |
| self._setup_agent() | |
| def chat(self, message: str, history: List[List[str]]) -> str: | |
| """Generate chat response using tool calling""" | |
| try: | |
| if not self.llm_with_tools: | |
| return "Error: Agent not initialized" | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== USER INPUT ===") | |
| logger.info(f"Message: {message}") | |
| logger.info(f"History length: {len(history)}") | |
| # Convert history to messages for context with dynamic system message | |
| from langchain_core.messages import SystemMessage | |
| # Create dynamic system prompt with current date | |
| system_prompt = create_system_prompt() | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"System prompt includes today's date: {get_current_date()}") | |
| messages = [SystemMessage(content=system_prompt)] | |
| for user_msg, assistant_msg in history: | |
| messages.append(HumanMessage(content=user_msg)) | |
| if assistant_msg: # Only add if assistant responded | |
| messages.append(AIMessage(content=assistant_msg)) | |
| # Add current message | |
| messages.append(HumanMessage(content=message)) | |
| # Get initial response from LLM | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== INVOKING LLM ===") | |
| logger.info(f"Total messages in context: {len(messages)}") | |
| response = self.llm_with_tools.invoke(messages) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== LLM RESPONSE ===") | |
| logger.info(f"Response type: {type(response)}") | |
| logger.info(f"Response content: {response.content}") | |
| logger.info(f"Has tool calls: {bool(response.tool_calls if hasattr(response, 'tool_calls') else False)}") | |
| if hasattr(response, 'tool_calls') and response.tool_calls: | |
| logger.info(f"Tool calls: {response.tool_calls}") | |
| logger.info(f"Response additional_kwargs: {getattr(response, 'additional_kwargs', {})}") | |
| # Check if LLM wants to call tools | |
| tool_calls_to_execute = [] | |
| # Method 1: Proper tool calls | |
| if hasattr(response, 'tool_calls') and response.tool_calls: | |
| tool_calls_to_execute = response.tool_calls | |
| # Method 2: Fallback - check if model mentioned search in content and user asked for current info | |
| elif ("search" in message.lower() or "today" in message.lower() or "current" in message.lower() or "recent" in message.lower()) and self.tools: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== FALLBACK TOOL CALLING ===") | |
| logger.info(f"Detected need for search based on keywords") | |
| # Manually trigger web search | |
| import uuid | |
| tool_calls_to_execute = [{ | |
| 'name': 'web_search', | |
| 'args': {'query': message}, | |
| 'id': str(uuid.uuid4()) | |
| }] | |
| if tool_calls_to_execute: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== TOOL CALLS DETECTED ===") | |
| logger.info(f"Number of tool calls: {len(tool_calls_to_execute)}") | |
| # Add the LLM response to messages | |
| messages.append(response) | |
| # Execute tool calls | |
| for tool_call in tool_calls_to_execute: | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"Executing tool: {tool_call['name']} with args: {tool_call['args']}") | |
| # Find and execute the tool | |
| tool_result = None | |
| for tool in self.tools: | |
| if tool.name == tool_call['name']: | |
| try: | |
| tool_result = tool.invoke(tool_call['args']) | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"Tool executed successfully: {tool_call['name']}") | |
| break | |
| except Exception as e: | |
| tool_result = f"Tool execution failed: {str(e)}" | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.error(f"Tool execution failed: {e}") | |
| if tool_result is None: | |
| tool_result = f"Tool {tool_call['name']} not found" | |
| # Add tool result to messages | |
| messages.append(ToolMessage( | |
| content=str(tool_result), | |
| tool_call_id=tool_call['id'] | |
| )) | |
| # Get final response from LLM after tool execution | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== GETTING FINAL RESPONSE ===") | |
| final_response = self.llm_with_tools.invoke(messages) | |
| final_message = final_response.content | |
| else: | |
| # No tool calls, use the direct response | |
| final_message = response.content | |
| if ENABLE_DETAILED_LOGGING: | |
| logger.info(f"=== FINAL MESSAGE ===") | |
| logger.info(f"Final message: {final_message}") | |
| return final_message | |
| except Exception as e: | |
| error_msg = f"Agent error: {str(e)}" | |
| logger.error(f"=== AGENT ERROR ===") | |
| logger.error(f"Error: {e}") | |
| logger.error(f"Error type: {type(e)}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return error_msg | |
| # Global agent instance | |
| tool_calling_agent = ToolCallingAgentChat(llm_ip, llm_port, llm_key, llm_model) | |
| def generate_response(message: str, history: List[List[str]], max_tokens: int): | |
| """Generate response using tool calling agent with dynamic system prompt""" | |
| global tool_calling_agent | |
| try: | |
| # Configuration is pre-loaded from environment variables | |
| # No runtime config changes allowed for security | |
| # Reset conversation if history is empty (indicates clear button was pressed) | |
| if len(history) == 0: | |
| tool_calling_agent.reset_conversation() | |
| # Generate response | |
| response = tool_calling_agent.chat(message, history) | |
| # Stream the response word by word for better UX | |
| words = response.split() | |
| current_response = "" | |
| for word in words: | |
| current_response += word + " " | |
| yield current_response.strip() | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| logger.error(error_msg) | |
| yield error_msg | |
| # CSS to fix avatar distortion and positioning issues | |
| avatar_css = """ | |
| /* Fix avatar image distortion with multiple selectors for compatibility */ | |
| #chatbot .avatar-container img, | |
| #chatbot .message-row img, | |
| #chatbot .message img, | |
| #chatbot img[src*="twemoji"], | |
| #chatbot img[src*="huggingface"], | |
| .gr-chatbot img { | |
| width: 40px !important; | |
| height: 40px !important; | |
| min-width: 40px !important; | |
| min-height: 40px !important; | |
| max-width: 40px !important; | |
| max-height: 40px !important; | |
| border-radius: 50% !important; | |
| object-fit: cover !important; | |
| aspect-ratio: 1 / 1 !important; | |
| margin: 0px !important; | |
| padding: 0px !important; | |
| display: block !important; | |
| flex-shrink: 0 !important; | |
| } | |
| /* Force square containers */ | |
| #chatbot .avatar-container, | |
| .gr-chatbot .avatar-container { | |
| width: 40px !important; | |
| height: 40px !important; | |
| min-width: 40px !important; | |
| min-height: 40px !important; | |
| flex-shrink: 0 !important; | |
| display: flex !important; | |
| align-items: center !important; | |
| justify-content: center !important; | |
| } | |
| """ | |
| # Create Gradio ChatInterface | |
| chatbot = gr.ChatInterface( | |
| generate_response, | |
| chatbot=gr.Chatbot( | |
| avatar_images=[ | |
| "https://cdn.jsdelivr.net/gh/twitter/twemoji@latest/assets/72x72/1f464.png", # User avatar (person emoji) | |
| "https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" # Bot avatar | |
| ], | |
| height="64vh", | |
| elem_id="chatbot" | |
| ), | |
| additional_inputs=[ | |
| gr.Slider(50, 8192, label="Max Tokens", value=1024, | |
| info="Maximum number of tokens in the response"), | |
| ], | |
| title="🤖 DQ Micro Agent", | |
| description="DevQuasar self hosted Micro Agent with websearch capabilities", | |
| theme="finlaymacklon/smooth_slate", | |
| css=avatar_css | |
| ) | |
| if __name__ == "__main__": | |
| chatbot.queue().launch() |