File size: 10,581 Bytes
a4b70d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
from __future__ import annotations
import asyncio
from typing import Optional
from aiohttp import ClientSession, ClientTimeout
from urllib.parse import quote, quote_plus
from aiohttp import ClientSession
try:
import nodriver
from nodriver.core.connection import ProtocolException
except:
pass
from ...typing import Messages, AsyncResult
from ...providers.response import VideoResponse, Reasoning, ContinueResponse, ProviderInfo
from ...requests import get_nodriver
from ...errors import MissingRequirementsError
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..helper import format_media_prompt
from ... import debug
PUBLIC_URL = "https://home.g4f.dev"
SEARCH_URL = f"{PUBLIC_URL}/search/video+"
class RequestConfig:
urls: dict[str, list[str]] = {}
headers: dict = {}
@classmethod
async def get_response(cls, prompt: str, search: bool = False) -> Optional[VideoResponse]:
if prompt in cls.urls and cls.urls[prompt]:
unique_list = list(set(cls.urls[prompt]))[:10]
return VideoResponse(unique_list, prompt, {
"headers": {"authorization": cls.headers.get("authorization")} if cls.headers.get("authorization") else {},
})
if search:
async with ClientSession() as session:
found_urls = []
for skip in range(0, 9):
async with session.get(SEARCH_URL + quote_plus(prompt) + f"?skip={skip}", timeout=ClientTimeout(total=10)) as response:
if response.ok:
found_urls.append(str(response.url))
else:
break
if found_urls:
return VideoResponse(found_urls, prompt)
class Video(AsyncGeneratorProvider, ProviderModelMixin):
urls = {
"search": "https://sora.chatgpt.com/explore?query={0}",
"sora": "https://sora.chatgpt.com/explore",
#"veo": "https://aistudio.google.com/generate-video"
}
api_url = f"{PUBLIC_URL}/backend-api/v2/create?provider=Video&cache=true&prompt="
drive_url = "https://www.googleapis.com/drive/v3/"
active_by_default = True
default_model = "search"
models = list(urls.keys())
video_models = models
needs_auth = True
working = True
browser = None
stop_browser = None
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
prompt: str = None,
aspect_ratio: str = None,
**kwargs
) -> AsyncResult:
if not model:
model = cls.default_model
if model not in cls.video_models:
raise ValueError(f"Model '{model}' is not supported by {cls.__name__}. Supported models: {cls.models}")
yield ProviderInfo(**cls.get_dict(), model="sora")
prompt = format_media_prompt(messages, prompt).encode()[:100].decode("utf-8", "ignore").strip()
if not prompt:
raise ValueError("Prompt cannot be empty.")
response = await RequestConfig.get_response(prompt, model=="search")
if response:
yield Reasoning(label=f"Found {len(response.urls)} Video(s)", status="")
yield response
return
try:
yield Reasoning(label="Open browser")
browser, stop_browser = await get_nodriver(proxy=proxy)
except Exception as e:
debug.error(f"Error getting nodriver:", e)
async with ClientSession() as session:
yield Reasoning(label="Generating")
async with session.get(cls.api_url + quote(prompt)) as response:
if not response.ok:
debug.error(f"Failed to generate Video: {response.status}")
else:
yield Reasoning(label="Finished", status="")
if response.headers.get("content-type", "text/plain").startswith("text/plain"):
data = (await response.text()).split("\n")
yield VideoResponse([f"{PUBLIC_URL}{url}" if url.startswith("/") else url for url in data], prompt)
return
yield VideoResponse(str(response.url), prompt)
return
raise MissingRequirementsError("Video provider requires a browser to be installed.")
page = None
try:
yield ContinueResponse("Timeout waiting for Video URL")
page = await browser.get(cls.urls[model].format(quote(prompt)))
except Exception as e:
stop_browser()
debug.error(f"Error opening page:", e)
if prompt not in RequestConfig.urls:
RequestConfig.urls[prompt] = []
def on_request(event: nodriver.cdp.network.RequestWillBeSent, page=None):
if event.request.url.startswith(cls.drive_url) or ".mp4" in event.request.url:
RequestConfig.headers = {}
for key, value in event.request.headers.items():
RequestConfig.headers[key.lower()] = value
for _, urls in RequestConfig.urls.items():
if event.request.url in urls:
return
debug.log(f"Adding URL: {event.request.url}")
RequestConfig.urls[prompt].append(event.request.url)
if not page:
raise RuntimeError("Failed to open page.")
for idx in range(300):
button = await page.find("User menu")
if button:
break
if idx == 299:
stop_browser()
raise RuntimeError("Failed to wait for user menu.")
if model == "search" and page is not None:
await page.send(nodriver.cdp.network.enable())
page.add_handler(nodriver.cdp.network.RequestWillBeSent, on_request)
for _ in range(5):
await page.scroll_down(5)
await asyncio.sleep(1)
response = await RequestConfig.get_response(prompt, True)
if response:
stop_browser()
yield Reasoning(label="Found", status="")
yield response
return
if page is None:
raise RuntimeError("Failed to open page or get response.")
try:
await asyncio.sleep(3)
await page.select("textarea", 240)
try:
button = await page.find("Image")
if button:
await button.click()
else:
debug.error("No 'Image' button found.")
button = await page.find("Video")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Video' button")
else:
debug.error("No 'Video' button found.")
except Exception as e:
debug.error(f"Error clicking button:", e)
try:
if aspect_ratio:
button = await page.find(":")
if button:
await button.click()
else:
debug.error("No 'x:x' button found.")
await asyncio.sleep(1)
button = await page.find(aspect_ratio)
if button:
await button.click()
yield Reasoning(label=f"Clicked '{aspect_ratio}' button")
else:
debug.error(f"No '{aspect_ratio}' button found.")
except Exception as e:
debug.error(f"Error clicking button:", e)
debug.log(f"Using prompt: {prompt}")
textarea = await page.select("textarea", 180)
await textarea.click()
await textarea.clear_input()
await textarea.send_keys(prompt)
await asyncio.sleep(1)
yield Reasoning(label=f"Sending prompt", token=prompt)
try:
button = await page.select('button[type="submit"]', 5)
if button:
await button.click()
except Exception as e:
debug.error(f"Error clicking submit button:", e)
try:
button = await page.find("Create video")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Create video' button")
except Exception as e:
debug.error(f"Error clicking 'Create video' button:", e)
try:
button = await page.find("Activity")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'Activity' button")
except Exception as e:
debug.error(f"Error clicking 'Activity' button:", e)
for idx in range(60):
await asyncio.sleep(1)
try:
button = await page.find("New Video")
if button:
await button.click()
yield Reasoning(label=f"Clicked 'New Video' button")
break
except ProtocolException as e:
if idx == 59:
debug.error(e)
if idx == 59:
stop_browser()
raise RuntimeError("Failed to click 'New Video' button")
await asyncio.sleep(3)
if model != "search" and page is not None:
await page.send(nodriver.cdp.network.enable())
page.add_handler(nodriver.cdp.network.RequestWillBeSent, on_request)
for idx in range(300):
yield Reasoning(label="Waiting for Video...", status=f"{idx+1}/300")
await asyncio.sleep(1)
if RequestConfig.urls[prompt]:
await asyncio.sleep(2)
response = await RequestConfig.get_response(prompt, model=="search")
if response:
stop_browser()
yield Reasoning(label="Finished", status="")
yield response
return
if idx == 299:
stop_browser()
raise RuntimeError("Failed to get Video URL")
finally:
stop_browser() |