try: import os from typing import Any, Dict, List, Optional import json import gradio as gr import torch from sentence_transformers import SentenceTransformer import chromadb from config import Config except ImportError as e: print(f"❌ Error: Required packages not installed: {e}") print("🔧 Make sure you're in the gemmaembeddings conda environment") print("📦 Required packages: torch, sentence-transformers, chromadb") # Global variables for model and collection (initialized lazily) config = Config() device = Config.get_device() model = SentenceTransformer(config.MODEL_PATH) collection = None print(f"🔄 Connecting to ChromaDB from cloud...") database = os.environ.get("chromadb_db") api_key = os.environ.get("chromadb_api_key") tenant=os.environ.get("chromadb_tenant") client = chromadb.CloudClient( api_key=api_key, tenant=tenant, database=database ) print(f"Connection to chromabd successful...") # === COLLECTION VALIDATION === # Ensure the required collection exists and has data try: collection = client.get_collection(config.COLLECTION_NAME) doc_count = collection.count() if doc_count == 0: print(f"Collection '{config.COLLECTION_NAME}' exists but is empty. Run ingest_studies.py to populate it.") print(f"✅ Connected to collection '{config.COLLECTION_NAME}' with {doc_count} documents") except Exception as e: print(f"Collection '{config.COLLECTION_NAME}' not found. Run ingest_studies.py first. Error: {str(e)}") class EmbeddingGemmaPrompts: """ Optimized prompt templates for Google's EmbeddingGemma model. This class implements the official EmbeddingGemma prompt instructions as specified in the HuggingFace model documentation. It provides task-specific formatting to achieve optimal embedding quality and search relevance. Reference: https://huggingface.co/google/embeddinggemma-300m#prompt-instructions The prompt format follows these official patterns: - Query: 'task: {task description} | query: {content}' - Document: 'title: {title | "none"} | text: {content}' Performance Impact: - task: fact checking → +136% similarity improvement - task: semantic similarity → +112% similarity improvement - task: question answering → +98% similarity improvement - task: classification → +73% similarity improvement Usage: # Format a search query formatted = EmbeddingGemmaPrompts.encode_query("How does RS work?", "question_answering") # Result: "task: question answering | query: How does RS work?" # Format a document for embedding formatted = EmbeddingGemmaPrompts.encode_document("Content here", "Document Title") # Result: "title: Document Title | text: Content here" Attributes: TASKS (Dict[str, str]): Mapping of task types to official task descriptions """ @staticmethod def format_query_prompt(content: str, task: str = "search result") -> str: """ Format query using official EmbeddingGemma query prompt template. Applies the official query format: 'task: {task description} | query: {content}' This format is critical for achieving optimal embedding quality with EmbeddingGemma. Args: content (str): The raw query text to be embedded task (str): Official EmbeddingGemma task description. Defaults to "search result" Returns: str: Formatted query string ready for embedding Example: >>> EmbeddingGemmaPrompts.format_query_prompt("RS trading system", "question answering") 'task: question answering | query: RS trading system' """ return f"task: {task} | query: {content}" @staticmethod def format_document_prompt(content: str, title: str = "none") -> str: """ Format document using official EmbeddingGemma document prompt template. Applies the official document format: 'title: {title | "none"} | text: {content}' Including meaningful titles significantly improves embedding quality and search relevance. Args: content (str): The document text content to be embedded title (str): Document title or "none" if no title available. Defaults to "none" Returns: str: Formatted document string ready for embedding Example: >>> EmbeddingGemmaPrompts.format_document_prompt("Content here", "Risk Management") 'title: Risk Management | text: Content here' >>> EmbeddingGemmaPrompts.format_document_prompt("Content without title") 'title: none | text: Content without title' """ return f'title: {title} | text: {content}' # Official EmbeddingGemma task descriptions with performance rankings # Based on testing results showing similarity score improvements TASKS = { # === RETRIEVAL TASKS === # General-purpose retrieval (baseline performance) "retrieval_query": "search result", # Standard retrieval query format "retrieval_document": "document", # Document embedding format # === HIGH-PERFORMANCE SPECIALIZED TASKS === # Best for verifying claims and finding evidence (+136% performance) "fact_checking": "fact checking", # Excellent for concept comparison and relationship analysis (+112% performance) "semantic_similarity": "sentence similarity", # Optimized for Q&A scenarios with contextual responses (+98% performance) "question_answering": "question answering", # Effective for content categorization and topic analysis (+73% performance) "classification": "classification", # === MODERATE PERFORMANCE TASKS === # Good for document grouping and clustering (+59% performance) "clustering": "clustering", # Specialized for finding code examples and implementations (+39% performance) "code_retrieval": "code retrieval", # === LEGACY COMPATIBILITY === # Shorter aliases for backward compatibility "search": "search result", # Default baseline task "question": "question answering", # Alias for question_answering "fact": "fact checking" # Alias for fact_checking } @classmethod def get_task_description(cls, task_type: str) -> str: """ Get the official EmbeddingGemma task description for a given task type. Validates the task type and returns the corresponding official task description used in EmbeddingGemma prompt formatting. Falls back to "search result" for unknown task types to ensure compatibility. Args: task_type (str): The task type key (e.g., "question_answering", "fact_checking") Returns: str: Official EmbeddingGemma task description (e.g., "question answering", "fact checking") Example: >>> EmbeddingGemmaPrompts.get_task_description("fact_checking") 'fact checking' >>> EmbeddingGemmaPrompts.get_task_description("unknown_task") 'search result' # Fallback for unknown tasks """ return cls.TASKS.get(task_type, "search result") @classmethod def encode_query(cls, content: str, task_type: str = "search") -> str: """ Encode a query with task-specific EmbeddingGemma prompt optimization. This is the primary method for formatting search queries. It combines the user's query with the appropriate task-specific prompt template to achieve optimal embedding quality and search relevance. Args: content (str): The raw query text from the user task_type (str): Task type for optimization. Defaults to "search" Valid options: "search", "question_answering", "fact_checking", "semantic_similarity", "classification", "clustering", "code_retrieval" Returns: str: Optimized query string formatted for EmbeddingGemma Performance Impact: Using appropriate task types can improve similarity scores by 39-136% compared to the baseline "search" task type. Example: >>> cls.encode_query("How does risk management work?", "question_answering") 'task: question answering | query: How does risk management work?' >>> cls.encode_query("RS system reduces risk by 30%", "fact_checking") 'task: fact checking | query: RS system reduces risk by 30%' """ task_desc = cls.get_task_description(task_type) return cls.format_query_prompt(content, task_desc) @classmethod def encode_document(cls, content: str, title: str = "none") -> str: """ Encode a document with proper EmbeddingGemma document formatting. Formats documents for embedding using the official EmbeddingGemma document template. Including meaningful titles significantly improves search relevance and helps the model understand document structure. Args: content (str): The document text content to embed title (str): Document title extracted from metadata, filename, or content. Use "none" if no meaningful title is available Returns: str: Formatted document string ready for embedding Best Practices: - Extract titles from filenames, headers, or metadata when possible - Use "none" rather than empty string when no title is available - Keep titles concise and descriptive (< 100 characters) Example: >>> cls.encode_document("Trading strategy content...", "Momentum Strategy Guide") 'title: Momentum Strategy Guide | text: Trading strategy content...' >>> cls.encode_document("Untitled content here") 'title: none | text: Untitled content here' """ return cls.format_document_prompt(content, title) def search_knowledge_base( query: str, num_results: int = 5, source_filter: Optional[str] = None, task_type: str = "search" ) -> Dict[str, Any]: """ Search the RS Studies knowledge base using semantic similarity Args: query: The search query num_results: Number of results to return source_filter: Optional source folder filter task_type: Type of task for query formatting Returns: Dictionary with search results and metadata """ try: # Create query embedding with task-specific formatting using EmbeddingGemmaPrompts query_formatted = EmbeddingGemmaPrompts.encode_query(query, task_type) query_embedding = model.encode([query_formatted], device=device) # Prepare search parameters search_params = { "query_embeddings": query_embedding.tolist(), "n_results": min(num_results, config.MAX_NUM_RESULTS), "include": ["documents", "metadatas", "distances"] } # Add source filter if specified if source_filter and source_filter in config.VALID_SOURCES: search_params["where"] = {"source_folder": {"$eq": source_filter}} # Perform search results = collection.query(**search_params) # Format results formatted_results = [] if results["documents"] and len(results["documents"]) > 0: for i in range(len(results["documents"][0])): result = { "rank": i + 1, "content": results["documents"][0][i], "source_folder": results["metadatas"][0][i].get("source_folder", "unknown"), "chunk_file": results["metadatas"][0][i].get("chunk_file", "unknown"), "chunk_number": results["metadatas"][0][i].get("chunk_number", "unknown"), "similarity_score": float(1 - results["distances"][0][i]), "distance": float(results["distances"][0][i]), "chunk_length": results["metadatas"][0][i].get("chunk_length", 0), "metadata": results["metadatas"][0][i] } formatted_results.append(result) return { "query": query, "task_type": task_type, "num_results": len(formatted_results), "source_filter": source_filter, "results": formatted_results, "success": True } except Exception as e: return {"error": f"Search failed: {str(e)}", "results": [], "success": False} def get_available_sources() -> Dict[str, Any]: """Get list of available source folders in the knowledge base""" try: # Get all metadata to find unique source folders all_results = collection.get(include=["metadatas"]) sources = set() for metadata in all_results["metadatas"]: source = metadata.get("source_folder") if source: sources.add(source) # Get statistics for each source source_stats = {} for source in sources: source_results = collection.get( where={"source_folder": {"$eq": source}}, include=["metadatas"] ) source_stats[source] = len(source_results["metadatas"]) return { "sources": sorted(list(sources)), "source_stats": source_stats, "total_sources": len(sources), "total_chunks": collection.count(), "success": True } except Exception as e: return {"error": f"Failed to get sources: {str(e)}", "sources": [], "success": False} # MCP Tool Definitions def search_rs_studies( query: str, num_results: int = 5, source_filter: Optional[str] = None, task_type: str = "search" ) -> str: """ Search the RS Studies knowledge base for relevant information. This tool provides semantic search across RS trading system documentation, Chennai meetup transcripts, and Q&A content with optimized EmbeddingGemma prompts. Args: query: Your search question or topic (required) num_results: Number of results to return (1-50, default: 5) source_filter: Limit search to specific source: - 'rs_stkege_01': RS trading system documentation - 'cheenai_meet_full': Chennai meetup transcripts - 'QnAYoutubeChannel': Q&A discussions - None: Search all sources (default) task_type: Search optimization using EmbeddingGemma task-specific prompts: - 'search'/'retrieval_query': General search (default) - 'question'/'question_answering': Question answering format - 'fact'/'fact_checking': Fact checking format - 'classification': Text classification tasks - 'clustering': Document clustering and grouping - 'semantic_similarity': Semantic similarity assessment - 'code_retrieval': Code search and retrieval Returns: JSON string with search results including content, sources, and similarity scores """ # Validate parameters if not query or not query.strip(): return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) if source_filter and source_filter not in config.VALID_SOURCES: return json.dumps({ "error": f"Invalid source_filter. Must be one of: {config.VALID_SOURCES}", "results": [], "success": False }) valid_task_types = list(EmbeddingGemmaPrompts.TASKS.keys()) if task_type not in valid_task_types: return json.dumps({ "error": f"Invalid task_type. Must be one of: {valid_task_types}", "results": [], "success": False }) # Perform search results = search_knowledge_base(query, num_results, source_filter, task_type) return json.dumps(results, indent=2) def get_rs_sources() -> str: """ Get information about available data sources in the RS Studies knowledge base. Returns: JSON string with list of available sources, their statistics, and collection info """ sources_info = get_available_sources() return json.dumps(sources_info, indent=2) def ask_rs_question(question: str, context_size: int = 3) -> str: """ Ask a specific question about RS trading systems and get contextual answers. This is a higher-level tool that searches for relevant information and provides it in a question-answering format with ranked context. Args: question: Your question about RS systems, trading, or related topics context_size: Number of relevant chunks to include in context (1-10, default: 3) Returns: JSON string with the question, relevant context chunks, and analysis """ if not question or not question.strip(): return json.dumps({ "error": "Question cannot be empty", "context": [], "success": False }) context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE)) # Search for relevant information using question task type search_results = search_knowledge_base(question, context_size, task_type="question_answering") if not search_results.get("success", False): return json.dumps(search_results) # Format as Q&A response response = { "question": question, "context_chunks": len(search_results.get("results", [])), "relevant_context": [], "sources_used": set(), "success": True } for i, result in enumerate(search_results.get("results", [])[:context_size]): context_item = { "rank": i + 1, "content": result["content"], "source": f"{result['source_folder']} (chunk {result['chunk_number']})", "relevance_score": f"{result['similarity_score']:.3f}", "chunk_file": result["chunk_file"] } response["relevant_context"].append(context_item) response["sources_used"].add(result["source_folder"]) # Convert set to list for JSON serialization response["sources_used"] = sorted(list(response["sources_used"])) return json.dumps(response, indent=2) def get_collection_info() -> str: """ Get detailed information about the RS Studies knowledge base collection. Returns: JSON string with collection statistics, configuration, and metadata structure """ try: total_count = collection.count() # Get sample of metadata to understand structure sample_results = collection.get(limit=10, include=["metadatas"]) # Analyze metadata structure metadata_keys = set() for metadata in sample_results["metadatas"]: metadata_keys.update(metadata.keys()) info = { "collection_name": config.COLLECTION_NAME, "total_documents": total_count, "model_path": config.MODEL_PATH, "device": device, "metadata_structure": sorted(list(metadata_keys)), "config": { "max_results": config.MAX_NUM_RESULTS, "valid_sources": config.VALID_SOURCES }, "success": True } return json.dumps(info, indent=2) except Exception as e: return json.dumps({"error": f"Failed to get collection info: {str(e)}", "success": False}) def search_by_source(source_name: str, query: str = "", num_results: int = 10) -> str: """ Browse or search within a specific data source. Args: source_name: Name of the source to search within (use get_rs_sources to see available sources) query: Optional search query (if empty, returns recent chunks from the source) num_results: Number of results to return (1-50, default: 10) Returns: JSON string with results from the specified source """ if source_name not in config.VALID_SOURCES: return json.dumps({ "error": f"Invalid source_name. Must be one of: {config.VALID_SOURCES}", "results": [], "success": False }) num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) if query.strip(): # Search within the source results = search_knowledge_base(query, num_results, source_name) else: # Browse the source (get recent chunks) if not ensure_initialized(): return json.dumps({ "error": "Server not properly initialized", "results": [], "success": False }) try: source_results = collection.get( where={"source_folder": {"$eq": source_name}}, limit=num_results, include=["documents", "metadatas"] ) formatted_results = [] for i, (doc, metadata) in enumerate(zip(source_results["documents"], source_results["metadatas"])): result = { "rank": i + 1, "content": doc, "source_folder": metadata.get("source_folder", "unknown"), "chunk_file": metadata.get("chunk_file", "unknown"), "chunk_number": metadata.get("chunk_number", "unknown"), "chunk_length": metadata.get("chunk_length", 0), "metadata": metadata } formatted_results.append(result) results = { "source_name": source_name, "query": query or "(browsing mode)", "num_results": len(formatted_results), "results": formatted_results, "success": True } except Exception as e: results = { "error": f"Failed to browse source: {str(e)}", "results": [], "success": False } return json.dumps(results, indent=2) def verify_fact_rs(statement: str, num_evidence: int = 3) -> str: """ Verify a fact or statement against the RS Studies knowledge base. This tool uses EmbeddingGemma's fact checking optimization to find evidence that supports or contradicts the given statement. Args: statement: The statement or claim to verify num_evidence: Number of evidence chunks to return (1-10, default: 3) Returns: JSON string with evidence chunks ranked by relevance to the fact claim """ if not statement or not statement.strip(): return json.dumps({ "error": "Statement cannot be empty", "evidence": [], "success": False }) num_evidence = max(1, min(num_evidence, config.MAX_CONTEXT_SIZE)) # Search for evidence using fact checking optimization search_results = search_knowledge_base(statement, num_evidence, task_type="fact_checking") if not search_results.get("success", False): return json.dumps(search_results) # Format as fact verification response response = { "statement": statement, "evidence_count": len(search_results.get("results", [])), "evidence": [], "sources_consulted": set(), "success": True } for i, result in enumerate(search_results.get("results", [])): evidence_item = { "rank": i + 1, "content": result["content"], "source": f"{result['source_folder']} (chunk {result['chunk_number']})", "relevance_score": f"{result['similarity_score']:.3f}", "chunk_file": result["chunk_file"] } response["evidence"].append(evidence_item) response["sources_consulted"].add(result["source_folder"]) # Convert set to list for JSON serialization response["sources_consulted"] = sorted(list(response["sources_consulted"])) return json.dumps(response, indent=2) def compare_similarity_rs(text1: str, text2: str, context_size: int = 5) -> str: """ Compare semantic similarity between two concepts in the RS Studies context. This tool finds content related to both concepts and assesses their relationship using EmbeddingGemma's semantic similarity optimization. Args: text1: First concept, topic, or text to compare text2: Second concept, topic, or text to compare context_size: Number of relevant chunks to analyze for each concept (1-10, default: 5) Returns: JSON string with related content for both concepts and similarity analysis """ if not text1 or not text1.strip() or not text2 or not text2.strip(): return json.dumps({ "error": "Both text1 and text2 must be provided", "analysis": {}, "success": False }) context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE)) # Search for content related to each concept using semantic similarity optimization results1 = search_knowledge_base(text1, context_size, task_type="semantic_similarity") results2 = search_knowledge_base(text2, context_size, task_type="semantic_similarity") if not results1.get("success", False) or not results2.get("success", False): return json.dumps({ "error": "Failed to search for one or both concepts", "analysis": {}, "success": False }) # Analyze overlap and differences sources1 = set(r["source_folder"] for r in results1.get("results", [])) sources2 = set(r["source_folder"] for r in results2.get("results", [])) response = { "concept1": text1, "concept2": text2, "concept1_results": len(results1.get("results", [])), "concept2_results": len(results2.get("results", [])), "shared_sources": sorted(list(sources1.intersection(sources2))), "concept1_unique_sources": sorted(list(sources1 - sources2)), "concept2_unique_sources": sorted(list(sources2 - sources1)), "concept1_context": [ { "rank": i + 1, "content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"], "source": f"{r['source_folder']} (chunk {r['chunk_number']})", "relevance": f"{r['similarity_score']:.3f}" } for i, r in enumerate(results1.get("results", [])) ], "concept2_context": [ { "rank": i + 1, "content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"], "source": f"{r['source_folder']} (chunk {r['chunk_number']})", "relevance": f"{r['similarity_score']:.3f}" } for i, r in enumerate(results2.get("results", [])) ], "success": True } return json.dumps(response, indent=2) def classify_content_rs(content: str, categories: List[str] = None) -> str: """ Classify content against RS Studies knowledge categories. Uses EmbeddingGemma's classification optimization to categorize content based on the RS Studies knowledge base. Args: content: Text content to classify categories: Optional list of specific categories to check against (defaults to major RS topics) Returns: JSON string with classification results and supporting evidence """ if not content or not content.strip(): return json.dumps({ "error": "Content cannot be empty", "classification": {}, "success": False }) # Default categories based on RS Studies sources if categories is None: categories = [ "trading systems", "market analysis", "Chennai meetup discussions", "Q&A topics", "technical strategies" ] # Search for similar content using classification optimization search_results = search_knowledge_base(content, 8, task_type="classification") if not search_results.get("success", False): return json.dumps(search_results) # Analyze which categories the content best fits source_distribution = {} for result in search_results.get("results", []): source = result["source_folder"] if source not in source_distribution: source_distribution[source] = [] source_distribution[source].append({ "content": result["content"][:150] + "..." if len(result["content"]) > 150 else result["content"], "similarity": result["similarity_score"] }) response = { "content": content[:200] + "..." if len(content) > 200 else content, "available_categories": categories, "source_distribution": source_distribution, "top_matches": [ { "rank": i + 1, "content": r["content"][:150] + "..." if len(r["content"]) > 150 else r["content"], "source_category": r["source_folder"], "similarity_score": f"{r['similarity_score']:.3f}" } for i, r in enumerate(search_results.get("results", [])[:5]) ], "success": True } return json.dumps(response, indent=2) # ================================================== # QnA-ENHANCED EMBEDDING TOOLS # ================================================== def search_by_embedding_type( query: str, embedding_type: str = "content", num_results: int = 5, source_filter: Optional[str] = None ) -> str: """ Search the knowledge base using specific embedding types for optimized retrieval. This tool leverages the QnA-enhanced embeddings to provide targeted search based on different content representations of the same chunks. Args: query: Your search question or topic (required) embedding_type: Type of embedding to search: - 'content': Original chunk content (default) - 'enhanced_content': Content enhanced with QnA context - 'questions': Questions-only embeddings for question matching - 'answers': Answers-only embeddings for factual retrieval num_results: Number of results to return (1-50, default: 5) source_filter: Limit to specific source folder (optional) Returns: JSON string with search results optimized for the specified embedding type """ # Validate parameters if not query or not query.strip(): return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) valid_embedding_types = ["content", "enhanced_content", "questions", "answers"] if embedding_type not in valid_embedding_types: return json.dumps({ "error": f"Invalid embedding_type. Must be one of: {valid_embedding_types}", "results": [], "success": False }) num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) try: # Format query appropriately based on embedding type if embedding_type == "questions": formatted_query = EmbeddingGemmaPrompts.encode_query(query, "question_answering") elif embedding_type == "answers": formatted_query = EmbeddingGemmaPrompts.encode_query(query, "fact_checking") else: formatted_query = EmbeddingGemmaPrompts.encode_query(query, "search") # Create query embedding query_embedding = model.encode([formatted_query], device=device) # Build where clause to filter by embedding type where_clause = {"embedding_type": embedding_type} if source_filter: where_clause["source_folder"] = source_filter # Query ChromaDB search_results = collection.query( query_embeddings=query_embedding.tolist(), n_results=num_results, where=where_clause ) # Format results results = [] for i, (doc, metadata, distance) in enumerate(zip( search_results['documents'][0], search_results['metadatas'][0], search_results['distances'][0] )): results.append({ "rank": i + 1, "content": doc, "similarity_score": 1 - distance, "embedding_type": metadata.get("embedding_type", "unknown"), "enhanced": metadata.get("enhanced", False), "qna_count": metadata.get("qna_count", 0), "source_folder": metadata.get("source_folder", "unknown"), "chunk_number": metadata.get("chunk_number", "unknown"), "chunk_file": metadata.get("chunk_file", "unknown") }) return json.dumps({ "query": query, "embedding_type": embedding_type, "results_found": len(results), "source_filter": source_filter, "results": results, "success": True }, indent=2) except Exception as e: return json.dumps({ "error": f"Search failed: {str(e)}", "query": query, "embedding_type": embedding_type, "results": [], "success": False }) def smart_multi_search( query: str, num_results_per_type: int = 3, source_filter: Optional[str] = None, combine_strategy: str = "best_of_each" ) -> str: """ Perform intelligent multi-type search across different embedding types. This tool searches across multiple embedding types and combines results to provide comprehensive coverage of relevant information. Args: query: Your search question or topic (required) num_results_per_type: Results per embedding type (1-10, default: 3) source_filter: Limit to specific source folder (optional) combine_strategy: How to combine results: - 'best_of_each': Top results from each type - 'relevance_ranked': All results ranked by similarity - 'type_weighted': Weighted by embedding type appropriateness Returns: JSON string with combined search results and analysis """ if not query or not query.strip(): return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) num_results_per_type = max(1, min(num_results_per_type, 10)) try: all_results = {} embedding_types = ["content", "enhanced_content", "questions", "answers"] # Search each embedding type for emb_type in embedding_types: search_result = search_by_embedding_type( query, emb_type, num_results_per_type, source_filter ) result_data = json.loads(search_result) if result_data.get("success", False): all_results[emb_type] = result_data["results"] else: all_results[emb_type] = [] # Combine results based on strategy combined_results = [] if combine_strategy == "best_of_each": # Take top result from each type for emb_type, results in all_results.items(): for result in results: result["search_type"] = emb_type combined_results.append(result) elif combine_strategy == "relevance_ranked": # Combine all and sort by similarity for emb_type, results in all_results.items(): for result in results: result["search_type"] = emb_type combined_results.append(result) combined_results.sort(key=lambda x: x["similarity_score"], reverse=True) elif combine_strategy == "type_weighted": # Apply weights based on query type analysis query_lower = query.lower() # Simple heuristics for weighting weights = { "content": 1.0, "enhanced_content": 1.2, # Slightly favor enhanced "questions": 1.5 if any(word in query_lower for word in ["what", "how", "why", "when", "where", "?"]) else 0.8, "answers": 1.3 if any(word in query_lower for word in ["define", "explain", "meaning", "is"]) else 0.9 } for emb_type, results in all_results.items(): for result in results: result["search_type"] = emb_type result["weighted_score"] = result["similarity_score"] * weights[emb_type] combined_results.append(result) combined_results.sort(key=lambda x: x["weighted_score"], reverse=True) # Deduplicate by chunk (keep best scoring version) seen_chunks = {} final_results = [] for result in combined_results: chunk_key = f"{result['source_folder']}_chunk_{result['chunk_number']}" if chunk_key not in seen_chunks or result["similarity_score"] > seen_chunks[chunk_key]["similarity_score"]: seen_chunks[chunk_key] = result final_results = list(seen_chunks.values()) final_results.sort(key=lambda x: x.get("weighted_score", x["similarity_score"]), reverse=True) # Add ranking for i, result in enumerate(final_results): result["final_rank"] = i + 1 return json.dumps({ "query": query, "combine_strategy": combine_strategy, "total_results": len(final_results), "embedding_types_searched": embedding_types, "results_per_type": {emb_type: len(results) for emb_type, results in all_results.items()}, "source_filter": source_filter, "results": final_results[:num_results_per_type * 2], # Limit final output "success": True }, indent=2) except Exception as e: return json.dumps({ "error": f"Multi-search failed: {str(e)}", "query": query, "results": [], "success": False }) def analyze_embedding_coverage(source_filter: Optional[str] = None) -> str: """ Analyze the distribution and coverage of different embedding types in the knowledge base. Args: source_filter: Limit analysis to specific source folder (optional) Returns: JSON string with embedding type statistics and coverage analysis """ try: # Build where clause where_clause = {} if source_filter: where_clause["source_folder"] = source_filter # Get all documents with metadata if where_clause: all_docs = collection.get(where=where_clause) else: all_docs = collection.get() # Analyze embedding types type_counts = {} enhanced_counts = {"enhanced": 0, "original": 0} source_breakdown = {} qna_stats = {"with_qna": 0, "without_qna": 0} for metadata in all_docs['metadatas']: emb_type = metadata.get('embedding_type', 'unknown') type_counts[emb_type] = type_counts.get(emb_type, 0) + 1 # Enhanced vs original if metadata.get('enhanced', False): enhanced_counts["enhanced"] += 1 else: enhanced_counts["original"] += 1 # Source breakdown source = metadata.get('source_folder', 'unknown') if source not in source_breakdown: source_breakdown[source] = {} source_breakdown[source][emb_type] = source_breakdown[source].get(emb_type, 0) + 1 # QnA statistics if metadata.get('qna_count', 0) > 0: qna_stats["with_qna"] += 1 else: qna_stats["without_qna"] += 1 total_embeddings = len(all_docs['metadatas']) return json.dumps({ "total_embeddings": total_embeddings, "source_filter": source_filter, "embedding_type_distribution": type_counts, "enhancement_status": enhanced_counts, "qna_coverage": qna_stats, "source_breakdown": source_breakdown, "coverage_percentage": { emb_type: round((count / total_embeddings) * 100, 2) for emb_type, count in type_counts.items() }, "success": True }, indent=2) except Exception as e: return json.dumps({ "error": f"Analysis failed: {str(e)}", "analysis": {}, "success": False }) def find_related_questions( topic: str, num_questions: int = 5, source_filter: Optional[str] = None ) -> str: """ Find questions related to a specific topic using the questions-only embeddings. This tool is optimized for discovering what questions are available about a topic, useful for exploration and understanding coverage. Args: topic: Topic or concept to find questions about (required) num_questions: Number of related questions to return (1-20, default: 5) source_filter: Limit to specific source folder (optional) Returns: JSON string with related questions and their context """ if not topic or not topic.strip(): return json.dumps({"error": "Topic cannot be empty", "questions": [], "success": False}) num_questions = max(1, min(num_questions, 20)) try: # Search using questions-only embeddings question_search = search_by_embedding_type( topic, "questions", num_questions, source_filter ) search_data = json.loads(question_search) if not search_data.get("success", False): return json.dumps({ "error": "Failed to search questions", "topic": topic, "questions": [], "success": False }) # Extract questions and add context questions = [] for result in search_data["results"]: # Parse the questions from the content (format: "Q1 | Q2 | Q3") question_list = [q.strip() for q in result["content"].split("|")] for question in question_list: if question: # Skip empty questions questions.append({ "question": question, "relevance_score": result["similarity_score"], "source": f"{result['source_folder']} (chunk {result['chunk_number']})", "chunk_file": result["chunk_file"], "qna_count": result.get("qna_count", 0) }) # Sort by relevance and limit questions.sort(key=lambda x: x["relevance_score"], reverse=True) questions = questions[:num_questions] # Add ranking for i, q in enumerate(questions): q["rank"] = i + 1 return json.dumps({ "topic": topic, "total_questions_found": len(questions), "source_filter": source_filter, "questions": questions, "success": True }, indent=2) except Exception as e: return json.dumps({ "error": f"Question search failed: {str(e)}", "topic": topic, "questions": [], "success": False }) with gr.Blocks() as demo: gr.Markdown( """ This is a MCP only tool for RS Studies This connects to a remote chromadb instance. This tool is MCP-only, so it does not have a UI. """ ) gr.api( search_rs_studies ) gr.api( get_rs_sources ) gr.api( ask_rs_question ) gr.api( search_by_source ) gr.api( verify_fact_rs ) gr.api( compare_similarity_rs ) gr.api( classify_content_rs ) gr.api( search_by_embedding_type ) gr.api( smart_multi_search ) gr.api( analyze_embedding_coverage ) gr.api( find_related_questions ) _, url, _ = demo.launch(mcp_server=True)