quick-space-610 / utils.py
Taf2023's picture
Update Gradio app with multiple files
1b9b049 verified
raw
history blame
10.1 kB
import socket
import threading
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import struct
@dataclass
class ConnectionInfo:
ip: str
port: int
start_time: datetime
bytes_sent: int = 0
bytes_received: int = 0
is_active: bool = True
class SOCKS5ProxyServer:
def __init__(self, host='0.0.0.0', port=1080):
self.host = host
self.port = port
self.server_socket = None
self.running = False
self.connections: Dict[str, ConnectionInfo] = {}
self.total_connections = 0
self.start_time = None
self._lock = threading.Lock()
def start(self):
"""Start SOCKS5 proxy server"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(50)
self.running = True
self.start_time = datetime.now()
logging.info(f"SOCKS5 Proxy Server started at {self.host}:{self.port}")
# Start thread for accepting connections
threading.Thread(target=self._accept_connections, daemon=True).start()
return True
except Exception as e:
logging.error(f"Failed to start proxy server: {e}")
return False
def stop(self):
"""Stop SOCKS5 proxy server"""
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except:
pass
logging.info("SOCKS5 Proxy Server stopped")
def _accept_connections(self):
"""Accept new connections"""
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
client_ip, client_port = client_address
# Record connection
with self._lock:
self.connections[f"{client_ip}:{client_port}"] = ConnectionInfo(
ip=client_ip,
port=client_port,
start_time=datetime.now()
)
self.total_connections += 1
# Create thread for handling connection
threading.Thread(
target=self._handle_client,
args=(client_socket, client_ip, client_port),
daemon=True
).start()
except socket.error:
if self.running:
logging.error("Error accepting connection")
break
def _handle_client(self, client_socket, client_ip, client_port):
"""Handle client connection"""
try:
# SOCKS5 handshake
if not self._handle_socks5_handshake(client_socket):
return
# Receive request from client
request = client_socket.recv(1024)
if len(request) < 10:
return
# Parse SOCKS5 request
if request[0] != 0x05: # Version 5
return
cmd = request[1] # Command
addr_type = request[3] # Address type
if cmd == 0x01: # CONNECT
self._handle_connect(client_socket, request, client_ip, client_port)
elif cmd == 0x02: # BIND
self._handle_bind(client_socket, client_ip, client_port)
elif cmd == 0x03: # UDP ASSOCIATE
self._handle_udp_associate(client_socket, client_ip, client_port)
except Exception as e:
logging.error(f"Error handling client {client_ip}:{client_port}: {e}")
finally:
# Close connection
try:
client_socket.close()
except:
pass
# Update status
with self._lock:
if f"{client_ip}:{client_port}" in self.connections:
self.connections[f"{client_ip}:{client_port}"].is_active = False
def _handle_socks5_handshake(self, client_socket):
"""Handle SOCKS5 handshake"""
try:
# Receive client hello
data = client_socket.recv(1024)
if len(data) < 2:
return False
version = data[0]
nmethods = data[1]
if version != 0x05: # Version 5
return False
# Respond with no authentication
response = b'\x05\x00'
client_socket.send(response)
return True
except Exception as e:
logging.error(f"Handshake error: {e}")
return False
def _handle_connect(self, client_socket, request, client_ip, client_port):
"""Handle SOCKS5 CONNECT command"""
try:
addr_type = request[3]
dest_addr = ""
dest_port = 0
if addr_type == 0x01: # IPv4
dest_ip = socket.inet_ntoa(request[4:8])
dest_port = struct.unpack('!H', request[8:10])[0]
dest_addr = dest_ip
elif addr_type == 0x03: # Domain name
domain_length = request[4]
dest_addr = request[5:5+domain_length].decode('utf-8')
dest_port = struct.unpack('!H', request[5+domain_length:7+domain_length])[0]
elif addr_type == 0x04: # IPv6
dest_ip = socket.inet_ntop(socket.AF_INET6, request[4:20])
dest_port = struct.unpack('!H', request[20:22])[0]
dest_addr = dest_ip
# Connect to target
target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
target_socket.connect((dest_addr, dest_port))
# Send success response
reply = b'\x05\x00\x00\x01' + socket.inet_aton('0.0.0.0') + b'\x00\x00'
client_socket.send(reply)
# Start data relay
threading.Thread(
target=self._relay_data,
args=(client_socket, target_socket, client_ip, client_port),
daemon=True
).start()
logging.info(f"CONNECT: {client_ip}:{client_port} -> {dest_addr}:{dest_port}")
except Exception as e:
# Send error response
reply = b'\x05\x05\x00\x01\x00\x00\x00\x00\x00\x00'
try:
client_socket.send(reply)
except:
pass
logging.error(f"CONNECT error: {e}")
def _handle_bind(self, client_socket, client_ip, client_port):
"""Handle SOCKS5 BIND command"""
# BIND command implementation (for incoming connections)
reply = b'\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00' # Command not supported
try:
client_socket.send(reply)
except:
pass
def _handle_udp_associate(self, client_socket, client_ip, client_port):
"""Handle SOCKS5 UDP ASSOCIATE command"""
# UDP ASSOCIATE implementation
reply = b'\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00'
try:
client_socket.send(reply)
except:
pass
def _relay_data(self, client_socket, target_socket, client_ip, client_port):
"""Relay data between client and target"""
try:
while True:
try:
data = client_socket.recv(4096)
if not data:
break
# Forward to target
target_socket.send(data)
# Update statistics
with self._lock:
if f"{client_ip}:{client_port}" in self.connections:
conn = self.connections[f"{client_ip}:{client_port}"]
conn.bytes_sent += len(data)
except socket.error:
break
try:
data = target_socket.recv(4096)
if not data:
break
# Forward back to client
client_socket.send(data)
# Update statistics
with self._lock:
if f"{client_ip}:{client_port}" in self.connections:
conn = self.connections[f"{client_ip}:{client_port}"]
conn.bytes_received += len(data)
except socket.error:
break
except Exception as e:
logging.error(f"Relay error: {e}")
finally:
try:
client_socket.close()
target_socket.close()
except:
pass
class ProxyManager:
def __init__(self, host='0.0.0.0', port=1080):
self.proxy_server = SOCKS5ProxyServer(host, port)
self.host = host
self.port = port
def start_server(self):
"""Start proxy server"""
return self.proxy_server.start()
def stop_server(self):
"""Stop proxy server"""
self.proxy_server.stop()
def is_running(self):
"""Check if server is running"""
return self.proxy_server.running
def get_server_info(self):
"""Get server information"""
info = {
"server_status": "Running" if self.proxy_server.running else "Stopped",
"host": self