File size: 2,931 Bytes
2d93720 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
import requests
import random
import time
import sys
import threading
from flask import Flask, request, jsonify
PROXY_LIST_URL = "https://proxies.typegpt.net/ips.txt"
proxies_cache = []
last_refresh = 0
app = Flask(__name__)
def fetch_proxies():
"""Fetch proxy list from remote URL."""
try:
resp = requests.get(PROXY_LIST_URL, timeout=10)
resp.raise_for_status()
proxies = [line.strip() for line in resp.text.splitlines() if line.strip()]
return proxies
except Exception as e:
print(f"[ERROR] Failed to fetch proxies: {e}")
return []
def get_random_proxy(proxies):
"""Pick a random proxy from the list."""
if not proxies:
return None
return random.choice(proxies)
def make_request(proxy, target_url):
"""Send a GET request through the given proxy to target URL."""
proxies = {"http": proxy, "https": proxy}
try:
print(f"[INFO] Using proxy: {proxy}")
resp = requests.get(target_url, proxies=proxies, timeout=15)
if resp.status_code == 200:
print(f"[SUCCESS] Proxy working! Status: {resp.status_code}")
return True, resp.text[:200]
else:
print(f"[WARN] Proxy responded with status {resp.status_code}")
return False, f"Status {resp.status_code}"
except Exception as e:
print(f"[ERROR] Proxy failed: {proxy} | {e}")
return False, str(e)
def proxy_loop(target_url):
"""Background loop to test proxies continuously."""
global proxies_cache, last_refresh
while True:
if time.time() - last_refresh > 300 or not proxies_cache:
proxies_cache = fetch_proxies()
last_refresh = time.time()
print(f"[INFO] Refreshed {len(proxies_cache)} proxies.")
proxy = get_random_proxy(proxies_cache)
if proxy:
make_request(proxy, target_url)
time.sleep(10)
# ---- Flask Endpoints ----
@app.route("/health")
def health():
return "Healthy", 200
@app.route("/test")
def test_proxy():
target_url = request.args.get("url")
if not target_url:
return jsonify({"error": "Missing ?url=<target_url>"}), 400
proxy = get_random_proxy(proxies_cache)
if not proxy:
return jsonify({"error": "No proxies available"}), 500
ok, msg = make_request(proxy, target_url)
if ok:
return jsonify({"status": "Proxy working!", "proxy": proxy, "response_snippet": msg}), 200
else:
return jsonify({"status": "Proxy failed", "proxy": proxy, "error": msg}), 502
def main():
if len(sys.argv) < 2:
print("Usage: python main.py <target_url>")
sys.exit(1)
target_url = sys.argv[1]
# Start background thread
t = threading.Thread(target=proxy_loop, args=(target_url,), daemon=True)
t.start()
# Start Flask server
app.run(host="0.0.0.0", port=5000)
if __name__ == "__main__":
main()
|