Spaces:
Running
Running
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| import faiss | |
| import gradio as gr | |
| import json | |
| class FaissTextRetrieval: | |
| def __init__(self, model_name): | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.model = AutoModel.from_pretrained(model_name).eval() | |
| self.device = "cpu" | |
| self.all_index = faiss.read_index("data/all.index") | |
| with open("data/all.json", "r") as f: | |
| self.all_id2label = {int(k):v for k, v in json.load(f).items()} | |
| self.general_index = faiss.read_index("data/general.index") | |
| with open("data/general.json", "r") as f: | |
| self.general_id2label = {int(k):v for k, v in json.load(f).items()} | |
| self.character_index = faiss.read_index("data/character.index") | |
| with open("data/character.json", "r") as f: | |
| self.character_id2label = {int(k):v for k, v in json.load(f).items()} | |
| def to(self, device, dtype=torch.float32): | |
| self.device = device | |
| self.dtype = dtype if "cuda" in device else torch.float32 | |
| self.model.to(device, dtype=dtype) | |
| def average_pool(self, last_hidden_states, attention_mask): | |
| last_hidden = last_hidden_states.masked_fill(~attention_mask[..., None].bool(), 0.0) | |
| return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None] | |
| def get_embeddings(self, input_texts: list): | |
| batch_dict = self.tokenizer(input_texts, max_length=512, padding=True, truncation=True, return_tensors='pt') | |
| input_ids = batch_dict["input_ids"].to(self.device) | |
| attention_mask = batch_dict["attention_mask"].to(self.device) | |
| outputs = self.model(input_ids=input_ids, attention_mask=attention_mask) | |
| embeddings = self.average_pool(outputs.last_hidden_state, attention_mask) | |
| embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| return embeddings | |
| def search(self, query, top_k: int = 5, search_type = "all") -> list: | |
| query = "query:" + query | |
| query_embeddings = self.get_embeddings([query]).float().cpu().numpy() | |
| if search_type == "all": | |
| index = self.all_index | |
| id2label = self.all_id2label | |
| elif search_type == "general": | |
| index = self.general_index | |
| id2label = self.general_id2label | |
| elif search_type == "character": | |
| index = self.character_index | |
| id2label = self.character_id2label | |
| distances, indices = index.search(query_embeddings, top_k) | |
| results = {id2label[idx]:distances[0][j] for j, idx in enumerate(indices[0])} | |
| return results | |
| def reset(self): | |
| self.passage_texts = [] | |
| self.index = None | |
| def main(): | |
| rag = FaissTextRetrieval("intfloat/multilingual-e5-large") | |
| def search(query, search_type): | |
| return rag.search(query, top_k=50, search_type=search_type) | |
| description = """Model:[intfloat/multilingual-e5-large](https://huggingface.co/intfloat/multilingual-e5-large) | |
| Tag:[SmilingWolf/wd-eva02-large-tagger-v3](https://huggingface.co/SmilingWolf/wd-eva02-large-tagger-v3) | |
| """ | |
| gr.Interface(search, inputs=("textarea", gr.Radio(["all", "general", "character"], value="all")), outputs="label", title="Tag Search", description=description).launch() | |
| if __name__ == "__main__": | |
| main() |