Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import requests | |
| import uuid | |
| from huggingface_hub import InferenceClient, HfApi | |
| from pypdf import PdfReader | |
| from bs4 import BeautifulSoup | |
| import json | |
| import datetime | |
| import zipfile | |
| import nltk.data # Import for sentence tokenization | |
| import nltk | |
| nltk.download('punkt') | |
| # Enable verbose logging | |
| VERBOSE = True | |
| def log(message): | |
| if VERBOSE: | |
| print(f"[LOG] {datetime.datetime.now()} - {message}") | |
| # Hugging Face API Initialization | |
| HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| HF_TOKEN = os.environ.get('HF_TOKEN') | |
| if not HF_TOKEN: | |
| raise EnvironmentError("HF_TOKEN is not set. Please export it as an environment variable.") | |
| try: | |
| client = InferenceClient(HF_MODEL) | |
| api = HfApi(token=HF_TOKEN) | |
| log("Initialized Hugging Face client and API.") | |
| except Exception as e: | |
| log(f"Error initializing Hugging Face client: {e}") | |
| exit(1) # Exit if HF initialization fails | |
| REPO_NAME = "acecalisto3/tmp" | |
| DATASET_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/" | |
| # Constants | |
| MAX_HISTORY = 100 | |
| MAX_DATA = 20000 | |
| MAX_TOKENS = 8192 # Moved here for global access | |
| # Utility Functions | |
| def read_pdf(file_path): | |
| log(f"Reading PDF: {file_path}") | |
| try: | |
| reader = PdfReader(file_path) | |
| text = "\n".join(page.extract_text() for page in reader.pages) | |
| log(f"Extracted text from {len(reader.pages)} pages.") | |
| return text | |
| except Exception as e: | |
| log(f"Error reading PDF {file_path}: {e}") # Include filename in error message | |
| return "" # Return empty string instead of exception string | |
| def fetch_url(url, max_depth): | |
| log(f"Fetching URL: {url} with depth: {max_depth}") | |
| visited = set() | |
| to_visit = [(url, 0)] | |
| results = [] | |
| while to_visit: | |
| current_url, depth = to_visit.pop(0) | |
| if current_url in visited: # Check visited *before* fetching | |
| continue | |
| log(f"Visiting: {current_url} at depth {depth}") | |
| if depth < max_depth: | |
| try: | |
| response = requests.get(current_url, timeout=10) # Add timeout | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| visited.add(current_url) | |
| soup = BeautifulSoup(response.content, 'lxml') | |
| results.append(soup.text) | |
| for link in soup.find_all("a", href=True): | |
| absolute_url = requests.compat.urljoin(current_url, link.get('href')) # Handle relative URLs | |
| if absolute_url.startswith("http") and absolute_url not in visited: | |
| to_visit.append((absolute_url, depth + 1)) | |
| except requests.exceptions.RequestException as e: | |
| log(f"Error fetching {current_url}: {e}") | |
| else: | |
| log(f"Skipping {current_url} due to max depth") | |
| return "\n".join(results) # Return a single string | |
| def read_txt(txt_path): | |
| log(f"Reading TXT file: {txt_path}") | |
| try: | |
| with open(txt_path, "r", encoding="utf-8") as f: # Specify encoding | |
| content = f.read() | |
| return content | |
| except Exception as e: | |
| log(f"Error reading TXT file {txt_path}: {e}") | |
| return "" | |
| def read_zip(zip_path): | |
| log(f"Extracting ZIP file: {zip_path}") | |
| try: | |
| extracted_data = [] | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| for file_info in zip_ref.infolist(): # Use infolist for file info | |
| if file_info.filename.endswith((".txt", ".pdf")): # Check both extensions at once | |
| with zip_ref.open(file_info) as file: | |
| try: | |
| content = file.read() | |
| if file_info.filename.endswith(".txt"): | |
| extracted_data.append(content.decode("utf-8")) | |
| elif file_info.filename.endswith(".pdf"): | |
| temp_path = f"/tmp/{uuid.uuid4()}" | |
| with open(temp_path, "wb") as temp_file: | |
| temp_file.write(content) | |
| extracted_data.append(read_pdf(temp_path)) | |
| os.remove(temp_path) | |
| except UnicodeDecodeError: | |
| log(f"Skipping file {file_info.filename} due to decoding error") | |
| return "\n".join(extracted_data) | |
| except Exception as e: | |
| log(f"Error reading ZIP file {zip_path}: {e}") | |
| return "" | |
| def chunk_text(text, max_chunk_size): | |
| log(f"Chunking text into max size: {max_chunk_size}") | |
| tokenizer = nltk.data.punkt.PunktSentenceTokenizer() | |
| sentences = tokenizer.tokenize(text) | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) + 1 > max_chunk_size: # account for space | |
| chunks.append(current_chunk.strip()) # remove trailing whitespace | |
| current_chunk = "" | |
| current_chunk += sentence + " " | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| log(f"Chunked into {len(chunks)} parts.") | |
| return chunks | |
| # Enhanced Dataset Processing Functions | |
| def extract_dataset(data, instructions="Extract {history}", max_tokens=MAX_TOKENS): # Use global MAX_TOKENS | |
| log("Extracting dataset...") | |
| try: | |
| chunks = chunk_text(data, MAX_DATA) | |
| extracted = [] | |
| for i, chunk in enumerate(chunks): | |
| log(f"Processing chunk {i+1}/{len(chunks)}") # Log chunk number | |
| try: | |
| response = client.text_generation( | |
| prompt=instructions.format(history=chunk), | |
| max_new_tokens=max_tokens | |
| ) | |
| extracted.append(response) | |
| except Exception as e: | |
| log(f"Error processing chunk {i+1}: {e}") | |
| extracted.append(f"Error processing chunk: {e}") # Append error message instead of crashing | |
| return "\n".join(extracted) | |
| except Exception as e: | |
| log(f"Error during dataset extraction: {e}") | |
| return f"Dataset extraction error: {e}" # Return informative error | |
| # Gradio App Interface | |
| with gr.Blocks() as app: | |
| gr.HTML("<center><h1>Dataset Generator and Flash Chatbot</h1><p>Generate datasets and train chatbots on the fly.</p></center>") | |
| chatbot = gr.Chatbot(label="Flash Trained Chatbot", show_copy_button=True, type="messages") | |
| command_selector = gr.Dropdown( | |
| label="Select Command", | |
| choices=["Scrape Data", "Extract Dataset", "Combine Datasets", "Train Chatbot"], | |
| value="Scrape Data" | |
| ) | |
| data = gr.Textbox(label="Input Text", lines=6, placeholder="Enter text or upload files.") | |
| files = gr.Files(label="Upload Files (PDFs, TXTs, ZIPs)", file_types=[".pdf", ".txt", ".zip"]) | |
| url = gr.Textbox(label="URL") | |
| depth_slider = gr.Slider(label="Crawl Depth", minimum=1, maximum=10, value=1, step=1) | |
| pdf_url = gr.Textbox(label="PDF URL") | |
| json_out = gr.JSON(label="Generated Datasets") | |
| error_box = gr.Textbox(label="Error Log", interactive=False) | |
| button = gr.Button("Process") | |
| def process_summarization(command, history, data, files, url, pdf_url, depth): | |
| datasets = [] | |
| errors = [] | |
| try: | |
| if data: | |
| log("Processing input text.") | |
| datasets.append(data) | |
| if files: | |
| for file in files: | |
| if file.name.endswith(".pdf"): | |
| datasets.append(read_pdf(file.name)) | |
| elif file.name.endswith(".txt"): | |
| datasets.append(read_txt(file.name)) | |
| elif file.name.endswith(".zip"): | |
| datasets.append(read_zip(file.name)) | |
| if url: | |
| log(f"Processing URL: {url}") | |
| datasets.append("\n".join(fetch_url(url, max_depth=depth))) | |
| if pdf_url: | |
| log(f"Processing PDF URL: {pdf_url}") | |
| response = requests.get(pdf_url) | |
| if response.status_code == 200: | |
| temp_path = f"temp_{uuid.uuid4()}.pdf" | |
| with open(temp_path, "wb") as f: | |
| f.write(response.content) | |
| datasets.append(read_pdf(temp_path)) | |
| os.remove(temp_path) | |
| else: | |
| errors.append(f"Failed to fetch PDF: {response.status_code}") | |
| if command == "Extract Dataset": | |
| datasets = [extract_dataset("\n".join(datasets))] | |
| elif command == "Combine Datasets": | |
| datasets = [combine_datasets(datasets)] | |
| elif command == "Train Chatbot": | |
| chatbot_data = combine_datasets(datasets) | |
| return chatbot_data, "Chatbot trained successfully!" | |
| return {"datasets": datasets}, "\n".join(errors) | |
| except Exception as e: | |
| errors.append(f"Error: {e}") | |
| return {"error": "No valid data processed."}, "\n".join(errors) | |
| button.click( | |
| process_summarization, | |
| inputs=[command_selector, chatbot, data, files, url, pdf_url, depth_slider], | |
| outputs=[json_out, error_box] | |
| ) | |
| app.launch() | |