Spaces:
Runtime error
Runtime error
| from typing import Dict, List, Any | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from duckduckgo_search import ddg | |
| from transformers import pipeline | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| import time | |
| import json | |
| import os | |
| from urllib.parse import urlparse | |
| import asyncio | |
| class ModelManager: | |
| """Manages AI models for text processing""" | |
| def __init__(self): | |
| # Initialize with smaller, CPU-friendly models | |
| self.summarizer = pipeline( | |
| "summarization", | |
| model="facebook/bart-base", | |
| device=-1 # Use CPU | |
| ) | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| def generate_summary(self, text: str, max_length: int = 150) -> str: | |
| """Generate a concise summary of the text""" | |
| if not text or len(text.split()) < 50: | |
| return text | |
| try: | |
| summary = self.summarizer( | |
| text, | |
| max_length=max_length, | |
| min_length=30, | |
| do_sample=False | |
| )[0]['summary_text'] | |
| return summary | |
| except Exception as e: | |
| print(f"Error in summarization: {e}") | |
| return text[:500] + "..." | |
| class ContentProcessor: | |
| """Processes and analyzes different types of content""" | |
| def __init__(self): | |
| self.model_manager = ModelManager() | |
| def process_content(self, content: str) -> Dict[str, Any]: | |
| """Process content and generate insights""" | |
| if not content: | |
| return {"summary": "", "insights": []} | |
| try: | |
| summary = self.model_manager.generate_summary(content) | |
| return { | |
| "summary": summary, | |
| "insights": [] # Simplified for CPU deployment | |
| } | |
| except Exception as e: | |
| print(f"Error processing content: {e}") | |
| return {"summary": content[:500] + "...", "insights": []} | |
| class OSINTEngine: | |
| """Main OSINT engine class""" | |
| def __init__(self): | |
| pass | |
| async def search_username(self, query: str) -> Dict[str, Any]: | |
| """Search for usernames""" | |
| # Implement username search logic here | |
| pass | |
| async def search_image(self, query: str) -> Dict[str, Any]: | |
| """Search for images""" | |
| # Implement image search logic here | |
| pass | |
| async def search_social_media(self, query: str, platform: str) -> Dict[str, Any]: | |
| """Search for social media profiles""" | |
| # Implement social media search logic here | |
| pass | |
| async def gather_personal_info(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: | |
| """Gather personal information""" | |
| # Implement personal info gathering logic here | |
| pass | |
| async def search_historical_data(self, query: str) -> Dict[str, Any]: | |
| """Search for historical data""" | |
| # Implement historical data search logic here | |
| pass | |
| class WebSearchEngine: | |
| """Main search engine class""" | |
| def __init__(self): | |
| self.processor = ContentProcessor() | |
| self.session = requests.Session() | |
| self.request_delay = 1.0 | |
| self.last_request_time = 0 | |
| self.osint_engine = OSINTEngine() # Add OSINT engine | |
| def is_valid_url(self, url: str) -> bool: | |
| """Check if URL is valid for crawling""" | |
| try: | |
| parsed = urlparse(url) | |
| return bool(parsed.netloc and parsed.scheme in ['http', 'https']) | |
| except: | |
| return False | |
| def get_metadata(self, soup: BeautifulSoup) -> Dict[str, str]: | |
| """Extract metadata from page""" | |
| metadata = {} | |
| # Get title | |
| title = soup.find('title') | |
| if title: | |
| metadata['title'] = title.text.strip() | |
| # Get meta description | |
| desc = soup.find('meta', attrs={'name': 'description'}) | |
| if desc: | |
| metadata['description'] = desc.get('content', '') | |
| # Get publication date | |
| date = soup.find('meta', attrs={'property': 'article:published_time'}) | |
| if date: | |
| metadata['published_date'] = date.get('content', '').split('T')[0] | |
| return metadata | |
| def process_url(self, url: str) -> Dict[str, Any]: | |
| """Process a single URL""" | |
| if not self.is_valid_url(url): | |
| return None | |
| try: | |
| # Rate limiting | |
| current_time = time.time() | |
| if current_time - self.last_request_time < self.request_delay: | |
| time.sleep(self.request_delay) | |
| response = self.session.get(url, timeout=10) | |
| self.last_request_time = time.time() | |
| if response.status_code != 200: | |
| return None | |
| soup = BeautifulSoup(response.text, 'lxml') | |
| metadata = self.get_metadata(soup) | |
| # Extract main content (simplified) | |
| content = ' '.join([p.text for p in soup.find_all('p')]) | |
| processed = self.processor.process_content(content) | |
| return { | |
| 'url': url, | |
| 'title': metadata.get('title', url), | |
| 'summary': processed['summary'], | |
| 'published_date': metadata.get('published_date', '') | |
| } | |
| except Exception as e: | |
| print(f"Error processing URL {url}: {e}") | |
| return None | |
| def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| """Perform search and process results""" | |
| try: | |
| # Perform DuckDuckGo search | |
| search_results = ddg(query, max_results=max_results) | |
| results = [] | |
| for result in search_results: | |
| processed = self.process_url(result['link']) | |
| if processed: | |
| results.append(processed) | |
| return results[:max_results] | |
| except Exception as e: | |
| print(f"Error in search: {e}") | |
| return [] | |
| async def advanced_search(self, query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: | |
| """Perform advanced search based on type""" | |
| results = {} | |
| try: | |
| if search_type == "web": | |
| results["web"] = self.search(query, kwargs.get("max_results", 5)) | |
| elif search_type == "username": | |
| results["osint"] = await self.osint_engine.search_username(query) | |
| elif search_type == "image": | |
| results["image"] = await self.osint_engine.search_image(query) | |
| elif search_type == "social": | |
| results["social"] = await self.osint_engine.search_social_media( | |
| query, | |
| kwargs.get("platform") | |
| ) | |
| elif search_type == "personal": | |
| results["personal"] = await self.osint_engine.gather_personal_info(kwargs) | |
| elif search_type == "historical": | |
| results["historical"] = await self.osint_engine.search_historical_data(query) | |
| except Exception as e: | |
| results["error"] = str(e) | |
| return results | |
| # Main search function | |
| def search(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| """Main search function""" | |
| engine = WebSearchEngine() | |
| return engine.search(query, max_results) | |
| # Main advanced search function | |
| async def advanced_search(query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: | |
| """Main advanced search function""" | |
| engine = WebSearchEngine() | |
| return await engine.advanced_search(query, search_type, **kwargs) | |