tuliodisanto's picture
Upload 2 files
f3ebbc7 verified
raw
history blame
16.7 kB
# app.py (Versão Híbrida Final com Bi-Encoder e Cross-Encoder)
import pandas as pd
from flask import Flask, render_template, request, jsonify
import os
import sys
import traceback
import subprocess
# Importa as duas classes de modelos necessárias
from sentence_transformers import SentenceTransformer, CrossEncoder
import csv
from collections import defaultdict
import datetime
import re
from huggingface_hub import InferenceClient, HfApi
from huggingface_hub.utils import HfHubHTTPError
import atexit
import json
from hashlib import sha1
# --- Variáveis e Constantes de Feedback ---
USER_FEEDBACK_FILE = 'user_feedback.csv'
USER_BEST_MATCHES_COUNTS = {}
USER_FEEDBACK_THRESHOLD = 10
FEEDBACK_CSV_COLUMNS = ['timestamp', 'query_original', 'query_normalized', 'tuss_code_submitted', 'tuss_code_raw_input', 'tuss_description_associated', 'rol_names_associated', 'feedback_type']
# --- Configuração do Cliente da IA ---
api_key = os.environ.get("USUARIO_KEY")
if not api_key:
print("--- [AVISO CRÍTICO] Secret 'USUARIO_KEY' não encontrado. As chamadas para a IA irão falhar. ---")
client_ia = None
else:
client_ia = InferenceClient(
provider="novita",
api_key=api_key,
)
print("--- [SUCESSO] Cliente de Inferência da IA configurado com a chave correta. ---")
# --- SEÇÃO DE PERSISTÊNCIA ---
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "tuliodisanto/Buscador_Rol_vs.2_IA"
if not HF_TOKEN:
print("--- [AVISO CRÍTICO] Secret 'HF_TOKEN' não encontrado. Os arquivos não serão salvos no repositório. ---")
hf_api = None
else:
hf_api = HfApi(token=HF_TOKEN)
print(f"--- [SUCESSO] Cliente da API do Hugging Face configurado para o repositório: {REPO_ID}. ---")
DATA_HAS_CHANGED = False
# --- Funções de Feedback ---
def normalize_text_for_feedback(text):
if pd.isna(text): return ""
try:
from enhanced_search_v2 import normalize_text as es_normalize_text
return es_normalize_text(str(text).strip())
except ImportError:
import unidecode
return unidecode.unidecode(str(text).lower().strip())
def load_user_feedback():
global USER_BEST_MATCHES_COUNTS
USER_BEST_MATCHES_COUNTS = {}
feedback_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), USER_FEEDBACK_FILE)
if not os.path.exists(feedback_file_path):
with open(feedback_file_path, 'w', newline='', encoding='utf-8') as f: csv.writer(f).writerow(FEEDBACK_CSV_COLUMNS)
return
try:
with open(feedback_file_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
try:
header = next(reader)
if [col.strip() for col in header] != FEEDBACK_CSV_COLUMNS:
print(f"--- [AVISO] Cabeçalho do {USER_FEEDBACK_FILE} incorreto.")
return
except StopIteration:
return
for row in reader:
if len(row) == len(FEEDBACK_CSV_COLUMNS):
row_dict = dict(zip(FEEDBACK_CSV_COLUMNS, row))
query_norm, tuss_code = row_dict.get('query_normalized', ''), row_dict.get('tuss_code_submitted', '')
if query_norm and tuss_code:
if query_norm not in USER_BEST_MATCHES_COUNTS: USER_BEST_MATCHES_COUNTS[query_norm] = {}
USER_BEST_MATCHES_COUNTS[query_norm][tuss_code] = USER_BEST_MATCHES_COUNTS[query_norm].get(tuss_code, 0) + 1
print(f"--- [SUCESSO] Feedback de usuário carregado/sincronizado. ---")
except Exception as e: print(f"--- [ERRO] Falha ao carregar feedback: {e} ---"); traceback.print_exc()
# --- Execução de Scripts e Importações ---
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from enhanced_search_v2 import load_and_prepare_database, load_correction_corpus, load_general_dictionary, search_procedure_with_log
print("--- [SUCESSO] Módulo 'enhanced_search_v2.py' importado. ---")
except Exception as e: print(f"--- [ERRO CRÍTICO] Não foi possível importar 'enhanced_search_v2.py': {e} ---"); traceback.print_exc(); sys.exit(1)
app = Flask(__name__)
# --- Carregamento dos Dados e Modelos ---
# CORREÇÃO 1: Preparar as variáveis para receber todos os 7 valores de load_and_prepare_database
DF_ORIGINAL, DF_NORMALIZED, FUZZY_CORPUS, BM25_MODEL, DB_WORD_SET, doc_freq, tuss_map = (None, None, None, None, set(), {}, {})
CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS = [], []
PORTUGUESE_WORD_SET = set()
SEMANTIC_MODEL = None # Bi-Encoder
CROSS_ENCODER_MODEL = None # Cross-Encoder
try:
# Carregamento dos dados
db_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'rol_procedures_database.csv')
# CORREÇÃO 1 (continuação): Desempacotar os 7 valores retornados pela função
DF_ORIGINAL, DF_NORMALIZED, FUZZY_CORPUS, BM25_MODEL, DB_WORD_SET, doc_freq, tuss_map = load_and_prepare_database(db_path)
dict_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Dic.csv')
CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS = load_correction_corpus(dict_path, column_name='Termo_Correto')
general_dict_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dicionario_ptbr.txt')
PORTUGUESE_WORD_SET = load_general_dictionary(general_dict_path)
load_user_feedback()
# Carregamento dos dois modelos semânticos
print("\n--- [SETUP] Carregando modelo Bi-Encoder (Etapa 1 de reordenação)... ---")
bi_encoder_model_name = 'sentence-transformers/all-MiniLM-L6-v2'
SEMANTIC_MODEL = SentenceTransformer(bi_encoder_model_name, device='cpu')
print(f"--- [SUCESSO] Modelo Bi-Encoder '{bi_encoder_model_name}' carregado. ---")
print("\n--- [SETUP] Carregando modelo Cross-Encoder (Etapa 2 de reordenação)... ---")
cross_encoder_model_name = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
CROSS_ENCODER_MODEL = CrossEncoder(cross_encoder_model_name, device='cpu')
print(f"--- [SUCESSO] Modelo Cross-Encoder '{cross_encoder_model_name}' carregado. ---")
except Exception as e:
print(f"--- [ERRO CRÍTICO] Falha fatal durante o setup: {e} ---"); traceback.print_exc(); sys.exit(1)
# --- Rotas da Aplicação ---
@app.route('/')
def index(): return render_template('index.html')
@app.route('/favicon.ico')
def favicon(): return '', 204
@app.route('/search', methods=['POST'])
def search():
try:
data = request.get_json()
query = data.get('query', '').strip()
# CORREÇÃO 2: Reordenar os argumentos da função. Todos os posicionais primeiro, depois os de palavra-chave.
results = search_procedure_with_log(
query,
DF_ORIGINAL,
DF_NORMALIZED,
FUZZY_CORPUS,
(CORRECTION_CORPUS, NORMALIZED_CORRECTION_CORPUS),
PORTUGUESE_WORD_SET,
BM25_MODEL,
DB_WORD_SET,
doc_freq, # <-- Novo argumento posicional
tuss_map, # <-- Novo argumento posicional
limit_per_layer=10,
semantic_model=SEMANTIC_MODEL,
cross_encoder_model=CROSS_ENCODER_MODEL,
user_best_matches_counts=USER_BEST_MATCHES_COUNTS,
user_feedback_threshold=USER_FEEDBACK_THRESHOLD
)
return jsonify(results)
except Exception as e:
print("--- [ERRO FATAL DURANTE A BUSCA] ---"); traceback.print_exc()
return jsonify({"error": "Ocorreu um erro interno no motor de busca."}), 500
@app.route('/submit_feedback', methods=['POST'])
def submit_feedback_route():
global DATA_HAS_CHANGED
try:
data = request.get_json()
query, tuss_code_submitted = data.get('query'), data.get('tuss_code')
if not query or not tuss_code_submitted:
return jsonify({"status": "error", "message": "Dados incompletos."}), 400
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), USER_FEEDBACK_FILE)
with open(file_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
query_normalized = normalize_text_for_feedback(query)
tuss_descriptions, rol_names = [], []
if DF_ORIGINAL is not None:
matching_rows = DF_ORIGINAL[DF_ORIGINAL['Codigo_TUSS'].astype(str) == tuss_code_submitted]
if not matching_rows.empty:
tuss_descriptions = matching_rows['Descricao_TUSS'].unique().tolist()
rol_names = matching_rows['Procedimento_Rol'].unique().tolist()
tuss_desc_assoc = " | ".join(filter(None, tuss_descriptions)) or 'Não encontrado'
rol_names_assoc = " | ".join(filter(None, rol_names)) or 'Não encontrado'
writer.writerow([datetime.datetime.now().isoformat(), query, query_normalized, tuss_code_submitted, '', tuss_desc_assoc, rol_names_assoc, 'confirm_result'])
DATA_HAS_CHANGED = True
print(f"--- [DADOS] '{USER_FEEDBACK_FILE}' foi modificado. Commit agendado para o desligamento. ---")
load_user_feedback()
return jsonify({"status": "success", "message": "Feedback recebido!"}), 200
except Exception as e:
print("--- [ERRO NO SUBMIT_FEEDBACK] ---"); traceback.print_exc();
return jsonify({"status": "error", "message": "Erro interno."}), 500
@app.route('/get_tuss_info', methods=['GET'])
def get_tuss_info():
tuss_code_prefix = request.args.get('tuss_prefix', '').strip()
if not tuss_code_prefix: return jsonify([])
suggestions = []
if DF_ORIGINAL is not None and not DF_ORIGINAL.empty:
filtered_df = DF_ORIGINAL[DF_ORIGINAL['Codigo_TUSS'].astype(str).str.startswith(tuss_code_prefix)]
tuss_grouped = filtered_df.groupby('Codigo_TUSS').agg(tuss_descriptions=('Descricao_TUSS', lambda x: list(x.unique())), rol_names=('Procedimento_Rol', lambda x: list(x.unique()))).reset_index()
for index, row in tuss_grouped.head(10).iterrows():
tuss_desc = " | ".join(filter(None, row['tuss_descriptions'])) or 'Sem descrição TUSS'
rol_name = " | ".join(filter(None, row['rol_names'])) or 'Sem procedimento Rol'
suggestions.append({'tuss_code': str(row['Codigo_TUSS']), 'tuss_description': tuss_desc, 'rol_name': rol_name})
return jsonify(suggestions)
@app.route('/get_ai_suggestion', methods=['POST'])
def get_ai_suggestion():
if not client_ia:
return jsonify({"error": "O serviço de IA não está configurado no servidor."}), 503
try:
data = request.get_json()
query = data.get('query')
results = data.get('results', [])
if not query or not results:
return jsonify({"error": "A consulta e os resultados são necessários."}), 400
RELEVANT_KEYS_FOR_AI = [
'Codigo_TUSS', 'Descricao_TUSS', 'Procedimento_Rol',
'CAPITULO', 'GRUPO', 'SUBGRUPO', 'Semantico',
'Sinonimo_1', 'Sinonimo_2'
]
simplified_results = []
for r in results:
procedimento_rol_str = str(r.get('Procedimento_Rol', ''))
unique_id = f"{r.get('Codigo_TUSS')}_{sha1(procedimento_rol_str.encode('utf-8')).hexdigest()[:8]}"
pruned_result = {
'unique_id': unique_id,
**{key: r.get(key) for key in RELEVANT_KEYS_FOR_AI
if r.get(key) and pd.notna(r.get(key)) and str(r.get(key)).strip() not in ['---', '']}
}
if 'Codigo_TUSS' in pruned_result:
simplified_results.append(pruned_result)
formatted_results_str = json.dumps(simplified_results, indent=2, ensure_ascii=False)
system_prompt = (
"Você é um especialista em terminologia de procedimentos médicos do Brasil (Tabela TUSS e Rol da ANS). "
"Sua tarefa é analisar uma lista de procedimentos e escolher os 3 que melhor correspondem à consulta do usuário, em ordem de relevância."
)
user_prompt = (
f"Consulta do usuário: \"{query}\"\n\n"
f"### Resultados da Busca para Análise (JSON):\n{formatted_results_str}\n\n"
"### Sua Tarefa:\n"
"1. **Pense em voz alta:** Dentro de uma tag `<thought>`, explique seu processo de raciocínio passo a passo. Analise a consulta e compare os resultados, justificando por que um é mais relevante que o outro com base em seu `Procedimento_Rol` e outros campos.\n"
"2. **Forneça a resposta final:** Após a tag `<thought>`, seu único resultado deve ser um bloco de código JSON. Este JSON **DEVE** conter uma chave `suggested_ids` com uma lista de **EXATAMENTE 3 strings** do campo `unique_id` que você selecionou, ordenadas da mais para a menos relevante.\n\n"
"**EXEMPLO DE RESPOSTA OBRIGATÓRIA:**\n"
"<thought>\n"
"O Raciocínio da IA fica aqui...\n"
"</thought>\n"
"```json\n"
"{\n"
' "suggested_ids": ["30602122_abc12345", "30602360_def67890", "30602033_ghi11223"]\n'
"}\n"
"```"
)
completion = client_ia.chat.completions.create(
model="baidu/ERNIE-4.5-21B-A3B-PT",
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
max_tokens=1500, temperature=0.1
)
raw_response = completion.choices[0].message.content.strip()
thought_process = "Não foi possível extrair o raciocínio da resposta da IA."
json_part = None
if "<thought>" in raw_response and "</thought>" in raw_response:
start = raw_response.find("<thought>") + len("<thought>")
end = raw_response.find("</thought>")
thought_process = raw_response[start:end].strip()
if "```json" in raw_response:
start = raw_response.find("```json") + len("```json")
end = raw_response.rfind("```")
json_str = raw_response[start:end].strip()
try:
json_part = json.loads(json_str)
except json.JSONDecodeError: pass
if not json_part or "suggested_ids" not in json_part or not isinstance(json_part.get("suggested_ids"), list) or len(json_part["suggested_ids"]) == 0:
return jsonify({
"error": "A IA não retornou a lista de 'suggested_ids' no formato JSON esperado.",
"details": raw_response
}), 422
response_data = {
"suggested_ids": json_part["suggested_ids"][:3],
"thought_process": thought_process,
"query": query,
"results": simplified_results
}
return jsonify(response_data)
except Exception as e:
print("--- [ERRO FATAL NA SUGESTÃO DA IA] ---"); traceback.print_exc()
return jsonify({"error": f"Ocorreu um erro interno na IA: {str(e)}"}), 500
# --- SEÇÃO DE PERSISTÊNCIA ---
def commit_file_to_repo(local_file_name, commit_message):
if not hf_api:
print(f"--- [AVISO] API do HF não configurada. Pular o commit de '{local_file_name}'. ---")
return
local_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), local_file_name)
if not os.path.exists(local_file_path) or os.path.getsize(local_file_path) == 0:
print(f"--- [AVISO] Arquivo '{local_file_name}' não existe ou está vazio. Pular commit. ---")
return
path_in_repo = local_file_name
try:
print(f"--- [API HF] Tentando fazer o commit de '{local_file_name}' para o repositório... ---")
hf_api.upload_file(
path_or_fileobj=local_file_path, path_in_repo=path_in_repo,
repo_id=REPO_ID, repo_type="space", commit_message=commit_message)
print(f"--- [API HF] Sucesso no commit de '{local_file_name}'. ---")
except Exception as e:
print(f"--- [ERRO API HF] Falha no commit de '{local_file_name}': {e} ---")
def save_data_on_exit():
print("--- [SHUTDOWN] O aplicativo está desligando. Verificando dados para salvar... ---")
if DATA_HAS_CHANGED:
print("--- [SHUTDOWN] Mudanças detectadas. Fazendo o commit dos arquivos para o repositório. ---")
commit_file_to_repo(USER_FEEDBACK_FILE, "Commit automático: Atualiza feedbacks de usuários.")
else:
print("--- [SHUTDOWN] Nenhuma mudança nos dados detectada. Nenhum commit necessário. ---")
atexit.register(save_data_on_exit)
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
app.run(host='0.0.0.0', port=port, debug=False)