Spaces:
Runtime error
Runtime error
| """ | |
| OSINT engine for comprehensive information gathering. | |
| """ | |
| from typing import Dict, List, Any, Optional | |
| import asyncio | |
| import json | |
| from dataclasses import dataclass | |
| import holehe.core as holehe | |
| from sherlock import sherlock | |
| import face_recognition | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import requests | |
| from geopy.geocoders import Nominatim | |
| from geopy.exc import GeocoderTimedOut | |
| import whois | |
| from datetime import datetime | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| class PersonInfo: | |
| name: str | |
| age: Optional[int] = None | |
| location: Optional[str] = None | |
| gender: Optional[str] = None | |
| social_profiles: List[Dict[str, str]] = None | |
| images: List[str] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "name": self.name, | |
| "age": self.age, | |
| "location": self.location, | |
| "gender": self.gender, | |
| "social_profiles": self.social_profiles or [], | |
| "images": self.images or [] | |
| } | |
| class OSINTEngine: | |
| def __init__(self): | |
| self.geolocator = Nominatim(user_agent="intelligent_search_engine") | |
| self.known_platforms = [ | |
| "Twitter", "Instagram", "Facebook", "LinkedIn", "GitHub", | |
| "Reddit", "YouTube", "TikTok", "Pinterest", "Snapchat", | |
| "Twitch", "Medium", "Dev.to", "Stack Overflow" | |
| ] | |
| async def search_username(self, username: str) -> Dict[str, Any]: | |
| """Search for username across multiple platforms.""" | |
| results = [] | |
| # Use holehe for email-based search | |
| email = f"{username}@gmail.com" # Example email | |
| holehe_results = await holehe.check_email(email) | |
| # Use sherlock for username search | |
| sherlock_results = sherlock.sherlock(username, self.known_platforms, verbose=False) | |
| # Combine results | |
| for platform, data in {**holehe_results, **sherlock_results}.items(): | |
| if data.get("exists", False): | |
| results.append({ | |
| "platform": platform, | |
| "url": data.get("url", ""), | |
| "confidence": data.get("confidence", "high") | |
| }) | |
| return { | |
| "username": username, | |
| "found_on": results | |
| } | |
| async def search_person(self, name: str, location: Optional[str] = None, | |
| age: Optional[int] = None, gender: Optional[str] = None) -> PersonInfo: | |
| """Search for information about a person.""" | |
| person = PersonInfo( | |
| name=name, | |
| age=age, | |
| location=location, | |
| gender=gender | |
| ) | |
| # Initialize social profiles list | |
| person.social_profiles = [] | |
| # Search for social media profiles | |
| username_variants = [ | |
| name.replace(" ", ""), | |
| name.replace(" ", "_"), | |
| name.replace(" ", "."), | |
| name.lower().replace(" ", "") | |
| ] | |
| for username in username_variants: | |
| results = await self.search_username(username) | |
| person.social_profiles.extend(results.get("found_on", [])) | |
| return person | |
| async def analyze_image(self, image_data: bytes) -> Dict[str, Any]: | |
| """Analyze an image for faces and other identifiable information.""" | |
| try: | |
| # Load image | |
| image = face_recognition.load_image_file(io.BytesIO(image_data)) | |
| # Detect faces | |
| face_locations = face_recognition.face_locations(image) | |
| face_encodings = face_recognition.face_encodings(image, face_locations) | |
| results = { | |
| "faces_found": len(face_locations), | |
| "faces": [] | |
| } | |
| # Analyze each face | |
| for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)): | |
| face_data = { | |
| "location": { | |
| "top": face_location[0], | |
| "right": face_location[1], | |
| "bottom": face_location[2], | |
| "left": face_location[3] | |
| } | |
| } | |
| results["faces"].append(face_data) | |
| return results | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def search_location(self, location: str) -> Dict[str, Any]: | |
| """Gather information about a location.""" | |
| try: | |
| # Geocode the location | |
| location_data = self.geolocator.geocode(location, timeout=10) | |
| if not location_data: | |
| return {"error": "Location not found"} | |
| return { | |
| "address": location_data.address, | |
| "latitude": location_data.latitude, | |
| "longitude": location_data.longitude, | |
| "raw": location_data.raw | |
| } | |
| except GeocoderTimedOut: | |
| return {"error": "Geocoding service timed out"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
| """Analyze a domain for WHOIS and other information.""" | |
| try: | |
| w = whois.whois(domain) | |
| return { | |
| "registrar": w.registrar, | |
| "creation_date": w.creation_date, | |
| "expiration_date": w.expiration_date, | |
| "last_updated": w.updated_date, | |
| "status": w.status, | |
| "name_servers": w.name_servers | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |