# enhanced_search_v2.py (Versão com Boost para Termos Raros e Agrupamento de Gênero) ################################################################################################### # # Este arquivo contém o motor de busca principal. Ele implementa uma estratégia de busca em # múltiplas camadas, começando pelas buscas de mais alta confiança (exatas) e progredindo # para métodos mais abrangentes (relevância de termos, semântica). # # MELHORIA CHAVE: # Para resolver casos como o da busca por "Natalizumabe", onde um termo extremamente raro e # importante está presente mas o score semântico é baixo, foi implementado um "boost de termo raro". # Se uma palavra da query for rara (aparecer em 7 ou menos documentos), qualquer resultado que # a contenha recebe um aumento em seu `text_score`, garantindo sua prioridade na lista de candidatos. # # A contagem de raridade também foi aprimorada para agrupar palavras masculinas e femininas # (ex: 'esquerdo' e 'esquerda' têm suas frequências somadas). # ################################################################################################### import pandas as pd import re from thefuzz import process, fuzz from unidecode import unidecode import time from sentence_transformers import util import torch import math from collections import defaultdict from rank_bm25 import BM25Okapi # --- FUNÇÕES AUXILIARES DE NORMALIZAÇÃO --- # def literal_normalize_text(text): if pd.isna(text): return "" normalized = unidecode(str(text).lower()) normalized = re.sub(r'[^\w\s]', ' ', normalized) return re.sub(r'\s+', ' ', normalized).strip() def normalize_text(text): if pd.isna(text): return "" return unidecode(str(text).lower().strip()) def get_longest_word(query_text): words = re.findall(r'\b\w{4,}\b', query_text) if not words: return "" return max(words, key=len) # --- FUNÇÕES DE FORMATAÇÃO E DESTAQUE --- # def format_result(row_data, row_index, match_type="", score=0): data = row_data.copy() is_rol = data.get('Correlacao_Rol', '').strip().lower() == 'sim' if not is_rol: data['Grupo'], data['Subgrupo'], data['Vigencia'], data['Resolucao_Normativa'] = '', '', '', '' data['PAC'], data['DUT'] = '---', '---' else: data['PAC'] = 'Sim' if data.get('PAC', '').strip().lower() == 'pac' else 'Não' original_dut_value = data.get('DUT', '').strip() if original_dut_value and original_dut_value.replace('.', '', 1).isdigit(): data['DUT'] = f'Sim, DUT nº {original_dut_value}' else: data['DUT'] = 'Não' standard_columns = [ 'Codigo_TUSS', 'Descricao_TUSS', 'Correlacao_Rol', 'Procedimento_Rol', 'Resolucao_Normativa', 'Vigencia', 'OD', 'AMB', 'HCO', 'HSO', 'PAC', 'DUT', 'SUBGRUPO', 'GRUPO', 'CAPITULO', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico' ] formatted_data = {col: data.get(col, '') for col in standard_columns} result = { "row_index": row_index, "score": round(score), "text_score": round(score), "semantic_score": 0, "match_type": match_type, "is_rol_procedure": is_rol } result.update(formatted_data) return result def _highlight_matches(results, query): if not query or not results: return results stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_words = {word for word in normalize_text(query).split() if len(word) > 2 and word not in stopwords} cols_to_highlight = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico'] for result in results: for col in cols_to_highlight: original_text = result.get(col, '') if original_text and query_words: highlighted_text = original_text for word in sorted(list(query_words), key=len, reverse=True): pattern = r'\b(' + re.escape(word) + r')\b' highlighted_text = re.sub(pattern, r'\1', highlighted_text, flags=re.IGNORECASE) result[f"{col}_highlighted"] = highlighted_text else: result[f"{col}_highlighted"] = original_text return results # --- FUNÇÕES DE CARREGAMENTO DE DADOS --- # def load_and_prepare_database(db_path): """ Carrega a planilha principal e pré-processa todos os dados necessários para a busca ser rápida. Esta função é executada apenas uma vez, quando o aplicativo inicia. """ try: print(f"Carregando e preparando a base de dados de: {db_path}...") df_original = pd.read_csv(db_path, dtype=str).fillna('') search_cols = ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico', 'SUBGRUPO', 'GRUPO', 'CAPITULO'] df_normalized = df_original.copy() df_normalized['Codigo_TUSS_literal'] = df_normalized['Codigo_TUSS'].apply(literal_normalize_text) df_normalized['Codigo_TUSS_norm'] = df_normalized['Codigo_TUSS'].apply(normalize_text) df_normalized['full_text_norm'] = "" for col in search_cols: if col in df_normalized.columns: df_normalized[f'{col}_literal'] = df_normalized[col].apply(literal_normalize_text) df_normalized[f'{col}_norm'] = df_normalized[col].apply(normalize_text) df_normalized['full_text_norm'] += ' ' + df_normalized[f'{col}_norm'] print("Criando dicionário da base, modelo BM25 e frequência de palavras...") db_word_set = set(); tokenized_corpus = [] for text in df_normalized['full_text_norm']: words = text.split(); tokenized_corpus.append(words); db_word_set.update(words) db_word_set.discard('') bm25_model = BM25Okapi(tokenized_corpus) # Calcula a frequência inicial dos documentos doc_freq = defaultdict(int) for doc_words in tokenized_corpus: for word in set(doc_words): doc_freq[word] += 1 ### MELHORIA: COMBINA FREQUÊNCIAS DE PALAVRAS MASCULINAS/FEMININAS ### print("Combinando frequências de palavras (masculino/feminino) para cálculo de raridade...") combined_doc_freq = {} processed_words = set() for word, freq in doc_freq.items(): if word in processed_words: continue base_word = None pair_word = None if word.endswith('o'): base_word = word[:-1] pair_word = base_word + 'a' elif word.endswith('a'): base_word = word[:-1] pair_word = base_word + 'o' if pair_word and pair_word in doc_freq: combined_freq = freq + doc_freq[pair_word] combined_doc_freq[word] = combined_freq combined_doc_freq[pair_word] = combined_freq processed_words.add(word) processed_words.add(pair_word) else: combined_doc_freq[word] = freq tuss_to_full_text_map = df_normalized.set_index('Codigo_TUSS')['full_text_norm'].to_dict() print("Criando corpus para busca fuzzy...") fuzzy_search_corpus = [] for index, row in df_normalized.iterrows(): for col in ['Descricao_TUSS', 'Procedimento_Rol', 'Sinonimo_1', 'Sinonimo_2', 'Sinonimo_3', 'Sinonimo_4', 'Semantico']: if col in df_original.columns and f'{col}_norm' in row and pd.notna(row[f'{col}_norm']): val = row[f'{col}_norm'] if val: fuzzy_search_corpus.append((val, index, f'{col}_norm')) print(f"Base de dados pronta com {len(df_original)} procedimentos.") # Retorna a frequência combinada return df_original, df_normalized, fuzzy_search_corpus, bm25_model, db_word_set, combined_doc_freq, tuss_to_full_text_map except Exception as e: print(f"Erro crítico ao carregar/preparar a base de dados: {e}"); raise def load_general_dictionary(path): try: with open(path, 'r', encoding='utf-8') as f: words = {normalize_text(line.strip()) for line in f if line.strip()} return words except (FileNotFoundError, Exception): return set() def load_correction_corpus(dict_path, column_name='Termo_Correto'): try: df_dict = pd.read_csv(dict_path, dtype=str).fillna('') if column_name not in df_dict.columns: return [], [] original_corpus = df_dict[column_name].dropna().astype(str).tolist() normalized_corpus = [normalize_text(term) for term in original_corpus] return original_corpus, normalized_corpus except (FileNotFoundError, Exception): return [], [] # --- FUNÇÕES DE RECLASSIFICAÇÃO SEMÂNTICA (IA) --- # def create_unified_document_text(result_dict): text_parts = { result_dict.get('Descricao_TUSS', ''), result_dict.get('Procedimento_Rol', ''), result_dict.get('Semantico', ''), result_dict.get('SUBGRUPO', ''), result_dict.get('GRUPO', ''), result_dict.get('CAPITULO', '') } for i in range(1, 5): text_parts.add(result_dict.get(f'Sinonimo_{i}', '')) return ". ".join(sorted([part for part in text_parts if part and str(part).strip()])) def rerank_with_cross_encoder(query, results_list, model): if not model or not results_list or not query: return results_list, "Cross-Encoder não fornecido ou lista de candidatos vazia." sentence_pairs = [[query, create_unified_document_text(result)] for result in results_list] if not sentence_pairs: return results_list, "Não foram encontrados pares para reordenar." try: raw_scores = model.predict(sentence_pairs, show_progress_bar=False) semantic_scores_normalized = torch.sigmoid(torch.tensor(raw_scores)).numpy() * 100 for i, result in enumerate(results_list): result['semantic_score'] = round(semantic_scores_normalized[i]) result['match_type'] = "Relevância Semântica (IA)" def hybrid_sort_key(result): sem_score = result.get('semantic_score', 0) txt_score = result.get('text_score', 0) is_rol = result.get('is_rol_procedure', False) if sem_score >= 85: return (1, sem_score, txt_score, is_rol) else: return (0, sem_score + txt_score, sem_score, is_rol) reranked_results = sorted(results_list, key=hybrid_sort_key, reverse=True) log_message = "Reordenação final por: 1º Semântica (>=85), 2º Híbrido (Semântica+Texto), 3º Cobertura do Rol." return reranked_results, log_message except Exception as e: log_message = f"Erro no Cross-Encoder: {e}"; print(log_message) key_function = lambda x: (x.get('text_score', 0), x.get('is_rol_procedure', False)) reranked_results = sorted(results_list, key=key_function, reverse=True) return reranked_results, log_message ### NOVA FUNÇÃO: BOOST PARA TERMOS RAROS ### def _boost_rare_term_matches(results, query_words, doc_freq, df_normalized, boost_factor=1.2, rarity_threshold_count=7): """ Aumenta o 'text_score' de resultados que contêm termos muito raros da query. Isso evita que a reordenação semântica rebaixe um resultado com uma palavra-chave crítica. """ if not results or not query_words: return results, None # Identifica palavras raras na query. Uma palavra é rara se aparece em 'rarity_threshold_count' ou menos documentos. # Adicionamos um filtro de tamanho para evitar boost em palavras pequenas como "do" ou "a" que podem ser raras por acaso. rare_words = {word for word in query_words if doc_freq.get(word, 0) <= rarity_threshold_count and len(word) > 4} if not rare_words: return results, None boosted_indices = [] for result in results: row_index = result['row_index'] full_text = df_normalized.loc[row_index, 'full_text_norm'] # Se alguma palavra rara da query estiver no texto do documento... if any(re.search(r'\b' + re.escape(word) + r'\b', full_text) for word in rare_words): original_score = result['text_score'] # Aplica o boost, com um teto de 99 para não interferir com scores de 100 (match exato). boosted_score = min(original_score * boost_factor, 99) result['text_score'] = round(boosted_score) result['score'] = result['text_score'] result['match_type'] += " + Rare-Term Boost" boosted_indices.append(row_index) log_message = f"Aplicado boost de termo raro para as palavras: {list(rare_words)}. Resultados afetados: {len(boosted_indices)}." return results, log_message # --- FUNÇÃO INTERNA DE BUSCA COM CAMADAS --- # def _run_search_layers(literal_query, normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, bm25_model, limit_per_layer): matched_indices = set() stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'} query_words = [word for word in normalized_query.split() if word not in stopwords and len(word) > 1] def sort_key(x): return (x.get('text_score', 0), x.get('is_rol_procedure', False)) # Adicionada a nova camada "phrase_matches" for layer in ["literal_matches", "exact_matches", "phrase_matches", "logical_matches", "almost_exact_matches", "term_matches", "keyword_matches"]: response["results_by_layer"][layer] = [] bm25_scores = None max_bm25_score = 1.0 if query_words and bm25_model: bm25_scores = bm25_model.get_scores(normalized_query.split()) positive_scores = [s for s in bm25_scores if s > 0] if positive_scores: max_bm25_score = max(positive_scores) def get_normalized_bm25_score(index): if bm25_scores is None or index >= len(bm25_scores): return 0 return (bm25_scores[index] / max_bm25_score) * 95 if max_bm25_score > 0 else 0 # CAMADAS 0 e 1: Buscas Exatas (Score 100) temp_results = [] if literal_query: for col in ['Codigo_TUSS_literal', 'Descricao_TUSS_literal', 'Procedimento_Rol_literal']: if col in df_normalized.columns: for index, _ in df_normalized[df_normalized[col] == literal_query].iterrows(): if index not in matched_indices: temp_results.append(format_result(df_original.loc[index], index, "Texto Exato", 100)); matched_indices.add(index) response["results_by_layer"]["literal_matches"] = sorted(temp_results, key=lambda x: x['score'], reverse=True)[:limit_per_layer] temp_results = [] if normalized_query: for index, _ in df_normalized[df_normalized['Codigo_TUSS_norm'] == normalized_query].iterrows(): if index not in matched_indices: temp_results.append(format_result(df_original.loc[index], index, "Código Exato", 100)); matched_indices.add(index) for col in ['Descricao_TUSS_norm', 'Procedimento_Rol_norm']: if col in df_normalized.columns: for index, _ in df_normalized[df_normalized[col] == normalized_query].iterrows(): if index not in matched_indices: temp_results.append(format_result(df_original.loc[index], index, "Exato (Normalizado)", 100)); matched_indices.add(index) response["results_by_layer"]["exact_matches"] = sorted(temp_results, key=lambda x: x['score'], reverse=True)[:limit_per_layer] # CAMADA 2: Busca por Frase Exata (Score Fixo 99) # Busca pela sequência exata de palavras da query. Altamente relevante. temp_results = [] if normalized_query and len(normalized_query.split()) > 1: # Só faz sentido para mais de uma palavra phrase_pattern = r'\b' + re.escape(normalized_query) + r'\b' mask = df_normalized['full_text_norm'].str.contains(phrase_pattern, na=False, regex=True) for index in df_normalized.index[mask & ~df_normalized.index.isin(matched_indices)]: temp_results.append(format_result(df_original.loc[index], index, "Frase Exata (Normalizada)", 99)) matched_indices.add(index) response["results_by_layer"]["phrase_matches"] = sorted(temp_results, key=sort_key, reverse=True)[:limit_per_layer] # CAMADA 3: Busca Lógica (E) - Pontuada pelo BM25 temp_results = [] if query_words: mask = pd.Series(True, index=df_normalized.index) for word in query_words: mask &= df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(word) + r'\b', na=False) for index in df_normalized.index[mask & ~df_normalized.index.isin(matched_indices)]: score = get_normalized_bm25_score(index) if score > 0: temp_results.append(format_result(df_original.loc[index], index, "Busca Lógica (BM25)", score)); matched_indices.add(index) response["results_by_layer"]["logical_matches"] = sorted(temp_results, key=sort_key, reverse=True)[:limit_per_layer] # CAMADA 4: Busca por Aproximação (Fuzzy) - Pontuada pelo BM25 temp_results, processed_indices = [], set() if fuzzy_search_corpus: for match_text, score in process.extractBests(normalized_query, [item[0] for item in fuzzy_search_corpus], scorer=fuzz.token_set_ratio, limit=limit_per_layer * 3, score_cutoff=90): if score == 100 and match_text == normalized_query: continue for _, original_index, _ in [item for item in fuzzy_search_corpus if item[0] == match_text]: if original_index not in matched_indices and original_index not in processed_indices: bm25_score = get_normalized_bm25_score(original_index) if bm25_score > 0: temp_results.append(format_result(df_original.loc[original_index], original_index, "Aproximação (BM25)", bm25_score)) matched_indices.add(original_index); processed_indices.add(original_index) response["results_by_layer"]["almost_exact_matches"] = sorted(temp_results, key=sort_key, reverse=True)[:limit_per_layer] # CAMADA 5: Busca por Relevância de Termos (BM25) temp_results = [] if bm25_scores is not None: for i, score in enumerate(bm25_scores): original_index = df_normalized.index[i] if score > 0 and original_index not in matched_indices: normalized_score = get_normalized_bm25_score(original_index) temp_results.append(format_result(df_original.loc[original_index], original_index, "Relevância de Termos (BM25)", normalized_score)) matched_indices.add(original_index) response["results_by_layer"]["term_matches"] = sorted(temp_results, key=sort_key, reverse=True)[:limit_per_layer * 4] # CAMADA 6: Fallback por Palavra-Chave - Pontuado pelo BM25 if sum(len(v) for k, v in response["results_by_layer"].items()) == 0 and normalized_query: if longest_word := get_longest_word(normalized_query): temp_results = [] mask = df_normalized['full_text_norm'].str.contains(r'\b' + re.escape(longest_word) + r'\b', na=False) for index in df_normalized.index[mask & ~df_normalized.index.isin(matched_indices)]: score = get_normalized_bm25_score(index) if score > 0: temp_results.append(format_result(df_original.loc[index], index, f"Palavra-Chave (BM25)", score)) response["results_by_layer"]["keyword_matches"] = sorted(temp_results, key=sort_key, reverse=True)[:limit_per_layer] return None # --- FUNÇÃO PRINCIPAL QUE ORQUESTRA A BUSCA --- # def search_procedure_with_log(query, df_original, df_normalized, fuzzy_search_corpus, correction_corpus, portuguese_word_set, bm25_model, db_word_set, doc_freq, tuss_to_full_text_map, limit_per_layer=10, semantic_model=None, cross_encoder_model=None, user_best_matches_counts=None, user_feedback_threshold=10): RERANK_LIMIT = 50; start_time = time.time(); original_query = str(query).strip() response = {"search_log": [], "results_by_layer": {}, "final_semantic_results": [], "was_corrected": False, "original_query": original_query, "corrected_query": ""} if not original_query: response["search_log"].append("Query vazia."); return response response["search_log"].append(f"Buscando por: '{original_query}'") # ETAPA 1: CORREÇÃO ORTOGRÁFICA stopwords = {'de', 'do', 'da', 'dos', 'das', 'a', 'o', 'e', 'em', 'um', 'uma', 'para', 'com'}; query_after_correction = original_query original_correction_corpus, normalized_correction_corpus = correction_corpus valid_words = portuguese_word_set.union(db_word_set) ### CORRIGIDO: Lógica de correção ortográfica restaurada ### if valid_words and original_correction_corpus: corrected_words = [] words_were_corrected = False for word in original_query.split(): norm_word = normalize_text(word) if len(norm_word) > 2 and norm_word not in valid_words and not norm_word.isdigit(): # Usa o 'process.extractOne' do thefuzz para encontrar a melhor correspondência best_match, score = process.extractOne(norm_word, normalized_correction_corpus, scorer=fuzz.ratio) if score > 85: # Limiar de confiança para correção # Encontra o termo original correspondente ao termo normalizado original_term_index = normalized_correction_corpus.index(best_match) corrected_word = original_correction_corpus[original_term_index] corrected_words.append(corrected_word) words_were_corrected = True else: corrected_words.append(word) # Mantém a palavra original se a correção não for boa else: corrected_words.append(word) # Mantém a palavra se ela for válida, curta ou um número if words_were_corrected: query_after_correction = " ".join(corrected_words) response["was_corrected"] = True response["corrected_query"] = query_after_correction response["search_log"].append(f"Query corrigida de '{original_query}' para '{query_after_correction}'") # ETAPA 2: PREPARAÇÃO DAS QUERIES cleaned_query = " ".join([word for word in query_after_correction.split() if normalize_text(word) not in stopwords]); normalized_query = normalize_text(cleaned_query) if not cleaned_query.strip(): response["search_log"].append("Query resultante vazia."); return response if cleaned_query != query_after_correction: response["search_log"].append(f"Query limpa (sem stop words): '{cleaned_query}'") # ETAPA 3: EXECUÇÃO DA BUSCA _run_search_layers(literal_normalize_text(query_after_correction), normalized_query, response, df_original, df_normalized, fuzzy_search_corpus, bm25_model, limit_per_layer) # ETAPA 4: AGREGAÇÃO E REORDENAÇÃO # Adicionado nome para a nova camada layer_names_pt = { "literal_matches": "Busca Literal Exata", "exact_matches": "Busca Normalizada Exata", "phrase_matches": "Busca por Frase Exata", "logical_matches": "Busca Lógica (BM25)", "almost_exact_matches": "Busca por Aproximação (BM25)", "term_matches": "Busca por Relevância de Termos (BM25)", "keyword_matches": "Busca por Palavra-Chave (BM25)" } response["search_log"].append("\n--- Detalhamento por Camada ---") for key, name in layer_names_pt.items(): response["search_log"].append(f"Camada '{name}': {len(response['results_by_layer'].get(key, []))} candidatos.") all_candidates = [] # Adicionada a nova camada "phrase_matches" ao modo de alta confiança high_confidence_results = response["results_by_layer"].get("literal_matches", []) + response["results_by_layer"].get("exact_matches", []) + response["results_by_layer"].get("phrase_matches", []) if high_confidence_results: response["search_log"].append("\n--- [MODO DE ALTA CONFIANÇA] ---"); all_candidates = high_confidence_results else: response["search_log"].append("\n--- [MODO DE BUSCA AMPLA] ---") # Garante que a nova camada não seja adicionada duas vezes na busca ampla for key in layer_names_pt.keys(): if key not in ["literal_matches", "exact_matches", "phrase_matches"]: all_candidates.extend(response["results_by_layer"].get(key, [])) unique_candidates = list({r['row_index']: r for r in all_candidates}.values()) response["search_log"].append(f"Total de candidatos encontrados: {len(all_candidates)}. Candidatos únicos (após desduplicação): {len(unique_candidates)}.") ### APLICA BOOST PARA TERMOS RAROS ANTES DA REORDENAÇÃO ### query_words_for_boost = [word for word in normalized_query.split() if word not in stopwords] if unique_candidates: unique_candidates, boost_log_msg = _boost_rare_term_matches( unique_candidates, query_words_for_boost, doc_freq, # Usa o doc_freq já combinado df_normalized ) if boost_log_msg: response["search_log"].append(boost_log_msg) if user_best_matches_counts: query_norm_fb = normalize_text(response.get("corrected_query") or original_query) for r in unique_candidates: votes = user_best_matches_counts.get(query_norm_fb, {}).get(r['Codigo_TUSS'], 0) if votes >= user_feedback_threshold: r.update({'is_user_best_match': True, 'feedback_votes': votes}) response["search_log"].append(f"\n--- Análise e Reordenação ---") final_list = [] if unique_candidates: query_for_semantic = response.get("corrected_query") or cleaned_query prioritized_by_feedback = sorted([r for r in unique_candidates if r.get('is_user_best_match')], key=lambda x: (x.get('feedback_votes', 0), x.get('semantic_score', 0), x.get('text_score', 0)), reverse=True) to_rerank = [r for r in unique_candidates if not r.get('is_user_best_match')] final_list.extend(prioritized_by_feedback) if prioritized_by_feedback: response["search_log"].append(f"{len(prioritized_by_feedback)} resultado(s) priorizado(s) por feedback.") if to_rerank: to_rerank_sorted = sorted(to_rerank, key=lambda x: x.get('text_score', 0), reverse=True) reranked_by_ia, log_msg = rerank_with_cross_encoder(query_for_semantic, to_rerank_sorted[:RERANK_LIMIT], cross_encoder_model) response["search_log"].append(log_msg) final_list.extend(reranked_by_ia) response["final_semantic_results"] = _highlight_matches(final_list[:15], query_for_semantic) end_time = time.time(); response["search_duration_seconds"] = round(end_time - start_time, 4) response["search_log"].append(f"Busca completa em {response['search_duration_seconds']} segundos.") print(f"\n\n==================== LOG DE DEPURAÇÃO (QUERY: '{original_query}') ====================") for log_item in response["search_log"]: print(log_item) return response