Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import hashlib | |
| from datetime import datetime | |
| import numpy as np | |
| # PDF ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
| try: | |
| import fitz # PyMuPDF | |
| PDF_AVAILABLE = True | |
| except ImportError: | |
| PDF_AVAILABLE = False | |
| print("โ ๏ธ PyMuPDF not installed. Install with: pip install pymupdf") | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| ST_AVAILABLE = True | |
| except ImportError: | |
| ST_AVAILABLE = False | |
| print("โ ๏ธ Sentence Transformers not installed. Install with: pip install sentence-transformers") | |
| # Soft and bright custom CSS | |
| custom_css = """ | |
| .gradio-container { | |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
| min-height: 100vh; | |
| font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; | |
| } | |
| .main-container { | |
| background: rgba(255, 255, 255, 0.98); | |
| border-radius: 16px; | |
| padding: 24px; | |
| box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); | |
| border: 1px solid rgba(0, 0, 0, 0.05); | |
| margin: 12px; | |
| } | |
| /* Status messages styling */ | |
| .pdf-status { | |
| padding: 12px 16px; | |
| border-radius: 12px; | |
| margin: 12px 0; | |
| font-size: 0.95rem; | |
| font-weight: 500; | |
| } | |
| .pdf-success { | |
| background: linear-gradient(135deg, #d4edda 0%, #c3e6cb 100%); | |
| border: 1px solid #b1dfbb; | |
| color: #155724; | |
| } | |
| .pdf-error { | |
| background: linear-gradient(135deg, #f8d7da 0%, #f5c6cb 100%); | |
| border: 1px solid #f1aeb5; | |
| color: #721c24; | |
| } | |
| .pdf-info { | |
| background: linear-gradient(135deg, #d1ecf1 0%, #bee5eb 100%); | |
| border: 1px solid #9ec5d8; | |
| color: #0c5460; | |
| } | |
| .rag-context { | |
| background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%); | |
| border-left: 4px solid #f59e0b; | |
| padding: 12px; | |
| margin: 12px 0; | |
| border-radius: 8px; | |
| font-size: 0.9rem; | |
| } | |
| """ | |
| class SimpleTextSplitter: | |
| """ํ ์คํธ ๋ถํ ๊ธฐ""" | |
| def __init__(self, chunk_size=800, chunk_overlap=100): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def split_text(self, text: str) -> List[str]: | |
| """ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ """ | |
| chunks = [] | |
| sentences = text.split('. ') | |
| current_chunk = "" | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) < self.chunk_size: | |
| current_chunk += sentence + ". " | |
| else: | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence + ". " | |
| if current_chunk: | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| class PDFRAGSystem: | |
| """PDF ๊ธฐ๋ฐ RAG ์์คํ """ | |
| def __init__(self): | |
| self.documents = {} | |
| self.document_chunks = {} | |
| self.embeddings_store = {} | |
| self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) | |
| # ์๋ฒ ๋ฉ ๋ชจ๋ธ ์ด๊ธฐํ | |
| self.embedder = None | |
| if ST_AVAILABLE: | |
| try: | |
| self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| print("โ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์ฑ๊ณต") | |
| except Exception as e: | |
| print(f"โ ๏ธ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์คํจ: {e}") | |
| def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: | |
| """PDF์์ ํ ์คํธ ์ถ์ถ""" | |
| if not PDF_AVAILABLE: | |
| return { | |
| "metadata": { | |
| "title": "PDF Reader Not Available", | |
| "file_name": os.path.basename(pdf_path), | |
| "pages": 0 | |
| }, | |
| "full_text": "PDF ์ฒ๋ฆฌ๋ฅผ ์ํด 'pip install pymupdf'๋ฅผ ์คํํด์ฃผ์ธ์." | |
| } | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text_content = [] | |
| metadata = { | |
| "title": doc.metadata.get("title", os.path.basename(pdf_path)), | |
| "pages": len(doc), | |
| "file_name": os.path.basename(pdf_path) | |
| } | |
| for page_num, page in enumerate(doc): | |
| text = page.get_text() | |
| if text.strip(): | |
| text_content.append(text) | |
| doc.close() | |
| return { | |
| "metadata": metadata, | |
| "full_text": "\n\n".join(text_content) | |
| } | |
| except Exception as e: | |
| raise Exception(f"PDF ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}") | |
| def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: | |
| """PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ""" | |
| try: | |
| # PDF ํ ์คํธ ์ถ์ถ | |
| pdf_data = self.extract_text_from_pdf(pdf_path) | |
| # ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ | |
| chunks = self.text_splitter.split_text(pdf_data["full_text"]) | |
| # ์ฒญํฌ ์ ์ฅ | |
| self.document_chunks[doc_id] = chunks | |
| # ์๋ฒ ๋ฉ ์์ฑ | |
| if self.embedder: | |
| embeddings = self.embedder.encode(chunks) | |
| self.embeddings_store[doc_id] = embeddings | |
| # ๋ฌธ์ ์ ๋ณด ์ ์ฅ | |
| self.documents[doc_id] = { | |
| "metadata": pdf_data["metadata"], | |
| "chunk_count": len(chunks), | |
| "upload_time": datetime.now().isoformat() | |
| } | |
| return { | |
| "success": True, | |
| "doc_id": doc_id, | |
| "chunks": len(chunks), | |
| "pages": pdf_data["metadata"]["pages"], | |
| "title": pdf_data["metadata"]["title"] | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: | |
| """๊ด๋ จ ์ฒญํฌ ๊ฒ์""" | |
| all_relevant_chunks = [] | |
| if self.embedder and self.embeddings_store: | |
| # ์๋ฒ ๋ฉ ๊ธฐ๋ฐ ๊ฒ์ | |
| query_embedding = self.embedder.encode([query])[0] | |
| for doc_id in doc_ids: | |
| if doc_id in self.embeddings_store and doc_id in self.document_chunks: | |
| doc_embeddings = self.embeddings_store[doc_id] | |
| chunks = self.document_chunks[doc_id] | |
| # ์ฝ์ฌ์ธ ์ ์ฌ๋ ๊ณ์ฐ | |
| similarities = [] | |
| for emb in doc_embeddings: | |
| sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) | |
| similarities.append(sim) | |
| # ์์ ์ฒญํฌ ์ ํ | |
| top_indices = np.argsort(similarities)[-top_k:][::-1] | |
| for idx in top_indices: | |
| if similarities[idx] > 0.2: | |
| all_relevant_chunks.append({ | |
| "content": chunks[idx], | |
| "doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
| "similarity": similarities[idx] | |
| }) | |
| else: | |
| # ํค์๋ ๊ธฐ๋ฐ ๊ฒ์ | |
| query_keywords = set(query.lower().split()) | |
| for doc_id in doc_ids: | |
| if doc_id in self.document_chunks: | |
| chunks = self.document_chunks[doc_id] | |
| for i, chunk in enumerate(chunks[:5]): # ์ฒ์ 5๊ฐ๋ง | |
| chunk_lower = chunk.lower() | |
| score = sum(1 for keyword in query_keywords if keyword in chunk_lower) | |
| if score > 0: | |
| all_relevant_chunks.append({ | |
| "content": chunk[:500], | |
| "doc_name": self.documents[doc_id]["metadata"]["file_name"], | |
| "similarity": score / len(query_keywords) if query_keywords else 0 | |
| }) | |
| # ์ ๋ ฌ ๋ฐ ๋ฐํ | |
| all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) | |
| return all_relevant_chunks[:top_k] | |
| def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: | |
| """RAG ํ๋กฌํํธ ์์ฑ""" | |
| relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) | |
| if not relevant_chunks: | |
| return query | |
| # ํ๋กฌํํธ ๊ตฌ์ฑ | |
| prompt_parts = [] | |
| prompt_parts.append("๋ค์ ๋ฌธ์ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ๋ต๋ณํด์ฃผ์ธ์:\n") | |
| prompt_parts.append("=" * 40) | |
| for i, chunk in enumerate(relevant_chunks, 1): | |
| prompt_parts.append(f"\n[์ฐธ๊ณ {i} - {chunk['doc_name']}]") | |
| content = chunk['content'][:300] if len(chunk['content']) > 300 else chunk['content'] | |
| prompt_parts.append(content) | |
| prompt_parts.append("\n" + "=" * 40) | |
| prompt_parts.append(f"\n์ง๋ฌธ: {query}") | |
| return "\n".join(prompt_parts) | |
| # RAG ์์คํ ์ธ์คํด์ค ์์ฑ | |
| rag_system = PDFRAGSystem() | |
| # State variable to track current model and RAG settings | |
| current_model = gr.State("openai/gpt-oss-120b") | |
| rag_enabled_state = gr.State(False) | |
| selected_docs_state = gr.State([]) | |
| top_k_state = gr.State(3) | |
| def upload_pdf(file): | |
| """PDF ํ์ผ ์ ๋ก๋ ์ฒ๋ฆฌ""" | |
| if file is None: | |
| return ( | |
| gr.update(value="<div class='pdf-status pdf-info'>๐ ํ์ผ์ ์ ํํด์ฃผ์ธ์</div>"), | |
| gr.update(choices=[]), | |
| gr.update(value=False) | |
| ) | |
| try: | |
| # ํ์ผ ํด์๋ฅผ ID๋ก ์ฌ์ฉ | |
| with open(file.name, 'rb') as f: | |
| file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
| doc_id = f"doc_{file_hash}" | |
| # PDF ์ฒ๋ฆฌ ๋ฐ ์ ์ฅ | |
| result = rag_system.process_and_store_pdf(file.name, doc_id) | |
| if result["success"]: | |
| status_html = f""" | |
| <div class="pdf-status pdf-success"> | |
| โ PDF ์ ๋ก๋ ์๋ฃ!<br> | |
| ๐ {result['title']}<br> | |
| ๐ {result['pages']} ํ์ด์ง | ๐ {result['chunks']} ์ฒญํฌ | |
| </div> | |
| """ | |
| # ๋ฌธ์ ๋ชฉ๋ก ์ ๋ฐ์ดํธ | |
| doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" | |
| for doc_id in rag_system.documents.keys()] | |
| return ( | |
| status_html, | |
| gr.update(choices=doc_choices, value=doc_choices), | |
| gr.update(value=True) | |
| ) | |
| else: | |
| return ( | |
| f"<div class='pdf-status pdf-error'>โ ์ค๋ฅ: {result['error']}</div>", | |
| gr.update(), | |
| gr.update(value=False) | |
| ) | |
| except Exception as e: | |
| return ( | |
| f"<div class='pdf-status pdf-error'>โ ์ค๋ฅ: {str(e)}</div>", | |
| gr.update(), | |
| gr.update(value=False) | |
| ) | |
| def clear_documents(): | |
| """๋ฌธ์ ์ด๊ธฐํ""" | |
| rag_system.documents = {} | |
| rag_system.document_chunks = {} | |
| rag_system.embeddings_store = {} | |
| return ( | |
| gr.update(value="<div class='pdf-status pdf-info'>๐๏ธ ๋ชจ๋ ๋ฌธ์๊ฐ ์ญ์ ๋์์ต๋๋ค</div>"), | |
| gr.update(choices=[], value=[]), | |
| gr.update(value=False) | |
| ) | |
| def switch_model(model_choice): | |
| """Function to switch between models""" | |
| return gr.update(visible=False), gr.update(visible=True), model_choice | |
| def create_rag_wrapper(original_fn, model_name): | |
| """์๋ณธ ๋ชจ๋ธ ํจ์๋ฅผ RAG๋ก ๊ฐ์ธ๋ ๋ํผ ์์ฑ""" | |
| def wrapped_fn(message, history=None): | |
| # RAG ์ค์ ๊ฐ์ ธ์ค๊ธฐ | |
| if rag_enabled_state.value and selected_docs_state.value: | |
| doc_ids = [doc.split(":")[0] for doc in selected_docs_state.value] | |
| enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k_state.value) | |
| # RAG ์ ์ฉ ์๋ฆผ | |
| print(f"๐ RAG ์ ์ฉ: {len(message)}์ โ {len(enhanced_message)}์") | |
| # ์๋ณธ ๋ชจ๋ธ์ ๊ฐํ๋ ๋ฉ์์ง ์ ๋ฌ | |
| if history is not None: | |
| return original_fn(enhanced_message, history) | |
| else: | |
| return original_fn(enhanced_message) | |
| else: | |
| # RAG ๋ฏธ์ ์ฉ์ ์๋ณธ ๋ฉ์์ง ๊ทธ๋๋ก ์ ๋ฌ | |
| if history is not None: | |
| return original_fn(message, history) | |
| else: | |
| return original_fn(message) | |
| return wrapped_fn | |
| # Main interface with soft theme | |
| with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), css=custom_css) as demo: | |
| with gr.Row(): | |
| # Sidebar | |
| with gr.Column(scale=1): | |
| with gr.Group(elem_classes="main-container"): | |
| gr.Markdown("# ๐ Inference Provider + RAG") | |
| gr.Markdown( | |
| "OpenAI GPT-OSS models served by Cerebras API. " | |
| "Upload PDF documents for context-aware responses." | |
| ) | |
| # Model selection | |
| model_dropdown = gr.Dropdown( | |
| choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], | |
| value="openai/gpt-oss-120b", | |
| label="๐ Select Model", | |
| info="Choose between different model sizes" | |
| ) | |
| # Login button | |
| login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") | |
| # Reload button to apply model change | |
| reload_btn = gr.Button("๐ Apply Model Change", variant="primary", size="lg") | |
| # RAG Settings | |
| with gr.Accordion("๐ PDF RAG Settings", open=True): | |
| pdf_upload = gr.File( | |
| label="Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| upload_status = gr.HTML( | |
| value="<div class='pdf-status pdf-info'>๐ค Upload a PDF to enable document-based answers</div>" | |
| ) | |
| document_list = gr.CheckboxGroup( | |
| choices=[], | |
| label="๐ Uploaded Documents", | |
| info="Select documents to use as context" | |
| ) | |
| clear_btn = gr.Button("๐๏ธ Clear All Documents", size="sm", variant="secondary") | |
| enable_rag = gr.Checkbox( | |
| label="โจ Enable RAG", | |
| value=False, | |
| info="Use documents for context-aware responses" | |
| ) | |
| top_k_chunks = gr.Slider( | |
| minimum=1, | |
| maximum=5, | |
| value=3, | |
| step=1, | |
| label="Context Chunks", | |
| info="Number of document chunks to use" | |
| ) | |
| # Additional options | |
| with gr.Accordion("โ๏ธ Advanced Options", open=False): | |
| gr.Markdown("*These options will be available after model implementation*") | |
| temperature = gr.Slider( | |
| minimum=0, | |
| maximum=2, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ) | |
| max_tokens = gr.Slider( | |
| minimum=1, | |
| maximum=4096, | |
| value=512, | |
| step=1, | |
| label="Max Tokens" | |
| ) | |
| # Main chat area | |
| with gr.Column(scale=3): | |
| with gr.Group(elem_classes="main-container"): | |
| gr.Markdown("## ๐ฌ Chat Interface") | |
| # RAG status | |
| rag_status = gr.HTML( | |
| value="<div class='pdf-status pdf-info'>๐ RAG: <strong>Disabled</strong></div>" | |
| ) | |
| # RAG context preview | |
| context_preview = gr.HTML(value="", visible=False) | |
| # Container for model interfaces | |
| with gr.Column(visible=True) as model_120b_container: | |
| gr.Markdown("### Model: openai/gpt-oss-120b") | |
| # Load the original model and wrap it with RAG | |
| original_interface_120b = gr.load( | |
| "models/openai/gpt-oss-120b", | |
| accept_token=login_button, | |
| provider="fireworks-ai" | |
| ) | |
| # Note: The loaded interface will have its own chat components | |
| # We'll intercept the messages through our wrapper function | |
| with gr.Column(visible=False) as model_20b_container: | |
| gr.Markdown("### Model: openai/gpt-oss-20b") | |
| # Load the original model | |
| original_interface_20b = gr.load( | |
| "models/openai/gpt-oss-20b", | |
| accept_token=login_button, | |
| provider="fireworks-ai" | |
| ) | |
| # Event Handlers | |
| # PDF upload | |
| pdf_upload.upload( | |
| fn=upload_pdf, | |
| inputs=[pdf_upload], | |
| outputs=[upload_status, document_list, enable_rag] | |
| ) | |
| # Clear documents | |
| clear_btn.click( | |
| fn=clear_documents, | |
| outputs=[upload_status, document_list, enable_rag] | |
| ) | |
| # Update RAG state when settings change | |
| def update_rag_state(enabled, docs, k): | |
| rag_enabled_state.value = enabled | |
| selected_docs_state.value = docs if docs else [] | |
| top_k_state.value = k | |
| status = "โ Enabled" if enabled and docs else "โญ Disabled" | |
| status_html = f"<div class='pdf-status pdf-info'>๐ RAG: <strong>{status}</strong></div>" | |
| # Show context preview if RAG is enabled | |
| if enabled and docs: | |
| preview = f"<div class='rag-context'>๐ Using {len(docs)} document(s) with {k} chunks per query</div>" | |
| return gr.update(value=status_html), gr.update(value=preview, visible=True) | |
| else: | |
| return gr.update(value=status_html), gr.update(value="", visible=False) | |
| # Connect RAG state updates | |
| enable_rag.change( | |
| fn=update_rag_state, | |
| inputs=[enable_rag, document_list, top_k_chunks], | |
| outputs=[rag_status, context_preview] | |
| ) | |
| document_list.change( | |
| fn=update_rag_state, | |
| inputs=[enable_rag, document_list, top_k_chunks], | |
| outputs=[rag_status, context_preview] | |
| ) | |
| top_k_chunks.change( | |
| fn=update_rag_state, | |
| inputs=[enable_rag, document_list, top_k_chunks], | |
| outputs=[rag_status, context_preview] | |
| ) | |
| # Handle model switching | |
| reload_btn.click( | |
| fn=switch_model, | |
| inputs=[model_dropdown], | |
| outputs=[model_120b_container, model_20b_container, current_model] | |
| ).then( | |
| fn=lambda: gr.Info("Model switched successfully!"), | |
| inputs=[], | |
| outputs=[] | |
| ) | |
| # Update visibility based on dropdown selection | |
| def update_visibility(model_choice): | |
| if model_choice == "openai/gpt-oss-120b": | |
| return gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True) | |
| model_dropdown.change( | |
| fn=update_visibility, | |
| inputs=[model_dropdown], | |
| outputs=[model_120b_container, model_20b_container] | |
| ) | |
| # Monkey-patch the loaded interfaces to add RAG support | |
| # This is done after the interface is loaded | |
| demo.load = lambda: print("๐ RAG System Ready!") | |
| demo.launch() |