Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import requests | |
| import uuid | |
| import datetime | |
| import zipfile | |
| import tempfile | |
| import shutil | |
| import secrets | |
| import time | |
| import json | |
| from typing import List, Tuple, Any, Dict, Set, Generator | |
| from urllib.parse import urljoin, urlparse | |
| # Third-party libraries | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient, HfApi, hf_hub_download | |
| from huggingface_hub.utils import HfHubHTTPError | |
| from pypdf import PdfReader | |
| from bs4 import BeautifulSoup | |
| import nltk | |
| # --- CONFIGURATION --- | |
| class Config: | |
| """Centralized configuration for the Maestro application.""" | |
| HF_MODEL = os.getenv("HF_MODEL", "mistralai/Mixtral-8x7B-Instruct-v0.1") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| HF_DATASET_REPO = "Omnibus/tmp" # As specified in the user's script | |
| MEMORY_MAIN_PATH = "mem-test2/main.json" | |
| MEMORY_INDEX_PATH = "mem-test2/index.json" | |
| MEMORY_DATA_PATH = "mem-test2" | |
| VERBOSE = os.getenv("VERBOSE", "True").lower() == "true" | |
| MAX_TOKENS_SYNTHESIS = 4096 | |
| MAX_TOKENS_REPORT = 8192 | |
| MAX_TOKENS_CHAT = 2048 | |
| MAX_DATA_CHUNK = 20000 # For processing large text bodies | |
| REQUESTS_TIMEOUT = 20 | |
| USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' | |
| # --- PROMPT LIBRARY (Integrated for simplicity) --- | |
| class PromptLibrary: | |
| """A centralized library of meticulously crafted prompt templates.""" | |
| AGENT_PREFIX = """ | |
| You are Maestro, an Expert Information Retrieval and Synthesis Agent. Your operation is governed by these directives: | |
| 1. Ethical Safeguard [v2.4]: Refuse to process harmful, illegal, or unethical requests. | |
| 2. Temporal Awareness: Use the timestamp {dynamic_timestamp_utc} to evaluate data relevance. | |
| 3. Contextual Prioritization: Analyze the user's purpose '{user_purpose}' to weigh data relevance. | |
| """ | |
| COMPRESS_JSON = """ | |
| Task: {task} | |
| Based on the AGENT_PREFIX context and the following data, generate a structured and concise JSON summary. | |
| Input Data Chunk: | |
| --- | |
| {history} | |
| --- | |
| Existing Knowledge (for context): | |
| --- | |
| {knowledge} | |
| --- | |
| Instructions: | |
| Compile and categorize the data above into a JSON dictionary string. Extract key information, group related entities, and ensure the output is a single, valid JSON object. | |
| """ | |
| COMPRESS_REPORT = """ | |
| Task: {task} | |
| Based on the AGENT_PREFIX context and the summarized knowledge you have, compile a detailed, exhaustive report (~8000 words). | |
| Summarized Knowledge: | |
| --- | |
| {knowledge} | |
| --- | |
| Last Chunk of Raw Data (for final context): | |
| --- | |
| {history} | |
| --- | |
| Instructions: | |
| Synthesize all provided information into a single, comprehensive narrative. Be thorough, detailed, and structure the report with clear headings and sections. | |
| """ | |
| SAVE_MEMORY = """ | |
| Task: {task} | |
| Data: | |
| --- | |
| {history} | |
| --- | |
| Instructions: | |
| Compile and categorize the data above into a JSON dictionary string. Include ALL text, datapoints, titles, descriptions, and source urls indexed into an easy to search JSON format. | |
| Required keys: "keywords", "title", "description", "content", "url". The "keywords" list should be comprehensive. | |
| """ | |
| RECALL_MEMORY = """ | |
| The user will give you a query and a list of keywords from a database index. | |
| Your duty is to choose the words from the list that are most closely related to the search query. | |
| If no keywords are relevant, return an empty list: []. | |
| Respond only with a single, valid JSON list of strings. | |
| USER QUERY: {prompt} | |
| KEYWORD LIST: {keywords} | |
| """ | |
| # --- UTILITIES --- | |
| def log(message: str) -> None: | |
| if Config.VERBOSE: print(f"[{datetime.datetime.now(datetime.timezone.utc).isoformat()}] {message}") | |
| # --- CORE APPLICATION ENGINE --- | |
| class MaestroEngine: | |
| """Handles all data processing, memory management, and LLM interaction.""" | |
| def __init__(self): | |
| if not Config.HF_TOKEN: raise ValueError("HF_TOKEN environment variable not set!") | |
| self.client = InferenceClient(model=Config.HF_MODEL, token=Config.HF_TOKEN) | |
| self.api = HfApi(token=Config.HF_TOKEN) | |
| try: | |
| nltk.data.find("tokenizers/punkt") | |
| except LookupError: | |
| log("Downloading NLTK 'punkt' tokenizer...") | |
| nltk.download('punkt', quiet=True) | |
| log("MaestroEngine initialized.") | |
| # --- Data Ingestion --- | |
| def _read_pdf_from_path(self, path: str) -> str: | |
| try: | |
| return "\n".join(page.extract_text() or "" for page in PdfReader(path).pages) | |
| except Exception as e: return f"Error reading PDF {os.path.basename(path)}: {e}" | |
| def _read_pdf_from_url(self, url: str) -> str: | |
| try: | |
| response = requests.get(url, stream=True, timeout=Config.REQUESTS_TIMEOUT) | |
| response.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(response.content) | |
| return self._read_pdf_from_path(tmp_file.name) | |
| except Exception as e: return f"Failed to download or read PDF from {url}: {e}" | |
| finally: | |
| if 'tmp_file' in locals() and os.path.exists(tmp_file.name): os.remove(tmp_file.name) | |
| def _get_web_text(self, url: str) -> str: | |
| try: | |
| response = requests.get(url, headers={'User-Agent': Config.USER_AGENT}, timeout=Config.REQUESTS_TIMEOUT) | |
| response.raise_for_status() | |
| return BeautifulSoup(response.content, 'lxml').get_text(separator="\n", strip=True) | |
| except Exception as e: return f"Failed to fetch URL {url}: {e}" | |
| def process_data_sources(self, text: str, files: List[str], url: str, pdf_url: str, pdf_batch: str) -> Tuple[str, List[str]]: | |
| """Orchestrates data ingestion from all provided sources.""" | |
| all_content, errors = [], [] | |
| if text: all_content.append(text) | |
| if url: all_content.append(self._get_web_text(url)) | |
| if pdf_url: all_content.append(self._read_pdf_from_url(pdf_url)) | |
| if pdf_batch: | |
| urls = [u.strip() for u in pdf_batch.split(',') if u.strip()] | |
| for u in urls: | |
| content = self._read_pdf_from_url(u) | |
| if content.startswith("Error"): errors.append(content) | |
| else: all_content.append(content) | |
| if files: | |
| for path in files: | |
| if not path: continue | |
| filename, ext = os.path.basename(path), os.path.splitext(path)[1].lower() | |
| if ext == '.pdf': all_content.append(self._read_pdf_from_path(path)) | |
| elif ext == '.txt': | |
| with open(path, 'r', encoding='utf-8', errors='ignore') as f: all_content.append(f.read()) | |
| else: errors.append(f"Unsupported file type: {filename}") | |
| return "\n\n---\n\n".join(all_content), errors | |
| # --- LLM Interaction --- | |
| def _run_gpt(self, prompt_template: str, max_tokens: int, **kwargs) -> str: | |
| """Core LLM call function.""" | |
| system_prompt = PromptLibrary.AGENT_PREFIX.format( | |
| dynamic_timestamp_utc=datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| user_purpose=kwargs.get('task', 'completing a system task.') | |
| ) | |
| full_prompt = f"<s>[INST] {system_prompt}\n\n{prompt_template.format(**kwargs)} [/INST]" | |
| log(f"Running GPT. Template: {prompt_template[:50]}...") | |
| try: | |
| return self.client.text_generation(full_prompt, max_new_tokens=max_tokens, temperature=0.8, top_p=0.95).strip() | |
| except Exception as e: | |
| log(f"LLM Error: {e}") | |
| return f'{{"error": "LLM call failed", "details": "{e}"}}' | |
| def _chunk_and_process(self, text: str, prompt_template: str, task: str, max_tokens: int) -> List[str]: | |
| """Chunks large text and processes each chunk with an LLM.""" | |
| text_len = len(text) | |
| if text_len == 0: return [] | |
| num_chunks = (text_len + Config.MAX_DATA_CHUNK - 1) // Config.MAX_DATA_CHUNK | |
| chunk_size = (text_len + num_chunks - 1) // num_chunks | |
| results, knowledge = [], "" | |
| for i in range(num_chunks): | |
| chunk = text[i*chunk_size : (i+1)*chunk_size] | |
| log(f"Processing chunk {i+1}/{num_chunks}...") | |
| resp = self._run_gpt(prompt_template, max_tokens, task=task, knowledge=knowledge, history=chunk) | |
| knowledge = resp if len(resp) < 2000 else resp[:2000] # Use response as context for next chunk | |
| results.append(resp) | |
| return results | |
| # --- Synthesis & Reporting Workflow --- | |
| def synthesis_workflow(self, text: str, task: str, do_summarize: bool, do_report: bool) -> Tuple[str, List[Dict]]: | |
| """Handles the multi-stage summarization and reporting process.""" | |
| if not text: return "No data to process.", [] | |
| json_summary_objects, final_report = [], "" | |
| if do_summarize or do_report: # Summarization is a prerequisite for reporting | |
| log("Starting summarization stage...") | |
| summaries = self._chunk_and_process(text, PromptLibrary.COMPRESS_JSON, task, Config.MAX_TOKENS_SYNTHESIS) | |
| for s in summaries: | |
| try: json_summary_objects.append(json.loads(s)) | |
| except json.JSONDecodeError: json_summary_objects.append({"error": "Failed to parse summary JSON", "raw": s}) | |
| log("Summarization stage complete.") | |
| if do_report: | |
| log("Starting report generation stage...") | |
| # Use the collected JSON summaries as knowledge for the final report | |
| knowledge_for_report = json.dumps(json_summary_objects, indent=2) | |
| final_report = self._run_gpt(PromptLibrary.COMPRESS_REPORT, Config.MAX_TOKENS_REPORT, task=task, knowledge=knowledge_for_report, history="All data chunks have been summarized.") | |
| log("Report generation complete.") | |
| return final_report, json_summary_objects | |
| return "Summarization complete.", json_summary_objects | |
| # --- Persistent Memory System --- | |
| def _hf_download_json(self, repo_path: str, default: Any = []) -> Any: | |
| try: | |
| path = hf_hub_download(repo_id=Config.HF_DATASET_REPO, filename=repo_path, repo_type="dataset", token=Config.HF_TOKEN) | |
| with open(path, 'r') as f: return json.load(f) | |
| except HfHubHTTPError: return default # File doesn't exist, return default | |
| except (json.JSONDecodeError, IOError): return default | |
| def _hf_upload_json(self, data: Any, repo_path: str): | |
| with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix=".json") as tmp_file: | |
| json.dump(data, tmp_file, indent=4) | |
| tmp_path = tmp_file.name | |
| self.api.upload_file(path_or_fileobj=tmp_path, path_in_repo=repo_path, repo_id=Config.HF_DATASET_REPO, repo_type="dataset") | |
| os.remove(tmp_path) | |
| def save_to_memory(self, text: str, task: str) -> List[Dict]: | |
| """Saves processed text to the Hugging Face Dataset repo.""" | |
| log("Starting memory save process...") | |
| json_chunks = self._chunk_and_process(text, PromptLibrary.SAVE_MEMORY, task, Config.MAX_TOKENS_SYNTHESIS) | |
| parsed_chunks, main_file = [], self._hf_download_json(Config.MEMORY_MAIN_PATH) | |
| for i, chunk_str in enumerate(json_chunks): | |
| try: | |
| data = json.loads(chunk_str) | |
| ts = datetime.datetime.now(datetime.timezone.utc) | |
| filename = f"{ts.strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4().hex[:8]}.json" | |
| self._hf_upload_json(data, f"{Config.MEMORY_DATA_PATH}/{filename}") | |
| main_file.append({"file_name": filename, "keywords": data.get("keywords", []), "description": data.get("description", "")}) | |
| parsed_chunks.append(data) | |
| except json.JSONDecodeError: log(f"Could not parse memory chunk {i} into JSON.") | |
| self._hf_upload_json(main_file, Config.MEMORY_MAIN_PATH) | |
| self.update_keyword_index(main_file) | |
| log("Memory save complete.") | |
| return parsed_chunks | |
| def update_keyword_index(self, main_file_content: List[Dict]): | |
| log("Updating keyword index...") | |
| keyword_index = {} | |
| for entry in main_file_content: | |
| for keyword in entry.get("keywords", []): | |
| k = keyword.strip().lower() | |
| if k not in keyword_index: keyword_index[k] = [] | |
| if entry["file_name"] not in keyword_index[k]: keyword_index[k].append(entry["file_name"]) | |
| self._hf_upload_json(keyword_index, Config.MEMORY_INDEX_PATH) | |
| log("Keyword index updated.") | |
| def recall_from_memory(self, query: str) -> str: | |
| log("Recalling from memory...") | |
| index = self._hf_download_json(Config.MEMORY_INDEX_PATH, default={}) | |
| if not index: return "Memory index is empty or could not be loaded." | |
| relevant_keywords_str = self._run_gpt(PromptLibrary.RECALL_MEMORY, 256, prompt=query, keywords=list(index.keys())) | |
| try: | |
| relevant_keywords = json.loads(relevant_keywords_str) | |
| except json.JSONDecodeError: return "Could not determine relevant keywords from memory." | |
| if not relevant_keywords: return "Found no relevant information in memory for that query." | |
| # Fetch data from relevant files | |
| matched_files, fetched_data = set(), [] | |
| for k in relevant_keywords: | |
| for fname in index.get(k.lower().strip(), []): matched_files.add(fname) | |
| for fname in list(matched_files)[:5]: # Limit fetches | |
| data = self._hf_download_json(f"{Config.MEMORY_DATA_PATH}/{fname}", default={}) | |
| fetched_data.append(data) | |
| return f"Recalled {len(fetched_data)} entries from memory:\n\n{json.dumps(fetched_data, indent=2)}" | |
| # --- GRADIO APPLICATION --- | |
| class GradioApp: | |
| def __init__(self, engine: MaestroEngine): | |
| self.engine = engine | |
| self.app = self._build_ui() | |
| def _build_ui(self) -> gr.Blocks: | |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="sky"), title="Maestro AI Engine") as app: | |
| session_id = gr.State(lambda: secrets.token_hex(16)) | |
| gr.Markdown("# ๐ง Maestro: AI Data Engine & Synthesis Platform") | |
| with gr.Tabs(): | |
| with gr.TabItem("โ๏ธ Ingestion & Synthesis"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| task_instructions = gr.Textbox(label="Primary Task / Instructions", placeholder="e.g., 'Summarize the key findings regarding renewable energy adoption'") | |
| with gr.Tabs(): | |
| with gr.TabItem("Text Input"): text_input = gr.Textbox(lines=10) | |
| with gr.TabItem("File Upload"): file_upload = gr.File(label="Upload Files (.pdf, .txt)", file_count="multiple", type="filepath") | |
| with gr.TabItem("Web URL"): url_input = gr.Textbox(label="URL") | |
| with gr.TabItem("PDF URL"): pdf_url_input = gr.Textbox(label="Single PDF URL") | |
| with gr.TabItem("Batch PDF URLs"): pdf_batch_input = gr.Textbox(label="Comma-separated PDF URLs", lines=3) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Processing Options") | |
| summarize_check = gr.Checkbox(label="Create JSON Summary", value=True) | |
| report_check = gr.Checkbox(label="Generate Full Report (requires summary)", value=False) | |
| memory_check = gr.Checkbox(label="Save to Persistent Memory", value=False) | |
| process_button = gr.Button("๐ Process & Synthesize", variant="primary", scale=2) | |
| gr.Markdown("### Results") | |
| with gr.Row(): | |
| final_report_output = gr.Markdown(label="Final Report") | |
| json_summary_output = gr.JSON(label="JSON Summaries") | |
| with gr.TabItem("๐ Memory Recall"): | |
| memory_query = gr.Textbox(label="Query Persistent Memory", placeholder="e.g., 'What do we know about market trends in 2024?'") | |
| recall_button = gr.Button("Recall", variant="primary") | |
| memory_output = gr.Textbox(label="Recalled Information", lines=20, interactive=False) | |
| # --- CORRECTION PART 1: The event listener now expects 4 outputs --- | |
| # The output components match the error: [state, textbox, textbox, button] | |
| # In our code, these are: session_id, final_report_output, json_summary_output, process_button | |
| process_button.click( | |
| self._synthesis_workflow, | |
| [session_id, task_instructions, text_input, file_upload, url_input, pdf_url_input, pdf_batch_input, summarize_check, report_check, memory_check], | |
| [session_id, final_report_output, json_summary_output, process_button] | |
| ) | |
| recall_button.click(self.engine.recall_from_memory, [memory_query], [memory_output]) | |
| return app | |
| # --- CORRECTION PART 2: The handler function is now a generator that yields updates --- | |
| def _synthesis_workflow(self, session, task, text, files, url, pdf_url, pdf_batch, do_sum, do_rep, do_mem): | |
| log(f"Starting synthesis workflow for session: {session}") | |
| # 1. First yield: Immediately update the UI to show a "processing" state. | |
| # This provides a value for all 4 output components. | |
| yield { | |
| session_id: session, # The state component doesn't need to be changed | |
| final_report_output: "โ๏ธ Processing... Please wait.", | |
| json_summary_output: None, | |
| process_button: gr.update(value="Processing...", interactive=False) | |
| } | |
| # 2. Perform the actual work | |
| ingested_text, errors = self.engine.process_data_sources(text, files, url, pdf_url, pdf_batch) | |
| if errors: | |
| log(f"Ingestion errors: {errors}") | |
| if not ingested_text: | |
| # Final yield (or return) in case of error | |
| yield { | |
| session_id: session, | |
| final_report_output: "## Error\nNo data was successfully ingested. Please check your inputs.", | |
| json_summary_output: {"errors": errors}, | |
| process_button: gr.update(value="๐ Process & Synthesize", interactive=True) | |
| } | |
| return # Stop execution here | |
| if do_mem: | |
| self.engine.save_to_memory(ingested_text, task) | |
| report_result, summaries_result = "Processing was not requested.", None | |
| if do_sum or do_rep: | |
| report_result, summaries_result = self.engine.synthesis_workflow(ingested_text, task, do_sum, do_rep) | |
| # 3. Final yield: Return the final results and re-enable the button. | |
| # This also provides a value for all 4 output components. | |
| yield { | |
| session_id: session, | |
| final_report_output: report_result, | |
| json_summary_output: summaries_result, | |
| process_button: gr.update(value="๐ Process & Synthesize", interactive=True) | |
| } | |
| def launch(self): self.app.launch(debug=Config.VERBOSE, share=False) | |
| if __name__ == "__main__ ": | |
| if not Config.HF_TOKEN: | |
| print("FATAL: HF_TOKEN environment variable not set.") | |
| else: | |
| log("Instantiating Maestro Engine...") | |
| engine = MaestroEngine() | |
| app = GradioApp(engine) | |
| log("Launching Gradio App...") | |
| app.launch() |