Spaces:
Sleeping
Sleeping
| import os | |
| from typing import List, Tuple | |
| from langchain_core.documents import Document | |
| from odf.opendocument import load | |
| from odf.text import P | |
| from typing import List | |
| from setup.easy_imports import ( | |
| PyPDFLoader, | |
| RecursiveCharacterTextSplitter, | |
| ) | |
| class SplitterUtils: | |
| def get_file_type(self, file_path): | |
| _, ext = os.path.splitext(file_path) | |
| ext = ext.lower() # Normalize to lowercase | |
| if ext == ".pdf": | |
| return "pdf" | |
| elif ext == ".docx": | |
| return "word" | |
| elif ext == ".doc": | |
| return "doc" | |
| elif ext == ".odt": | |
| return "odt" | |
| elif ext == ".txt": | |
| return "txt" | |
| else: | |
| print("\next", ext) | |
| return "unknown" | |
| def load_odt_file(self, file_path: str): | |
| textdoc = load(file_path) | |
| all_paragraphs = textdoc.getElementsByType(P) | |
| text = [] | |
| for p in all_paragraphs: | |
| for node in p.childNodes: | |
| if node.nodeType == node.TEXT_NODE: | |
| text.append(node.data) | |
| return "\n".join(text) | |
| def getTextFromDotDoc(self, file_path: str): | |
| import subprocess | |
| import shutil | |
| antiword_path = shutil.which("antiword") | |
| command = [antiword_path, "-m", "UTF-8", file_path] | |
| # Execute the command | |
| result = subprocess.run( | |
| command, | |
| capture_output=True, # Capture stdout and stderr | |
| text=True, # Decode stdout/stderr as text using utf-8 | |
| check=True, # Raise CalledProcessError on non-zero exit code | |
| encoding="utf-8", # Explicitly specify decoding | |
| ) | |
| # Success! The extracted text is in result.stdout | |
| extracted_text = result.stdout | |
| return extracted_text | |
| class Splitter_Simple: | |
| def __init__( | |
| self, | |
| chunk_size=1000, | |
| chunk_overlap=400, | |
| ): | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
| ) | |
| async def load_and_split_document(self, pdf_path: str): | |
| """Load PDF and split into chunks with metadata""" | |
| print("\nCOMEÇANDO LEITURA DO PDF") | |
| pages = PyPDFLoader(pdf_path).load_and_split(self.text_splitter) | |
| print("\nTERMINADO LEITURA DO PDF") | |
| return pages | |
| def load_and_split_text(self, text: str) -> List[Document]: | |
| documents: List[Document] = [] | |
| chunks = self.text_splitter.split_text(text) | |
| for chunk in chunks: | |
| documents.append(Document(page_content=chunk)) | |
| return documents | |
| def get_chunks_of_string_only_from_list_of_documents( | |
| self, lista_de_documentos: List[Document] | |
| ): | |
| full_text_as_string = "" | |
| for page in lista_de_documentos: | |
| full_text_as_string = full_text_as_string + page.page_content | |
| full_text_as_array = self.text_splitter.split_text(full_text_as_string) | |
| return full_text_as_array | |
| def combine_documents_without_losing_pagination(documents: list[Document]): | |
| combined_text = "" | |
| page_boundaries: List[Tuple[int, int, int]] = ( | |
| [] | |
| ) # (start_idx, end_idx, page_number) | |
| current_position = 0 | |
| for document in documents: | |
| start = current_position | |
| combined_text += document.page_content | |
| end = current_position + len(document.page_content) | |
| page_number = document.metadata.get("page", len(page_boundaries) + 1) | |
| page_boundaries.append((start, end, page_number)) | |
| current_position = end | |
| return page_boundaries, combined_text | |