Spaces:
Paused
Paused
| import asyncio | |
| import base64 | |
| import random | |
| import re | |
| import nopecha | |
| import requests | |
| from hcaptcha_challenger.agents import Malenia | |
| from playwright.async_api import BrowserContext as ASyncContext, async_playwright | |
| nopecha.api_key = '5nogeisu16i5tr5r' | |
| async def route_continuation(route, request, host, sitekey): | |
| # 检查请求的URL,只拦截特定网站的请求 | |
| if request.url == f"https://{host}/": | |
| print("start to solve") | |
| # 修改DNS解析结果 | |
| await route.fulfill(status=200, | |
| body=""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <title>hCAPTCHA 演示</title> | |
| <meta charset="UTF-8"> | |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> | |
| <meta name="viewport" content="width=device-width, user-scalable=yes"> | |
| <script src="https://js.hcaptcha.com/1/api.js" type="text/javascript" async defer></script> | |
| </head> | |
| <body> | |
| <br><br> | |
| <div class="sample-form"> | |
| <form id="hcaptcha-demo-form" method="POST"> | |
| <div id="hcaptcha-demo" class="h-captcha" data-sitekey="%%%%%%%%%%%" data-callback="onSuccess" data-expired-callback="onExpire"></div> | |
| <script> | |
| // success callback | |
| var onSuccess = function(response) { | |
| var errorDivs = document.getElementsByClassName("hcaptcha-error"); | |
| if (errorDivs.length) { | |
| errorDivs[0].className = ""; | |
| } | |
| var errorMsgs = document.getElementsByClassName("hcaptcha-error-message"); | |
| if (errorMsgs.length) { | |
| errorMsgs[0].parentNode.removeChild(errorMsgs[0]); | |
| } | |
| var logEl = document.querySelector(".hcaptcha-success"); | |
| logEl.innerHTML = "挑战成功!" | |
| }; | |
| var onExpire = function(response) { | |
| var logEl = document.querySelector(".hcaptcha-success"); | |
| logEl.innerHTML = "令牌已过期。" | |
| }; | |
| </script> | |
| <div class="hcaptcha-success smsg" aria-live="polite"></div> | |
| </body> | |
| <script type="text/javascript"> | |
| // beacon example | |
| function addEventHandler(object,szEvent,cbCallback){ | |
| if(!!object.addEventListener){ // for modern browsers or IE9+ | |
| return object.addEventListener(szEvent,cbCallback); | |
| } | |
| if(!!object.attachEvent){ // for IE <=8 | |
| return object.attachEvent(szEvent,cbCallback); | |
| } | |
| }; | |
| // Ex: triggers pageview beacon | |
| addEventHandler(window,'load',function(){b();}); | |
| // Ex: triggers event beacon without pageview | |
| addEventHandler(window,'load',function(){b({"vt": "e", "ec": "test_cat", "ea": "test_action"});}); | |
| </script> | |
| </html> | |
| """.replace("%%%%%%%%%%%", sitekey)) | |
| else: | |
| # 对于其他网站的请求,不做修改 | |
| await route.continue_() | |
| def url_to_base64(url): | |
| try: | |
| # 获取 URL 的内容 | |
| response = requests.get(url) | |
| # 将内容转换为 base64 | |
| content_base64 = base64.b64encode(response.content).decode('utf-8') | |
| return content_base64 | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching URL: {e}") | |
| return None | |
| async def handle_response(page): | |
| await asyncio.sleep(2) | |
| try: | |
| await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]", timeout=15000) | |
| frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| question_text = frame_challenge.locator("//div[@class='challenge-prompt']") | |
| element_handle = await question_text.element_handle() | |
| prompt_padding_element = await element_handle.query_selector("div.prompt-padding") | |
| if prompt_padding_element is not None: | |
| span_element = await element_handle.query_selector("h2 > span") | |
| span_text = await span_element.inner_text() | |
| examples0 = frame_challenge.locator('//div[@class="challenge-example"]') | |
| examples = examples0.locator('div.image[aria-hidden]') | |
| aria_hidden_value = await examples.get_attribute("aria-hidden") | |
| if aria_hidden_value == "true": | |
| examples = None | |
| print("no examples") | |
| else: | |
| examples1 = examples.locator('//div[@class="image"]') | |
| await examples1.wait_for() | |
| style_attribute = await examples1.get_attribute("style") | |
| url_match = re.search(r'url\("(.+?)"\)', style_attribute) | |
| if url_match: | |
| url = url_match.group(1) | |
| print(url) | |
| examples = url | |
| task0 = frame_challenge.locator("//div[@class='task-image']") | |
| await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() | |
| count = await task0.count() | |
| print(span_text) | |
| print("done tasks for getting examples") | |
| data = [] | |
| for i in range(count): | |
| sample = task0.nth(i) | |
| img = sample.locator('//div[@class="image"]') | |
| style_attribute = await img.get_attribute("style") | |
| url_match = re.search(r'url\("(.+?)"\)', style_attribute) | |
| if url_match: | |
| url = url_match.group(1) | |
| data.append(url) | |
| print("done tasks for getting url") | |
| await classify_click(page, data, 0, examples, span_text) | |
| else: | |
| await asyncio.sleep(random.uniform(0.1, 0.3)) | |
| await frame_challenge.locator("//div[@class='refresh button']").click() | |
| print("refresh") | |
| except Exception as e: | |
| print(e) | |
| await page.close() | |
| tasks.cancel() | |
| async def on_response(response, page): | |
| if response.url.startswith("https://api.hcaptcha.com/getcaptcha"): | |
| asyncio.create_task(handle_response(page)) | |
| elif response.url.startswith("https://api.hcaptcha.com/checkcaptcha"): | |
| data0 = await response.json() | |
| if data0.get("pass"): | |
| global tasks, token | |
| await page.close() | |
| token = data0.get("generated_pass_UUID") | |
| tasks.cancel() | |
| print(data0) | |
| async def classify_click(page, data, round0, examples, quetsion0): | |
| try: | |
| await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| samples = frame_challenge.locator("//div[@class='task-image']") | |
| await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() | |
| count = await samples.count() | |
| print(count) | |
| if examples: | |
| clicks = nopecha.Recognition.solve( | |
| type='hcaptcha', | |
| task=quetsion0, | |
| image_urls=data, | |
| image_examples=examples | |
| ) | |
| else: | |
| clicks = nopecha.Recognition.solve( | |
| type='hcaptcha', | |
| task=quetsion0, | |
| image_urls=data | |
| ) | |
| for i in range(count): | |
| sample = samples.nth(i) | |
| await sample.wait_for() | |
| if clicks[i]: | |
| print("try to click") | |
| await sample.click(delay=200) | |
| print(clicks) | |
| await asyncio.sleep(random.uniform(0.1, 0.3)) | |
| fl = frame_challenge.locator("//div[@class='button-submit button']") | |
| await fl.click() | |
| if round0 == 0: | |
| await asyncio.sleep(2) | |
| await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| task0 = frame_challenge.locator("//div[@class='task-image']") | |
| await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() | |
| count = await task0.count() | |
| data = [] | |
| for i in range(count): | |
| sample = task0.nth(i) | |
| img = sample.locator('//div[@class="image"]') | |
| style_attribute = await img.get_attribute("style") | |
| url_match = re.search(r'url\("(.+?)"\)', style_attribute) | |
| if url_match: | |
| url = url_match.group(1) | |
| data.append(url) | |
| print("done tasks for getting url") | |
| await classify_click(page, data, 1, examples, quetsion0) | |
| except Exception as e: | |
| print(e) | |
| tasks.cancel() | |
| async def area_click(page, data, round0, examples): | |
| try: | |
| await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") | |
| locator = frame_challenge.locator("//div[@class='challenge-view']//canvas") | |
| await locator.wait_for(state="visible") | |
| image = await locator.screenshot() | |
| if examples: | |
| clicks = nopecha.Recognition.solve( | |
| type='hcaptcha_area_select', | |
| task=data.get("requester_question").get("en"), | |
| image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])], | |
| image_examples=examples | |
| ) | |
| else: | |
| clicks = nopecha.Recognition.solve( | |
| type='hcaptcha_area_select', | |
| task=data.get("requester_question").get("en"), | |
| image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])] | |
| ) | |
| print(clicks) | |
| print(clicks["x"], clicks["y"]) | |
| print("try to click") | |
| bounds = await locator.bounding_box() | |
| print(bounds) | |
| await locator.click(delay=200, position={"x": int(bounds["width"] * clicks["x"] / 100), | |
| "y": int(bounds["height"] * clicks["y"] / 100)}) | |
| print("done") | |
| fl = frame_challenge.locator("//div[@class='button-submit button']") | |
| await fl.click() | |
| await asyncio.sleep(random.uniform(0.1, 0.3)) | |
| if round0 == 0: | |
| await area_click(page, data, 1, examples) | |
| except Exception as e: | |
| print(e) | |
| await area_click(page, data, round0, examples) | |
| async def hit_challenge(context: ASyncContext, host, sitekey, times: int = 8): | |
| await context.route('**/*', lambda route, request: route_continuation(route, request, host, sitekey)) | |
| page = await context.new_page() | |
| page.on('response', lambda response: on_response(response, page)) | |
| await page.goto(f"https://{host}") | |
| checkbox = page.frame_locator("//iframe[contains(@title,'checkbox')]") | |
| await checkbox.locator("#checkbox").click() | |
| await asyncio.sleep(3000) | |
| async def bytedance(host, sitekey): | |
| async with async_playwright() as p: | |
| browser = await p.firefox.launch(headless=True) | |
| context = await browser.new_context( | |
| locale="en-US" | |
| ) | |
| await Malenia.apply_stealth(context) | |
| await hit_challenge(context, host, sitekey) | |
| question = {} | |
| tasks = None | |
| token = None | |
| async def main(host, key): | |
| global tasks, token | |
| try: | |
| tasks = asyncio.gather(bytedance(host, key), | |
| return_exceptions=True) | |
| await tasks | |
| return token | |
| except asyncio.CancelledError: | |
| print("task done") | |
| return token | |
| except Exception as e: | |
| print(e) | |
| return token | |
| # asyncio.run(main("free.vps.vc", "3bae0a5b-f2b8-43ef-98b7-76865a8a3997")) | |