|
|
import asyncio |
|
|
import webbrowser |
|
|
import time |
|
|
from .qwenOAuth2 import generatePKCEPair, QwenOAuth2Client |
|
|
|
|
|
|
|
|
AUTHORIZATION_URL = "https://chat.qwen.ai/api/v1/oauth2/device/code" |
|
|
TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token" |
|
|
CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56" |
|
|
SCOPES = "openid profile email model.completion" |
|
|
|
|
|
|
|
|
REDIRECT_URI = "http://localhost:8080/callback" |
|
|
|
|
|
async def launch_browser_for_oauth(): |
|
|
|
|
|
pkce_pair = generatePKCEPair() |
|
|
code_verifier = pkce_pair['code_verifier'] |
|
|
code_challenge = pkce_pair['code_challenge'] |
|
|
|
|
|
|
|
|
client = QwenOAuth2Client() |
|
|
|
|
|
|
|
|
device_auth = await client.requestDeviceAuthorization({ |
|
|
"scope": SCOPES, |
|
|
"code_challenge": code_challenge, |
|
|
"code_challenge_method": "S256", |
|
|
}) |
|
|
|
|
|
|
|
|
if not isinstance(device_auth, dict) or "device_code" not in device_auth: |
|
|
print("Failed to receive device code") |
|
|
return |
|
|
|
|
|
|
|
|
print("Please visit the following URL to authorize:") |
|
|
print(device_auth.get("verification_uri_complete") or device_auth["verification_uri"]) |
|
|
|
|
|
url_to_open = device_auth.get("verification_uri_complete") or device_auth["verification_uri"] |
|
|
try: |
|
|
webbrowser.open(url_to_open) |
|
|
except: |
|
|
print(f"Open the URL manually in your browser: {url_to_open}") |
|
|
|
|
|
|
|
|
device_code = device_auth["device_code"] |
|
|
expires_in = device_auth.get("expires_in", 1800) |
|
|
start_time = time.time() |
|
|
|
|
|
print("Waiting for authorization... Press Ctrl+C to cancel.") |
|
|
|
|
|
while True: |
|
|
if time.time() - start_time > expires_in: |
|
|
print("Authorization timed out.") |
|
|
break |
|
|
|
|
|
|
|
|
token_response = await client.pollDeviceToken({ |
|
|
"device_code": device_code, |
|
|
"code_verifier": code_verifier, |
|
|
}) |
|
|
|
|
|
if isinstance(token_response, dict): |
|
|
if "status" in token_response and token_response["status"] == "pending": |
|
|
print(".", end="", flush=True) |
|
|
await asyncio.sleep(2) |
|
|
continue |
|
|
elif "access_token" in token_response: |
|
|
|
|
|
print("\nAuthorization successful.") |
|
|
print("Access Token:", token_response["access_token"]) |
|
|
|
|
|
await client.sharedManager.saveCredentialsToFile(token_response) |
|
|
print(f"Credentials saved to: {client.sharedManager.getCredentialFilePath()}") |
|
|
return |
|
|
else: |
|
|
print(f"\nError during polling: {token_response}") |
|
|
break |
|
|
else: |
|
|
print(f"\nUnexpected response: {token_response}") |
|
|
break |
|
|
|
|
|
|
|
|
async def main(): |
|
|
await launch_browser_for_oauth() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|