Spaces:
Sleeping
Sleeping
| # app.py (Versão Híbrida Final com Bi-Encoder e Cross-Encoder) | |
| import pandas as pd | |
| from flask import Flask, render_template, request, jsonify | |
| import os | |
| import sys | |
| import traceback | |
| import subprocess | |
| # Importa as duas classes de modelos necessárias | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| import csv | |
| from collections import defaultdict | |
| import datetime | |
| import re | |
| from huggingface_hub import InferenceClient, HfApi | |
| from huggingface_hub.utils import HfHubHTTPError | |
| import atexit | |
| import json | |
| from hashlib import sha1 | |
| # --- Variáveis e Constantes de Feedback --- | |
| USER_FEEDBACK_FILE = 'user_feedback.csv' | |
| USER_BEST_MATCHES_COUNTS = {} | |
| USER_FEEDBACK_THRESHOLD = 10 | |
| FEEDBACK_CSV_COLUMNS = ['timestamp', 'query_original', 'query_normalized', 'tuss_code_submitted', 'tuss_code_raw_input', 'tuss_description_associated', 'rol_names_associated', 'feedback_type'] | |
| # --- Configuração do Cliente da IA --- | |
| api_key = os.environ.get("USUARIO_KEY") | |
| if not api_key: | |
| print("--- [AVISO CRÍTICO] Secret 'USUARIO_KEY' não encontrado. As chamadas para a IA irão falhar. ---") | |
| client_ia = None | |
| else: | |
| client_ia = InferenceClient( | |
| provider="novita", | |
| api_key=api_key, | |
| ) | |
| print("--- [SUCESSO] Cliente de Inferência da IA configurado com a chave correta. ---") | |
| # --- SEÇÃO DE PERSISTÊNCIA --- | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| REPO_ID = "tuliodisanto/Buscador_Rol_vs.2_IA" | |
| if not HF_TOKEN: | |
| print("--- [AVISO CRÍTICO] Secret 'HF_TOKEN' não encontrado. Os arquivos não serão salvos no repositório. ---") | |
| hf_api = None | |
| else: | |
| hf_api = HfApi(token=HF_TOKEN) | |
| print(f"--- [SUCESSO] Cliente da API do Hugging Face configurado para o repositório: {REPO_ID}. ---") | |
| DATA_HAS_CHANGED = False | |
| # --- Funções de Feedback --- | |
| def normalize_text_for_feedback(text): | |
| if pd.isna(text): return "" | |
| try: | |
| from enhanced_search_v2 import normalize_text as es_normalize_text | |
| return es_normalize_text(str(text).strip()) | |
| except ImportError: | |
| import unidecode | |
| return unidecode.unidecode(str(text).lower().strip()) | |
| def load_user_feedback(): | |
| global USER_BEST_MATCHES_COUNTS | |
| USER_BEST_MATCHES_COUNTS = {} | |
| feedback_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), USER_FEEDBACK_FILE) | |
| if not os.path.exists(feedback_file_path): | |
| with open(feedback_file_path, 'w', newline='', encoding='utf-8') as f: csv.writer(f).writerow(FEEDBACK_CSV_COLUMNS) | |
| return | |
| try: | |
| with open(feedback_file_path, 'r', encoding='utf-8') as f: | |
| reader = csv.reader(f) | |
| try: | |
| header = next(reader) | |
| if [col.strip() for col in header] != FEEDBACK_CSV_COLUMNS: | |
| print(f"--- [AVISO] Cabeçalho do {USER_FEEDBACK_FILE} incorreto.") | |
| return | |
| except StopIteration: | |
| return | |
| for row in reader: | |
| if len(row) == len(FEEDBACK_CSV_COLUMNS): | |
| row_dict = dict(zip(FEEDBACK_CSV_COLUMNS, row)) | |
| query_norm, tuss_code = row_dict.get('query_normalized', ''), row_dict.get('tuss_code_submitted', '') | |
| if query_norm and tuss_code: | |
| if query_norm not in USER_BEST_MATCHES_COUNTS: USER_BEST_MATCHES_COUNTS[query_norm] = {} | |
| USER_BEST_MATCHES_COUNTS[query_norm][tuss_code] = USER_BEST_MATCHES_COUNTS[query_norm].get(tuss_code, 0) + 1 | |
| print(f"--- [SUCESSO] Feedback de usuário carregado/sincronizado. ---") | |
| except Exception as e: print(f"--- [ERRO] Falha ao carregar feedback: {e} ---"); traceback.print_exc() | |
| # --- Execução de Scripts e Importações --- | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| try: | |
| from enhanced_search_v2 import load_and_prepare_database, load_correction_corpus, load_general_dictionary, search_procedure_with_log | |
| print("--- [SUCESSO] Módulo 'enhanced_search_v2.py' importado. ---") | |
| except Exception as e: print(f"--- [ERRO CRÍTICO] Não foi possível importar 'enhanced_search_v2.py': {e} ---"); traceback.print_exc(); sys.exit(1) | |
| app = Flask(__name__) | |
| # --- Carregamento dos Dados e Modelos --- | |
| # CORREÇÃO 1: Preparar as variáveis para receber todos os 7 valores de load_and_prepare_database | |
| DF_ORIGINAL, DF_NORMALIZED, FUZZY_CORPUS, BM25_MODEL, DB_WORD_SET, doc_freq, tuss_map = (None, None, None, None, set(), {}, {}) | |
| CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS = [], [] | |
| PORTUGUESE_WORD_SET = set() | |
| SEMANTIC_MODEL = None # Bi-Encoder | |
| CROSS_ENCODER_MODEL = None # Cross-Encoder | |
| try: | |
| # Carregamento dos dados | |
| db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'rol_procedures_database.csv') | |
| # CORREÇÃO 1 (continuação): Desempacotar os 7 valores retornados pela função | |
| DF_ORIGINAL, DF_NORMALIZED, FUZZY_CORPUS, BM25_MODEL, DB_WORD_SET, doc_freq, tuss_map = load_and_prepare_database(db_path) | |
| dict_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Dic.csv') | |
| CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS = load_correction_corpus(dict_path, column_name='Termo_Correto') | |
| general_dict_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dicionario_ptbr.txt') | |
| PORTUGUESE_WORD_SET = load_general_dictionary(general_dict_path) | |
| load_user_feedback() | |
| # Carregamento dos dois modelos semânticos | |
| print("\n--- [SETUP] Carregando modelo Bi-Encoder (Etapa 1 de reordenação)... ---") | |
| bi_encoder_model_name = 'sentence-transformers/all-MiniLM-L6-v2' | |
| SEMANTIC_MODEL = SentenceTransformer(bi_encoder_model_name, device='cpu') | |
| print(f"--- [SUCESSO] Modelo Bi-Encoder '{bi_encoder_model_name}' carregado. ---") | |
| print("\n--- [SETUP] Carregando modelo Cross-Encoder (Etapa 2 de reordenação)... ---") | |
| cross_encoder_model_name = 'cross-encoder/ms-marco-MiniLM-L-6-v2' | |
| CROSS_ENCODER_MODEL = CrossEncoder(cross_encoder_model_name, device='cpu') | |
| print(f"--- [SUCESSO] Modelo Cross-Encoder '{cross_encoder_model_name}' carregado. ---") | |
| except Exception as e: | |
| print(f"--- [ERRO CRÍTICO] Falha fatal durante o setup: {e} ---"); traceback.print_exc(); sys.exit(1) | |
| # --- Rotas da Aplicação --- | |
| def index(): return render_template('index.html') | |
| def favicon(): return '', 204 | |
| def search(): | |
| try: | |
| data = request.get_json() | |
| query = data.get('query', '').strip() | |
| # CORREÇÃO 2: Reordenar os argumentos da função. Todos os posicionais primeiro, depois os de palavra-chave. | |
| results = search_procedure_with_log( | |
| query, | |
| DF_ORIGINAL, | |
| DF_NORMALIZED, | |
| FUZZY_CORPUS, | |
| (CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS), | |
| PORTUGUESE_WORD_SET, | |
| BM25_MODEL, | |
| DB_WORD_SET, | |
| doc_freq, # <-- Novo argumento posicional | |
| tuss_map, # <-- Novo argumento posicional | |
| limit_per_layer=10, | |
| semantic_model=SEMANTIC_MODEL, | |
| cross_encoder_model=CROSS_ENCODER_MODEL, | |
| user_best_matches_counts=USER_BEST_MATCHES_COUNTS, | |
| user_feedback_threshold=USER_FEEDBACK_THRESHOLD | |
| ) | |
| return jsonify(results) | |
| except Exception as e: | |
| print("--- [ERRO FATAL DURANTE A BUSCA] ---"); traceback.print_exc() | |
| return jsonify({"error": "Ocorreu um erro interno no motor de busca."}), 500 | |
| def submit_feedback_route(): | |
| global DATA_HAS_CHANGED | |
| try: | |
| data = request.get_json() | |
| query, tuss_code_submitted = data.get('query'), data.get('tuss_code') | |
| if not query or not tuss_code_submitted: | |
| return jsonify({"status": "error", "message": "Dados incompletos."}), 400 | |
| file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), USER_FEEDBACK_FILE) | |
| with open(file_path, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| query_normalized = normalize_text_for_feedback(query) | |
| tuss_descriptions, rol_names = [], [] | |
| if DF_ORIGINAL is not None: | |
| matching_rows = DF_ORIGINAL[DF_ORIGINAL['Codigo_TUSS'].astype(str) == tuss_code_submitted] | |
| if not matching_rows.empty: | |
| tuss_descriptions = matching_rows['Descricao_TUSS'].unique().tolist() | |
| rol_names = matching_rows['Procedimento_Rol'].unique().tolist() | |
| tuss_desc_assoc = " | ".join(filter(None, tuss_descriptions)) or 'Não encontrado' | |
| rol_names_assoc = " | ".join(filter(None, rol_names)) or 'Não encontrado' | |
| writer.writerow([datetime.datetime.now().isoformat(), query, query_normalized, tuss_code_submitted, '', tuss_desc_assoc, rol_names_assoc, 'confirm_result']) | |
| DATA_HAS_CHANGED = True | |
| print(f"--- [DADOS] '{USER_FEEDBACK_FILE}' foi modificado. Commit agendado para o desligamento. ---") | |
| load_user_feedback() | |
| return jsonify({"status": "success", "message": "Feedback recebido!"}), 200 | |
| except Exception as e: | |
| print("--- [ERRO NO SUBMIT_FEEDBACK] ---"); traceback.print_exc(); | |
| return jsonify({"status": "error", "message": "Erro interno."}), 500 | |
| def get_tuss_info(): | |
| tuss_code_prefix = request.args.get('tuss_prefix', '').strip() | |
| if not tuss_code_prefix: return jsonify([]) | |
| suggestions = [] | |
| if DF_ORIGINAL is not None and not DF_ORIGINAL.empty: | |
| filtered_df = DF_ORIGINAL[DF_ORIGINAL['Codigo_TUSS'].astype(str).str.startswith(tuss_code_prefix)] | |
| tuss_grouped = filtered_df.groupby('Codigo_TUSS').agg(tuss_descriptions=('Descricao_TUSS', lambda x: list(x.unique())), rol_names=('Procedimento_Rol', lambda x: list(x.unique()))).reset_index() | |
| for index, row in tuss_grouped.head(10).iterrows(): | |
| tuss_desc = " | ".join(filter(None, row['tuss_descriptions'])) or 'Sem descrição TUSS' | |
| rol_name = " | ".join(filter(None, row['rol_names'])) or 'Sem procedimento Rol' | |
| suggestions.append({'tuss_code': str(row['Codigo_TUSS']), 'tuss_description': tuss_desc, 'rol_name': rol_name}) | |
| return jsonify(suggestions) | |
| def get_ai_suggestion(): | |
| if not client_ia: | |
| return jsonify({"error": "O serviço de IA não está configurado no servidor."}), 503 | |
| try: | |
| data = request.get_json() | |
| query = data.get('query') | |
| results = data.get('results', []) | |
| if not query or not results: | |
| return jsonify({"error": "A consulta e os resultados são necessários."}), 400 | |
| RELEVANT_KEYS_FOR_AI = [ | |
| 'Codigo_TUSS', 'Descricao_TUSS', 'Procedimento_Rol', | |
| 'CAPITULO', 'GRUPO', 'SUBGRUPO', 'Semantico', | |
| 'Sinonimo_1', 'Sinonimo_2' | |
| ] | |
| simplified_results = [] | |
| for r in results: | |
| procedimento_rol_str = str(r.get('Procedimento_Rol', '')) | |
| unique_id = f"{r.get('Codigo_TUSS')}_{sha1(procedimento_rol_str.encode('utf-8')).hexdigest()[:8]}" | |
| pruned_result = { | |
| 'unique_id': unique_id, | |
| **{key: r.get(key) for key in RELEVANT_KEYS_FOR_AI | |
| if r.get(key) and pd.notna(r.get(key)) and str(r.get(key)).strip() not in ['---', '']} | |
| } | |
| if 'Codigo_TUSS' in pruned_result: | |
| simplified_results.append(pruned_result) | |
| formatted_results_str = json.dumps(simplified_results, indent=2, ensure_ascii=False) | |
| system_prompt = ( | |
| "Você é um especialista em terminologia de procedimentos médicos do Brasil (Tabela TUSS e Rol da ANS). " | |
| "Sua tarefa é analisar uma lista de procedimentos e escolher os 3 que melhor correspondem à consulta do usuário, em ordem de relevância." | |
| ) | |
| user_prompt = ( | |
| f"Consulta do usuário: \"{query}\"\n\n" | |
| f"### Resultados da Busca para Análise (JSON):\n{formatted_results_str}\n\n" | |
| "### Sua Tarefa:\n" | |
| "1. **Pense em voz alta:** Dentro de uma tag `<thought>`, explique seu processo de raciocínio passo a passo. Analise a consulta e compare os resultados, justificando por que um é mais relevante que o outro com base em seu `Procedimento_Rol` e outros campos.\n" | |
| "2. **Forneça a resposta final:** Após a tag `<thought>`, seu único resultado deve ser um bloco de código JSON. Este JSON **DEVE** conter uma chave `suggested_ids` com uma lista de **EXATAMENTE 3 strings** do campo `unique_id` que você selecionou, ordenadas da mais para a menos relevante.\n\n" | |
| "**EXEMPLO DE RESPOSTA OBRIGATÓRIA:**\n" | |
| "<thought>\n" | |
| "O Raciocínio da IA fica aqui...\n" | |
| "</thought>\n" | |
| "```json\n" | |
| "{\n" | |
| ' "suggested_ids": ["30602122_abc12345", "30602360_def67890", "30602033_ghi11223"]\n' | |
| "}\n" | |
| "```" | |
| ) | |
| completion = client_ia.chat.completions.create( | |
| model="baidu/ERNIE-4.5-21B-A3B-PT", | |
| messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], | |
| max_tokens=1500, temperature=0.1 | |
| ) | |
| raw_response = completion.choices[0].message.content.strip() | |
| thought_process = "Não foi possível extrair o raciocínio da resposta da IA." | |
| json_part = None | |
| if "<thought>" in raw_response and "</thought>" in raw_response: | |
| start = raw_response.find("<thought>") + len("<thought>") | |
| end = raw_response.find("</thought>") | |
| thought_process = raw_response[start:end].strip() | |
| if "```json" in raw_response: | |
| start = raw_response.find("```json") + len("```json") | |
| end = raw_response.rfind("```") | |
| json_str = raw_response[start:end].strip() | |
| try: | |
| json_part = json.loads(json_str) | |
| except json.JSONDecodeError: pass | |
| if not json_part or "suggested_ids" not in json_part or not isinstance(json_part.get("suggested_ids"), list) or len(json_part["suggested_ids"]) == 0: | |
| return jsonify({ | |
| "error": "A IA não retornou a lista de 'suggested_ids' no formato JSON esperado.", | |
| "details": raw_response | |
| }), 422 | |
| response_data = { | |
| "suggested_ids": json_part["suggested_ids"][:3], | |
| "thought_process": thought_process, | |
| "query": query, | |
| "results": simplified_results | |
| } | |
| return jsonify(response_data) | |
| except Exception as e: | |
| print("--- [ERRO FATAL NA SUGESTÃO DA IA] ---"); traceback.print_exc() | |
| return jsonify({"error": f"Ocorreu um erro interno na IA: {str(e)}"}), 500 | |
| # --- SEÇÃO DE PERSISTÊNCIA --- | |
| def commit_file_to_repo(local_file_name, commit_message): | |
| if not hf_api: | |
| print(f"--- [AVISO] API do HF não configurada. Pular o commit de '{local_file_name}'. ---") | |
| return | |
| local_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), local_file_name) | |
| if not os.path.exists(local_file_path) or os.path.getsize(local_file_path) == 0: | |
| print(f"--- [AVISO] Arquivo '{local_file_name}' não existe ou está vazio. Pular commit. ---") | |
| return | |
| path_in_repo = local_file_name | |
| try: | |
| print(f"--- [API HF] Tentando fazer o commit de '{local_file_name}' para o repositório... ---") | |
| hf_api.upload_file( | |
| path_or_fileobj=local_file_path, path_in_repo=path_in_repo, | |
| repo_id=REPO_ID, repo_type="space", commit_message=commit_message) | |
| print(f"--- [API HF] Sucesso no commit de '{local_file_name}'. ---") | |
| except Exception as e: | |
| print(f"--- [ERRO API HF] Falha no commit de '{local_file_name}': {e} ---") | |
| def save_data_on_exit(): | |
| print("--- [SHUTDOWN] O aplicativo está desligando. Verificando dados para salvar... ---") | |
| if DATA_HAS_CHANGED: | |
| print("--- [SHUTDOWN] Mudanças detectadas. Fazendo o commit dos arquivos para o repositório. ---") | |
| commit_file_to_repo(USER_FEEDBACK_FILE, "Commit automático: Atualiza feedbacks de usuários.") | |
| else: | |
| print("--- [SHUTDOWN] Nenhuma mudança nos dados detectada. Nenhum commit necessário. ---") | |
| atexit.register(save_data_on_exit) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=False) |