Spaces:
Runtime error
Runtime error
| """ | |
| Advanced RAG-based search engine with multi-source intelligence. | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| import asyncio | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.docstore.document import Document | |
| from duckduckgo_search import DDGS | |
| from googlesearch import search as gsearch | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| import hashlib | |
| from urllib.parse import urlparse | |
| import re | |
| class SearchEngine: | |
| def __init__(self): | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-mpnet-base-v2" | |
| ) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=50 | |
| ) | |
| self.cache = {} | |
| self.cache_ttl = timedelta(hours=24) | |
| self.search_delay = 2 # seconds between searches | |
| self.last_search_time = datetime.min | |
| def _get_cache_key(self, query: str, **kwargs) -> str: | |
| """Generate cache key from query and kwargs.""" | |
| cache_data = { | |
| "query": query, | |
| **kwargs | |
| } | |
| return hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest() | |
| def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]: | |
| """Get result from cache if valid.""" | |
| if cache_key in self.cache: | |
| result, timestamp = self.cache[cache_key] | |
| if datetime.now() - timestamp < self.cache_ttl: | |
| return result | |
| del self.cache[cache_key] | |
| return None | |
| def _set_cached_result(self, cache_key: str, result: Dict[str, Any]): | |
| """Store result in cache.""" | |
| self.cache[cache_key] = (result, datetime.now()) | |
| async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]: | |
| """Perform web search using multiple search engines.""" | |
| results = [] | |
| # Respect rate limiting | |
| time_since_last = datetime.now() - self.last_search_time | |
| if time_since_last.total_seconds() < self.search_delay: | |
| await asyncio.sleep(self.search_delay - time_since_last.total_seconds()) | |
| # DuckDuckGo Search | |
| try: | |
| with DDGS() as ddgs: | |
| ddg_results = [r for r in ddgs.text(query, max_results=max_results)] | |
| results.extend(ddg_results) | |
| except Exception as e: | |
| print(f"DuckDuckGo search error: {e}") | |
| # Google Search | |
| try: | |
| google_results = gsearch(query, num_results=max_results) | |
| results.extend([{"link": url, "title": url} for url in google_results]) | |
| except Exception as e: | |
| print(f"Google search error: {e}") | |
| self.last_search_time = datetime.now() | |
| return results[:max_results] | |
| def _clean_html(self, html: str) -> str: | |
| """Clean HTML content.""" | |
| # Remove script and style elements | |
| html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL) | |
| html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL) | |
| # Remove comments | |
| html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL) | |
| # Remove remaining tags | |
| html = re.sub(r'<[^>]+>', ' ', html) | |
| # Clean whitespace | |
| html = re.sub(r'\s+', ' ', html).strip() | |
| return html | |
| async def fetch_content(self, url: str) -> Optional[str]: | |
| """Fetch and extract content from a webpage.""" | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| # Extract main content | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Remove unwanted elements | |
| for element in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
| element.decompose() | |
| # Try to find main content | |
| main_content = None | |
| # Look for article tag | |
| if soup.find("article"): | |
| main_content = soup.find("article") | |
| # Look for main tag | |
| elif soup.find("main"): | |
| main_content = soup.find("main") | |
| # Look for div with common content class names | |
| elif soup.find("div", class_=re.compile(r"content|article|post|entry")): | |
| main_content = soup.find("div", class_=re.compile(r"content|article|post|entry")) | |
| # Use body if no main content found | |
| if not main_content: | |
| main_content = soup.body | |
| # Extract text | |
| if main_content: | |
| text = self._clean_html(str(main_content)) | |
| else: | |
| text = self._clean_html(response.text) | |
| return text | |
| except Exception as e: | |
| print(f"Error fetching {url}: {e}") | |
| return None | |
| def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: | |
| """Extract metadata from webpage.""" | |
| metadata = { | |
| "url": url, | |
| "domain": urlparse(url).netloc, | |
| "title": None, | |
| "description": None, | |
| "published_date": None, | |
| "author": None, | |
| "keywords": None | |
| } | |
| # Extract title | |
| if soup.title: | |
| metadata["title"] = soup.title.string | |
| # Extract meta tags | |
| for meta in soup.find_all("meta"): | |
| name = meta.get("name", "").lower() | |
| property = meta.get("property", "").lower() | |
| content = meta.get("content") | |
| if name == "description" or property == "og:description": | |
| metadata["description"] = content | |
| elif name == "author": | |
| metadata["author"] = content | |
| elif name == "keywords": | |
| metadata["keywords"] = content | |
| elif name in ["published_time", "article:published_time"]: | |
| metadata["published_date"] = content | |
| return metadata | |
| async def process_search_results(self, query: str) -> Dict[str, Any]: | |
| """Process search results and create a RAG-based answer.""" | |
| cache_key = self._get_cache_key(query) | |
| cached_result = self._get_cached_result(cache_key) | |
| if cached_result: | |
| return cached_result | |
| # Perform web search | |
| search_results = await self.search_web(query) | |
| # Fetch content from search results | |
| documents = [] | |
| metadata_list = [] | |
| for result in search_results: | |
| url = result.get("link") | |
| if not url: | |
| continue | |
| content = await self.fetch_content(url) | |
| if content: | |
| # Split content into chunks | |
| chunks = self.text_splitter.split_text(content) | |
| # Store metadata | |
| metadata = { | |
| "source": url, | |
| "title": result.get("title", url), | |
| **result | |
| } | |
| metadata_list.append(metadata) | |
| # Create documents | |
| for chunk in chunks: | |
| doc = Document( | |
| page_content=chunk, | |
| metadata=metadata | |
| ) | |
| documents.append(doc) | |
| if not documents: | |
| return { | |
| "answer": "I couldn't find any relevant information.", | |
| "sources": [], | |
| "metadata": [] | |
| } | |
| # Create vector store | |
| vectorstore = FAISS.from_documents(documents, self.embeddings) | |
| # Create retrieval chain | |
| chain = RetrievalQAWithSourcesChain.from_chain_type( | |
| llm=None, # We'll implement custom answer synthesis | |
| retriever=vectorstore.as_retriever() | |
| ) | |
| # Get relevant documents | |
| relevant_docs = chain.retriever.get_relevant_documents(query) | |
| # Extract unique sources and content | |
| sources = [] | |
| content = [] | |
| used_metadata = [] | |
| for doc in relevant_docs[:5]: # Limit to top 5 most relevant docs | |
| source = doc.metadata["source"] | |
| if source not in sources: | |
| sources.append(source) | |
| content.append(doc.page_content) | |
| # Find corresponding metadata | |
| for meta in metadata_list: | |
| if meta["source"] == source: | |
| used_metadata.append(meta) | |
| break | |
| result = { | |
| "answer": "\n\n".join(content), | |
| "sources": sources, | |
| "metadata": used_metadata | |
| } | |
| # Cache the result | |
| self._set_cached_result(cache_key, result) | |
| return result | |
| async def search(self, query: str) -> Dict[str, Any]: | |
| """Main search interface.""" | |
| try: | |
| return await self.process_search_results(query) | |
| except Exception as e: | |
| return { | |
| "answer": f"An error occurred: {str(e)}", | |
| "sources": [], | |
| "metadata": [] | |
| } | |