gpt-oss-RAG / app.py
openfree's picture
Update app.py
ee0cb34 verified
raw
history blame
20.5 kB
import gradio as gr
import os
from typing import List, Dict, Any, Optional
import hashlib
import json
from datetime import datetime
# PDF 처리 라이브러리
import pymupdf # PyMuPDF
import chromadb
from chromadb.utils import embedding_functions
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import numpy as np
# Custom CSS (κΈ°μ‘΄ CSS + μΆ”κ°€ μŠ€νƒ€μΌ)
custom_css = """
.gradio-container {
background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
min-height: 100vh;
}
@keyframes gradient-animation {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
.dark .gradio-container {
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%);
background-size: 400% 400%;
animation: gradient-animation 15s ease infinite;
}
.main-container {
background-color: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 20px;
padding: 20px;
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37);
border: 1px solid rgba(255, 255, 255, 0.18);
margin: 10px;
}
.dark .main-container {
background-color: rgba(30, 30, 30, 0.95);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.pdf-status {
padding: 10px;
border-radius: 10px;
margin: 10px 0;
font-size: 0.9em;
}
.pdf-success {
background-color: rgba(52, 211, 153, 0.2);
border: 1px solid rgba(52, 211, 153, 0.5);
color: #10b981;
}
.pdf-error {
background-color: rgba(248, 113, 113, 0.2);
border: 1px solid rgba(248, 113, 113, 0.5);
color: #ef4444;
}
.pdf-processing {
background-color: rgba(251, 191, 36, 0.2);
border: 1px solid rgba(251, 191, 36, 0.5);
color: #f59e0b;
}
.document-card {
padding: 12px;
margin: 8px 0;
border-radius: 8px;
background: rgba(255, 255, 255, 0.1);
border: 1px solid rgba(255, 255, 255, 0.2);
cursor: pointer;
transition: all 0.3s ease;
}
.document-card:hover {
background: rgba(255, 255, 255, 0.2);
transform: translateX(5px);
}
"""
class PDFRAGSystem:
"""PDF 기반 RAG μ‹œμŠ€ν…œ 클래슀"""
def __init__(self):
self.documents = {}
self.embedder = None
self.vector_store = None
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
self.initialize_vector_store()
def initialize_vector_store(self):
"""벑터 μ €μž₯μ†Œ μ΄ˆκΈ°ν™”"""
try:
# Sentence Transformer λͺ¨λΈ λ‘œλ“œ
self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# ChromaDB ν΄λΌμ΄μ–ΈνŠΈ μ΄ˆκΈ°ν™”
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection(
name="pdf_documents",
metadata={"hnsw:space": "cosine"}
)
except Exception as e:
print(f"Vector store initialization error: {e}")
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""PDFμ—μ„œ ν…μŠ€νŠΈ μΆ”μΆœ"""
try:
doc = pymupdf.open(pdf_path)
text_content = []
metadata = {
"title": doc.metadata.get("title", "Untitled"),
"author": doc.metadata.get("author", "Unknown"),
"pages": len(doc),
"creation_date": doc.metadata.get("creationDate", ""),
"file_name": os.path.basename(pdf_path)
}
for page_num, page in enumerate(doc):
text = page.get_text()
if text.strip():
text_content.append({
"page": page_num + 1,
"content": text
})
doc.close()
return {
"metadata": metadata,
"pages": text_content,
"full_text": "\n\n".join([p["content"] for p in text_content])
}
except Exception as e:
raise Exception(f"PDF 처리 였λ₯˜: {str(e)}")
def process_and_index_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]:
"""PDF 처리 및 벑터 인덱싱"""
try:
# PDF ν…μŠ€νŠΈ μΆ”μΆœ
pdf_data = self.extract_text_from_pdf(pdf_path)
# ν…μŠ€νŠΈλ₯Ό 청크둜 λΆ„ν• 
chunks = self.text_splitter.split_text(pdf_data["full_text"])
# 각 청크에 λŒ€ν•œ μž„λ² λ”© 생성
embeddings = self.embedder.encode(chunks)
# ChromaDB에 μ €μž₯
ids = [f"{doc_id}_{i}" for i in range(len(chunks))]
metadatas = [
{
"doc_id": doc_id,
"chunk_index": i,
"source": pdf_data["metadata"]["file_name"],
"page_count": pdf_data["metadata"]["pages"]
}
for i in range(len(chunks))
]
self.collection.add(
ids=ids,
embeddings=embeddings.tolist(),
documents=chunks,
metadatas=metadatas
)
# λ¬Έμ„œ 정보 μ €μž₯
self.documents[doc_id] = {
"metadata": pdf_data["metadata"],
"chunk_count": len(chunks),
"upload_time": datetime.now().isoformat()
}
return {
"success": True,
"doc_id": doc_id,
"chunks": len(chunks),
"pages": pdf_data["metadata"]["pages"],
"title": pdf_data["metadata"]["title"]
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def search_relevant_chunks(self, query: str, top_k: int = 5) -> List[Dict]:
"""쿼리와 κ΄€λ ¨λœ 청크 검색"""
try:
# 쿼리 μž„λ² λ”© 생성
query_embedding = self.embedder.encode([query])
# μœ μ‚¬ν•œ λ¬Έμ„œ 검색
results = self.collection.query(
query_embeddings=query_embedding.tolist(),
n_results=top_k
)
if results and results['documents']:
chunks = []
for i in range(len(results['documents'][0])):
chunks.append({
"content": results['documents'][0][i],
"metadata": results['metadatas'][0][i],
"distance": results['distances'][0][i] if 'distances' in results else None
})
return chunks
return []
except Exception as e:
print(f"Search error: {e}")
return []
def generate_rag_prompt(self, query: str, context_chunks: List[Dict]) -> str:
"""RAG ν”„λ‘¬ν”„νŠΈ 생성"""
context = "\n\n---\n\n".join([
f"[좜처: {chunk['metadata']['source']}, 청크 {chunk['metadata']['chunk_index']+1}]\n{chunk['content']}"
for chunk in context_chunks
])
prompt = f"""λ‹€μŒ λ¬Έμ„œ λ‚΄μš©μ„ μ°Έκ³ ν•˜μ—¬ μ§ˆλ¬Έμ— λ‹΅λ³€ν•΄μ£Όμ„Έμš”.
닡변은 제곡된 λ¬Έμ„œ λ‚΄μš©μ„ λ°”νƒ•μœΌλ‘œ μž‘μ„±ν•˜λ˜, ν•„μš”μ‹œ μΆ”κ°€ μ„€λͺ…을 포함할 수 μžˆμŠ΅λ‹ˆλ‹€.
λ¬Έμ„œμ—μ„œ κ΄€λ ¨ 정보λ₯Ό 찾을 수 μ—†λŠ” 경우, κ·Έ 사싀을 λͺ…μ‹œν•΄μ£Όμ„Έμš”.
πŸ“š μ°Έκ³  λ¬Έμ„œ:
{context}
❓ 질문: {query}
πŸ’‘ λ‹΅λ³€:"""
return prompt
# RAG μ‹œμŠ€ν…œ μΈμŠ€ν„΄μŠ€ 생성
rag_system = PDFRAGSystem()
# State variables
current_model = gr.State("openai/gpt-oss-120b")
uploaded_documents = gr.State({})
rag_enabled = gr.State(False)
def upload_pdf(file):
"""PDF 파일 μ—…λ‘œλ“œ 처리"""
if file is None:
return gr.update(value="νŒŒμΌμ„ μ„ νƒν•΄μ£Όμ„Έμš”"), gr.update(choices=[]), gr.update(value=False)
try:
# 파일 ν•΄μ‹œλ₯Ό ID둜 μ‚¬μš©
with open(file.name, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
doc_id = f"doc_{file_hash}"
# PDF 처리 및 인덱싱
result = rag_system.process_and_index_pdf(file.name, doc_id)
if result["success"]:
status_html = f"""
<div class="pdf-status pdf-success">
βœ… PDF μ—…λ‘œλ“œ 성곡!<br>
πŸ“„ 제λͺ©: {result.get('title', 'Unknown')}<br>
πŸ“‘ νŽ˜μ΄μ§€: {result['pages']}νŽ˜μ΄μ§€<br>
πŸ” μƒμ„±λœ 청크: {result['chunks']}개<br>
πŸ†” λ¬Έμ„œ ID: {doc_id}
</div>
"""
# λ¬Έμ„œ λͺ©λ‘ μ—…λ°μ΄νŠΈ
doc_list = list(rag_system.documents.keys())
doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}"
for doc_id in doc_list]
return status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True)
else:
status_html = f"""
<div class="pdf-status pdf-error">
❌ PDF μ—…λ‘œλ“œ μ‹€νŒ¨<br>
였λ₯˜: {result['error']}
</div>
"""
return status_html, gr.update(choices=[]), gr.update(value=False)
except Exception as e:
status_html = f"""
<div class="pdf-status pdf-error">
❌ 였λ₯˜ λ°œμƒ: {str(e)}
</div>
"""
return status_html, gr.update(choices=[]), gr.update(value=False)
def clear_documents():
"""μ—…λ‘œλ“œλœ λ¬Έμ„œ μ΄ˆκΈ°ν™”"""
try:
# ChromaDB μ»¬λ ‰μ…˜ μž¬μƒμ„±
rag_system.chroma_client.delete_collection("pdf_documents")
rag_system.collection = rag_system.chroma_client.create_collection(
name="pdf_documents",
metadata={"hnsw:space": "cosine"}
)
rag_system.documents = {}
return gr.update(value="<div class='pdf-status pdf-success'>βœ… λͺ¨λ“  λ¬Έμ„œκ°€ μ‚­μ œλ˜μ—ˆμŠ΅λ‹ˆλ‹€</div>"), gr.update(choices=[], value=[]), gr.update(value=False)
except Exception as e:
return gr.update(value=f"<div class='pdf-status pdf-error'>❌ μ‚­μ œ μ‹€νŒ¨: {str(e)}</div>"), gr.update(), gr.update()
def process_with_rag(message: str, enable_rag: bool, selected_docs: List[str], top_k: int = 5):
"""RAGλ₯Ό ν™œμš©ν•œ λ©”μ‹œμ§€ 처리"""
if not enable_rag or not selected_docs:
return message # RAG λΉ„ν™œμ„±ν™”μ‹œ 원본 λ©”μ‹œμ§€ λ°˜ν™˜
try:
# κ΄€λ ¨ 청크 검색
relevant_chunks = rag_system.search_relevant_chunks(message, top_k=top_k)
if relevant_chunks:
# μ„ νƒλœ λ¬Έμ„œμ˜ 청크만 필터링
selected_doc_ids = [doc.split(":")[0] for doc in selected_docs]
filtered_chunks = [
chunk for chunk in relevant_chunks
if chunk['metadata']['doc_id'] in selected_doc_ids
]
if filtered_chunks:
# RAG ν”„λ‘¬ν”„νŠΈ 생성
rag_prompt = rag_system.generate_rag_prompt(message, filtered_chunks[:top_k])
return rag_prompt
return message
except Exception as e:
print(f"RAG processing error: {e}")
return message
def switch_model(model_choice):
"""λͺ¨λΈ μ „ν™˜ ν•¨μˆ˜"""
return gr.update(visible=False), gr.update(visible=True), model_choice
# Gradio μΈν„°νŽ˜μ΄μŠ€
with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo:
with gr.Row():
# μ‚¬μ΄λ“œλ°”
with gr.Column(scale=1):
with gr.Group(elem_classes="main-container"):
gr.Markdown("# πŸš€ AI Chat with RAG")
gr.Markdown(
"PDF λ¬Έμ„œλ₯Ό μ—…λ‘œλ“œν•˜μ—¬ AIκ°€ λ¬Έμ„œ λ‚΄μš©μ„ μ°Έκ³ ν•΄ λ‹΅λ³€ν•˜λ„λ‘ ν•  수 μžˆμŠ΅λ‹ˆλ‹€."
)
# λͺ¨λΈ 선택
model_dropdown = gr.Dropdown(
choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"],
value="openai/gpt-oss-120b",
label="πŸ“Š λͺ¨λΈ 선택"
)
login_button = gr.LoginButton("Sign in with Hugging Face", size="lg")
reload_btn = gr.Button("πŸ”„ λͺ¨λΈ λ³€κ²½ 적용", variant="primary", size="lg")
# RAG μ„€μ •
with gr.Accordion("πŸ“š PDF RAG μ„€μ •", open=True):
pdf_upload = gr.File(
label="PDF μ—…λ‘œλ“œ",
file_types=[".pdf"],
type="filepath"
)
upload_status = gr.HTML(
value="<div class='pdf-status'>PDFλ₯Ό μ—…λ‘œλ“œν•˜μ—¬ RAGλ₯Ό ν™œμ„±ν™”ν•˜μ„Έμš”</div>"
)
document_list = gr.CheckboxGroup(
choices=[],
label="πŸ“„ μ—…λ‘œλ“œλœ λ¬Έμ„œ",
info="μ§ˆλ¬Έμ— μ°Έκ³ ν•  λ¬Έμ„œλ₯Ό μ„ νƒν•˜μ„Έμš”"
)
with gr.Row():
clear_btn = gr.Button("πŸ—‘οΈ λͺ¨λ“  λ¬Έμ„œ μ‚­μ œ", size="sm")
refresh_btn = gr.Button("πŸ”„ λͺ©λ‘ μƒˆλ‘œκ³ μΉ¨", size="sm")
enable_rag = gr.Checkbox(
label="RAG ν™œμ„±ν™”",
value=False,
info="λ¬Έμ„œ 기반 λ‹΅λ³€ 생성 ν™œμ„±ν™”"
)
with gr.Accordion("βš™οΈ RAG κ³ κΈ‰ μ„€μ •", open=False):
top_k_chunks = gr.Slider(
minimum=1,
maximum=10,
value=5,
step=1,
label="μ°Έμ‘°ν•  청크 수",
info="λ‹΅λ³€ μƒμ„±μ‹œ μ°Έκ³ ν•  λ¬Έμ„œ 청크의 개수"
)
chunk_size = gr.Slider(
minimum=500,
maximum=2000,
value=1000,
step=100,
label="청크 크기",
info="λ¬Έμ„œλ₯Ό λΆ„ν• ν•˜λŠ” 청크의 크기 (문자 수)"
)
# κ³ κΈ‰ μ˜΅μ…˜
with gr.Accordion("βš™οΈ λͺ¨λΈ μ„€μ •", open=False):
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max Tokens"
)
# 메인 μ±„νŒ… μ˜μ—­
with gr.Column(scale=3):
with gr.Group(elem_classes="main-container"):
gr.Markdown("## πŸ’¬ Chat Interface")
# RAG μƒνƒœ ν‘œμ‹œ
with gr.Row():
rag_status = gr.HTML(
value="<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>πŸ” RAG: <strong>λΉ„ν™œμ„±ν™”</strong></div>"
)
# λͺ¨λΈ μΈν„°νŽ˜μ΄μŠ€ μ»¨ν…Œμ΄λ„ˆ
with gr.Column(visible=True) as model_120b_container:
gr.Markdown("### Model: openai/gpt-oss-120b")
# μ‹€μ œ λͺ¨λΈ λ‘œλ“œλŠ” gr.load()둜 처리
chatbot_120b = gr.Chatbot(height=400)
msg_box_120b = gr.Textbox(
label="λ©”μ‹œμ§€ μž…λ ₯",
placeholder="PDF λ‚΄μš©μ— λŒ€ν•΄ μ§ˆλ¬Έν•΄λ³΄μ„Έμš”...",
lines=2
)
with gr.Row():
send_btn_120b = gr.Button("πŸ“€ 전솑", variant="primary")
clear_btn_120b = gr.Button("πŸ—‘οΈ λŒ€ν™” μ΄ˆκΈ°ν™”")
with gr.Column(visible=False) as model_20b_container:
gr.Markdown("### Model: openai/gpt-oss-20b")
chatbot_20b = gr.Chatbot(height=400)
msg_box_20b = gr.Textbox(
label="λ©”μ‹œμ§€ μž…λ ₯",
placeholder="PDF λ‚΄μš©μ— λŒ€ν•΄ μ§ˆλ¬Έν•΄λ³΄μ„Έμš”...",
lines=2
)
with gr.Row():
send_btn_20b = gr.Button("πŸ“€ 전솑", variant="primary")
clear_btn_20b = gr.Button("πŸ—‘οΈ λŒ€ν™” μ΄ˆκΈ°ν™”")
# 이벀트 ν•Έλ“€λŸ¬
# PDF μ—…λ‘œλ“œ 처리
pdf_upload.upload(
fn=upload_pdf,
inputs=[pdf_upload],
outputs=[upload_status, document_list, enable_rag]
)
# λ¬Έμ„œ μ΄ˆκΈ°ν™”
clear_btn.click(
fn=clear_documents,
outputs=[upload_status, document_list, enable_rag]
)
# RAG μƒνƒœ μ—…λ°μ΄νŠΈ
enable_rag.change(
fn=lambda x: gr.update(
value=f"<div style='padding: 10px; background: rgba(59, 130, 246, 0.1); border-radius: 8px; margin-bottom: 10px;'>πŸ” RAG: <strong>{'ν™œμ„±ν™”' if x else 'λΉ„ν™œμ„±ν™”'}</strong></div>"
),
inputs=[enable_rag],
outputs=[rag_status]
)
# λͺ¨λΈ μ „ν™˜
reload_btn.click(
fn=switch_model,
inputs=[model_dropdown],
outputs=[model_120b_container, model_20b_container, current_model]
).then(
fn=lambda: gr.Info("λͺ¨λΈμ΄ μ„±κ³΅μ μœΌλ‘œ μ „ν™˜λ˜μ—ˆμŠ΅λ‹ˆλ‹€!"),
inputs=[],
outputs=[]
)
# μ±„νŒ… κΈ°λŠ₯ (RAG 톡합)
def chat_with_rag(message, history, enable_rag, selected_docs, top_k):
"""RAGλ₯Ό ν™œμš©ν•œ μ±„νŒ…"""
# RAG 처리
processed_message = process_with_rag(message, enable_rag, selected_docs, top_k)
# 여기에 μ‹€μ œ λͺ¨λΈ API 호좜 μ½”λ“œκ°€ λ“€μ–΄κ°€μ•Ό 함
# ν˜„μž¬λŠ” μ˜ˆμ‹œ 응닡
if enable_rag and selected_docs:
response = f"[RAG ν™œμ„±ν™”] μ„ νƒλœ {len(selected_docs)}개 λ¬Έμ„œλ₯Ό μ°Έκ³ ν•˜μ—¬ λ‹΅λ³€ν•©λ‹ˆλ‹€:\n\n{processed_message[:200]}..."
else:
response = f"[일반 λͺ¨λ“œ] {message}에 λŒ€ν•œ λ‹΅λ³€μž…λ‹ˆλ‹€."
history.append((message, response))
return "", history
# 120b λͺ¨λΈ μ±„νŒ…
msg_box_120b.submit(
fn=chat_with_rag,
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_120b, chatbot_120b]
)
send_btn_120b.click(
fn=chat_with_rag,
inputs=[msg_box_120b, chatbot_120b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_120b, chatbot_120b]
)
clear_btn_120b.click(
lambda: ([], ""),
outputs=[chatbot_120b, msg_box_120b]
)
# 20b λͺ¨λΈ μ±„νŒ…
msg_box_20b.submit(
fn=chat_with_rag,
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_20b, chatbot_20b]
)
send_btn_20b.click(
fn=chat_with_rag,
inputs=[msg_box_20b, chatbot_20b, enable_rag, document_list, top_k_chunks],
outputs=[msg_box_20b, chatbot_20b]
)
clear_btn_20b.click(
lambda: ([], ""),
outputs=[chatbot_20b, msg_box_20b]
)
if __name__ == "__main__":
demo.launch()