Spaces:
Runtime error
Runtime error
| import re | |
| import time | |
| import asyncio | |
| from io import BytesIO | |
| from typing import List, Optional | |
| import httpx | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import torch | |
| import PIL | |
| import imagehash | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, CLIPModel, CLIPProcessor | |
| from PIL import Image | |
| class Translator: | |
| def __init__(self, model_id: str, device: Optional[str] = None): | |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model_id = model_id | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| model_id) | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained(model_id).to(self.device) | |
| def _bos_token_attr(self): | |
| if hasattr(self.tokenizer, "get_lang_id"): | |
| return self.tokenizer.get_lang_id | |
| elif hasattr(self.tokenizer, "lang_code_to_id"): | |
| return self.tokenizer.lang_code_to_id | |
| else: | |
| return | |
| def _language_code_mapper(self): | |
| if "nllb" in self.model_id.lower(): | |
| return {"en": "eng_Latn", | |
| "es": "spa_Latn", | |
| "pt": "por_Latn"} | |
| elif "m2m" in self.model_id.lower(): | |
| return {"en": "en", | |
| "es": "es", | |
| "pt": "pt"} | |
| else: | |
| return {"en": "eng", | |
| "es": "spa", | |
| "pt": "por"} | |
| def translate(self, texts: List[str], src_lang: str, dest_lang: str = "en", max_length: int = 100): | |
| self.tokenizer.src_lang = self._language_code_mapper[src_lang] | |
| inputs = self.tokenizer(texts, return_tensors="pt").to(self.device) | |
| if "opus" in self.model_id.lower(): | |
| forced_bos_token_id = None | |
| else: | |
| forced_bos_token_id = self._bos_token_attr[self._language_code_mapper["en"]] | |
| translated_tokens = self.model.generate( | |
| **inputs, forced_bos_token_id=forced_bos_token_id, max_length=max_length | |
| ) | |
| return self.tokenizer.batch_decode(translated_tokens, skip_special_tokens=True) | |
| class OffTopicDetector: | |
| def __init__(self, model_id: str, device: Optional[str] = None, image_size: str = "E", translator: Optional[Translator] = None): | |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
| self.processor = CLIPProcessor.from_pretrained(model_id) | |
| self.model = CLIPModel.from_pretrained(model_id).to(self.device) | |
| self.image_size = image_size | |
| self.translator = translator | |
| def predict_probas(self, images: List[PIL.Image.Image], domain: str, site: str, | |
| title: Optional[str] = None, | |
| valid_templates: Optional[List[str]] = None, | |
| invalid_classes: Optional[List[str]] = None, | |
| autocast: bool = True): | |
| domain = domain.lower() | |
| if valid_templates: | |
| valid_classes = [template.format(domain) for template in valid_templates] | |
| else: | |
| valid_classes = [f"a photo of {domain}", f"brochure with {domain} image", f"instructions for {domain}", f"{domain} diagram"] | |
| if title: | |
| if site == "CBT": | |
| translated_title = title | |
| else: | |
| if site == "MLB": | |
| src_lang = "pt" | |
| else: | |
| src_lang = "es" | |
| translated_title = self.translator.translate(title, src_lang=src_lang, dest_lang="en", max_length=100)[0] | |
| valid_classes.append(translated_title.lower()) | |
| if not invalid_classes: | |
| invalid_classes = ["promotional ad with store information", "promotional text", "google maps screenshot", "business card", "qr code"] | |
| n_valid = len(valid_classes) | |
| classes = valid_classes + invalid_classes | |
| print(f"Valid classes: {valid_classes}", f"Invalid classes: {invalid_classes}", sep="\n") | |
| n_classes = len(classes) | |
| if self.device == "cuda": | |
| torch.cuda.synchronize() | |
| start = time.time() | |
| inputs = self.processor(text=classes, images=images, return_tensors="pt", padding=True).to(self.device) | |
| if self.device == "cpu" and autocast is True: | |
| autocast = False | |
| with torch.autocast(self.device, enabled=autocast): | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| probas = outputs.logits_per_image.softmax(dim=1).cpu().numpy() # we can take the softmax to get the label probabilities | |
| if self.device == "cuda": | |
| torch.cuda.synchronize() | |
| end = time.time() | |
| duration = end - start | |
| print(f"Model time: {round(duration, 2)} s", | |
| f"Model time per image: {round(duration/len(images) * 1000, 0)} ms", | |
| sep="\n") | |
| valid_probas = probas[:, 0:n_valid].sum(axis=1, keepdims=True) | |
| invalid_probas = probas[:, n_valid:n_classes].sum(axis=1, keepdims=True) | |
| return probas, valid_probas, invalid_probas | |
| def predict_probas_url(self, img_urls: List[str], domain: str, site:str, | |
| title: Optional[str] = None, | |
| valid_templates: Optional[List[str]] = None, | |
| invalid_classes: Optional[List[str]] = None, | |
| autocast: bool = True): | |
| images = self.get_images(img_urls) | |
| dedup_images = self._filter_dups(images) | |
| return dedup_images, self.predict_probas(dedup_images, domain, site, title, valid_templates, invalid_classes, autocast) | |
| def predict_probas_item(self, url_or_id: str, | |
| use_title: bool = False, | |
| valid_templates: Optional[List[str]] = None, | |
| invalid_classes: Optional[List[str]] = None): | |
| images, domain, site, title = self.get_item_data(url_or_id) | |
| title = title if use_title else None | |
| probas, valid_probas, invalid_probas = self.predict_probas(images, domain, site, title, valid_templates, | |
| invalid_classes) | |
| return images, domain, probas, valid_probas, invalid_probas | |
| def apply_threshold(self, valid_probas: np.ndarray, threshold: float = 0.4): | |
| return valid_probas >= threshold | |
| def get_item_data(self, url_or_id: str): | |
| if url_or_id.startswith("http"): | |
| item_id = "".join(url_or_id.split("/")[3].split("-")[:2]) | |
| else: | |
| item_id = re.sub("-", "", url_or_id) | |
| start = time.time() | |
| response = httpx.get(f"https://api.mercadolibre.com/items/{item_id}").json() | |
| title = response["title"] | |
| site, domain = response["domain_id"].split("-") | |
| img_urls = [x["url"] for x in response["pictures"]] | |
| img_urls = [x.replace("-O.jpg", f"-{self.image_size}.jpg") for x in img_urls] | |
| domain_name = httpx.get(f"https://api.mercadolibre.com/catalog_domains/CBT-{domain}").json()["name"] | |
| end = time.time() | |
| duration = end - start | |
| print(f"Items API time: {round(duration * 1000, 0)} ms") | |
| images = self.get_images(img_urls) | |
| dedup_images = self._filter_dups(images) | |
| return dedup_images, domain_name, site, title | |
| def _filter_dups(self, images: List): | |
| if len(images) > 1: | |
| hashes = {} | |
| for img in images: | |
| hashes.update({str(imagehash.average_hash(img)): img}) | |
| dedup_hashes = list(dict.fromkeys(hashes)) | |
| dedup_images = [img for hash, img in hashes.items() if hash in dedup_hashes] | |
| else: | |
| dedup_images = images | |
| if (diff := len(images) - len(dedup_images)) > 0: | |
| print(f"Filtered {diff} images out of {len(images)} due to matching hashes.") | |
| return dedup_images | |
| def get_images(self, urls: List[str]): | |
| start = time.time() | |
| images = asyncio.run(self._gather_download_tasks(urls)) | |
| end = time.time() | |
| duration = end - start | |
| print(f"Download time: {round(duration, 2)} s", | |
| f"Download time per image: {round(duration/len(urls) * 1000, 0)} ms", | |
| sep="\n") | |
| return asyncio.run(self._gather_download_tasks(urls)) | |
| async def _gather_download_tasks(self, urls: List[str]): | |
| async def _process_download(url: str, client: httpx.AsyncClient): | |
| response = await client.get(url) | |
| return Image.open(BytesIO(response.content)) | |
| async with httpx.AsyncClient() as client: | |
| tasks = [_process_download(url, client) for url in urls] | |
| return await asyncio.gather(*tasks) | |
| def show(self, images: List[PIL.Image.Image], valid_probas: np.ndarray, n_cols: int = 3, | |
| title: Optional[str] = None, threshold: Optional[float] = None): | |
| if threshold is not None: | |
| prediction = self.apply_threshold(valid_probas, threshold) | |
| title_scores = [f"Valid: {pred.squeeze()}" for pred in prediction] | |
| else: | |
| prediction = np.round(valid_probas[:, 0], 2) | |
| title_scores = [f"Valid: {pred:.2f}" for pred in prediction] | |
| n_images = len(images) | |
| n_rows = int(np.ceil(n_images / n_cols)) | |
| fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 16)) | |
| for i, ax in enumerate(axes.ravel()): | |
| ax.axis("off") | |
| try: | |
| ax.imshow(images[i]) | |
| ax.set_title(title_scores[i]) | |
| except IndexError: | |
| continue | |
| if title: | |
| fig.suptitle(title) | |
| fig.tight_layout() | |
| return |