Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| import hashlib | |
| import json | |
| from datetime import datetime | |
| # PDF μ²λ¦¬ λΌμ΄λΈλ¬λ¦¬ | |
| import pymupdf # PyMuPDF | |
| import chromadb | |
| from chromadb.utils import embedding_functions | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| # Custom CSS (κΈ°μ‘΄ CSS + μΆκ° μ€νμΌ) | |
| custom_css = """ | |
| .gradio-container { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); | |
| background-size: 400% 400%; | |
| animation: gradient-animation 15s ease infinite; | |
| min-height: 100vh; | |
| } | |
| @keyframes gradient-animation { | |
| 0% { background-position: 0% 50%; } | |
| 50% { background-position: 100% 50%; } | |
| 100% { background-position: 0% 50%; } | |
| } | |
| .dark .gradio-container { | |
| background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); | |
| background-size: 400% 400%; | |
| animation: gradient-animation 15s ease infinite; | |
| } | |
| .main-container { | |
| background-color: rgba(255, 255, 255, 0.95); | |
| backdrop-filter: blur(10px); | |
| border-radius: 20px; | |
| padding: 20px; | |
| box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); | |
| border: 1px solid rgba(255, 255, 255, 0.18); | |
| margin: 10px; | |
| } | |
| .dark .main-container { | |
| background-color: rgba(30, 30, 30, 0.95); | |
| border: 1px solid rgba(255, 255, 255, 0.1); | |
| } | |
| .pdf-status { | |
| padding: 10px; | |
| border-radius: 10px; | |
| margin: 10px 0; | |
| font-size: 0.9em; | |
| } | |
| .pdf-success { | |
| background-color: rgba(52, 211, 153, 0.2); | |
| border: 1px solid rgba(52, 211, 153, 0.5); | |
| color: #10b981; | |
| } | |
| .pdf-error { | |
| background-color: rgba(248, 113, 113, 0.2); | |
| border: 1px solid rgba(248, 113, 113, 0.5); | |
| color: #ef4444; | |
| } | |
| .pdf-processing { | |
| background-color: rgba(251, 191, 36, 0.2); | |
| border: 1px solid rgba(251, 191, 36, 0.5); | |
| color: #f59e0b; | |
| } | |
| .document-card { | |
| padding: 12px; | |
| margin: 8px 0; | |
| border-radius: 8px; | |
| background: rgba(255, 255, 255, 0.1); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| cursor: pointer; | |
| transition: all 0.3s ease; | |
| } | |
| .document-card:hover { | |
| background: rgba(255, 255, 255, 0.2); | |
| transform: translateX(5px); | |
| } | |
| """ | |
| class PDFRAGSystem: | |
| """PDF κΈ°λ° RAG μμ€ν ν΄λμ€""" | |
| def __init__(self): | |
| self.documents = {} | |
| self.embedder = None | |
| self.vector_store = None | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] | |
| ) | |
| self.initialize_vector_store() | |
| def initialize_vector_store(self): | |
| """λ²‘ν° μ μ₯μ μ΄κΈ°ν""" | |
| try: | |
| # Sentence Transformer λͺ¨λΈ λ‘λ | |
| self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| # ChromaDB ν΄λΌμ΄μΈνΈ μ΄κΈ°ν | |
| self.chroma_client = chromadb.Client() | |
| self.collection = self.chroma_client.create_collection( | |
| name="pdf_documents", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| except Exception as e: | |
| print(f"Vector store initialization error: {e}") | |
| def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: | |
| """PDFμμ ν μ€νΈ μΆμΆ""" | |
| try: | |
| doc = pymupdf.open(pdf_path) | |
| text_content = [] | |
| metadata = { | |
| "title": doc.metadata.get("title", "Untitled"), | |
| "author": doc.metadata.get("author", "Unknown"), | |
| "pages": len(doc), | |
| "creation_date": doc.metadata.get("creationDate", ""), | |
| "file_name": os.path.basename(pdf_path) | |
| } | |
| for page_num, page in enumerate(doc): | |
| text = page.get_text() | |
| if text.strip(): | |
| text_content.append({ | |
| "page": page_num + 1, | |
| "content": text | |
| }) | |
| doc.close() | |
| return { | |
| "metadata": metadata, | |
| "pages": text_content, | |
| "full_text": "\n\n".join([p["content"] for p in text_content]) | |
| } | |
| except Exception as e: | |
| raise Exception(f"PDF μ²λ¦¬ μ€λ₯: {str(e)}") | |
| def process_and_index_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: | |
| """PDF μ²λ¦¬ λ° λ²‘ν° μΈλ±μ±""" | |
| try: | |
| # PDF ν μ€νΈ μΆμΆ | |
| pdf_data = self.extract_text_from_pdf(pdf_path) | |
| # ν μ€νΈλ₯Ό μ²ν¬λ‘ λΆν | |
| chunks = self.text_splitter.split_text(pdf_data["full_text"]) | |
| # κ° μ²ν¬μ λν μλ² λ© μμ± | |
| embeddings = self.embedder.encode(chunks) | |
| # ChromaDBμ μ μ₯ | |
| ids = [f"{doc_id}_{i}" for i in range(len(chunks))] | |
| metadatas = [ | |
| { | |
| "doc_id": doc_id, | |
| "chunk_index": i, | |
| "source": pdf_data["metadata"]["file_name"], | |
| "page_count": pdf_data["metadata"]["pages"] | |
| } | |
| for i in range(len(chunks)) | |
| ] | |
| self.collection.add( | |
| ids=ids, | |
| embeddings=embeddings.tolist(), | |
| documents=chunks, | |
| metadatas=metadatas | |
| ) | |
| # λ¬Έμ μ 보 μ μ₯ | |
| self.documents[doc_id] = { | |
| "metadata": pdf_data["metadata"], | |
| "chunk_count": len(chunks), | |
| "upload_time": datetime.now().isoformat() | |
| } | |
| return { | |
| "success": True, | |
| "doc_id": doc_id, | |
| "chunks": len(chunks), | |
| "pages": pdf_data["metadata"]["pages"], | |
| "title": pdf_data["metadata"]["title"] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def search_relevant_chunks(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """쿼리μ κ΄λ ¨λ μ²ν¬ κ²μ""" | |
| try: | |
| # 쿼리 μλ² λ© μμ± | |
| query_embedding = self.embedder.encode([query]) | |
| # μ μ¬ν λ¬Έμ κ²μ | |
| results = self.collection.query( | |
| query_embeddings=query_embedding.tolist(), | |
| n_results=top_k | |
| ) | |
| if results and results['documents']: | |
| chunks = [] | |
| for i in range(len(results['documents'][0])): | |
| chunks.append({ | |
| "content": results['documents'][0][i], | |
| "metadata": results['metadatas'][0][i], | |
| "distance": results['distances'][0][i] if 'distances' in results else None | |
| }) | |
| return chunks | |
| return [] | |
| except Exception as e: | |
| print(f"Search error: {e}") | |
| return [] | |
| def generate_rag_prompt(self, query: str, context_chunks: List[Dict]) -> str: | |
| """RAG ν둬ννΈ μμ±""" | |
| context = "\n\n---\n\n".join([ | |
| f"[μΆμ²: {chunk['metadata']['source']}, μ²ν¬ {chunk['metadata']['chunk_index']+1}]\n{chunk['content']}" | |
| for chunk in context_chunks | |
| ]) | |
| prompt = f"""λ€μ λ¬Έμ λ΄μ©μ μ°Έκ³ νμ¬ μ§λ¬Έμ λ΅λ³ν΄μ£ΌμΈμ. | |
| λ΅λ³μ μ 곡λ λ¬Έμ λ΄μ©μ λ°νμΌλ‘ μμ±νλ, νμμ μΆκ° μ€λͺ μ ν¬ν¨ν μ μμ΅λλ€. | |
| λ¬Έμμμ κ΄λ ¨ μ 보λ₯Ό μ°Ύμ μ μλ κ²½μ°, κ·Έ μ¬μ€μ λͺ μν΄μ£ΌμΈμ. | |
| π μ°Έκ³ λ¬Έμ: | |
| {context} | |
| β μ§λ¬Έ: {query} | |
| π‘ λ΅λ³:""" | |
| return prompt | |
| # RAG μμ€ν μΈμ€ν΄μ€ μμ± | |
| rag_system = PDFRAGSystem() | |
| # State variables | |
| current_model = gr.State("openai/gpt-oss-120b") | |
| uploaded_documents = gr.State({}) | |
| rag_enabled = gr.State(False) | |
| def upload_pdf(file): | |
| """PDF νμΌ μ λ‘λ μ²λ¦¬""" | |
| if file is None: | |
| return gr.update(value="νμΌμ μ νν΄μ£ΌμΈμ"), gr.update(choices=[]), gr.update(value=False) | |
| try: | |
| # νμΌ ν΄μλ₯Ό IDλ‘ μ¬μ© | |
| with open(file.name, 'rb') as f: | |
| file_hash = hashlib.md5(f.read()).hexdigest()[:8] | |
| doc_id = f"doc_{file_hash}" | |
| # PDF μ²λ¦¬ λ° μΈλ±μ± | |
| result = rag_system.process_and_index_pdf(file.name, doc_id) | |
| if result["success"]: | |
| status_html = f""" | |
| <div class="pdf-status pdf-success"> | |
| β PDF μ λ‘λ μ±κ³΅!<br> | |
| π μ λͺ©: {result.get('title', 'Unknown')}<br> | |
| π νμ΄μ§: {result['pages']}νμ΄μ§<br> | |
| π μμ±λ μ²ν¬: {result['chunks']}κ°<br> | |
| π λ¬Έμ ID: {doc_id} | |
| </div> | |
| """ | |
| # λ¬Έμ λͺ©λ‘ μ λ°μ΄νΈ | |
| doc_list = list(rag_system.documents.keys()) | |
| doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" | |
| for doc_id in doc_list] | |
| return status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) | |
| else: | |
| status_html = f""" | |
| <div class="pdf-status pdf-error"> | |
| β PDF μ λ‘λ μ€ν¨<br> | |
| μ€λ₯: {result['error']} | |
| </div> | |
| """ | |
| return status_html, gr.update(choices=[]), gr.update(value=False) | |
| except Exception as e: | |
| status_html = f""" | |
| <div class="pdf-status pdf-error"> | |
| β μ€λ₯ λ°μ: {str(e)} | |
| </div> | |
| """ | |
| return status_html, gr.update(choices=[]), gr.update(value=False) | |
| def clear_documents(): | |
| """μ λ‘λλ λ¬Έμ μ΄κΈ°ν""" | |
| try: | |
| # ChromaDB 컬λ μ μ¬μμ± | |
| rag_system.chroma_client.delete_collection("pdf_documents") | |
| rag_system.collection = rag_system.chroma_client.create_collection( | |
| name="pdf_documents", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| rag_system.documents = {} | |
| return gr.update(value="<div class='pdf-status pdf-success'>β λͺ¨λ λ¬Έμκ° μμ λμμ΅λλ€</div>"), gr.update(choices=[], value=[]), gr.update(value=False) | |
| except Exception as e: | |
| return gr.update(value=f"<div class='pdf-status pdf-error'>β μμ μ€ν¨: {str(e)}</div>"), gr.update(), gr.update() | |
| def process_with_rag(message: str, enable_rag: bool, selected_docs: List[str], top_k: int = 5): | |
| """RAGλ₯Ό νμ©ν λ©μμ§ μ²λ¦¬""" | |
| if not enable_rag or not selected_docs: | |
| return message # RAG λΉνμ±νμ μλ³Έ λ©μμ§ λ°ν | |
| try: | |
| # κ΄λ ¨ μ²ν¬ κ²μ | |
| relevant_chunks = rag_system.search_relevant_chunks(message, top_k=top_k) | |
| if relevant_chunks: | |
| # μ νλ λ¬Έμμ μ²ν¬λ§ νν°λ§ | |
| selected_doc_ids = [doc.split(":")[0] for doc in selected_docs] | |
| filtered_chunks = [ | |
| chunk for chunk in relevant_chunks | |
| if chunk['metadata']['doc_id'] in selected_doc_ids | |
| ] | |
| if filtered_chunks: | |
| # RAG ν둬ννΈ μμ± | |
| rag_prompt = rag_system.generate_rag_prompt(message, filtered_chunks[:top_k]) | |
| return rag_prompt | |
| return message | |
| except Exception as e: | |
| print(f"RAG processing error: {e}") | |
| return message | |
| def switch_model(model_choice): | |
| """λͺ¨λΈ μ ν ν¨μ""" | |
| return gr.update(visible=False), gr.update(visible=True), model_choice | |
| # Gradio μΈν°νμ΄μ€ | |
| with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo: | |
| with gr.Row(): | |
| # μ¬μ΄λλ° | |
| with gr.Column(scale=1): | |
| with gr.Group(elem_classes="main-container"): | |
| gr.Markdown("# π AI Chat with RAG") | |
| gr.Markdown( | |
| "PDF λ¬Έμλ₯Ό μ λ‘λνμ¬ AIκ° λ¬Έμ λ΄μ©μ μ°Έκ³ ν΄ λ΅λ³νλλ‘ ν μ μμ΅λλ€." | |
| ) | |
| # λͺ¨λΈ μ ν | |
| model_dropdown = gr.Dropdown( | |
| choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], | |
| value="openai/gpt-oss-120b", | |
| label="π λͺ¨λΈ μ ν" | |
| ) | |
| login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") | |
| reload_btn = gr.Button("π λͺ¨λΈ λ³κ²½ μ μ©", variant="primary", size="lg") | |
| # RAG μ€μ | |
| with gr.Accordion("π PDF RAG μ€μ ", open=True): | |
| pdf_upload = gr.File( | |
| label="PDF μ λ‘λ", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| upload_status = gr.HTML( | |
| value="<div class='pdf-status'>PDFλ₯Ό μ λ‘λνμ¬ RAGλ₯Ό νμ±ννμΈμ</div>" | |
| ) | |
| document_list = gr.CheckboxGroup( | |
| choices=[], | |
| label="π μ λ‘λλ λ¬Έμ", | |
| info="μ§λ¬Έμ μ°Έκ³ ν λ¬Έμλ₯Ό μ ννμΈμ" | |
| ) | |
| with gr.Row(): | |
| clear_btn = gr.Button("ποΈ λͺ¨λ λ¬Έμ μμ ", size="sm") | |
| refresh_btn = gr.Button("π λͺ©λ‘ μλ‘κ³ μΉ¨", size="sm") | |
| enable_rag = gr.Checkbox( | |
| label="RAG νμ±ν", | |
| value=False, | |
| info="λ¬Έμ κΈ°λ° λ΅λ³ μμ± νμ±ν" | |
| ) | |
| with gr.Accordion("βοΈ RAG κ³ κΈ μ€μ ", open=False): | |
| top_k_chunks = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=5, | |
| step=1, | |
| label="μ°Έμ‘°ν μ²ν¬ μ", | |
| info="λ΅λ³ μμ±μ μ°Έκ³ ν λ¬Έμ μ²ν¬μ κ°μ" | |
| ) | |
| chunk_size = gr.Slider( | |
| minimum=500, | |
| maximum=2000, | |
| value=1000, | |
| step=100, | |
| label="μ²ν¬ ν¬κΈ°", | |
| info="λ¬Έμλ₯Ό λΆν νλ μ²ν¬μ ν¬κΈ° (λ¬Έμ μ)" | |
| ) | |
| # κ³ κΈ μ΅μ | |
| with gr.Accordion("βοΈ λͺ¨λΈ μ€μ ", open=False): | |
| temperature = gr.Slider( | |
| minimum=0, | |
| maximum=2, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ) | |
| max_tokens = gr.Slider( | |
| minimum=1, | |
| maximum=4096, | |
| value=512, | |
| step=1, | |
| label="Max Tokens" | |
| ) | |
| # λ©μΈ μ±ν μμ | |
| with gr.Column(scale=3): | |
| with gr.Group(elem_classes="main-container"): | |
| gr.Markdown("## π¬ Chat Interface") | |
| # RAG μν νμ | |
| with gr.Row(): | |
| rag_status = gr.HTML( | |
| value="<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>π RAG: <strong>λΉνμ±ν</strong></div>" | |
| ) | |
| # λͺ¨λΈ μΈν°νμ΄μ€ 컨ν μ΄λ | |
| with gr.Column(visible=True) as model_120b_container: | |
| gr.Markdown("### Model: openai/gpt-oss-120b") | |
| # μ€μ λͺ¨λΈ λ‘λλ gr.load()λ‘ μ²λ¦¬ | |
| chatbot_120b = gr.Chatbot(height=400) | |
| msg_box_120b = gr.Textbox( | |
| label="λ©μμ§ μ λ ₯", | |
| placeholder="PDF λ΄μ©μ λν΄ μ§λ¬Έν΄λ³΄μΈμ...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| send_btn_120b = gr.Button("π€ μ μ‘", variant="primary") | |
| clear_btn_120b = gr.Button("ποΈ λν μ΄κΈ°ν") | |
| with gr.Column(visible=False) as model_20b_container: | |
| gr.Markdown("### Model: openai/gpt-oss-20b") | |
| chatbot_20b = gr.Chatbot(height=400) | |
| msg_box_20b = gr.Textbox( | |
| label="λ©μμ§ μ λ ₯", | |
| placeholder="PDF λ΄μ©μ λν΄ μ§λ¬Έν΄λ³΄μΈμ...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| send_btn_20b = gr.Button("π€ μ μ‘", variant="primary") | |
| clear_btn_20b = gr.Button("ποΈ λν μ΄κΈ°ν") | |
| # μ΄λ²€νΈ νΈλ€λ¬ | |
| # PDF μ λ‘λ μ²λ¦¬ | |
| pdf_upload.upload( | |
| fn=upload_pdf, | |
| inputs=[pdf_upload], | |
| outputs=[upload_status, document_list, enable_rag] | |
| ) | |
| # λ¬Έμ μ΄κΈ°ν | |
| clear_btn.click( | |
| fn=clear_documents, | |
| outputs=[upload_status, document_list, enable_rag] | |
| ) | |
| # RAG μν μ λ°μ΄νΈ | |
| enable_rag.change( | |
| fn=lambda x: gr.update( | |
| value=f"<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>π RAG: <strong>{'νμ±ν' if x else 'λΉνμ±ν'}</strong></div>" | |
| ), | |
| inputs=[enable_rag], | |
| outputs=[rag_status] | |
| ) | |
| # λͺ¨λΈ μ ν | |
| reload_btn.click( | |
| fn=switch_model, | |
| inputs=[model_dropdown], | |
| outputs=[model_120b_container, model_20b_container, current_model] | |
| ).then( | |
| fn=lambda: gr.Info("λͺ¨λΈμ΄ μ±κ³΅μ μΌλ‘ μ νλμμ΅λλ€!"), | |
| inputs=[], | |
| outputs=[] | |
| ) | |
| # μ±ν κΈ°λ₯ (RAG ν΅ν©) | |
| def chat_with_rag(message, history, enable_rag, selected_docs, top_k): | |
| """RAGλ₯Ό νμ©ν μ±ν """ | |
| # RAG μ²λ¦¬ | |
| processed_message = process_with_rag(message, enable_rag, selected_docs, top_k) | |
| # μ¬κΈ°μ μ€μ λͺ¨λΈ API νΈμΆ μ½λκ° λ€μ΄κ°μΌ ν¨ | |
| # νμ¬λ μμ μλ΅ | |
| if enable_rag and selected_docs: | |
| response = f"[RAG νμ±ν] μ νλ {len(selected_docs)}κ° λ¬Έμλ₯Ό μ°Έκ³ νμ¬ λ΅λ³ν©λλ€:\n\n{processed_message[:200]}..." | |
| else: | |
| response = f"[μΌλ° λͺ¨λ] {message}μ λν λ΅λ³μ λλ€." | |
| history.append((message, response)) | |
| return "", history | |
| # 120b λͺ¨λΈ μ±ν | |
| msg_box_120b.submit( | |
| fn=chat_with_rag, | |
| inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], | |
| outputs=[msg_box_120b, chatbot_120b] | |
| ) | |
| send_btn_120b.click( | |
| fn=chat_with_rag, | |
| inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks], | |
| outputs=[msg_box_120b, chatbot_120b] | |
| ) | |
| clear_btn_120b.click( | |
| lambda: ([], ""), | |
| outputs=[chatbot_120b, msg_box_120b] | |
| ) | |
| # 20b λͺ¨λΈ μ±ν | |
| msg_box_20b.submit( | |
| fn=chat_with_rag, | |
| inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], | |
| outputs=[msg_box_20b, chatbot_20b] | |
| ) | |
| send_btn_20b.click( | |
| fn=chat_with_rag, | |
| inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks], | |
| outputs=[msg_box_20b, chatbot_20b] | |
| ) | |
| clear_btn_20b.click( | |
| lambda: ([], ""), | |
| outputs=[chatbot_20b, msg_box_20b] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |