Spaces:
Building
Building
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM, pipeline | |
| import torch | |
| import src.exception.Exception as ExceptionCustom | |
| import polars as pl | |
| METHOD = "TRANSLATE" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| df = pl.read_parquet("isolanguages.parquet") | |
| non_empty_isos = df.slice(1).filter(pl.col("ISO639-1") != "").rows() | |
| # all_langs = languagecodes.iso_languages_byname | |
| all_langs = {iso[0]: (iso[1], iso[2], iso[3]) for iso in non_empty_isos} # {'Romanian': ('ro', 'rum', 'ron')} | |
| # iso1_to_name = {codes[0]: lang for entry in all_langs for lang, codes in entry.items()} # {'ro': 'Romanian', 'de': 'German'} | |
| iso1_to_name = {iso[1]: iso[0] for iso in non_empty_isos} # {'ro': 'Romanian', 'de': 'German'} | |
| class Translators: | |
| def __init__(self, model_name: str, sl: str, tl: str, input_text: str): | |
| self.model_name = model_name | |
| self.sl, self.tl = sl, tl | |
| self.input_text = input_text | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.message = f'Translated from {iso1_to_name[self.sl]} to {iso1_to_name[self.tl]} with {self.model_name}.' | |
| def HelsinkiNLP_mulroa(self): | |
| try: | |
| pipe = pipeline("translation", model=self.model_name, device=self.device) | |
| iso1to3 = {iso[1]: iso[3] for iso in non_empty_isos} # {'ro': 'ron'} | |
| iso3tl = iso1to3.get(self.tl) # 'deu', 'ron', 'eng', 'fra' | |
| translation = pipe(f'>>{iso3tl}<< {self.input_text}') | |
| return translation[0]['translation_text'], f'Translated from {iso1_to_name[self.sl]} to {iso1_to_name[self.tl]} with {self.model_name}.' | |
| except Exception as error: | |
| return f"Error translating with model: {self.model_name}! Try other available language combination.", error | |
| def text2textgenerationpipe(self): | |
| translation = pipeline('text2text-generation', model = self.model_name) | |
| return translation(self.input_text)[0]['generated_text'], self.message | |
| def translationpipe(self): | |
| translation = pipeline('translation', model = self.model_name) | |
| return translation(self.input_text)[0]['translation_text'], self.message | |
| def mbartlarge25(self): | |
| from transformers import MBartForConditionalGeneration, MBart50TokenizerFast, MBartTokenizer | |
| src_lang = f"{self.sl}_XX" | |
| tgt_lang = f"{self.tl}_{self.tl.upper()}" | |
| # Load model and tokenizer | |
| # from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| tokenizer = AutoTokenizer.from_pretrained(self.model_name, src_lang=src_lang) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name) | |
| # model = MBartForConditionalGeneration.from_pretrained(self.model_name) | |
| # tokenizer = MBartTokenizer.from_pretrained(self.model_name, src_lang=src_lang) | |
| # pipe = pipeline("translation", model="facebook/mbart-large-cc25") | |
| # translator = pipeline('translation', model=model, tokenizer=tokenizer, src_lang=self.sl, tgt_lang=self.tl) | |
| # translated_text = translator(text, max_length=512) | |
| # return translated_text[0]['translation_text'] | |
| # Tokenize and translate | |
| inputs = tokenizer(self.input_text, return_tensors="pt") | |
| translated_tokens = model.generate( | |
| **inputs, | |
| forced_bos_token_id=tokenizer.lang_code_to_id[tgt_lang], | |
| max_length=512, # Add max_length to avoid truncation | |
| num_beams=4) # Use beam search for better results | |
| print(src_lang, tgt_lang, tokenizer.lang_code_to_id[tgt_lang]) | |
| translation = tokenizer.batch_decode(translated_tokens, skip_special_tokens=True)[0] | |
| return translation, self.message | |
| def mbartlarge(self): | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
| model_name = "facebook/mbart-large-cc25" | |
| # load tokenizer and model | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
| # tell tokenizer the source language | |
| tokenizer.src_lang = "en_XX" | |
| tokenizer.tgt_lang = "ro_RO" | |
| # set the target language as the model's forced BOS token so pipeline will use it implicitly | |
| model.config.forced_bos_token_id = tokenizer.lang_code_to_id["ro_RO"] | |
| # find the id for the target language and force it at generation | |
| # forced_bos_token_id = tokenizer.lang_code_to_id["ro_RO"] | |
| # create the pipeline (pass tokenizer and model explicitly) | |
| # export langs=ar_AR,cs_CZ,de_DE,en_XX,es_XX,et_EE,fi_FI,fr_XX,gu_IN,hi_IN,it_IT,ja_XX,kk_KZ,ko_KR,lt_LT,lv_LV,my_MM,ne_NP,nl_XX,ro_RO,ru_RU,si_LK,tr_TR,vi_VN,zh_CN | |
| pipe = pipeline("translation", model=model, tokenizer=tokenizer, src_lang="en_XX", tgt_lang="ro_RO") | |
| # pipe = pipeline("translation_en_to_de", model="facebook/mbart-large-cc25") | |
| # "translation" task was used, instead of "translation_XX_to_YY", defaulting to "translation_en_to_ro" | |
| # call the pipeline; generation kwargs are forwarded to model.generate | |
| # src_lang (str, optional) — The language of the input. | |
| # tgt_lang (str, optional) — The language of the desired output. Might be required for multilingual models. Will not have any effect for single pair translation models | |
| src_text = "Check general exterior conditions" | |
| result = pipe( | |
| src_text, | |
| num_beams=4, | |
| max_length=256 | |
| ) | |
| return result[0]["translation_text"], self.message | |
| def paraphraseTranslateMethod(requestValue: str, model: str): | |
| nltk.download('punkt') | |
| nltk.download('punkt_tab') | |
| exception = ExceptionCustom.checkForException(requestValue, METHOD) | |
| if exception: | |
| return "", exception | |
| tokenized_sent_list = sent_tokenize(requestValue) | |
| result_value = [] | |
| for SENTENCE in tokenized_sent_list: | |
| if model == 'roen': | |
| tokenizerROMENG = AutoTokenizer.from_pretrained("BlackKakapo/opus-mt-ro-en") | |
| modelROMENG = AutoModelForSeq2SeqLM.from_pretrained("BlackKakapo/opus-mt-ro-en") | |
| modelROMENG.to(device) | |
| input_ids = tokenizerROMENG(SENTENCE, return_tensors='pt').to(device) | |
| output = modelROMENG.generate( | |
| input_ids=input_ids.input_ids, | |
| do_sample=True, | |
| max_length=512, | |
| top_k=90, | |
| top_p=0.97, | |
| early_stopping=False | |
| ) | |
| result = tokenizerROMENG.batch_decode(output, skip_special_tokens=True)[0] | |
| else: | |
| tokenizerENGROM = AutoTokenizer.from_pretrained("BlackKakapo/opus-mt-en-ro") | |
| modelENGROM = AutoModelForSeq2SeqLM.from_pretrained("BlackKakapo/opus-mt-en-ro") | |
| modelENGROM.to(device) | |
| input_ids = tokenizerENGROM(SENTENCE, return_tensors='pt').to(device) | |
| output = modelENGROM.generate( | |
| input_ids=input_ids.input_ids, | |
| do_sample=True, | |
| max_length=512, | |
| top_k=90, | |
| top_p=0.97, | |
| early_stopping=False | |
| ) | |
| result = tokenizerENGROM.batch_decode(output, skip_special_tokens=True)[0] | |
| result_value.append(result) | |
| return " ".join(result_value).strip(), model | |
| def gemma(requestValue: str, model: str = 'Gargaz/gemma-2b-romanian-better'): | |
| requestValue = requestValue.replace('\n', ' ') | |
| prompt = f"Translate this to Romanian using a formal tone, responding only with the translated text: {requestValue}" | |
| messages = [{"role": "user", "content": f"Translate this text to Romanian: {requestValue}"}] | |
| if '/' not in model: | |
| model = 'Gargaz/gemma-2b-romanian-better' | |
| # limit max_new_tokens to 150% of the requestValue | |
| max_new_tokens = int(len(requestValue) + len(requestValue) * 0.5) | |
| try: | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| device=-1, | |
| max_new_tokens=max_new_tokens, # Keep short to reduce verbosity | |
| do_sample=False # Use greedy decoding for determinism | |
| ) | |
| output = pipe(messages, num_return_sequences=1, return_full_text=False) | |
| generated_text = output[0]["generated_text"] | |
| result = generated_text.split('\n', 1)[0] if '\n' in generated_text else generated_text | |
| return result.strip() | |
| except Exception as error: | |
| return error | |
| def gemma_direct(requestValue: str, model: str = 'Gargaz/gemma-2b-romanian-better'): | |
| # Load model directly | |
| model_name = model if '/' in model else 'Gargaz/gemma-2b-romanian-better' | |
| # limit max_new_tokens to 150% of the requestValue | |
| prompt = f"Translate this text to Romanian: {requestValue}" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name).to(device) | |
| input_ids = tokenizer.encode(requestValue, add_special_tokens=True) | |
| num_tokens = len(input_ids) | |
| # Estimate output length (e.g., 50% longer) | |
| max_new_tokens = int(num_tokens * 1.5) | |
| max_new_tokens += max_new_tokens % 2 # ensure it's even | |
| messages = [{"role": "user", "content": prompt}] | |
| try: | |
| inputs = tokenizer.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors="pt", | |
| ).to(device) | |
| outputs = model.generate(**inputs, max_new_tokens=max_new_tokens) | |
| response = tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:], skip_special_tokens=True) | |
| result = response.split('\n', 1)[0] if '\n' in response else response | |
| return result.strip() | |
| except Exception as error: | |
| return error | |