Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import requests | |
| import httpx | |
| from PIL import Image | |
| from io import BytesIO | |
| from typing import Dict, List, Any, Union | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from geopy.geocoders import Nominatim | |
| from waybackpy import WaybackMachineCDXServerAPI | |
| import whois | |
| from datetime import datetime | |
| from googlesearch import search as google_search | |
| class OSINTEngine: | |
| """OSINT capabilities for advanced information gathering""" | |
| def __init__(self): | |
| self.chrome_options = Options() | |
| self.chrome_options.add_argument('--headless') | |
| self.chrome_options.add_argument('--no-sandbox') | |
| self.chrome_options.add_argument('--disable-dev-shm-usage') | |
| self.setup_apis() | |
| def setup_apis(self): | |
| """Initialize API clients""" | |
| self.geolocator = Nominatim(user_agent="intelligent_search") | |
| self.http_client = httpx.AsyncClient() | |
| async def search_username(self, username: str) -> Dict[str, Any]: | |
| """Search for username across multiple platforms""" | |
| results = { | |
| 'platforms': [], | |
| 'social_media': {}, | |
| 'websites': [] | |
| } | |
| # Common social media platforms | |
| platforms = [ | |
| {'name': 'GitHub', 'url': f'https://github.com/{username}'}, | |
| {'name': 'Twitter', 'url': f'https://twitter.com/{username}'}, | |
| {'name': 'Instagram', 'url': f'https://instagram.com/{username}'}, | |
| {'name': 'LinkedIn', 'url': f'https://linkedin.com/in/{username}'}, | |
| {'name': 'Facebook', 'url': f'https://facebook.com/{username}'}, | |
| {'name': 'YouTube', 'url': f'https://youtube.com/@{username}'}, | |
| ] | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for platform in platforms: | |
| task = self.check_profile(session, platform['url'], platform['name']) | |
| tasks.append(task) | |
| platform_results = await asyncio.gather(*tasks) | |
| results['platforms'] = [r for r in platform_results if r is not None] | |
| # Google search for additional mentions | |
| try: | |
| search_query = f'"{username}" OR "@{username}" -site:twitter.com -site:facebook.com -site:instagram.com' | |
| web_results = list(google_search(search_query, num_results=5)) | |
| results['websites'] = web_results | |
| except Exception as e: | |
| results['websites'] = [str(e)] | |
| return results | |
| async def check_profile(self, session, url: str, platform: str) -> Dict[str, str]: | |
| """Check if a profile exists on a platform""" | |
| try: | |
| async with session.get(url) as response: | |
| if response.status == 200: | |
| return { | |
| 'platform': platform, | |
| 'url': url, | |
| 'exists': True | |
| } | |
| except: | |
| pass | |
| return None | |
| async def search_image(self, image_url: str) -> Dict[str, Any]: | |
| """Image analysis and reverse search""" | |
| results = { | |
| 'analysis': {}, | |
| 'similar_images': [], | |
| 'error': None | |
| } | |
| try: | |
| # Download and analyze image | |
| response = requests.get(image_url) | |
| img = Image.open(BytesIO(response.content)) | |
| # Basic image analysis | |
| results['analysis'] = { | |
| 'format': img.format, | |
| 'size': img.size, | |
| 'mode': img.mode | |
| } | |
| # Perform reverse image search using Google Lens | |
| search_url = f"https://lens.google.com/uploadbyurl?url={image_url}" | |
| results['similar_images'].append({ | |
| 'source': 'Google Lens', | |
| 'url': search_url | |
| }) | |
| except Exception as e: | |
| results['error'] = str(e) | |
| return results | |
| async def gather_personal_info(self, data: Dict[str, str]) -> Dict[str, Any]: | |
| """Gather personal information from various sources""" | |
| results = {} | |
| if 'location' in data: | |
| results['location'] = await self.analyze_location(data['location']) | |
| if 'domain' in data: | |
| results['domain'] = self.analyze_domain(data['domain']) | |
| return results | |
| async def analyze_location(self, location: str) -> Dict[str, Any]: | |
| """Analyze location information""" | |
| try: | |
| location_data = self.geolocator.geocode(location) | |
| if location_data: | |
| return { | |
| 'address': location_data.address, | |
| 'latitude': location_data.latitude, | |
| 'longitude': location_data.longitude, | |
| 'raw': location_data.raw | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| return None | |
| def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
| """Analyze domain information""" | |
| try: | |
| domain_info = whois.whois(domain) | |
| return { | |
| 'registrar': domain_info.registrar, | |
| 'creation_date': domain_info.creation_date, | |
| 'expiration_date': domain_info.expiration_date, | |
| 'last_updated': domain_info.updated_date, | |
| 'status': domain_info.status | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| async def search_historical_data(self, url: str) -> List[Dict[str, Any]]: | |
| """Search for historical data using Wayback Machine""" | |
| results = [] | |
| try: | |
| user_agent = "Mozilla/5.0" | |
| cdx = WaybackMachineCDXServerAPI(url, user_agent) | |
| for snapshot in cdx.snapshots(): | |
| results.append({ | |
| 'timestamp': snapshot.timestamp, | |
| 'url': snapshot.archive_url, | |
| 'status': snapshot.status_code, | |
| 'mime_type': snapshot.mime_type | |
| }) | |
| except Exception as e: | |
| results.append({'error': str(e)}) | |
| return results | |
| # Helper function to create document from gathered information | |
| def create_report(data: Dict[str, Any], template: str = "default") -> str: | |
| """Create a formatted report from gathered information""" | |
| if template == "default": | |
| report = "# OSINT Investigation Report\n\n" | |
| report += f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
| for section, content in data.items(): | |
| report += f"## {section.title()}\n" | |
| if isinstance(content, dict): | |
| for key, value in content.items(): | |
| report += f"* {key}: {value}\n" | |
| elif isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict): | |
| for k, v in item.items(): | |
| report += f"* {k}: {v}\n" | |
| else: | |
| report += f"* {item}\n" | |
| else: | |
| report += f"{content}\n" | |
| report += "\n" | |
| return report | |
| else: | |
| raise ValueError(f"Template '{template}' not found") | |