SQL / app.py
CHAN9IJI's picture
Update app.py
928aa38 verified
raw
history blame
21.8 kB
import gradio as gr
from huggingface_hub import InferenceClient
import json
import asyncio
import subprocess
import os
import psycopg2
from typing import Optional, Dict, Any, List
class MCPContextManager:
"""Model Context Protocol Manager for handling context and tools"""
def __init__(self):
self.context = []
self.tools = {}
self.resources = {}
self.db_connection = None
def add_context(self, role: str, content: str, metadata: Optional[Dict] = None):
"""Add context entry following MCP specification"""
entry = {
"role": role,
"content": content,
"timestamp": None,
"metadata": metadata or {}
}
self.context.append(entry)
return entry
def register_tool(self, name: str, description: str, parameters: Dict, handler):
"""Register a tool following MCP tool specification"""
self.tools[name] = {
"name": name,
"description": description,
"parameters": parameters,
"handler": handler
}
def register_resource(self, uri: str, name: str, mime_type: str, content: Any):
"""Register a resource following MCP resource specification"""
self.resources[uri] = {
"uri": uri,
"name": name,
"mimeType": mime_type,
"content": content
}
def connect_postgres(self, connection_string: str):
"""Connect to PostgreSQL database"""
try:
self.db_connection = psycopg2.connect(connection_string)
return {"success": True, "message": "Connected to PostgreSQL"}
except Exception as e:
return {"success": False, "error": str(e)}
def get_context_window(self, max_tokens: int = 4096) -> List[Dict]:
"""Get context window within token limits"""
return self.context[-10:]
async def call_tool(self, tool_name: str, arguments: Dict) -> Any:
"""Execute a registered tool"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
tool = self.tools[tool_name]
return await tool["handler"](arguments)
def get_available_tools(self) -> List[Dict]:
"""Get list of available tools in MCP format"""
return [
{
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
for tool in self.tools.values()
]
def export_context(self) -> str:
"""Export context in MCP-compatible JSON format"""
return json.dumps({
"context": self.context,
"tools": self.get_available_tools(),
"resources": list(self.resources.keys())
}, indent=2)
# Initialize MCP Manager
mcp_manager = MCPContextManager()
# PostgreSQL Connection String (from your config)
POSTGRES_CONNECTION_STRING = "postgresql://neondb_owner:npg_oGg8yphr6FeZ@ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech/neondb?sslmode=require"
# MCP Tool Handlers
async def calculator_handler(args):
"""Calculator tool handler"""
operation = args.get("operation")
a = float(args.get("a", 0))
b = float(args.get("b", 0))
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b != 0 else "Error: Division by zero"
}
return operations.get(operation, "Invalid operation")
async def memory_handler(args):
"""Memory/context storage handler"""
action = args.get("action")
key = args.get("key")
value = args.get("value")
if action == "store":
mcp_manager.register_resource(
uri=f"memory://{key}",
name=key,
mime_type="text/plain",
content=value
)
return f"Stored {key}"
elif action == "retrieve":
resource = mcp_manager.resources.get(f"memory://{key}")
return resource["content"] if resource else "Not found"
return "Invalid action"
async def web_search_handler(args):
"""Web search handler"""
query = args.get("query")
return f"Search results for: {query} (Integrated search coming soon)"
async def git_command_handler(args):
"""Git command execution handler"""
command = args.get("command", "")
repo_path = args.get("repo_path", r"C:\Users\CHAN\Documents\PROJECTS\PROJECTS\MODEL\SQL")
try:
# Security: Only allow specific safe git commands
allowed_commands = ["status", "log", "branch", "diff", "add", "commit", "push", "pull", "fetch"]
cmd_parts = command.split()
if not cmd_parts:
return {"error": "No command provided"}
# Check if first word is an allowed command
base_cmd = cmd_parts[0]
if base_cmd not in allowed_commands:
return {"error": f"Command '{base_cmd}' not allowed. Allowed: {', '.join(allowed_commands)}"}
# Execute git command
result = subprocess.run(
["git"] + cmd_parts,
cwd=repo_path,
capture_output=True,
text=True,
timeout=30
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode,
"success": result.returncode == 0,
"command": f"git {command}",
"repo": repo_path
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out (30s limit)"}
except FileNotFoundError:
return {"error": "Git not found. Make sure git is installed."}
except Exception as e:
return {"error": str(e)}
async def postgres_query_handler(args):
"""PostgreSQL query handler - Direct connection to Neon DB"""
query = args.get("query", "")
if not query:
return {"error": "No query provided"}
try:
# Connect to database
conn = psycopg2.connect(POSTGRES_CONNECTION_STRING)
cursor = conn.cursor()
# Execute query
cursor.execute(query)
# Check if it's a SELECT query
if query.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return {
"success": True,
"columns": columns,
"rows": results,
"row_count": len(results),
"data": [dict(zip(columns, row)) for row in results]
}
else:
# For INSERT, UPDATE, DELETE, etc.
conn.commit()
return {
"success": True,
"message": "Query executed successfully",
"rows_affected": cursor.rowcount
}
except psycopg2.Error as e:
return {
"success": False,
"error": str(e),
"error_type": "PostgreSQL Error"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"error_type": "General Error"
}
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
async def filesystem_handler(args):
"""Filesystem operations handler"""
operation = args.get("operation")
path = args.get("path", "")
content = args.get("content", "")
base_path = r"C:\Users\CHAN\Documents\PROJECTS"
# Security: Ensure path is within base_path
if path:
full_path = os.path.normpath(os.path.join(base_path, path))
if not full_path.startswith(base_path):
return {"error": "Access denied: Path outside allowed directory", "success": False}
else:
full_path = base_path
try:
if operation == "read":
if os.path.exists(full_path) and os.path.isfile(full_path):
with open(full_path, 'r', encoding='utf-8') as f:
return {"content": f.read(), "path": full_path, "success": True}
return {"error": "File not found", "success": False}
elif operation == "list":
if os.path.exists(full_path) and os.path.isdir(full_path):
items = []
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
items.append({
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": os.path.getsize(item_path) if os.path.isfile(item_path) else None
})
return {"items": items, "count": len(items), "path": full_path, "success": True}
return {"error": "Directory not found", "success": False}
elif operation == "write":
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return {"message": "File written successfully", "path": full_path, "success": True}
elif operation == "search":
pattern = args.get("pattern", "")
results = []
search_path = full_path if os.path.isdir(full_path) else base_path
for root, dirs, files in os.walk(search_path):
for file in files:
if pattern.lower() in file.lower():
results.append(os.path.join(root, file))
if len(results) >= 50: # Limit results
break
return {"results": results, "count": len(results), "success": True}
return {"error": "Invalid operation. Use: read, write, list, or search", "success": False}
except PermissionError:
return {"error": "Permission denied", "success": False}
except Exception as e:
return {"error": str(e), "success": False}
# Register all tools with MCP manager
mcp_manager.register_tool(
name="calculator",
description="Perform basic arithmetic operations (add, subtract, multiply, divide)",
parameters={
"type": "object",
"properties": {
"operation": {"type": "string", "enum": ["add", "subtract", "multiply", "divide"]},
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
},
"required": ["operation", "a", "b"]
},
handler=calculator_handler
)
mcp_manager.register_tool(
name="memory",
description="Store and retrieve information in session memory",
parameters={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["store", "retrieve"]},
"key": {"type": "string", "description": "Memory key"},
"value": {"type": "string", "description": "Value to store (required for 'store')"}
},
"required": ["action", "key"]
},
handler=memory_handler
)
mcp_manager.register_tool(
name="web_search",
description="Search the web for information",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
},
handler=web_search_handler
)
mcp_manager.register_tool(
name="git",
description="Execute git commands (status, log, commit, push, pull, add, branch, diff, fetch). Example: 'status' or 'commit -am \"message\"'",
parameters={
"type": "object",
"properties": {
"command": {"type": "string", "description": "Git command (without 'git' prefix)"},
"repo_path": {"type": "string", "description": "Repository path (optional, defaults to MODEL/SQL)"}
},
"required": ["command"]
},
handler=git_command_handler
)
mcp_manager.register_tool(
name="postgres",
description="Execute PostgreSQL queries on Neon database. Supports SELECT, INSERT, UPDATE, DELETE, CREATE, ALTER, etc.",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query to execute"}
},
"required": ["query"]
},
handler=postgres_query_handler
)
mcp_manager.register_tool(
name="filesystem",
description="Perform filesystem operations within C:\\Users\\CHAN\\Documents\\PROJECTS",
parameters={
"type": "object",
"properties": {
"operation": {"type": "string", "enum": ["read", "write", "list", "search"], "description": "Operation to perform"},
"path": {"type": "string", "description": "Relative path from PROJECTS folder"},
"content": {"type": "string", "description": "Content for write operation"},
"pattern": {"type": "string", "description": "Search pattern (for search operation)"}
},
"required": ["operation", "path"]
},
handler=filesystem_handler
)
def respond(
message,
history: list[dict[str, str]],
system_message,
max_tokens,
temperature,
top_p,
enable_mcp,
hf_token: gr.OAuthToken,
):
"""Enhanced respond function with MCP support"""
if enable_mcp:
mcp_manager.add_context("user", message)
client = InferenceClient(token=hf_token.token, model="openai/gpt-oss-20b")
messages = [{"role": "system", "content": system_message}]
if enable_mcp:
tools_info = "\n\nAvailable MCP Tools:\n" + json.dumps(mcp_manager.get_available_tools(), indent=2)
messages[0]["content"] += tools_info
messages.extend(history)
messages.append({"role": "user", "content": message})
response = ""
for msg in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
choices = msg.choices
token = ""
if len(choices) and choices[0].delta.content:
token = choices[0].delta.content
response += token
yield response
if enable_mcp:
mcp_manager.add_context("assistant", response)
def call_mcp_tool(tool_name: str, arguments_json: str):
"""Interface for calling MCP tools from UI"""
try:
arguments = json.loads(arguments_json) if arguments_json.strip() else {}
result = asyncio.run(mcp_manager.call_tool(tool_name, arguments))
return json.dumps({"success": True, "result": result}, indent=2)
except json.JSONDecodeError as e:
return json.dumps({"success": False, "error": f"Invalid JSON: {str(e)}"}, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": str(e)}, indent=2)
def export_mcp_context():
"""Export current MCP context"""
return mcp_manager.export_context()
def get_mcp_tools_list():
"""Get formatted list of available MCP tools"""
tools = mcp_manager.get_available_tools()
return json.dumps(tools, indent=2)
def test_postgres_connection():
"""Test PostgreSQL connection"""
try:
conn = psycopg2.connect(POSTGRES_CONNECTION_STRING)
cursor = conn.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()[0]
cursor.close()
conn.close()
return json.dumps({"success": True, "message": "Connected!", "version": version}, indent=2)
except Exception as e:
return json.dumps({"success": False, "error": str(e)}, indent=2)
# Gradio Interface
chatbot = gr.ChatInterface(
respond,
type="messages",
additional_inputs=[
gr.Textbox(
value="You are a powerful AI assistant with MCP tools: Git, PostgreSQL (Neon), Filesystem, Calculator, Memory, Web Search.",
label="System message",
lines=3
),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"),
gr.Checkbox(value=True, label="Enable MCP"),
],
)
with gr.Blocks(title="AI Chat with Full MCP") as demo:
gr.Markdown("# πŸ€– AI Chatbot with Full MCP Support")
gr.Markdown("✨ PostgreSQL (Neon) β€’ Git β€’ Filesystem β€’ Calculator β€’ Memory β€’ Web Search")
with gr.Tabs():
with gr.Tab("πŸ’¬ Chat"):
with gr.Sidebar():
gr.LoginButton()
chatbot.render()
with gr.Tab("πŸ› οΈ MCP Tools"):
gr.Markdown("### 🎯 Quick Actions")
with gr.Row():
with gr.Column():
gr.Markdown("**πŸ”§ Git Commands**")
git_cmd = gr.Textbox(label="Command", placeholder="status", value="status")
git_btn = gr.Button("Execute Git", variant="primary")
with gr.Column():
gr.Markdown("**πŸ“ Filesystem**")
fs_operation = gr.Dropdown(["list", "read", "search"], label="Operation", value="list")
fs_path = gr.Textbox(label="Path", placeholder="PROJECTS", value="")
fs_btn = gr.Button("Execute FS", variant="primary")
with gr.Row():
with gr.Column():
gr.Markdown("**πŸ—„οΈ PostgreSQL**")
pg_query = gr.Textbox(label="SQL Query", placeholder="SELECT * FROM users LIMIT 5", lines=2)
pg_btn = gr.Button("Execute SQL", variant="primary")
with gr.Column():
gr.Markdown("**πŸ”Œ Test Connection**")
test_pg_btn = gr.Button("Test PostgreSQL Connection")
tool_result = gr.Code(language="json", label="πŸ“Š Result", lines=15)
gr.Markdown("---")
gr.Markdown("### πŸ”§ Custom Tool Call")
gr.Markdown("**Available Tools:** calculator, memory, web_search, git, postgres, filesystem")
with gr.Row():
tool_name_input = gr.Textbox(label="Tool Name", placeholder="calculator", value="calculator")
tool_args_input = gr.Code(
label="Arguments (JSON)",
value='{"operation": "add", "a": 10, "b": 5}',
language="json",
lines=5
)
call_tool_btn = gr.Button("▢️ Call Tool", variant="secondary")
# Event handlers
git_btn.click(
fn=lambda cmd: call_mcp_tool("git", json.dumps({"command": cmd})),
inputs=[git_cmd],
outputs=tool_result
)
fs_btn.click(
fn=lambda op, path: call_mcp_tool("filesystem", json.dumps({"operation": op, "path": path})),
inputs=[fs_operation, fs_path],
outputs=tool_result
)
pg_btn.click(
fn=lambda query: call_mcp_tool("postgres", json.dumps({"query": query})),
inputs=[pg_query],
outputs=tool_result
)
test_pg_btn.click(
fn=test_postgres_connection,
outputs=tool_result
)
call_tool_btn.click(
fn=call_mcp_tool,
inputs=[tool_name_input, tool_args_input],
outputs=tool_result
)
with gr.Tab("πŸ“‹ MCP Context"):
gr.Markdown("### Export MCP Context")
export_btn = gr.Button("πŸ“₯ Export Context")
context_display = gr.Code(language="json", label="MCP Context", lines=15)
export_btn.click(fn=export_mcp_context, outputs=context_display)
with gr.Tab("ℹ️ About"):
gr.Markdown("""
### Model Context Protocol (MCP) - Full Integration
#### πŸ—„οΈ Database: PostgreSQL (Neon)
- **Host:** ep-summer-art-a1jpcb05-pooler.ap-southeast-1.aws.neon.tech
- **Database:** neondb
- **Direct Connection:** βœ… Active
#### πŸ“ Filesystem
- **Root:** C:\\Users\\CHAN\\Documents\\PROJECTS
- **Operations:** read, write, list, search
#### πŸ”§ Tools (6):
1. **Git** - Version control operations
2. **PostgreSQL** - Database queries
3. **Filesystem** - File operations
4. **Calculator** - Math operations
5. **Memory** - Session storage
6. **Web Search** - Information retrieval
#### πŸ“– Examples:
**Git Status:**
```json
{"command": "status"}
```
**Git Commit & Push:**
```json
{"command": "commit -am 'Update'"}
```
Then: ```json
{"command": "push"}
```
**PostgreSQL Query:**
```json
{"query": "SELECT * FROM information_schema.tables LIMIT 5"}
```
**List Files:**
```json
{"operation": "list", "path": "PROJECTS"}
```
""")
if __name__ == "__main__":
demo.launch()