Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| import pdfplumber | |
| import docx | |
| import nltk | |
| import gradio as gr | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.embeddings import CohereEmbeddings | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import FAISS, Chroma | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter, TokenTextSplitter | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| import numpy as np | |
| import re | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import word_tokenize | |
| from nltk.stem import SnowballStemmer | |
| import jellyfish | |
| from gensim.models import Word2Vec | |
| from gensim.models.fasttext import FastText | |
| from collections import Counter | |
| from tokenizers import Tokenizer | |
| from tokenizers.models import WordLevel | |
| from tokenizers.trainers import WordLevelTrainer | |
| from tokenizers.pre_tokenizers import Whitespace | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.manifold import TSNE | |
| from sklearn.metrics import silhouette_score | |
| from scipy.stats import spearmanr | |
| from functools import lru_cache | |
| # NLTK Resource Download | |
| def download_nltk_resources(): | |
| resources = ['punkt', 'stopwords', 'snowball_data'] | |
| for resource in resources: | |
| try: | |
| nltk.download(resource, quiet=True) | |
| except Exception as e: | |
| print(f"Failed to download {resource}: {str(e)}") | |
| download_nltk_resources() | |
| FILES_DIR = './files' | |
| # Model Management | |
| class ModelManager: | |
| def __init__(self): | |
| self.models = { | |
| 'HuggingFace': { | |
| 'e5-base-de': "danielheinz/e5-base-sts-en-de", | |
| 'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2", | |
| 'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2", | |
| 'gte-large': "gte-large", | |
| 'gbert-base': "gbert-base" | |
| }, | |
| 'OpenAI': { | |
| 'text-embedding-ada-002': "text-embedding-ada-002" | |
| }, | |
| 'Cohere': { | |
| 'embed-multilingual-v2.0': "embed-multilingual-v2.0" | |
| } | |
| } | |
| def add_model(self, provider, name, model_path): | |
| if provider not in self.models: | |
| self.models[provider] = {} | |
| self.models[provider][name] = model_path | |
| def remove_model(self, provider, name): | |
| if provider in self.models and name in self.models[provider]: | |
| del self.models[provider][name] | |
| def get_model(self, provider, name): | |
| return self.models.get(provider, {}).get(name) | |
| def list_models(self): | |
| return {provider: list(models.keys()) for provider, models in self.models.items()} | |
| model_manager = ModelManager() | |
| # File Handling | |
| class FileHandler: | |
| def extract_text(file_path): | |
| ext = os.path.splitext(file_path)[-1].lower() | |
| if ext == '.pdf': | |
| return FileHandler._extract_from_pdf(file_path) | |
| elif ext == '.docx': | |
| return FileHandler._extract_from_docx(file_path) | |
| elif ext == '.txt': | |
| return FileHandler._extract_from_txt(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| def _extract_from_pdf(file_path): | |
| with pdfplumber.open(file_path) as pdf: | |
| return ' '.join([page.extract_text() for page in pdf.pages]) | |
| def _extract_from_docx(file_path): | |
| doc = docx.Document(file_path) | |
| return ' '.join([para.text for para in doc.paragraphs]) | |
| def _extract_from_txt(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| # Text Processing | |
| def simple_tokenize(text): | |
| return text.split() | |
| def preprocess_text(text, lang='german'): | |
| text = text.lower() | |
| text = re.sub(r'[^a-zA-Z\s]', '', text) | |
| try: | |
| tokens = word_tokenize(text, language=lang) | |
| except LookupError: | |
| print(f"Warning: NLTK punkt tokenizer for {lang} not found. Using simple tokenization.") | |
| tokens = simple_tokenize(text) | |
| try: | |
| stop_words = set(stopwords.words(lang)) | |
| except LookupError: | |
| print(f"Warning: Stopwords for {lang} not found. Skipping stopword removal.") | |
| stop_words = set() | |
| tokens = [token for token in tokens if token not in stop_words] | |
| try: | |
| stemmer = SnowballStemmer(lang) | |
| tokens = [stemmer.stem(token) for token in tokens] | |
| except ValueError: | |
| print(f"Warning: SnowballStemmer for {lang} not available. Skipping stemming.") | |
| return ' '.join(tokens) | |
| def phonetic_match(text, query, method='levenshtein_distance'): | |
| if method == 'levenshtein_distance': | |
| text_phonetic = jellyfish.soundex(text) | |
| #query_phonetic = jellyfish.cologne_phonetic(query) | |
| query_phonetic = jellyfish.soundex(query) | |
| return jellyfish.levenshtein_distance(text_phonetic, query_phonetic) | |
| return 0 | |
| # Custom Tokenizer | |
| def create_custom_tokenizer(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| tokenizer = Tokenizer(WordLevel(unk_token="[UNK]")) | |
| tokenizer.pre_tokenizer = Whitespace() | |
| trainer = WordLevelTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]) | |
| tokenizer.train_from_iterator([text], trainer) | |
| return tokenizer | |
| def custom_tokenize(text, tokenizer): | |
| return tokenizer.encode(text).tokens | |
| # Embedding and Vector Store | |
| def get_embedding_model(model_type, model_name): | |
| model_path = model_manager.get_model(model_type, model_name) | |
| if model_type == 'HuggingFace': | |
| return HuggingFaceEmbeddings(model_name=model_path) | |
| elif model_type == 'OpenAI': | |
| return OpenAIEmbeddings(model=model_path) | |
| elif model_type == 'Cohere': | |
| return CohereEmbeddings(model=model_path) | |
| else: | |
| raise ValueError(f"Unsupported model type: {model_type}") | |
| def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None): | |
| if split_strategy == 'token': | |
| return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size) | |
| elif split_strategy == 'recursive': | |
| return RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=overlap_size, | |
| separators=custom_separators or ["\n\n", "\n", " ", ""] | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported split strategy: {split_strategy}") | |
| def get_vector_store(vector_store_type, chunks, embedding_model): | |
| # Convert chunks to a tuple to make it hashable | |
| chunks_tuple = tuple(chunks) | |
| # Use a helper function for the actual vector store creation | |
| return _create_vector_store(vector_store_type, chunks_tuple, embedding_model) | |
| def _create_vector_store(vector_store_type, chunks_tuple, embedding_model): | |
| # Convert the tuple back to a list for use with the vector store | |
| chunks = list(chunks_tuple) | |
| if vector_store_type == 'FAISS': | |
| return FAISS.from_texts(chunks, embedding_model) | |
| elif vector_store_type == 'Chroma': | |
| return Chroma.from_texts(chunks, embedding_model) | |
| else: | |
| raise ValueError(f"Unsupported vector store type: {vector_store_type}") | |
| def get_retriever(vector_store, search_type, search_kwargs): | |
| if search_type == 'similarity': | |
| return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) | |
| elif search_type == 'mmr': | |
| return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs) | |
| elif search_type == 'custom': | |
| # Implement custom retriever logic here | |
| pass | |
| else: | |
| raise ValueError(f"Unsupported search type: {search_type}") | |
| # Main Processing Functions | |
| def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators, lang='german', custom_tokenizer_file=None): | |
| if file_path: | |
| text = FileHandler.extract_text(file_path) | |
| else: | |
| text = "" | |
| for file in os.listdir(FILES_DIR): | |
| file_path = os.path.join(FILES_DIR, file) | |
| text += FileHandler.extract_text(file_path) | |
| if custom_tokenizer_file: | |
| tokenizer = create_custom_tokenizer(custom_tokenizer_file) | |
| text = ' '.join(custom_tokenize(text, tokenizer)) | |
| else: | |
| text = preprocess_text(text, lang) | |
| text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators) | |
| chunks = text_splitter.split_text(text) | |
| embedding_model = get_embedding_model(model_type, model_name) | |
| return chunks, embedding_model, len(text.split()) | |
| def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k, lang='german', phonetic_weight=0.3): | |
| preprocessed_query = preprocess_text(query, lang) | |
| vector_store = get_vector_store(vector_store_type, chunks, embedding_model) | |
| retriever = get_retriever(vector_store, search_type, {"k": top_k}) | |
| start_time = time.time() | |
| results = retriever.invoke(preprocessed_query) | |
| def score_result(doc): | |
| similarity_score = vector_store.similarity_search_with_score(doc.page_content, k=1)[0][1] | |
| phonetic_score = phonetic_match(doc.page_content, query) | |
| return (1 - phonetic_weight) * similarity_score + phonetic_weight * phonetic_score | |
| results = sorted(results, key=score_result, reverse=True) | |
| end_time = time.time() | |
| # Extract embeddings for each result and store them in the DataFrame | |
| embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] | |
| # Create a DataFrame with the results and embeddings | |
| results_df = pd.DataFrame({ | |
| 'content': [doc.page_content for doc in results], | |
| 'embedding': embeddings | |
| }) | |
| return results_df, end_time - start_time, vector_store | |
| # Evaluation Metrics | |
| def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k): | |
| stats = { | |
| "num_results": len(results), | |
| "avg_content_length": sum(len(doc.page_content) for doc in results) / len(results) if results else 0, | |
| # "avg_content_length": np.mean([len(doc.page_content) for doc in results]) if not results.empty else 0, | |
| "search_time": search_time, | |
| "vector_store_size": vector_store._index.ntotal if hasattr(vector_store, '_index') else "N/A", | |
| "num_documents": len(vector_store.docstore._dict), | |
| "num_tokens": num_tokens, | |
| "embedding_vocab_size": embedding_model.client.get_vocab_size() if hasattr(embedding_model, 'client') and hasattr(embedding_model.client, 'get_vocab_size') else "N/A", | |
| "embedding_dimension": len(embedding_model.embed_query(query)), | |
| "top_k": top_k, | |
| } | |
| if len(results) > 1: | |
| embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] | |
| pairwise_similarities = np.inner(embeddings, embeddings) | |
| stats["result_diversity"] = 1 - np.mean(pairwise_similarities[np.triu_indices(len(embeddings), k=1)]) | |
| # Silhouette Score | |
| if len(embeddings) > 2: | |
| print('-----') | |
| stats["silhouette_score"] = "N/A" | |
| #stats["silhouette_score"] = silhouette_score(embeddings, range(len(embeddings))) | |
| else: | |
| stats["silhouette_score"] = "N/A" | |
| else: | |
| stats["result_diversity"] = "N/A" | |
| stats["silhouette_score"] = "N/A" | |
| query_embedding = embedding_model.embed_query(query) | |
| result_embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] | |
| similarities = [np.inner(query_embedding, emb) for emb in result_embeddings] | |
| #similarities = [np.inner(query_embedding, emb)[0] for emb in result_embeddings] | |
| rank_correlation, _ = spearmanr(similarities, range(len(similarities))) | |
| stats["rank_correlation"] = rank_correlation | |
| return stats | |
| # Visualization | |
| def visualize_results(results_df, stats_df): | |
| fig, axs = plt.subplots(2, 2, figsize=(20, 20)) | |
| sns.barplot(x='model', y='search_time', data=stats_df, ax=axs[0, 0]) | |
| axs[0, 0].set_title('Search Time by Model') | |
| axs[0, 0].set_xticklabels(axs[0, 0].get_xticklabels(), rotation=45, ha='right') | |
| sns.scatterplot(x='result_diversity', y='rank_correlation', hue='model', data=stats_df, ax=axs[0, 1]) | |
| axs[0, 1].set_title('Result Diversity vs. Rank Correlation') | |
| sns.boxplot(x='model', y='avg_content_length', data=stats_df, ax=axs[1, 0]) | |
| axs[1, 0].set_title('Distribution of Result Content Lengths') | |
| axs[1, 0].set_xticklabels(axs[1, 0].get_xticklabels(), rotation=45, ha='right') | |
| embeddings = np.array([embedding for embedding in results_df['embedding'] if isinstance(embedding, np.ndarray)]) | |
| if len(embeddings) > 1: | |
| tsne = TSNE(n_components=2, random_state=42) | |
| embeddings_2d = tsne.fit_transform(embeddings) | |
| sns.scatterplot(x=embeddings_2d[:, 0], y=embeddings_2d[:, 1], hue=results_df['model'][:len(embeddings)], ax=axs[1, 1]) | |
| axs[1, 1].set_title('t-SNE Visualization of Result Embeddings') | |
| else: | |
| axs[1, 1].text(0.5, 0.5, "Not enough data for t-SNE visualization", ha='center', va='center') | |
| plt.tight_layout() | |
| return fig | |
| # Main Comparison Function | |
| def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k, lang='german', use_custom_embedding=False, optimize_vocab=False, phonetic_weight=0.3, custom_tokenizer_file=None): | |
| all_results = [] | |
| all_stats = [] | |
| settings = { | |
| "split_strategy": split_strategy, | |
| "chunk_size": chunk_size, | |
| "overlap_size": overlap_size, | |
| "custom_separators": custom_separators, | |
| "vector_store_type": vector_store_type, | |
| "search_type": search_type, | |
| "top_k": top_k, | |
| "lang": lang, | |
| "use_custom_embedding": use_custom_embedding, | |
| "optimize_vocab": optimize_vocab, | |
| "phonetic_weight": phonetic_weight | |
| } | |
| for model_type, model_name in zip(model_types, model_names): | |
| chunks, embedding_model, num_tokens = process_files( | |
| file.name if file else None, | |
| model_type, | |
| model_name, | |
| split_strategy, | |
| chunk_size, | |
| overlap_size, | |
| custom_separators.split(',') if custom_separators else None, | |
| lang, | |
| custom_tokenizer_file | |
| ) | |
| if use_custom_embedding: | |
| custom_model = create_custom_embedding(chunks) | |
| embedding_model = CustomEmbeddings(custom_model) | |
| if optimize_vocab: | |
| tokenizer, optimized_chunks = optimize_vocabulary(chunks) | |
| chunks = optimized_chunks | |
| results, search_time, vector_store = search_embeddings( | |
| chunks, | |
| embedding_model, | |
| vector_store_type, | |
| search_type, | |
| query, | |
| top_k, | |
| lang, | |
| phonetic_weight | |
| ) | |
| stats = calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k) | |
| stats["model"] = f"{model_type} - {model_name}" | |
| stats.update(settings) | |
| formatted_results = format_results(results, stats) | |
| all_results.extend(formatted_results) | |
| all_stats.append(stats) | |
| results_df = pd.DataFrame(all_results) | |
| stats_df = pd.DataFrame(all_stats) | |
| # Generate visualizations | |
| fig = visualize_results(results_df, stats_df) | |
| return results_df, stats_df, fig | |
| def format_results(results, stats): | |
| formatted_results = [] | |
| for doc in results: | |
| result = { | |
| "Model": stats["model"], | |
| "Content": doc.page_content, | |
| "Embedding": doc.embedding if hasattr(doc, 'embedding') else None, | |
| **doc.metadata, | |
| **{k: v for k, v in stats.items() if k not in ["model"]} | |
| } | |
| formatted_results.append(result) | |
| return formatted_results | |
| # Gradio Interface | |
| def launch_interface(share=True): | |
| iface = gr.Interface( | |
| fn=compare_embeddings, | |
| inputs=[ | |
| gr.File(label="Upload File (Optional)"), | |
| gr.Textbox(label="Search Query"), | |
| gr.CheckboxGroup(choices=list(model_manager.list_models().keys()) + ["Custom"], label="Embedding Model Types"), | |
| gr.CheckboxGroup(choices=[model for models in model_manager.list_models().values() for model in models] + ["custom_model"], label="Embedding Models"), | |
| gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"), | |
| gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"), | |
| gr.Slider(0, 100, step=10, value=50, label="Overlap Size"), | |
| gr.Textbox(label="Custom Split Separators (comma-separated, optional)"), | |
| gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"), | |
| gr.Radio(choices=["similarity", "mmr", "custom"], label="Search Type", value="similarity"), | |
| gr.Slider(1, 10, step=1, value=5, label="Top K"), | |
| gr.Dropdown(choices=["german", "english", "french"], label="Language", value="german"), | |
| gr.Checkbox(label="Use Custom Embedding", value=False), | |
| gr.Checkbox(label="Optimize Vocabulary", value=False), | |
| gr.Slider(0, 1, step=0.1, value=0.3, label="Phonetic Matching Weight"), | |
| gr.File(label="Custom Tokenizer File (Optional)") | |
| ], | |
| outputs=[ | |
| gr.Dataframe(label="Results", interactive=False), | |
| gr.Dataframe(label="Statistics", interactive=False), | |
| gr.Plot(label="Visualizations") | |
| ], | |
| title="Advanced Embedding Comparison Tool", | |
| description="Compare different embedding models and retrieval strategies with advanced preprocessing and phonetic matching" | |
| ) | |
| tutorial_md = """ | |
| # Advanced Embedding Comparison Tool Tutorial | |
| This tool allows you to compare different embedding models and retrieval strategies for document search and similarity matching. | |
| ## How to use: | |
| 1. Upload a file (optional) or use the default files in the system. | |
| 2. Enter a search query. | |
| 3. Select one or more embedding model types and specific models. | |
| 4. Choose a text splitting strategy and set chunk size and overlap. | |
| 5. Select a vector store type and search type. | |
| 6. Set the number of top results to retrieve. | |
| 7. Choose the language of your documents. | |
| 8. Optionally, use custom embeddings, optimize vocabulary, or adjust phonetic matching weight. | |
| 9. If you have a custom tokenizer, upload the file. | |
| The tool will process your query and display results, statistics, and visualizations to help you compare the performance of different models and strategies. | |
| """ | |
| iface = gr.TabbedInterface( | |
| [iface, gr.Markdown(tutorial_md)], | |
| ["Embedding Comparison", "Tutorial"] | |
| ) | |
| iface.launch(share=share) | |
| if __name__ == "__main__": | |
| launch_interface() |