import socket import threading import time import json import logging from datetime import datetime, timedelta from typing import Dict, List, Optional from dataclasses import dataclass import struct @dataclass class ConnectionInfo: ip: str port: int start_time: datetime bytes_sent: int = 0 bytes_received: int = 0 is_active: bool = True class SOCKS5ProxyServer: def __init__(self, host='0.0.0.0', port=1080): self.host = host self.port = port self.server_socket = None self.running = False self.connections: Dict[str, ConnectionInfo] = {} self.total_connections = 0 self.start_time = None self._lock = threading.Lock() def start(self): """Start SOCKS5 proxy server""" try: self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(50) self.running = True self.start_time = datetime.now() logging.info(f"SOCKS5 Proxy Server started at {self.host}:{self.port}") # Start thread for accepting connections threading.Thread(target=self._accept_connections, daemon=True).start() return True except Exception as e: logging.error(f"Failed to start proxy server: {e}") return False def stop(self): """Stop SOCKS5 proxy server""" self.running = False if self.server_socket: try: self.server_socket.close() except: pass logging.info("SOCKS5 Proxy Server stopped") def _accept_connections(self): """Accept new connections""" while self.running: try: client_socket, client_address = self.server_socket.accept() client_ip, client_port = client_address # Record connection with self._lock: self.connections[f"{client_ip}:{client_port}"] = ConnectionInfo( ip=client_ip, port=client_port, start_time=datetime.now() ) self.total_connections += 1 # Create thread for handling connection threading.Thread( target=self._handle_client, args=(client_socket, client_ip, client_port), daemon=True ).start() except socket.error: if self.running: logging.error("Error accepting connection") break def _handle_client(self, client_socket, client_ip, client_port): """Handle client connection""" try: # SOCKS5 handshake if not self._handle_socks5_handshake(client_socket): return # Receive request from client request = client_socket.recv(1024) if len(request) < 10: return # Parse SOCKS5 request if request[0] != 0x05: # Version 5 return cmd = request[1] # Command addr_type = request[3] # Address type if cmd == 0x01: # CONNECT self._handle_connect(client_socket, request, client_ip, client_port) elif cmd == 0x02: # BIND self._handle_bind(client_socket, client_ip, client_port) elif cmd == 0x03: # UDP ASSOCIATE self._handle_udp_associate(client_socket, client_ip, client_port) except Exception as e: logging.error(f"Error handling client {client_ip}:{client_port}: {e}") finally: # Close connection try: client_socket.close() except: pass # Update status with self._lock: if f"{client_ip}:{client_port}" in self.connections: self.connections[f"{client_ip}:{client_port}"].is_active = False def _handle_socks5_handshake(self, client_socket): """Handle SOCKS5 handshake""" try: # Receive client hello data = client_socket.recv(1024) if len(data) < 2: return False version = data[0] nmethods = data[1] if version != 0x05: # Version 5 return False # Respond with no authentication response = b'\x05\x00' client_socket.send(response) return True except Exception as e: logging.error(f"Handshake error: {e}") return False def _handle_connect(self, client_socket, request, client_ip, client_port): """Handle SOCKS5 CONNECT command""" try: addr_type = request[3] dest_addr = "" dest_port = 0 if addr_type == 0x01: # IPv4 dest_ip = socket.inet_ntoa(request[4:8]) dest_port = struct.unpack('!H', request[8:10])[0] dest_addr = dest_ip elif addr_type == 0x03: # Domain name domain_length = request[4] dest_addr = request[5:5+domain_length].decode('utf-8') dest_port = struct.unpack('!H', request[5+domain_length:7+domain_length])[0] elif addr_type == 0x04: # IPv6 dest_ip = socket.inet_ntop(socket.AF_INET6, request[4:20]) dest_port = struct.unpack('!H', request[20:22])[0] dest_addr = dest_ip # Connect to target target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) target_socket.connect((dest_addr, dest_port)) # Send success response reply = b'\x05\x00\x00\x01' + socket.inet_aton('0.0.0.0') + b'\x00\x00' client_socket.send(reply) # Start data relay threading.Thread( target=self._relay_data, args=(client_socket, target_socket, client_ip, client_port), daemon=True ).start() logging.info(f"CONNECT: {client_ip}:{client_port} -> {dest_addr}:{dest_port}") except Exception as e: # Send error response reply = b'\x05\x05\x00\x01\x00\x00\x00\x00\x00\x00' try: client_socket.send(reply) except: pass logging.error(f"CONNECT error: {e}") def _handle_bind(self, client_socket, client_ip, client_port): """Handle SOCKS5 BIND command""" # BIND command implementation (for incoming connections) reply = b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00' # Command not supported try: client_socket.send(reply) except: pass def _handle_udp_associate(self, client_socket, client_ip, client_port): """Handle SOCKS5 UDP ASSOCIATE command""" # UDP ASSOCIATE implementation reply = b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00' try: client_socket.send(reply) except: pass def _relay_data(self, client_socket, target_socket, client_ip, client_port): """Relay data between client and target""" try: while True: try: data = client_socket.recv(4096) if not data: break # Forward to target target_socket.send(data) # Update statistics with self._lock: if f"{client_ip}:{client_port}" in self.connections: conn = self.connections[f"{client_ip}:{client_port}"] conn.bytes_sent += len(data) except socket.error: break try: data = target_socket.recv(4096) if not data: break # Forward back to client client_socket.send(data) # Update statistics with self._lock: if f"{client_ip}:{client_port}" in self.connections: conn = self.connections[f"{client_ip}:{client_port}"] conn.bytes_received += len(data) except socket.error: break except Exception as e: logging.error(f"Relay error: {e}") finally: try: client_socket.close() target_socket.close() except: pass class ProxyManager: def __init__(self, host='0.0.0.0', port=1080): self.proxy_server = SOCKS5ProxyServer(host, port) self.host = host self.port = port def start_server(self): """Start proxy server""" return self.proxy_server.start() def stop_server(self): """Stop proxy server""" self.proxy_server.stop() def is_running(self): """Check if server is running""" return self.proxy_server.running def get_server_info(self): """Get server information""" info = { "server_status": "Running" if self.proxy_server.running else "Stopped", "host": self