Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import requests | |
| import httpx | |
| from PIL import Image | |
| from io import BytesIO | |
| from typing import Dict, List, Any, Union, Optional | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from geopy.geocoders import Nominatim | |
| from waybackpy import WaybackMachineCDXServerAPI | |
| import whois | |
| from datetime import datetime | |
| from googlesearch import search as google_search | |
| import base64 | |
| import io | |
| class OSINTEngine: | |
| """OSINT capabilities for advanced information gathering""" | |
| def __init__(self): | |
| self.chrome_options = Options() | |
| self.chrome_options.add_argument('--headless') | |
| self.chrome_options.add_argument('--no-sandbox') | |
| self.chrome_options.add_argument('--disable-dev-shm-usage') | |
| self.setup_apis() | |
| self.session = None | |
| self.platforms = { | |
| "twitter": "https://twitter.com/{}", | |
| "instagram": "https://instagram.com/{}", | |
| "facebook": "https://facebook.com/{}", | |
| "linkedin": "https://linkedin.com/in/{}", | |
| "github": "https://github.com/{}", | |
| "reddit": "https://reddit.com/user/{}", | |
| "youtube": "https://youtube.com/@{}", | |
| "tiktok": "https://tiktok.com/@{}", | |
| "pinterest": "https://pinterest.com/{}", | |
| "snapchat": "https://snapchat.com/add/{}", | |
| "twitch": "https://twitch.tv/{}", | |
| "medium": "https://medium.com/@{}", | |
| "devto": "https://dev.to/{}", | |
| "stackoverflow": "https://stackoverflow.com/users/{}" | |
| } | |
| def setup_apis(self): | |
| """Initialize API clients""" | |
| self.geolocator = Nominatim(user_agent="intelligent_search") | |
| self.http_client = httpx.AsyncClient() | |
| async def initialize(self): | |
| if not self.session: | |
| self.session = aiohttp.ClientSession() | |
| async def close(self): | |
| if self.session: | |
| await self.session.close() | |
| self.session = None | |
| async def search_username(self, username: str) -> Dict[str, Any]: | |
| """Search for username across multiple platforms""" | |
| results = { | |
| 'platforms': [], | |
| 'social_media': {}, | |
| 'websites': [] | |
| } | |
| # Common social media platforms | |
| platforms = [ | |
| {'name': 'GitHub', 'url': f'https://github.com/{username}'}, | |
| {'name': 'Twitter', 'url': f'https://twitter.com/{username}'}, | |
| {'name': 'Instagram', 'url': f'https://instagram.com/{username}'}, | |
| {'name': 'LinkedIn', 'url': f'https://linkedin.com/in/{username}'}, | |
| {'name': 'Facebook', 'url': f'https://facebook.com/{username}'}, | |
| {'name': 'YouTube', 'url': f'https://youtube.com/@{username}'}, | |
| ] | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for platform in platforms: | |
| task = self.check_profile(session, platform['url'], platform['name']) | |
| tasks.append(task) | |
| platform_results = await asyncio.gather(*tasks) | |
| results['platforms'] = [r for r in platform_results if r is not None] | |
| # Google search for additional mentions | |
| try: | |
| search_query = f'"{username}" OR "@{username}" -site:twitter.com -site:facebook.com -site:instagram.com' | |
| web_results = list(google_search(search_query, num_results=5)) | |
| results['websites'] = web_results | |
| except Exception as e: | |
| results['websites'] = [str(e)] | |
| return results | |
| async def check_profile(self, session, url: str, platform: str) -> Dict[str, str]: | |
| """Check if a profile exists on a platform""" | |
| try: | |
| async with session.get(url) as response: | |
| if response.status == 200: | |
| return { | |
| 'platform': platform, | |
| 'url': url, | |
| 'exists': True | |
| } | |
| except: | |
| pass | |
| return None | |
| async def check_username(self, username: str, platform: str = "all") -> List[Dict]: | |
| await self.initialize() | |
| results = [] | |
| platforms_to_check = [platform] if platform != "all" else self.platforms.keys() | |
| for platform_name in platforms_to_check: | |
| if platform_name in self.platforms: | |
| url = self.platforms[platform_name].format(username) | |
| try: | |
| async with self.session.get(url) as response: | |
| exists = response.status == 200 | |
| results.append({ | |
| "platform": platform_name, | |
| "url": url, | |
| "exists": exists | |
| }) | |
| except: | |
| results.append({ | |
| "platform": platform_name, | |
| "url": url, | |
| "exists": False, | |
| "error": "Connection failed" | |
| }) | |
| return results | |
| async def search_image(self, image_url: str) -> Dict[str, Any]: | |
| """Image analysis and reverse search""" | |
| results = { | |
| 'analysis': {}, | |
| 'similar_images': [], | |
| 'error': None | |
| } | |
| try: | |
| # Download and analyze image | |
| response = requests.get(image_url) | |
| img = Image.open(BytesIO(response.content)) | |
| # Basic image analysis | |
| results['analysis'] = { | |
| 'format': img.format, | |
| 'size': img.size, | |
| 'mode': img.mode | |
| } | |
| # Perform reverse image search using Google Lens | |
| search_url = f"https://lens.google.com/uploadbyurl?url={image_url}" | |
| results['similar_images'].append({ | |
| 'source': 'Google Lens', | |
| 'url': search_url | |
| }) | |
| except Exception as e: | |
| results['error'] = str(e) | |
| return results | |
| async def gather_personal_info(self, data: Dict[str, str]) -> Dict[str, Any]: | |
| """Gather personal information from various sources""" | |
| results = {} | |
| if 'location' in data: | |
| results['location'] = await self.analyze_location(data['location']) | |
| if 'domain' in data: | |
| results['domain'] = self.analyze_domain(data['domain']) | |
| return results | |
| async def analyze_location(self, location: str) -> Dict[str, Any]: | |
| """Analyze location information""" | |
| try: | |
| location_data = self.geolocator.geocode(location) | |
| if location_data: | |
| return { | |
| 'address': location_data.address, | |
| 'latitude': location_data.latitude, | |
| 'longitude': location_data.longitude, | |
| 'raw': location_data.raw | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| return None | |
| def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
| """Analyze domain information""" | |
| try: | |
| domain_info = whois.whois(domain) | |
| return { | |
| 'registrar': domain_info.registrar, | |
| 'creation_date': domain_info.creation_date, | |
| 'expiration_date': domain_info.expiration_date, | |
| 'last_updated': domain_info.updated_date, | |
| 'status': domain_info.status | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| async def search_historical_data(self, url: str) -> List[Dict[str, Any]]: | |
| """Search for historical data using Wayback Machine""" | |
| results = [] | |
| try: | |
| user_agent = "Mozilla/5.0" | |
| cdx = WaybackMachineCDXServerAPI(url, user_agent) | |
| for snapshot in cdx.snapshots(): | |
| results.append({ | |
| 'timestamp': snapshot.timestamp, | |
| 'url': snapshot.archive_url, | |
| 'status': snapshot.status_code, | |
| 'mime_type': snapshot.mime_type | |
| }) | |
| except Exception as e: | |
| results.append({'error': str(e)}) | |
| return results | |
| async def search_person(self, name: str, location: Optional[str] = None) -> List[Dict]: | |
| await self.initialize() | |
| results = [] | |
| # Format search query | |
| query = f"{name}" | |
| if location: | |
| query += f" {location}" | |
| # Simulate searching various sources | |
| sources = ["social_media", "news", "public_records", "professional"] | |
| for source in sources: | |
| # Simulate different data sources | |
| if source == "social_media": | |
| profile = { | |
| "name": name, | |
| "location": location, | |
| "source": "Social Media", | |
| "profile_image": "https://example.com/profile.jpg", | |
| "social_links": [ | |
| {"platform": "LinkedIn", "url": f"https://linkedin.com/in/{name.lower().replace(' ', '-')}"}, | |
| {"platform": "Twitter", "url": f"https://twitter.com/{name.lower().replace(' ', '')}"} | |
| ], | |
| "occupation": "Professional", | |
| "last_seen": datetime.now().strftime("%Y-%m-%d") | |
| } | |
| results.append(profile) | |
| elif source == "news": | |
| news = { | |
| "name": name, | |
| "source": "News Articles", | |
| "mentions": [ | |
| { | |
| "title": f"Article about {name}", | |
| "url": "https://example.com/news", | |
| "date": "2023-01-01" | |
| } | |
| ] | |
| } | |
| results.append(news) | |
| elif source == "public_records": | |
| record = { | |
| "name": name, | |
| "source": "Public Records", | |
| "location": location, | |
| "age_range": "25-35", | |
| "possible_relatives": ["Jane Doe", "John Doe Sr."], | |
| "previous_locations": ["New York, NY", "Los Angeles, CA"] | |
| } | |
| results.append(record) | |
| elif source == "professional": | |
| prof = { | |
| "name": name, | |
| "source": "Professional Records", | |
| "education": ["University Example"], | |
| "work_history": ["Company A", "Company B"], | |
| "skills": ["Leadership", "Management"] | |
| } | |
| results.append(prof) | |
| return results | |
| async def get_person_details(self, person_id: str) -> Dict: | |
| """Get detailed information about a specific person""" | |
| await self.initialize() | |
| # Simulate gathering detailed information | |
| details = { | |
| "personal": { | |
| "name": person_id, | |
| "age_range": "25-35", | |
| "locations": ["Current City, Country", "Previous City, Country"], | |
| "education": ["University Name", "High School Name"], | |
| "occupation": "Current Occupation" | |
| }, | |
| "social_media": { | |
| "profiles": [ | |
| { | |
| "platform": "LinkedIn", | |
| "url": f"https://linkedin.com/in/{person_id}", | |
| "last_active": "2023-01-01" | |
| }, | |
| { | |
| "platform": "Twitter", | |
| "url": f"https://twitter.com/{person_id}", | |
| "last_active": "2023-01-01" | |
| } | |
| ] | |
| }, | |
| "contact": { | |
| "email_pattern": "j***@example.com", | |
| "phone_pattern": "+1 (***) ***-**89" | |
| }, | |
| "images": [ | |
| { | |
| "url": "https://example.com/profile1.jpg", | |
| "source": "LinkedIn", | |
| "date": "2023-01-01" | |
| } | |
| ], | |
| "activities": { | |
| "recent_posts": [ | |
| { | |
| "platform": "Twitter", | |
| "content": "Example post content", | |
| "date": "2023-01-01" | |
| } | |
| ], | |
| "mentions": [ | |
| { | |
| "source": "News Article", | |
| "title": "Article Title", | |
| "url": "https://example.com/article", | |
| "date": "2023-01-01" | |
| } | |
| ] | |
| } | |
| } | |
| return details | |
| async def analyze_image(self, image_path: str) -> Dict: | |
| """Analyze an image and return information about it""" | |
| try: | |
| # Open and analyze the image | |
| img = Image.open(image_path if os.path.exists(image_path) else io.BytesIO(requests.get(image_path).content)) | |
| analysis = { | |
| "format": img.format, | |
| "size": f"{img.size[0]}x{img.size[1]}", | |
| "mode": img.mode, | |
| "metadata": {}, | |
| } | |
| # Extract EXIF data if available | |
| if hasattr(img, '_getexif') and img._getexif(): | |
| exif = img._getexif() | |
| if exif: | |
| analysis["metadata"] = { | |
| "datetime": exif.get(306, "Unknown"), | |
| "make": exif.get(271, "Unknown"), | |
| "model": exif.get(272, "Unknown"), | |
| "software": exif.get(305, "Unknown") | |
| } | |
| return analysis | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def find_similar_images(self, image_url: str) -> List[Dict]: | |
| """Find similar images""" | |
| # Simulate finding similar images | |
| return [ | |
| { | |
| "url": "https://example.com/similar1.jpg", | |
| "similarity": 0.95, | |
| "source": "Website A" | |
| }, | |
| { | |
| "url": "https://example.com/similar2.jpg", | |
| "similarity": 0.85, | |
| "source": "Website B" | |
| } | |
| ] | |
| async def get_location_info(self, location: str) -> Dict: | |
| """Get information about a location""" | |
| # Simulate location information retrieval | |
| return { | |
| "name": location, | |
| "coordinates": {"lat": 40.7128, "lng": -74.0060}, | |
| "country": "United States", | |
| "timezone": "America/New_York", | |
| "population": "8.4 million", | |
| "weather": "Sunny, 72°F" | |
| } | |
| async def get_domain_info(self, domain: str) -> Dict: | |
| """Get information about a domain""" | |
| # Simulate domain information retrieval | |
| return { | |
| "domain": domain, | |
| "registrar": "Example Registrar", | |
| "creation_date": "2020-01-01", | |
| "expiration_date": "2024-01-01", | |
| "nameservers": ["ns1.example.com", "ns2.example.com"], | |
| "ip_address": "192.0.2.1", | |
| "location": "United States" | |
| } | |
| # Helper function to create document from gathered information | |
| def create_report(data: Dict[str, Any], template: str = "default") -> str: | |
| """Create a formatted report from gathered information""" | |
| if template == "default": | |
| report = "# OSINT Investigation Report\n\n" | |
| report += f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
| for section, content in data.items(): | |
| report += f"## {section.title()}\n" | |
| if isinstance(content, dict): | |
| for key, value in content.items(): | |
| report += f"* {key}: {value}\n" | |
| elif isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict): | |
| for k, v in item.items(): | |
| report += f"* {k}: {v}\n" | |
| else: | |
| report += f"* {item}\n" | |
| else: | |
| report += f"{content}\n" | |
| report += "\n" | |
| return report | |
| else: | |
| raise ValueError(f"Template '{template}' not found") | |
| async def create_report_from_data(data: Dict) -> Dict: | |
| """Create a formatted report from the gathered data""" | |
| engine = OSINTEngine() | |
| try: | |
| report = {} | |
| if "username" in data: | |
| report["platforms"] = await engine.check_username(data["username"], data.get("platform", "all")) | |
| if "image_url" in data: | |
| report["analysis"] = await engine.analyze_image(data["image_url"]) | |
| report["similar_images"] = await engine.find_similar_images(data["image_url"]) | |
| if "location" in data: | |
| report["location"] = await engine.get_location_info(data["location"]) | |
| if "domain" in data: | |
| report["domain"] = await engine.get_domain_info(data["domain"]) | |
| if "name" in data: | |
| report["matches"] = await engine.search_person(data["name"], data.get("location")) | |
| if "person_id" in data: | |
| report["details"] = await engine.get_person_details(data["person_id"]) | |
| await engine.close() | |
| return report | |
| except Exception as e: | |
| await engine.close() | |
| return {"error": str(e)} | |