Spaces:
Running
Running
| import os | |
| from _utils.langchain_utils.LLM_class import LLM | |
| from typing import Any, List, Dict, Tuple, Optional, Union, cast | |
| from anthropic import Anthropic, AsyncAnthropic | |
| import logging | |
| from langchain.schema import Document | |
| from llama_index import Document as Llama_Index_Document | |
| import asyncio | |
| from typing import List | |
| from dataclasses import dataclass | |
| from _utils.gerar_documento_utils.llm_calls import ( | |
| aclaude_answer, | |
| agemini_answer, | |
| agpt_answer, | |
| ) | |
| from _utils.gerar_documento_utils.prompts import contextual_prompt | |
| from _utils.models.gerar_documento import ( | |
| ContextualizedChunk, | |
| DocumentChunk, | |
| RetrievalConfig, | |
| ) | |
| from langchain_core.messages import HumanMessage | |
| from gerar_documento.serializer import ( | |
| GerarDocumentoComPDFProprioSerializerData, | |
| GerarDocumentoSerializerData, | |
| ) | |
| from setup.logging import Axiom | |
| import re | |
| class ContextualRetriever: | |
| def __init__( | |
| self, | |
| serializer: Union[ | |
| GerarDocumentoSerializerData, GerarDocumentoComPDFProprioSerializerData, Any | |
| ], | |
| ): | |
| self.lista_contador = [] | |
| self.contextual_retriever_utils = ContextualRetrieverUtils() | |
| self.config = RetrievalConfig( | |
| num_chunks=serializer.num_chunks_retrieval, | |
| embedding_weight=serializer.embedding_weight, | |
| bm25_weight=serializer.bm25_weight, | |
| context_window=serializer.context_window, | |
| chunk_overlap=serializer.chunk_overlap, | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| self.bm25 = None | |
| self.claude_context_model = serializer.claude_context_model | |
| self.claude_api_key = os.environ.get("CLAUDE_API_KEY", "") | |
| self.claude_client = AsyncAnthropic(api_key=self.claude_api_key) | |
| async def llm_call_uma_lista_de_20_chunks( | |
| self, | |
| lista_com_20_chunks: List[DocumentChunk], | |
| resumo_auxiliar, | |
| axiom_instance: Axiom, | |
| ) -> List[List[str]]: | |
| all_chunks_contents, all_document_ids = ( | |
| self.contextual_retriever_utils.get_all_document_ids_and_contents( | |
| lista_com_20_chunks | |
| ) | |
| ) | |
| send_axiom = axiom_instance.send_axiom | |
| utils = self.contextual_retriever_utils | |
| try: | |
| prompt = contextual_prompt( | |
| resumo_auxiliar, all_chunks_contents, len(lista_com_20_chunks) | |
| ) | |
| result = None | |
| for attempt in range(4): | |
| if attempt != 0: | |
| send_axiom( | |
| f"------------- FORMATAÇÃO DO CONTEXTUAL INCORRETA - TENTANDO NOVAMENTE (TENTATIVA: {attempt + 1}) -------------" | |
| ) | |
| send_axiom( | |
| f"COMEÇANDO UMA REQUISIÇÃO DO CONTEXTUAL - TENTATIVA {attempt + 1}" | |
| ) | |
| raw_response = await agemini_answer(prompt, "gemini-2.0-flash-lite") | |
| response = cast(str, raw_response) | |
| send_axiom( | |
| f"TERMINOU UMA REQUISIÇÃO DO CONTEXTUAL - TENTATIVA {attempt + 1}" | |
| ) | |
| matches = utils.validate_many_chunks_in_one_request( | |
| response, all_document_ids | |
| ) | |
| if matches: | |
| send_axiom( | |
| f"VALIDAÇÃO DO CONTEXTUAL FUNCIONOU NA TENTATIVA {attempt + 1} (ou seja, a função validate_many_chunks_in_one_request)" | |
| ) | |
| result = utils.get_info_from_validated_chunks(matches) | |
| break | |
| if result is None: | |
| axiom_instance.send_axiom( | |
| f"-------------- UMA LISTA DE 20 CHUNKS FALHOU AS 4x NA FORMATAÇÃO ------------- ÚLTIMO RETORNO ERRADO: {response}" | |
| ) | |
| result = [[""]] # default value if no iteration succeeded | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Context generation failed for chunks .... : {str(e)}") | |
| return [[""]] | |
| async def contextualize_uma_lista_de_20_chunks( | |
| self, | |
| lista_com_20_chunks: List[DocumentChunk], | |
| response_auxiliar_summary, | |
| axiom_instance: Axiom, | |
| ): | |
| self.lista_contador.append(0) | |
| print("contador: ", len(self.lista_contador)) | |
| result = await self.llm_call_uma_lista_de_20_chunks( | |
| lista_com_20_chunks, response_auxiliar_summary, axiom_instance | |
| ) | |
| lista_chunks: List[ContextualizedChunk] = [] | |
| try: | |
| for index, chunk in enumerate(lista_com_20_chunks): | |
| lista_chunks.append( | |
| ContextualizedChunk( | |
| contextual_summary=result[index][2], | |
| content=chunk.content, | |
| page_number=chunk.page_number, | |
| id_do_processo=int(result[index][0]), | |
| chunk_id=chunk.chunk_id, | |
| start_char=chunk.start_char, | |
| end_char=chunk.end_char, | |
| context=result[index][1], | |
| ) | |
| ) | |
| except BaseException as e: | |
| axiom_instance.send_axiom( | |
| f"ERRO EM UMA LISTA COM 20 CHUNKS CONTEXTUALS --------- lista: {lista_com_20_chunks} ------------ ERRO: {e}" | |
| ) | |
| return lista_chunks | |
| async def contextualize_all_chunks( | |
| self, | |
| all_PDFs_chunks: List[DocumentChunk], | |
| response_auxiliar_summary, | |
| axiom_instance: Axiom, | |
| ) -> List[ContextualizedChunk]: | |
| """Add context to all chunks""" | |
| lista_de_listas_cada_com_20_chunks = ( | |
| self.contextual_retriever_utils.get_lista_de_listas_cada_com_20_chunks( | |
| all_PDFs_chunks | |
| ) | |
| ) | |
| async with asyncio.TaskGroup() as tg: | |
| def processa_uma_lista_de_20_chunks( | |
| lista_com_20_chunks: List[DocumentChunk], | |
| ): | |
| coroutine = self.contextualize_uma_lista_de_20_chunks( | |
| lista_com_20_chunks, response_auxiliar_summary, axiom_instance | |
| ) | |
| return tg.create_task(coroutine) | |
| tasks = [ | |
| processa_uma_lista_de_20_chunks(lista_com_20_chunks) | |
| for lista_com_20_chunks in lista_de_listas_cada_com_20_chunks | |
| ] | |
| contextualized_chunks: List[ContextualizedChunk] = [] | |
| for task in tasks: | |
| contextualized_chunks = contextualized_chunks + task.result() | |
| axiom_instance.send_axiom( | |
| "TERMINOU COM SUCESSO DE PROCESSAR TUDO DOS CONTEXTUALS" | |
| ) | |
| return contextualized_chunks | |
| class ContextualRetrieverUtils: | |
| def get_all_document_ids_and_contents( | |
| self, lista_com_20_chunks: List[DocumentChunk] | |
| ): | |
| contador = 1 | |
| all_chunks_contents = "" | |
| all_document_ids = [] | |
| for chunk in lista_com_20_chunks: | |
| all_chunks_contents += f"\n\nCHUNK {contador}:\n" | |
| all_chunks_contents += chunk.content | |
| pattern = r"Num\. (\d+)" | |
| import re | |
| match = re.search(pattern, chunk.content) | |
| if match: | |
| number = match.group(1) # Extract the number | |
| else: | |
| number = 0 | |
| all_document_ids.append(int(number)) | |
| contador += 1 | |
| return all_chunks_contents, all_document_ids | |
| def get_info_from_validated_chunks(self, matches): | |
| result = [ | |
| [int(doc_id), title.strip(), content.strip()] | |
| for doc_id, title, content in matches | |
| ] | |
| return result | |
| def get_lista_de_listas_cada_com_20_chunks( | |
| self, all_PDFs_chunks: List[DocumentChunk] | |
| ): | |
| return [all_PDFs_chunks[i : i + 20] for i in range(0, len(all_PDFs_chunks), 20)] | |
| def validate_many_chunks_in_one_request( | |
| self, response: str, lista_de_document_ids: List[int] | |
| ): | |
| context = ( | |
| response.replace("document_id: ", "") | |
| .replace("document_id:", "") | |
| .replace("DOCUMENT_ID: ", "") | |
| .replace("DOCUMENT_ID: ", "") | |
| ) | |
| # pattern = r"\[(\d+|[-.]+)\] --- (.+?) --- (.+?)</chunk_context>" # Funciona para quando a resposta do LLM não vem com "document_id" escrito | |
| matches = self.check_regex_patterns(context, lista_de_document_ids) | |
| if not matches or len(matches) == 0: | |
| print( | |
| "----------- ERROU NA TENTATIVA ATUAL DE FORMATAR O CONTEXTUAL -----------" | |
| ) | |
| return False | |
| matches_as_list = [] | |
| for index, match in enumerate(list(matches)): | |
| if index >= 20: | |
| break | |
| resultado = match[0].replace(".", "").replace("-", "") | |
| resultado = lista_de_document_ids[index] | |
| matches_as_list.append((resultado, match[1], match[2])) | |
| return matches_as_list | |
| def check_regex_patterns(self, context: str, lista_de_document_ids: List[int]): | |
| patterns = [ | |
| r"\[(.*?)\] --- \[(.*?)\] --- \[(.*?)\](?=\n|\s*$)", | |
| r"\[([^\[\]]+?)\]\s*---\s*\[([^\[\]]+?)\]\s*---\s*(.*?)</chunk_context>", | |
| r"<chunk_context>\s*(\d+)(?:\s*-\s*Pág\.\s*\d+)?\s*---\s*([^-\n]+)\s*---\s*([^<]+)</chunk_context>", | |
| r"<chunk_context>\s*(?:\[*([\d]+)\]*\s*[-–]*\s*(?:Pág\.\s*\d+\s*[-–]*)?)?\s*\[*([^\]]+)\]*\s*[-–]*\s*\[*([^\]]+)\]*\s*[-–]*\s*\[*([^\]]+)\]*\s*</chunk_context>", | |
| r"<chunk_context>\s*(.*?)\s*---\s*(.*?)\s*---\s*(.*?)\s*</chunk_context>", | |
| # -------------- ABAIXO SÃO OS ANTIGOS | |
| # r"\[*([\d.\-]+)\]*\s*---\s*\[*([^]]+)\]*\s*---\s*\[*([^]]+)\]*\s*</chunk_context>", # PRIMEIRO DE TODOS | |
| # r"<chunk_context>\s*([\d.\-]+)\s*---\s*([^<]+)\s*---\s*([^<]+)\s*</chunk_context>", | |
| # r"\[([\d.\-]+)\]\s*---\s*\[([^]]+)\]\s*---\s*\[([^]]+)\]\s*</chunk_context>", | |
| # r"<chunk_context>\s*\[?([\d.\-]+)\]?\s*---\s*\[?([^\]\[]+?)\]?\s*---\s*\[?([^<]+?)\]?\s*</chunk_context>", | |
| # r"<chunk_context>\s*\[([\d.\-]+)\]\s*---\s*\[([^\]]+)\]\s*---\s*\[([^\]]+)\]\s*</chunk_context>" | |
| # r"<chunk_context>\s*\[?([\d.\-\s]+)\]?\s*---\s*\[?([^\]\[]+?)\]?\s*---\s*\[?([\s\S]+?)\]?\s*</chunk_context>", | |
| ] | |
| resultado = None | |
| for pattern in patterns: | |
| matches: List[str] = re.findall(pattern, context, re.DOTALL) | |
| condition_tuples_3_items = all(len(m) == 3 for m in matches) | |
| if len(matches) == len(lista_de_document_ids) and condition_tuples_3_items: | |
| print("\n--------------- REGEX DO CONTEXTUAL FUNCIONOU") | |
| resultado = [] | |
| for m in matches: | |
| regex = r"Num\.\s*(\d+)\s*-" | |
| page_id = re.search(regex, m[0]) | |
| if page_id: | |
| first_item = page_id.group(1) | |
| else: | |
| first_item = "0" | |
| resultado.append((first_item, m[1], m[2])) | |
| break | |
| if not resultado: | |
| context = ( | |
| context.replace("</final_output>", "") | |
| .replace("<final_output>", "") | |
| .strip() | |
| ) | |
| raw_chunks = context.split("</chunk_context>")[0:20] | |
| resultado_temporario = [] | |
| for r in raw_chunks: | |
| lista_3_itens = r.split("---") | |
| page_id = re.search(r"Num\.\s*(\d+)\s*-", lista_3_itens[0].strip()) | |
| page_id_tentativa_2 = re.search( | |
| r"\d+\.\s+(\d+)\s+-\s+Pág\.", lista_3_itens[0].strip() | |
| ) | |
| if page_id: | |
| first_item = page_id.group(1) | |
| elif page_id_tentativa_2: | |
| first_item = page_id_tentativa_2.group(1) | |
| else: | |
| first_item = "0" | |
| resultado_temporario.append( | |
| (first_item, lista_3_itens[1], lista_3_itens[2]) | |
| ) | |
| condition_tuples_3_items = all(len(m) == 3 for m in resultado_temporario) | |
| if ( | |
| len(resultado_temporario) == len(lista_de_document_ids) | |
| and condition_tuples_3_items | |
| ): | |
| resultado = resultado_temporario | |
| return resultado | |
| # Código comentado abaixo é para ler as páginas ao redor da página atual do chunk | |
| # page_content = "" | |
| # for i in range( | |
| # max(0, chunk.page_number - 1), | |
| # min(len(single_page_text), chunk.page_number + 2), | |
| # ): | |
| # page_content += single_page_text[i].page_content if single_page_text[i] else "" | |