|
|
|
|
|
|
|
|
import os |
|
|
import sys |
|
|
import asyncio |
|
|
import json |
|
|
import argparse |
|
|
import traceback |
|
|
import requests |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import Optional, List, Dict |
|
|
from g4f.client import AsyncClient |
|
|
from g4f.providers.response import JsonConversation, MediaResponse, is_content |
|
|
from g4f.cookies import set_cookies_dir, read_cookie_files |
|
|
from g4f.Provider import ProviderUtils |
|
|
from g4f.image import extract_data_uri, is_accepted_format |
|
|
from g4f.image.copy_images import get_media_dir |
|
|
from g4f.client.helper import filter_markdown |
|
|
from g4f.errors import MissingRequirementsError |
|
|
|
|
|
try: |
|
|
from g4f.integration.markitdown import MarkItDown |
|
|
has_markitdown = True |
|
|
except ImportError: |
|
|
has_markitdown = False |
|
|
|
|
|
from g4f.config import CONFIG_DIR, COOKIES_DIR |
|
|
from g4f import debug |
|
|
|
|
|
CONVERSATION_FILE = CONFIG_DIR / "conversation.json" |
|
|
|
|
|
|
|
|
class ConversationManager: |
|
|
"""Manages conversation history and state.""" |
|
|
def __init__( |
|
|
self, |
|
|
file_path: Optional[Path] = None, |
|
|
model: Optional[str] = None, |
|
|
provider: Optional[str] = None, |
|
|
max_messages: int = 5 |
|
|
) -> None: |
|
|
self.file_path = file_path |
|
|
self.model = model |
|
|
self.provider = provider |
|
|
self.max_messages = max_messages |
|
|
self.conversation: Optional[JsonConversation] = None |
|
|
self.history: List[Dict[str, str]] = [] |
|
|
self.data: Dict = {} |
|
|
self._load() |
|
|
|
|
|
def _load(self) -> None: |
|
|
if not self.file_path or not self.file_path.is_file(): |
|
|
return |
|
|
try: |
|
|
with open(self.file_path, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
if self.model is None: |
|
|
self.model = data.get("model") |
|
|
if self.provider is None: |
|
|
self.provider = data.get("provider") |
|
|
self.data = data.get("data", {}) |
|
|
if self.provider and self.data.get(self.provider): |
|
|
self.conversation = JsonConversation(**self.data[self.provider]) |
|
|
elif not self.provider and self.data: |
|
|
self.conversation = JsonConversation(**self.data) |
|
|
self.history = data.get("items", []) |
|
|
except Exception as e: |
|
|
print(f"Error loading conversation: {e}", file=sys.stderr) |
|
|
|
|
|
def save(self) -> None: |
|
|
if not self.file_path: |
|
|
return |
|
|
try: |
|
|
if self.conversation and self.provider: |
|
|
self.data[self.provider] = self.conversation.get_dict() |
|
|
elif self.conversation: |
|
|
self.data.update(self.conversation.get_dict()) |
|
|
payload = { |
|
|
"model": self.model, |
|
|
"provider": self.provider, |
|
|
"data": self.data, |
|
|
"items": self.history |
|
|
} |
|
|
with open(self.file_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(payload, f, indent=2, ensure_ascii=False) |
|
|
except Exception as e: |
|
|
print(f"Error saving conversation: {e}", file=sys.stderr) |
|
|
|
|
|
def add_message(self, role: str, content: str) -> None: |
|
|
self.history.append({"role": role, "content": content}) |
|
|
|
|
|
def get_messages(self) -> List[Dict[str, str]]: |
|
|
result = [] |
|
|
for item in self.history[-self.max_messages:]: |
|
|
if item.get("role") in ["user", "system"] or result: |
|
|
result.append(item) |
|
|
return result |
|
|
|
|
|
async def stream_response( |
|
|
client: AsyncClient, |
|
|
input_text, |
|
|
conversation: ConversationManager, |
|
|
output_file: Optional[Path] = None, |
|
|
instructions: Optional[str] = None |
|
|
) -> None: |
|
|
media = None |
|
|
if isinstance(input_text, tuple): |
|
|
media, input_text = input_text |
|
|
|
|
|
if instructions: |
|
|
conversation.add_message("system", instructions) |
|
|
|
|
|
conversation.add_message("user", input_text) |
|
|
|
|
|
create_args = { |
|
|
"model": conversation.model, |
|
|
"messages": conversation.get_messages(), |
|
|
"stream": True, |
|
|
"media": media, |
|
|
"conversation": conversation.conversation, |
|
|
} |
|
|
|
|
|
response_tokens = [] |
|
|
last_chunk = None |
|
|
async for chunk in client.chat.completions.create(**create_args): |
|
|
last_chunk = chunk |
|
|
delta = chunk.choices[0].delta.content |
|
|
if not delta: |
|
|
continue |
|
|
if is_content(delta): |
|
|
response_tokens.append(delta) |
|
|
try: |
|
|
print(delta, end="", flush=True) |
|
|
except UnicodeEncodeError as e: |
|
|
debug.error(e) |
|
|
pass |
|
|
print() |
|
|
|
|
|
if last_chunk and hasattr(last_chunk, "conversation"): |
|
|
conversation.conversation = last_chunk.conversation |
|
|
|
|
|
media_chunk = next((t for t in response_tokens if isinstance(t, MediaResponse)), None) |
|
|
text_response = "" |
|
|
if media_chunk: |
|
|
text_response = response_tokens[0] if len(response_tokens) == 1 else "".join(str(t) for t in response_tokens) |
|
|
else: |
|
|
text_response = "".join(str(t) for t in response_tokens) |
|
|
|
|
|
if output_file: |
|
|
if save_content(text_response, media_chunk, str(output_file)): |
|
|
print(f"\n→ Response saved to '{output_file}'") |
|
|
|
|
|
if text_response: |
|
|
conversation.add_message("assistant", text_response) |
|
|
else: |
|
|
raise RuntimeError("No response received") |
|
|
|
|
|
|
|
|
def save_content(content, media: Optional[MediaResponse], filepath: str, allowed_types=None) -> bool: |
|
|
if media: |
|
|
for url in media.urls: |
|
|
if url.startswith(("http://", "https://")): |
|
|
try: |
|
|
resp = requests.get(url, cookies=media.get("cookies"), headers=media.get("headers")) |
|
|
if resp.status_code == 200: |
|
|
with open(filepath, "wb") as f: |
|
|
f.write(resp.content) |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Error fetching media '{url}': {e}", file=sys.stderr) |
|
|
return False |
|
|
else: |
|
|
content = url |
|
|
break |
|
|
if hasattr(content, "data"): |
|
|
content = content.data |
|
|
if not content: |
|
|
print("\nNo content to save.", file=sys.stderr) |
|
|
return False |
|
|
if content.startswith("data:"): |
|
|
with open(filepath, "wb") as f: |
|
|
f.write(extract_data_uri(content)) |
|
|
return True |
|
|
if content.startswith("/media/"): |
|
|
src = content.replace("/media", get_media_dir()).split("?")[0] |
|
|
os.rename(src, filepath) |
|
|
return True |
|
|
filtered = filter_markdown(content, allowed_types) |
|
|
if filtered: |
|
|
with open(filepath, "w", encoding="utf-8") as f: |
|
|
f.write(filtered) |
|
|
return True |
|
|
print("\nUnable to save content.", file=sys.stderr) |
|
|
return False |
|
|
|
|
|
def get_parser(exit_on_error=True): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="G4F CLI client with conversation history", |
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
|
|
exit_on_error=exit_on_error |
|
|
) |
|
|
parser.add_argument('-d', '--debug', action='store_true', help="Verbose debug") |
|
|
parser.add_argument('-p', '--provider', default=None, |
|
|
help=f"Provider to use: {', '.join(k for k,v in ProviderUtils.convert.items() if v.working)}") |
|
|
parser.add_argument('-m', '--model', help="Model name") |
|
|
parser.add_argument('-O', '--output', type=Path, |
|
|
help="Save assistant output to FILE (text or media)") |
|
|
parser.add_argument('-i', '--instructions', help="System instructions") |
|
|
parser.add_argument('-c', '--cookies-dir', type=Path, default=COOKIES_DIR, |
|
|
help="Cookies/HAR directory") |
|
|
parser.add_argument('--conversation-file', type=Path, default=CONVERSATION_FILE, |
|
|
help="Conversation JSON") |
|
|
parser.add_argument('-C', '--clear-history', action='store_true', help="Wipe history") |
|
|
parser.add_argument('-N', '--no-config', action='store_true', help="Skip loading history") |
|
|
|
|
|
parser.add_argument( |
|
|
'-e', '--edit', |
|
|
type=Path, |
|
|
metavar='FILE', |
|
|
help="If FILE given: send its contents and overwrite it with AI's reply." |
|
|
) |
|
|
parser.add_argument('--max-messages', type=int, default=5, |
|
|
help="Max user+assistant turns in context") |
|
|
parser.add_argument('input', nargs='*', |
|
|
help="URLs, image paths or plain text") |
|
|
return parser |
|
|
|
|
|
|
|
|
async def run_args(input_val, args): |
|
|
try: |
|
|
|
|
|
if args.output: |
|
|
args.output.parent.mkdir(parents=True, exist_ok=True) |
|
|
if args.conversation_file: |
|
|
args.conversation_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
args.cookies_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if args.debug: |
|
|
debug.logging = True |
|
|
|
|
|
conv = ConversationManager( |
|
|
None if args.no_config else args.conversation_file, |
|
|
model=args.model, |
|
|
provider=args.provider, |
|
|
max_messages=args.max_messages |
|
|
) |
|
|
if args.clear_history: |
|
|
conv.history = [] |
|
|
conv.conversation = None |
|
|
|
|
|
set_cookies_dir(str(args.cookies_dir)) |
|
|
read_cookie_files() |
|
|
|
|
|
client = AsyncClient(provider=conv.provider) |
|
|
|
|
|
if isinstance(args.edit, Path): |
|
|
file_to_edit = args.edit |
|
|
if not file_to_edit.exists(): |
|
|
print(f"ERROR: file not found: {file_to_edit}", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
text = file_to_edit.read_text(encoding="utf-8") |
|
|
|
|
|
input_val = f"```file: {file_to_edit}\n{text}\n```\n" + (input_val[1] if isinstance(input_val, tuple) else input_val) |
|
|
output_target = file_to_edit |
|
|
else: |
|
|
|
|
|
output_target = args.output |
|
|
|
|
|
await stream_response(client, input_val, conv, output_target, args.instructions) |
|
|
conv.save() |
|
|
|
|
|
except Exception: |
|
|
print(traceback.format_exc(), file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
def run_client_args(args, exit_on_error=True): |
|
|
input_txt = "" |
|
|
media = [] |
|
|
rest = 0 |
|
|
|
|
|
for idx, tok in enumerate(args.input): |
|
|
if tok.startswith(("http://","https://")): |
|
|
|
|
|
resp = requests.head(tok, allow_redirects=True) |
|
|
if resp.ok and resp.headers.get("Content-Type","").startswith("image"): |
|
|
media.append(tok) |
|
|
else: |
|
|
if not has_markitdown: |
|
|
raise MissingRequirementsError("Install markitdown") |
|
|
md = MarkItDown() |
|
|
txt = md.convert_url(tok).text_content |
|
|
input_txt += f"\n```source: {tok}\n{txt}\n```\n" |
|
|
elif os.path.isfile(tok): |
|
|
head = Path(tok).read_bytes()[:12] |
|
|
try: |
|
|
if is_accepted_format(head): |
|
|
media.append(Path(tok)) |
|
|
is_img = True |
|
|
else: |
|
|
is_img = False |
|
|
except ValueError: |
|
|
is_img = False |
|
|
if not is_img: |
|
|
txt = Path(tok).read_text(encoding="utf-8") |
|
|
input_txt += f"\n```file: {tok}\n{txt}\n```\n" |
|
|
else: |
|
|
rest = idx |
|
|
break |
|
|
rest = idx + 1 |
|
|
|
|
|
tail = args.input[rest:] |
|
|
if tail: |
|
|
input_txt = " ".join(tail) + "\n" + input_txt |
|
|
|
|
|
if not sys.stdin.isatty() and not input_txt: |
|
|
input_txt = sys.stdin.read() |
|
|
|
|
|
if media: |
|
|
val = (media, input_txt) |
|
|
else: |
|
|
val = input_txt.strip() |
|
|
|
|
|
if exit_on_error and not val: |
|
|
print("No input provided. Use -h.", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
elif not val: |
|
|
raise argparse.ArgumentError(None, "No input provided. Use -h for help.") |
|
|
|
|
|
asyncio.run(run_args(val, args)) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
run_client_args(get_parser().parse_args()) |