Ndux / app.py
acecalisto3's picture
Update app.py
69c291d verified
raw
history blame
9.25 kB
import gradio as gr
import os
import requests
import uuid
from huggingface_hub import InferenceClient, HfApi
from pypdf import PdfReader
from bs4 import BeautifulSoup
import json
import datetime
import zipfile
import nltk.data # Import for sentence tokenization
import nltk
nltk.download('punkt')
# Enable verbose logging
VERBOSE = True
def log(message):
if VERBOSE:
print(f"[LOG] {datetime.datetime.now()} - {message}")
# Hugging Face API Initialization
HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1"
HF_TOKEN = os.environ.get('HF_TOKEN')
if not HF_TOKEN:
raise EnvironmentError("HF_TOKEN is not set. Please export it as an environment variable.")
try:
client = InferenceClient(HF_MODEL)
api = HfApi(token=HF_TOKEN)
log("Initialized Hugging Face client and API.")
except Exception as e:
log(f"Error initializing Hugging Face client: {e}")
exit(1) # Exit if HF initialization fails
REPO_NAME = "acecalisto3/tmp"
DATASET_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/"
# Constants
MAX_HISTORY = 100
MAX_DATA = 20000
MAX_TOKENS = 8192 # Moved here for global access
# Utility Functions
def read_pdf(file_path):
log(f"Reading PDF: {file_path}")
try:
reader = PdfReader(file_path)
text = "\n".join(page.extract_text() for page in reader.pages)
log(f"Extracted text from {len(reader.pages)} pages.")
return text
except Exception as e:
log(f"Error reading PDF {file_path}: {e}") # Include filename in error message
return "" # Return empty string instead of exception string
def fetch_url(url, max_depth):
log(f"Fetching URL: {url} with depth: {max_depth}")
visited = set()
to_visit = [(url, 0)]
results = []
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited: # Check visited *before* fetching
continue
log(f"Visiting: {current_url} at depth {depth}")
if depth < max_depth:
try:
response = requests.get(current_url, timeout=10) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
visited.add(current_url)
soup = BeautifulSoup(response.content, 'lxml')
results.append(soup.text)
for link in soup.find_all("a", href=True):
absolute_url = requests.compat.urljoin(current_url, link.get('href')) # Handle relative URLs
if absolute_url.startswith("http") and absolute_url not in visited:
to_visit.append((absolute_url, depth + 1))
except requests.exceptions.RequestException as e:
log(f"Error fetching {current_url}: {e}")
else:
log(f"Skipping {current_url} due to max depth")
return "\n".join(results) # Return a single string
def read_txt(txt_path):
log(f"Reading TXT file: {txt_path}")
try:
with open(txt_path, "r", encoding="utf-8") as f: # Specify encoding
content = f.read()
return content
except Exception as e:
log(f"Error reading TXT file {txt_path}: {e}")
return ""
def read_zip(zip_path):
log(f"Extracting ZIP file: {zip_path}")
try:
extracted_data = []
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_info in zip_ref.infolist(): # Use infolist for file info
if file_info.filename.endswith((".txt", ".pdf")): # Check both extensions at once
with zip_ref.open(file_info) as file:
try:
content = file.read()
if file_info.filename.endswith(".txt"):
extracted_data.append(content.decode("utf-8"))
elif file_info.filename.endswith(".pdf"):
temp_path = f"/tmp/{uuid.uuid4()}"
with open(temp_path, "wb") as temp_file:
temp_file.write(content)
extracted_data.append(read_pdf(temp_path))
os.remove(temp_path)
except UnicodeDecodeError:
log(f"Skipping file {file_info.filename} due to decoding error")
return "\n".join(extracted_data)
except Exception as e:
log(f"Error reading ZIP file {zip_path}: {e}")
return ""
def chunk_text(text, max_chunk_size):
log(f"Chunking text into max size: {max_chunk_size}")
tokenizer = nltk.data.punkt.PunktSentenceTokenizer()
sentences = tokenizer.tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > max_chunk_size: # account for space
chunks.append(current_chunk.strip()) # remove trailing whitespace
current_chunk = ""
current_chunk += sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
log(f"Chunked into {len(chunks)} parts.")
return chunks
# Enhanced Dataset Processing Functions
def extract_dataset(data, instructions="Extract {history}", max_tokens=MAX_TOKENS): # Use global MAX_TOKENS
log("Extracting dataset...")
try:
chunks = chunk_text(data, MAX_DATA)
extracted = []
for i, chunk in enumerate(chunks):
log(f"Processing chunk {i+1}/{len(chunks)}") # Log chunk number
try:
response = client.text_generation(
prompt=instructions.format(history=chunk),
max_new_tokens=max_tokens
)
extracted.append(response)
except Exception as e:
log(f"Error processing chunk {i+1}: {e}")
extracted.append(f"Error processing chunk: {e}") # Append error message instead of crashing
return "\n".join(extracted)
except Exception as e:
log(f"Error during dataset extraction: {e}")
return f"Dataset extraction error: {e}" # Return informative error
# Gradio App Interface
with gr.Blocks() as app:
gr.HTML("<center><h1>Dataset Generator and Flash Chatbot</h1><p>Generate datasets and train chatbots on the fly.</p></center>")
chatbot = gr.Chatbot(label="Flash Trained Chatbot", show_copy_button=True, type="messages")
command_selector = gr.Dropdown(
label="Select Command",
choices=["Scrape Data", "Extract Dataset", "Combine Datasets", "Train Chatbot"],
value="Scrape Data"
)
data = gr.Textbox(label="Input Text", lines=6, placeholder="Enter text or upload files.")
files = gr.Files(label="Upload Files (PDFs, TXTs, ZIPs)", file_types=[".pdf", ".txt", ".zip"])
url = gr.Textbox(label="URL")
depth_slider = gr.Slider(label="Crawl Depth", minimum=1, maximum=10, value=1, step=1)
pdf_url = gr.Textbox(label="PDF URL")
json_out = gr.JSON(label="Generated Datasets")
error_box = gr.Textbox(label="Error Log", interactive=False)
button = gr.Button("Process")
def process_summarization(command, history, data, files, url, pdf_url, depth):
datasets = []
errors = []
try:
if data:
log("Processing input text.")
datasets.append(data)
if files:
for file in files:
if file.name.endswith(".pdf"):
datasets.append(read_pdf(file.name))
elif file.name.endswith(".txt"):
datasets.append(read_txt(file.name))
elif file.name.endswith(".zip"):
datasets.append(read_zip(file.name))
if url:
log(f"Processing URL: {url}")
datasets.append("\n".join(fetch_url(url, max_depth=depth)))
if pdf_url:
log(f"Processing PDF URL: {pdf_url}")
response = requests.get(pdf_url)
if response.status_code == 200:
temp_path = f"temp_{uuid.uuid4()}.pdf"
with open(temp_path, "wb") as f:
f.write(response.content)
datasets.append(read_pdf(temp_path))
os.remove(temp_path)
else:
errors.append(f"Failed to fetch PDF: {response.status_code}")
if command == "Extract Dataset":
datasets = [extract_dataset("\n".join(datasets))]
elif command == "Combine Datasets":
datasets = [combine_datasets(datasets)]
elif command == "Train Chatbot":
chatbot_data = combine_datasets(datasets)
return chatbot_data, "Chatbot trained successfully!"
return {"datasets": datasets}, "\n".join(errors)
except Exception as e:
errors.append(f"Error: {e}")
return {"error": "No valid data processed."}, "\n".join(errors)
button.click(
process_summarization,
inputs=[command_selector, chatbot, data, files, url, pdf_url, depth_slider],
outputs=[json_out, error_box]
)
app.launch()