File size: 3,139 Bytes
a4b70d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import webbrowser
import time
from .qwenOAuth2 import generatePKCEPair, QwenOAuth2Client

# Configuration
AUTHORIZATION_URL = "https://chat.qwen.ai/api/v1/oauth2/device/code"
TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
SCOPES = "openid profile email model.completion"

# Local redirect URL for redirect-based flow (if used)
REDIRECT_URI = "http://localhost:8080/callback"

async def launch_browser_for_oauth():
    # Generate PKCE parameters
    pkce_pair = generatePKCEPair()
    code_verifier = pkce_pair['code_verifier']
    code_challenge = pkce_pair['code_challenge']

    # Initialize OAuth client
    client = QwenOAuth2Client()

    # Request device code
    device_auth = await client.requestDeviceAuthorization({
        "scope": SCOPES,
        "code_challenge": code_challenge,
        "code_challenge_method": "S256",
    })

    # Check device auth success
    if not isinstance(device_auth, dict) or "device_code" not in device_auth:
        print("Failed to receive device code")
        return

    # Show user instructions
    print("Please visit the following URL to authorize:")
    print(device_auth.get("verification_uri_complete") or device_auth["verification_uri"])
    # Attempt to automatically open the URL
    url_to_open = device_auth.get("verification_uri_complete") or device_auth["verification_uri"]
    try:
        webbrowser.open(url_to_open)
    except:
        print(f"Open the URL manually in your browser: {url_to_open}")

    # Start polling for token
    device_code = device_auth["device_code"]
    expires_in = device_auth.get("expires_in", 1800)  # default 30 min
    start_time = time.time()

    print("Waiting for authorization... Press Ctrl+C to cancel.")

    while True:
        if time.time() - start_time > expires_in:
            print("Authorization timed out.")
            break

        # Poll for token
        token_response = await client.pollDeviceToken({
            "device_code": device_code,
            "code_verifier": code_verifier,
        })

        if isinstance(token_response, dict):
            if "status" in token_response and token_response["status"] == "pending":
                print(".", end="", flush=True)
                await asyncio.sleep(2)  # polling interval
                continue
            elif "access_token" in token_response:
                # Success
                print("\nAuthorization successful.")
                print("Access Token:", token_response["access_token"])
                # Save token_response to a file or config
                await client.sharedManager.saveCredentialsToFile(token_response)
                print(f"Credentials saved to: {client.sharedManager.getCredentialFilePath()}")
                return
            else:
                print(f"\nError during polling: {token_response}")
                break
        else:
            print(f"\nUnexpected response: {token_response}")
            break

# Run the entire process
async def main():
    await launch_browser_for_oauth()

if __name__ == "__main__":
    asyncio.run(main())