|
|
""" |
|
|
๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ๋ชจ๋ |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import csv |
|
|
import io |
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional, Tuple, Union |
|
|
import numpy as np |
|
|
|
|
|
logger = logging.getLogger("DocProcessor") |
|
|
if not logger.hasHandlers(): |
|
|
handler = logging.StreamHandler() |
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
handler.setFormatter(formatter) |
|
|
logger.addHandler(handler) |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
class DocumentProcessor: |
|
|
"""๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ํด๋์ค""" |
|
|
|
|
|
@staticmethod |
|
|
def split_text( |
|
|
text: str, |
|
|
chunk_size: int = 512, |
|
|
chunk_overlap: int = 50, |
|
|
separator: str = "\n" |
|
|
) -> List[str]: |
|
|
""" |
|
|
ํ
์คํธ๋ฅผ ๋ ์์ ์ฒญํฌ๋ก ๋ถํ |
|
|
|
|
|
Args: |
|
|
text: ๋ถํ ํ ํ
์คํธ |
|
|
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ |
|
|
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ |
|
|
separator: ๋ถํ ์ ์ฌ์ฉํ ๊ตฌ๋ถ์ |
|
|
|
|
|
Returns: |
|
|
๋ถํ ๋ ํ
์คํธ ์ฒญํฌ ๋ชฉ๋ก |
|
|
""" |
|
|
if not text or chunk_size <= 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
parts = text.split(separator) |
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_size = 0 |
|
|
|
|
|
for part in parts: |
|
|
part_size = len(part) |
|
|
|
|
|
if current_size + part_size + len(current_chunk) > chunk_size and current_chunk: |
|
|
|
|
|
chunks.append(separator.join(current_chunk)) |
|
|
|
|
|
|
|
|
overlap_tokens = [] |
|
|
overlap_size = 0 |
|
|
for token in reversed(current_chunk): |
|
|
if overlap_size + len(token) <= chunk_overlap: |
|
|
overlap_tokens.insert(0, token) |
|
|
overlap_size += len(token) + 1 |
|
|
else: |
|
|
break |
|
|
|
|
|
current_chunk = overlap_tokens |
|
|
current_size = overlap_size - len(current_chunk) |
|
|
|
|
|
current_chunk.append(part) |
|
|
current_size += part_size |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(separator.join(current_chunk)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
@staticmethod |
|
|
def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str: |
|
|
""" |
|
|
ํ
์คํธ ์ ์ |
|
|
|
|
|
Args: |
|
|
text: ์ ์ ํ ํ
์คํธ |
|
|
remove_urls: URL ์ ๊ฑฐ ์ฌ๋ถ |
|
|
remove_extra_whitespace: ์ฌ๋ถ์ ๊ณต๋ฐฑ ์ ๊ฑฐ ์ฌ๋ถ |
|
|
|
|
|
Returns: |
|
|
์ ์ ๋ ํ
์คํธ |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
if remove_urls: |
|
|
text = re.sub(r'https?://\S+|www\.\S+', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'<.*?>', '', text) |
|
|
|
|
|
|
|
|
if remove_extra_whitespace: |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
return text |
|
|
|
|
|
@staticmethod |
|
|
def text_to_documents( |
|
|
text: str, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
chunk_size: int = 512, |
|
|
chunk_overlap: int = 50 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
ํ
์คํธ๋ฅผ ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก์ผ๋ก ๋ณํ |
|
|
|
|
|
Args: |
|
|
text: ๋ณํํ ํ
์คํธ |
|
|
metadata: ๋ฌธ์์ ์ถ๊ฐํ ๋ฉํ๋ฐ์ดํฐ |
|
|
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ |
|
|
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ |
|
|
|
|
|
Returns: |
|
|
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก |
|
|
""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
|
|
|
clean = DocumentProcessor.clean_text(text) |
|
|
|
|
|
|
|
|
chunks = DocumentProcessor.split_text( |
|
|
clean, |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap |
|
|
) |
|
|
|
|
|
|
|
|
documents = [] |
|
|
for i, chunk in enumerate(chunks): |
|
|
doc = { |
|
|
"text": chunk, |
|
|
"index": i, |
|
|
"chunk_count": len(chunks) |
|
|
} |
|
|
|
|
|
|
|
|
if metadata: |
|
|
doc.update(metadata) |
|
|
|
|
|
documents.append(doc) |
|
|
|
|
|
return documents |
|
|
|
|
|
@staticmethod |
|
|
def load_documents_from_directory( |
|
|
directory: str, |
|
|
extensions: List[str] = [".txt", ".md", ".csv"], |
|
|
recursive: bool = True, |
|
|
chunk_size: int = 512, |
|
|
chunk_overlap: int = 50 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
๋๋ ํ ๋ฆฌ์์ ๋ฌธ์ ๋ก๋ ๋ฐ ์ฒ๋ฆฌ |
|
|
|
|
|
Args: |
|
|
directory: ๋ก๋ํ ๋๋ ํ ๋ฆฌ ๊ฒฝ๋ก |
|
|
extensions: ์ฒ๋ฆฌํ ํ์ผ ํ์ฅ์ ๋ชฉ๋ก |
|
|
recursive: ํ์ ๋๋ ํ ๋ฆฌ ๊ฒ์ ์ฌ๋ถ |
|
|
chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ |
|
|
chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ |
|
|
|
|
|
Returns: |
|
|
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก |
|
|
""" |
|
|
if not os.path.isdir(directory): |
|
|
logger.error(f"๋๋ ํ ๋ฆฌ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค: {directory}") |
|
|
return [] |
|
|
|
|
|
documents = [] |
|
|
|
|
|
for root, dirs, files in os.walk(directory): |
|
|
if not recursive and root != directory: |
|
|
continue |
|
|
|
|
|
for file in files: |
|
|
_, ext = os.path.splitext(file) |
|
|
if ext.lower() not in extensions: |
|
|
continue |
|
|
|
|
|
file_path = os.path.join(root, file) |
|
|
rel_path = os.path.relpath(file_path, directory) |
|
|
|
|
|
try: |
|
|
logger.info(f"ํ์ผ ๋ก๋ ์ค: {rel_path}") |
|
|
|
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
except UnicodeDecodeError: |
|
|
|
|
|
logger.info(f"UTF-8 ๋์ฝ๋ฉ ์คํจ, CP949๋ก ์๋: {rel_path}") |
|
|
with open(file_path, 'r', encoding='cp949') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"source": rel_path, |
|
|
"filename": file, |
|
|
"filetype": ext.lower()[1:], |
|
|
"filepath": file_path |
|
|
} |
|
|
|
|
|
|
|
|
if ext.lower() == '.csv': |
|
|
logger.info(f"CSV ํ์ผ ๊ฐ์ง, ํ ๋จ์๋ก ๋ถํ ์ฒ๋ฆฌ: {rel_path}") |
|
|
file_docs = DocumentProcessor.csv_to_documents(content, metadata) |
|
|
else: |
|
|
|
|
|
file_docs = DocumentProcessor.text_to_documents( |
|
|
content, |
|
|
metadata=metadata, |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap |
|
|
) |
|
|
|
|
|
documents.extend(file_docs) |
|
|
logger.info(f"{len(file_docs)}๊ฐ ์ฒญํฌ ์ถ์ถ: {rel_path}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"ํ์ผ '{rel_path}' ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"์ด {len(documents)}๊ฐ ๋ฌธ์ ์ฒญํฌ๋ฅผ ๋ก๋ํ์ต๋๋ค.") |
|
|
return documents |
|
|
|
|
|
@staticmethod |
|
|
def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]: |
|
|
""" |
|
|
๊ฒ์ ๊ฒฐ๊ณผ์์ RAG์ ์ฌ์ฉํ ์ปจํ
์คํธ ์ถ์ถ |
|
|
|
|
|
Args: |
|
|
results: ๊ฒ์ ๊ฒฐ๊ณผ ๋ชฉ๋ก |
|
|
field: ํ
์คํธ ๋ด์ฉ์ด ์๋ ํ๋ ์ด๋ฆ |
|
|
|
|
|
Returns: |
|
|
์ปจํ
์คํธ ํ
์คํธ ๋ชฉ๋ก |
|
|
""" |
|
|
context = [] |
|
|
|
|
|
for result in results: |
|
|
if field in result: |
|
|
context.append(result[field]) |
|
|
|
|
|
return context |
|
|
|
|
|
@staticmethod |
|
|
def csv_to_documents(content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
CSV ํ์ผ ๋ด์ฉ์ ํ ๋จ์๋ก ๋ถ๋ฆฌํ์ฌ ๊ฐ ํ์ ๋ณ๋์ ๋ฌธ์๋ก ์ฒ๋ฆฌ |
|
|
|
|
|
Args: |
|
|
content: CSV ํ์ผ์ ๋ด์ฉ |
|
|
metadata: ๊ธฐ๋ณธ ๋ฉํ๋ฐ์ดํฐ |
|
|
|
|
|
Returns: |
|
|
๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก (๊ฐ ํ์ด ๋ณ๋์ ๋ฌธ์) |
|
|
""" |
|
|
documents = [] |
|
|
|
|
|
try: |
|
|
|
|
|
try: |
|
|
csv_reader = csv.reader(io.StringIO(content)) |
|
|
rows = list(csv_reader) |
|
|
if len(rows) > 0 and len(rows[0]) > 1: |
|
|
|
|
|
logger.info(f"CSV ํ์ผ ์ฝ๋ง ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌ: {metadata.get('source', 'unknown')}") |
|
|
has_valid_format = True |
|
|
else: |
|
|
|
|
|
has_valid_format = False |
|
|
except Exception: |
|
|
has_valid_format = False |
|
|
|
|
|
|
|
|
if not has_valid_format: |
|
|
logger.warning(f"CSV ํ์ผ์ด ํ์ค ์ฝ๋ง ํ์์ด ์๋๋๋ค. ๊ณต๋ฐฑ ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌํ๊ฒ ์ต๋๋ค: {metadata.get('source', 'unknown')}") |
|
|
lines = content.strip().split('\n') |
|
|
|
|
|
for i, line in enumerate(lines): |
|
|
|
|
|
if not line.strip().startswith('IT'): |
|
|
continue |
|
|
|
|
|
|
|
|
parts = line.split(maxsplit=4) |
|
|
|
|
|
|
|
|
if len(parts) < 5: |
|
|
logger.warning(f"ํ {i+1} ๋ถ์กฑํ ๋ฐ์ดํฐ: {line[:50]}...") |
|
|
continue |
|
|
|
|
|
|
|
|
doc_id = parts[0].strip() |
|
|
query_type = parts[1].strip() |
|
|
question = parts[2].strip() |
|
|
answer = parts[3].strip() |
|
|
reference = parts[4].strip() if len(parts) > 4 else "" |
|
|
|
|
|
|
|
|
text = f"ID: {doc_id}\n" |
|
|
text += f"์ฟผ๋ฆฌ ์ ํ: {query_type}\n" |
|
|
text += f"์ง์ (Question): {question}\n" |
|
|
text += f"์๋ต (Answer): {answer}\n" |
|
|
if reference: |
|
|
text += f"์ฐธ์กฐ ๋ฌธ์/๋งฅ๋ฝ (Reference/Context): {reference}" |
|
|
|
|
|
|
|
|
doc_metadata = metadata.copy() |
|
|
doc_metadata.update({ |
|
|
"row": i, |
|
|
"query_type": query_type, |
|
|
"question": question, |
|
|
"answer": answer, |
|
|
"reference": reference |
|
|
}) |
|
|
|
|
|
document = { |
|
|
"text": text, |
|
|
"id": doc_id, |
|
|
**doc_metadata |
|
|
} |
|
|
|
|
|
documents.append(document) |
|
|
logger.debug(f"IT ๋ฌธ์ ์ฒ๋ฆฌ: {doc_id} - {question[:30]}...") |
|
|
|
|
|
logger.info(f"๊ณต๋ฐฑ ๊ตฌ๋ถ์ CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") |
|
|
return documents |
|
|
|
|
|
|
|
|
if not rows: |
|
|
logger.warning(f"CSV ํ์ผ์ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค: {metadata.get('source', 'unknown')}") |
|
|
return [] |
|
|
|
|
|
|
|
|
headers = rows[0] |
|
|
logger.debug(f"CSV ํค๋: {headers}") |
|
|
|
|
|
|
|
|
for i, row in enumerate(rows[1:], 1): |
|
|
|
|
|
while len(row) < len(headers): |
|
|
row.append("") |
|
|
|
|
|
|
|
|
row_data = {headers[j]: value for j, value in enumerate(row) if j < len(headers)} |
|
|
|
|
|
|
|
|
row_id = row[0] if row and len(row) > 0 else f"row_{i}" |
|
|
|
|
|
|
|
|
text_parts = [] |
|
|
for j, header in enumerate(headers): |
|
|
if j < len(row) and row[j]: |
|
|
text_parts.append(f"{header}: {row[j]}") |
|
|
|
|
|
text = "\n".join(text_parts) |
|
|
|
|
|
|
|
|
doc_metadata = metadata.copy() |
|
|
doc_metadata.update({ |
|
|
"row": i, |
|
|
"row_id": row_id, |
|
|
"total_rows": len(rows) - 1, |
|
|
"csv_data": row_data |
|
|
}) |
|
|
|
|
|
document = { |
|
|
"text": text, |
|
|
"id": row_id, |
|
|
**doc_metadata |
|
|
} |
|
|
|
|
|
documents.append(document) |
|
|
|
|
|
logger.info(f"CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"CSV ํ์ผ ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") |
|
|
|
|
|
return documents |
|
|
|