Spaces:
Running
Running
| from typing import Any, Dict, cast | |
| from langchain.prompts import PromptTemplate | |
| from _utils.Utils_Class import UtilsClass | |
| from _utils.bubble_integrations.enviar_resposta_final import enviar_resposta_final | |
| from _utils.custom_exception_handler import custom_exception_handler_without_api_handler | |
| from _utils.gerar_documento_utils.GerarDocumento import GerarDocumento | |
| from _utils.langchain_utils.LLM_class import LLM | |
| from _utils.gerar_documento_utils.utils import ( | |
| generate_document_title, | |
| split_text_by_tokens, | |
| ) | |
| from _utils.langchain_utils.Prompt_class import Prompt | |
| from _utils.utils import convert_markdown_to_HTML, print_sentry, sentry_add_breadcrumb | |
| from setup.easy_imports import ( | |
| Response, | |
| AsyncAPIView, | |
| APIView, | |
| MultiPartParser, | |
| extend_schema, | |
| ) | |
| from datetime import datetime | |
| from _utils.handle_files import handle_pdf_files_from_serializer, remove_pdf_temp_files | |
| from _utils.gerar_documento import ( | |
| gerar_documento, | |
| ) | |
| from _utils.gerar_documento_utils.prompts import prompt_auxiliar_inicio | |
| from setup.logging import Axiom, send_axiom | |
| from .serializer import ( | |
| GerarDocumentoComPDFProprioSerializer, | |
| GerarDocumentoSerializer, | |
| ) | |
| import asyncio | |
| from _utils.langchain_utils.Splitter_class import Splitter | |
| import json | |
| class GerarDocumentoView(AsyncAPIView): | |
| # parser_classes = [MultiPartParser] | |
| serializer = {} | |
| axiom_instance = Axiom() | |
| async def post(self, request): | |
| self.axiom_instance.generate_new_uuid() | |
| print(f"\n\nDATA E HORA DA REQUISIÇÃO: {datetime.now()}") | |
| self.axiom_instance.send_axiom( | |
| f"COMEÇOU NOVA REQUISIÇÃO - request.data: {request.data}" | |
| ) | |
| serializer = GerarDocumentoSerializer(data=request.data) | |
| if serializer.is_valid(raise_exception=True): | |
| obj = serializer.get_obj() # type: ignore | |
| if not serializer.validated_data: | |
| raise ValueError("Erro no validated_data") | |
| async def proccess_data_after_response(): | |
| # await asyncio.sleep(0) | |
| data = cast(Dict[str, Any], serializer.validated_data) | |
| self.serializer = data | |
| listaPDFs = json.loads(obj.files) | |
| listaPDFs = [l["link_arquivo"] for l in listaPDFs] | |
| self.axiom_instance.send_axiom(f"listaPDFs: {listaPDFs}") | |
| resposta_llm = await gerar_documento( | |
| obj, listaPDFs, self.axiom_instance, isBubble=True | |
| ) | |
| self.axiom_instance.send_axiom(f"resposta_llm: {resposta_llm}") | |
| # remove_pdf_temp_files(listaPDFs) | |
| # asyncio.create_task(proccess_data_after_response()) | |
| loop = asyncio.get_running_loop() | |
| loop.run_in_executor( | |
| None, lambda: asyncio.run(proccess_data_after_response()) | |
| ) | |
| return Response( | |
| {"resposta": "Requisição está sendo processada em segundo plano"} | |
| ) | |
| class GerarDocumentoComPDFProprioView(AsyncAPIView): | |
| parser_classes = [MultiPartParser] | |
| serializer = {} | |
| axiom_instance = Axiom() | |
| async def post(self, request): | |
| self.axiom_instance.generate_new_uuid() | |
| print(f"\n\nDATA E HORA DA REQUISIÇÃO: {datetime.now()}") | |
| self.axiom_instance.send_axiom( | |
| f"COMEÇOU NOVA REQUISIÇÃO - request.data: {request.data}" | |
| ) | |
| serializer = GerarDocumentoComPDFProprioSerializer(data=request.data) | |
| if serializer.is_valid(raise_exception=True): | |
| data = cast(Dict[str, Any], serializer.validated_data) | |
| obj = serializer.get_obj() # type: ignore | |
| self.serializer = data | |
| listaPDFs = handle_pdf_files_from_serializer( | |
| data["files"], self.axiom_instance | |
| ) | |
| resposta_llm = await gerar_documento(obj, listaPDFs, self.axiom_instance) | |
| self.axiom_instance.send_axiom(f"resposta_llm: {resposta_llm}") | |
| remove_pdf_temp_files(listaPDFs) | |
| self.axiom_instance.send_axiom( | |
| "PRÓXIMA LINHA ENVIA A RESPOSTA A QUEM FEZ A REQUISIÇÃO" | |
| ) | |
| return Response({"resposta": resposta_llm}) | |
| class GerarEmentaView(AsyncAPIView): | |
| serializer = {} | |
| axiom_instance = Axiom() | |
| async def post(self, request): | |
| self.axiom_instance.generate_new_uuid() | |
| print(f"\n\nDATA E HORA DA REQUISIÇÃO: {datetime.now()}") | |
| self.axiom_instance.send_axiom( | |
| f"COMEÇOU NOVA REQUISIÇÃO - request.data: {request.data}" | |
| ) | |
| serializer = GerarDocumentoSerializer(data=request.data) | |
| if serializer.is_valid(raise_exception=True): | |
| obj = serializer.get_obj() # type: ignore | |
| if not serializer.validated_data: | |
| raise ValueError("Erro no validated_data") | |
| async def proccess_data_after_response(): | |
| try: | |
| util = UtilsClass() | |
| handle_files = util.handle_files | |
| data = cast(Dict[str, Any], serializer.validated_data) | |
| self.serializer = data | |
| gerar_documento_instance = GerarDocumento( | |
| obj, True, self.axiom_instance | |
| ) | |
| # listaPDFs = [l["link_arquivo"] for l in data["files"]] | |
| listaPDFs = json.loads(obj.files) | |
| listaPDFs = [l["link_arquivo"] for l in listaPDFs] | |
| self.axiom_instance.send_axiom(f"listaPDFs: {listaPDFs}") | |
| all_PDFs_chunks, full_text_as_array = ( | |
| await handle_files.get_full_text_and_all_PDFs_chunks( | |
| listaPDFs, | |
| Splitter(obj.chunk_size, obj.chunk_overlap), | |
| False, | |
| True, | |
| ) | |
| ) | |
| full_text = "".join(full_text_as_array) | |
| self.axiom_instance.send_axiom( | |
| f"full_text[0:100] gerado do PDF: {full_text[0:100]}" | |
| ) | |
| texto_completo = "" | |
| text_splitted_by_tokens = split_text_by_tokens(full_text) | |
| for text in text_splitted_by_tokens: | |
| prompt_template = PromptTemplate( | |
| input_variables=["context"], | |
| template=obj.prompt_gerar_documento, | |
| ) | |
| texto_da_parte = ( | |
| await gerar_documento_instance.gerar_ementa_final( | |
| obj.llm_ultimas_requests, | |
| prompt_template.format(context=text), | |
| text, | |
| ) | |
| ) | |
| texto_completo += texto_da_parte | |
| texto_completo_como_html = convert_markdown_to_HTML( | |
| texto_completo | |
| ).replace("resposta_segunda_etapa:", "<br><br>") | |
| self.axiom_instance.send_axiom( | |
| f"resposta_llm: {texto_completo_como_html}" | |
| ) | |
| titulo_do_documento = await generate_document_title( | |
| cast(str, texto_completo_como_html) | |
| ) | |
| self.axiom_instance.send_axiom( | |
| "PRÓXIMA LINHA ENVIA A RESPOSTA A QUEM FEZ A REQUISIÇÃO" | |
| ) | |
| self.axiom_instance.send_axiom( | |
| "COMEÇANDO A REQUISIÇÃO FINAL PARA O BUBBLE" | |
| ) | |
| enviar_resposta_final( | |
| obj.doc_id, # type: ignore | |
| obj.form_response_id, # type: ignore | |
| obj.version, # type: ignore | |
| texto_completo_como_html, | |
| False, | |
| cast(str, titulo_do_documento), | |
| ) | |
| self.axiom_instance.send_axiom( | |
| "TERMINOU A REQUISIÇÃO FINAL PARA O BUBBLE" | |
| ) | |
| except Exception as e: | |
| print(f"ERRO GERAR EMENTA: {e}") | |
| custom_exception_handler_without_api_handler( | |
| e, serializer, self.axiom_instance | |
| ) | |
| raise | |
| # asyncio.create_task(proccess_data_after_response()) | |
| loop = asyncio.get_running_loop() | |
| loop.run_in_executor( | |
| None, lambda: asyncio.run(proccess_data_after_response()) | |
| ) | |
| return Response( | |
| {"resposta": "Requisição está sendo processada em segundo plano"} | |
| ) | |
| class GerarEmentaComPDFProprioView(AsyncAPIView): | |
| parser_classes = [MultiPartParser] | |
| serializer = {} | |
| axiom_instance = Axiom() | |
| async def post(self, request): | |
| self.axiom_instance.generate_new_uuid() | |
| print(f"\n\nDATA E HORA DA REQUISIÇÃO: {datetime.now()}") | |
| self.axiom_instance.send_axiom( | |
| f"COMEÇOU NOVA REQUISIÇÃO - request.data: {request.data}" | |
| ) | |
| serializer = GerarDocumentoComPDFProprioSerializer(data=request.data) | |
| util = UtilsClass() | |
| handle_files = util.handle_files | |
| if serializer.is_valid(raise_exception=True): | |
| data = cast(Dict[str, Any], serializer.validated_data) | |
| self.axiom_instance.send_axiom(f"data: {data}") | |
| self.serializer = data | |
| serializer_obj = serializer.get_obj() | |
| gerar_documento_instance = GerarDocumento( | |
| serializer_obj, False, self.axiom_instance | |
| ) | |
| listaPDFs = handle_pdf_files_from_serializer( | |
| data["files"], self.axiom_instance | |
| ) | |
| self.axiom_instance.send_axiom(f"listaPDFs: {listaPDFs}") | |
| all_PDFs_chunks, full_text_as_array = ( | |
| await handle_files.get_full_text_and_all_PDFs_chunks( | |
| listaPDFs, | |
| Splitter(serializer_obj.chunk_size, serializer_obj.chunk_overlap), | |
| False, | |
| False, | |
| ) | |
| ) | |
| full_text = "".join(full_text_as_array) | |
| self.axiom_instance.send_axiom( | |
| f"full_text[0:100] gerado do PDF: {full_text[0:100]}" | |
| ) | |
| texto_completo = "" | |
| text_splitted_by_tokens = split_text_by_tokens(full_text) | |
| for text in text_splitted_by_tokens: | |
| prompt_template = PromptTemplate( | |
| input_variables=["context"], | |
| template=serializer_obj.prompt_gerar_documento, | |
| ) | |
| texto_da_parte = await gerar_documento_instance.gerar_ementa_final( | |
| serializer_obj.llm_ultimas_requests, | |
| prompt_template.format(context=text), | |
| text, | |
| ) | |
| texto_completo += texto_da_parte | |
| texto_completo_como_html = convert_markdown_to_HTML(texto_completo).replace( | |
| "resposta_segunda_etapa:", "<br><br>" | |
| ) | |
| self.axiom_instance.send_axiom(f"resposta_llm: {texto_completo_como_html}") | |
| titulo_do_documento = await generate_document_title( | |
| cast(str, texto_completo_como_html) | |
| ) | |
| remove_pdf_temp_files(listaPDFs) | |
| self.axiom_instance.send_axiom( | |
| "PRÓXIMA LINHA ENVIA A RESPOSTA A QUEM FEZ A REQUISIÇÃO" | |
| ) | |
| return Response( | |
| { | |
| "texto_completo": texto_completo_como_html, | |
| "titulo_do_documento": titulo_do_documento, | |
| } | |
| ) | |