Spaces:
Sleeping
Sleeping
| import shutil | |
| import string | |
| from iso639 import languages | |
| import time | |
| import os | |
| from itertools import groupby | |
| from subprocess import Popen, PIPE | |
| import re | |
| from src.aligner import Aligner | |
| from src.utils import file_to_moses, moses_to_file | |
| import glob | |
| import spacy | |
| from spacy.tokens import Doc | |
| from gradio_client.exceptions import AppError | |
| import tqdm | |
| # Load multilingual model to use as sentence tokenizer | |
| spacy_nlp = spacy.load("xx_ent_wiki_sm") | |
| # Add the rule-based sentencizer | |
| if "sentencizer" not in spacy_nlp.pipe_names: | |
| spacy_nlp.add_pipe("sentencizer") | |
| import unicodedata | |
| class SalamandraTA7bTranslatorHF: | |
| def __init__(self, hf_token): | |
| from gradio_client import Client | |
| self.client = Client("BSC-LT/SalamandraTA-7B-Demo", hf_token=hf_token) | |
| def translate(self, text, source_lang, target_lang): | |
| if not text: | |
| return "" | |
| # we assume that they are specifying the language by code so we need to convert it to name | |
| lang1 = languages.get(alpha2=source_lang).name | |
| lang2 = languages.get(alpha2=target_lang).name | |
| result = self.client.predict( | |
| task="Translation", | |
| source=lang1, | |
| target=lang2, | |
| input_text=text, | |
| mt_text=None, | |
| api_name="/generate_output" | |
| ) | |
| return result[0] | |
| def translate_document(self, input_file: str, source_lang: str, target_lang: str, | |
| aligner: Aligner, | |
| temp_folder: str = "tmp", | |
| tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0") -> (str, str): | |
| input_filename = input_file.split("/")[-1] | |
| os.makedirs(temp_folder, exist_ok=True) | |
| # copy the original file to the temporal folder to avoid common issues with tikal | |
| temp_input_file = os.path.join(temp_folder, input_filename) | |
| shutil.copy(input_file, temp_input_file) | |
| original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf") | |
| plain_text_file = file_to_moses(temp_input_file, source_lang, target_lang, tikal_folder, | |
| original_xliff_file) | |
| # get paragraphs with runs | |
| paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in | |
| enumerate(open(plain_text_file).readlines())] | |
| # translate using plaintext file | |
| original_tokenized_sentences_with_style = [] | |
| original_spacing = [] | |
| for run in paragraphs_with_runs: | |
| tokens, spaces = tokenize_with_runs(run) | |
| original_tokenized_sentences_with_style += tokens | |
| original_spacing += spaces | |
| translated_sentences = [] | |
| yield "Translating 0%...", None | |
| total = len(original_tokenized_sentences_with_style) | |
| pbar = tqdm.tqdm(desc="Translating paragraphs...", total=total) | |
| for i, (sentence, spacing) in enumerate(zip(original_tokenized_sentences_with_style, original_spacing)): | |
| text = Doc(spacy_nlp.vocab, words=[token["text"] for token in sentence], spaces=spacing).text | |
| while True: | |
| try: | |
| translated_sentences.append(self.translate(text, source_lang, target_lang)) | |
| break | |
| except AppError as e: | |
| print(e) | |
| pbar.update(1) | |
| percent_complete = int(((i + 1) / total) * 100) | |
| yield f"Translating {percent_complete}%...", None | |
| # time to align the translation with the original | |
| print("Generating alignments...") | |
| yield "Aligning...", None | |
| start_time = time.time() | |
| translated_sentences_with_style, translated_sentences_spacing = generate_alignments( | |
| original_tokenized_sentences_with_style, | |
| translated_sentences, aligner, | |
| temp_folder) | |
| print(f"Finished alignments in {time.time() - start_time} seconds") | |
| # since we tokenized these sentences independently, the spacing information does not contain spaces after punctuation | |
| # at the end of the sentence (there's no space at the end of a sentence that ends with ".", unless there's a sentence | |
| # right after | |
| for sentence, sentence_spaces in zip(translated_sentences_with_style, translated_sentences_spacing): | |
| if sentence[-1]["text"] in string.punctuation: | |
| sentence_spaces[-1] = True | |
| # flatten the sentences into a list of tokens | |
| translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist] | |
| tokens_spaces = [item for sublist in translated_sentences_spacing for item in sublist] | |
| # group the tokens by style/run | |
| translated_runs_with_style = group_by_style(translated_tokens_with_style, tokens_spaces) | |
| # group the runs by original paragraph | |
| translated_paragraphs_with_style = {key: [{'id': None, 'paragraph_index': key, 'text': ""}] for key in | |
| range(len(paragraphs_with_runs))} | |
| for item in translated_runs_with_style: | |
| # first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we | |
| # didn't know where paragraphs started and ended | |
| if not translated_paragraphs_with_style[item['paragraph_index']][0]["text"]: | |
| first_item_in_paragraph = item.copy() | |
| first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ") | |
| translated_paragraphs_with_style[item['paragraph_index']] = [] | |
| translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph) | |
| else: | |
| translated_paragraphs_with_style[item['paragraph_index']].append(item) | |
| # save to new plain text file | |
| translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}") | |
| runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file) | |
| translated_file_path = moses_to_file(translated_moses_file, source_lang, target_lang, tikal_folder, | |
| original_xliff_file) | |
| print(f"Saved file in {translated_file_path}") | |
| yield "", translated_file_path | |
| def remove_invisible(text): | |
| return ''.join( | |
| c for c in text | |
| if not unicodedata.category(c) in ['Zs', 'Cc', 'Cf'] | |
| ) | |
| def get_leading_invisible(text): | |
| i = 0 | |
| while i < len(text): | |
| c = text[i] | |
| if unicodedata.category(c) in ['Zs', 'Cc', 'Cf']: | |
| i += 1 | |
| else: | |
| break | |
| return text[:i] | |
| def get_runs_from_paragraph(paragraph: str, paragraph_index: int) -> list[dict[str, str | tuple[str, ...]]]: | |
| """ | |
| Given some text that may or may not contain some chunks tagged with something like <g id=1> </g>, extract each | |
| of the runs of text and convert them into dictionaries to keep this information | |
| Parameters: | |
| text: Text to process | |
| paragraph_index: Index of the paragraph in the file | |
| Returns: | |
| list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index | |
| """ | |
| tag_stack = [] | |
| runs = [] | |
| pos = 0 | |
| # Match any tag: <tag id="123"/>, </tag>, or <tag id="123"> | |
| tag_pattern = re.compile(r'<(/?)(\w+)(?:\s+id="(\d+)")?\s*(/?)>') | |
| for match in tag_pattern.finditer(paragraph): | |
| start, end = match.span() | |
| is_closing = match.group(1) == "/" | |
| tag_name = match.group(2) | |
| tag_id = match.group(3) | |
| is_self_closing = match.group(4) == "/" | |
| # Text before this tag | |
| if start > pos: | |
| text = paragraph[pos:start] | |
| if text: | |
| runs.append({ | |
| "text": text, | |
| "id": tag_stack.copy(), | |
| "paragraph_index": paragraph_index | |
| }) | |
| if is_closing: | |
| # Closing tag </tag> | |
| expected_prefix = f"{tag_name}_" | |
| if tag_stack and tag_stack[-1].startswith(expected_prefix): | |
| tag_stack.pop() | |
| else: | |
| raise ValueError(f"Mismatched closing tag </{tag_name}>") | |
| elif is_self_closing: | |
| # Self-closing tag like <x id="1"/> | |
| if tag_id is None: | |
| tag_id = -1 | |
| # raise ValueError(f"Self-closing tag <{tag_name}/> missing id") | |
| runs.append({ | |
| "text": "", | |
| "id": [f"{tag_name}_{tag_id}"], | |
| "paragraph_index": paragraph_index | |
| }) | |
| else: | |
| # Opening tag <tag id="..."> | |
| if tag_id is None: | |
| tag_id = -1 | |
| # raise ValueError(f"Opening tag <{tag_name}> missing id") | |
| tag_stack.append(f"{tag_name}_{tag_id}") | |
| pos = end | |
| # Final trailing text | |
| if pos < len(paragraph): | |
| text = paragraph[pos:] | |
| if text: | |
| runs.append({ | |
| "text": text, | |
| "id": tag_stack.copy(), | |
| "paragraph_index": paragraph_index | |
| }) | |
| return runs | |
| def tokenize_text(text, tokenizer): | |
| # To avoid the tokenizer destroying the url | |
| def preserve_urls(text): | |
| url_pattern = r'https?://[^\s\)\]\}\>]+|www\.[^\s\)\]\}\>]+' | |
| # Find URLs using regex and replace them with a placeholder | |
| urls = re.findall(url_pattern, text) | |
| for idx, url in enumerate(urls): | |
| placeholder = f"URL{idx}" | |
| text = text.replace(url, placeholder) | |
| return text, urls | |
| # Replace URLs with placeholders | |
| text, urls = preserve_urls(text) | |
| # Tokenize using Sacremoses | |
| tokens = tokenizer.tokenize(text) | |
| # Revert placeholders back to original URLs | |
| for idx, url in enumerate(urls): | |
| placeholder = f"URL{idx}" | |
| tokens = [token.replace(placeholder, url) for token in tokens] | |
| return tokens | |
| def tokenize_with_runs(runs: list[dict[str, str]]) -> tuple[list[list[dict[str, str]]], list[list[bool]]]: | |
| """ | |
| Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according | |
| to its original run | |
| Parameters: | |
| runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information | |
| source_lang: Language of the document | |
| Returns: | |
| list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run | |
| """ | |
| # it's a bit of a mess but first we get the tokenized sentences | |
| # join runs and send through spacy to split into clean tokens | |
| doc_from_runs = spacy_nlp("".join([run["text"] for run in runs]).strip()) | |
| # extract sentences and tokenize each into words | |
| tokenized_sentences = [[token.text.strip() for token in sent if token.text.strip()] for sent in doc_from_runs.sents] | |
| tokenized_sentences_spaces = [[token.whitespace_ != '' for token in sent if token.text.strip()] for sent in | |
| doc_from_runs.sents] | |
| flat_tokens = [token for sentence in tokenized_sentences for token in sentence] | |
| flat_spaces = [token for sentence in tokenized_sentences_spaces for token in sentence] | |
| flat_tokens_with_style = [] | |
| flat_spaces_with_style = [] | |
| token_idx = 0 | |
| for run in runs: | |
| run["text"] = run["text"].strip() | |
| while run["text"]: | |
| if run["text"].startswith(flat_tokens[token_idx]): | |
| run["text"] = run["text"][len(flat_tokens[token_idx]):] | |
| if flat_spaces[token_idx]: | |
| run["text"] = run["text"].lstrip() | |
| item = run.copy() | |
| item["text"] = flat_tokens[token_idx] | |
| flat_tokens_with_style.append(item) | |
| flat_spaces_with_style.append(flat_spaces[token_idx]) | |
| token_idx += 1 | |
| elif remove_invisible(run["text"]).startswith(flat_tokens[token_idx]): | |
| leading_invisible = get_leading_invisible(run["text"]) | |
| run["text"] = run["text"][len(leading_invisible + flat_tokens[token_idx]):] | |
| if flat_spaces[token_idx]: | |
| run["text"] = run["text"].lstrip() | |
| item = run.copy() | |
| item["text"] = leading_invisible + flat_tokens[token_idx] | |
| flat_tokens_with_style.append(item) | |
| flat_spaces_with_style.append(flat_spaces[token_idx]) | |
| token_idx += 1 | |
| elif flat_tokens[token_idx].startswith(run["text"]): | |
| subtoken = flat_tokens[token_idx][:len(run["text"])] | |
| item = run.copy() | |
| item["text"] = subtoken | |
| flat_tokens_with_style.append(item) | |
| flat_spaces_with_style.append(False) | |
| flat_tokens[token_idx] = flat_tokens[token_idx][len(run["text"]):] | |
| run["text"] = run["text"][len(subtoken):] | |
| elif flat_tokens[token_idx].startswith(remove_invisible(run["text"])): | |
| flat_tokens[token_idx] = flat_tokens[token_idx][len(remove_invisible(run["text"])):] | |
| item = run.copy() | |
| item["text"] = run["text"] | |
| flat_tokens_with_style.append(item) | |
| flat_spaces_with_style.append(flat_spaces[token_idx]) | |
| run["text"] = "" | |
| else: | |
| raise Exception(f"Something unexpected happened") | |
| # reconstruct the sentences | |
| token_idx = 0 | |
| tokenized_sentences_with_style, tokenized_sentences_spaces_with_style = [], [] | |
| for sentence, sentence_spaces in zip(tokenized_sentences, tokenized_sentences_spaces): | |
| sentence_with_style, sentence_spaces_with_style = [], [] | |
| for token in sentence: | |
| if token == flat_tokens_with_style[token_idx]["text"]: | |
| sentence_with_style.append(flat_tokens_with_style[token_idx]) | |
| sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
| token_idx += 1 | |
| elif token.startswith(flat_tokens_with_style[token_idx]["text"]): | |
| while token: | |
| token = token[len(flat_tokens_with_style[token_idx]["text"]):] | |
| sentence_with_style.append(flat_tokens_with_style[token_idx]) | |
| sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
| token_idx += 1 | |
| elif token == remove_invisible(flat_tokens_with_style[token_idx]["text"]): | |
| sentence_with_style.append(flat_tokens_with_style[token_idx]) | |
| sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
| token_idx += 1 | |
| elif token.startswith(remove_invisible(flat_tokens_with_style[token_idx]["text"])): | |
| while token: | |
| token = token[len(remove_invisible(flat_tokens_with_style[token_idx]["text"])):] | |
| sentence_with_style.append(remove_invisible(flat_tokens_with_style[token_idx]["text"])) | |
| sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
| token_idx += 1 | |
| else: | |
| print(token) | |
| print(sentence) | |
| print(token_idx) | |
| print(flat_tokens_with_style) | |
| raise Exception(f"Something unexpected happened") | |
| tokenized_sentences_with_style.append(sentence_with_style) | |
| tokenized_sentences_spaces_with_style.append(sentence_spaces_with_style) | |
| return tokenized_sentences_with_style, tokenized_sentences_spaces_with_style | |
| def generate_alignments(original_tokenized_sentences_with_style: list[list[dict[str, str]]], | |
| translated_sentences: list[str], aligner, temp_folder: str): | |
| """ | |
| Given some original sentences with style and formatting and its translation without formatting, try to match | |
| the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily | |
| forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about | |
| from which paragraph that sentence came from | |
| Parameters: | |
| original_tokenized_sentences_with_style: Original text split into sentences with style information | |
| translated_sentences: Translated text, split into sentences | |
| aligner: Object of the aligner class, uses fastalign | |
| temp_folder: Path to folder where to put all the intermediate files | |
| source_lang: original language of the document | |
| target_lang: target language of the translation | |
| Returns: | |
| list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated | |
| original token | |
| """ | |
| # clean temp folder | |
| for f in glob.glob(os.path.join(temp_folder, "*align*")): | |
| os.remove(f) | |
| # tokenize the translated text by sentence and word | |
| translated_tokenized_sentences = [] | |
| # keep spacing information to detokenize properly later | |
| translated_tokenized_sentences_spaces = [] | |
| for sentence in translated_sentences: | |
| tokens = spacy_nlp(sentence) | |
| translated_tokenized_sentences_spaces.append([token.whitespace_ != '' for token in tokens]) | |
| translated_tokenized_sentences.append([token.text for token in tokens]) | |
| assert len(translated_tokenized_sentences) == len( | |
| original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentences, likely due to a translation error" | |
| original_sentences = [] | |
| translated_sentences = [] | |
| for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences): | |
| original_sentences.append(' '.join(item['text'] for item in original)) | |
| translated_sentences.append(' '.join(translated)) | |
| alignments = aligner.align(original_sentences, translated_sentences) | |
| # using the alignments generated by fastalign, we need to copy the style of the original token to the translated one | |
| translated_sentences_with_style = [] | |
| for sentence_idx, sentence_alignments in enumerate(alignments): | |
| # reverse the order of the alignments and build a dict with it | |
| sentence_alignments = {target: source for source, target in sentence_alignments} | |
| translated_sentence_with_style: list[dict[str, str]] = [] | |
| for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]): | |
| # fastalign has found a token aligned with the translated one | |
| if token_idx in sentence_alignments.keys(): | |
| # get the aligned token | |
| original_idx = sentence_alignments[token_idx] | |
| new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy() | |
| new_entry["text"] = translated_token | |
| translated_sentence_with_style.append(new_entry) | |
| else: | |
| # WARNING this is a test | |
| # since fastalign doesn't know from which word to reference this token, copy the style of the previous word | |
| try: | |
| new_entry = translated_sentence_with_style[-1].copy() | |
| # no previous word? make it up | |
| except IndexError: | |
| current_paragraph = original_tokenized_sentences_with_style[sentence_idx][0]["paragraph_index"] | |
| new_entry = {'id': [], 'paragraph_index': current_paragraph, 'text': translated_token} | |
| translated_sentence_with_style.append(new_entry) | |
| translated_sentences_with_style.append(translated_sentence_with_style) | |
| return translated_sentences_with_style, translated_tokenized_sentences_spaces | |
| def group_by_style(tokens: list[dict[str, str]], spaces: list[bool]) -> list[dict[str, str]]: | |
| """ | |
| To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we | |
| reconstruct the runs. | |
| Parameters: | |
| tokens: Tokens with style information | |
| Returns: | |
| list[dict]: A list of translated runs with format and style | |
| """ | |
| groups = [] | |
| zipped = zip(tokens, spaces) | |
| for key, group in groupby(zipped, key=lambda x: (x[0]["id"], x[0]["paragraph_index"])): | |
| group = list(group) | |
| tokens = [item[0]['text'] for item in group] | |
| spaces = [item[1] for item in group] | |
| text = Doc(spacy_nlp.vocab, words=tokens, spaces=spaces).text | |
| groups.append({"text": text, | |
| "id": key[0], | |
| "paragraph_index": key[1]}) | |
| return groups | |
| def runs_to_plain_text(paragraphs_with_style: dict[int, list[dict[str, str, str]]], out_file_path: str): | |
| """ | |
| Generate a plain text file restoring the original tag structure like <g id=1> </g> | |
| Parameters: | |
| paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs | |
| out_file_path: Path to the file where the plain text will be saved | |
| """ | |
| with open(out_file_path, "w") as out_file: | |
| def close_tags(ids): | |
| tag = "" | |
| for gid in ids: | |
| tag_type, tag_id = gid.split("_") | |
| tag += f'</{tag_type}>' | |
| return tag | |
| def open_tags(ids): | |
| tag = "" | |
| for gid in ids: | |
| tag_type, tag_id = gid.split("_") | |
| tag += f'<{tag_type}' | |
| if int(tag_id) > 0: | |
| tag += f' id="{tag_id}">' | |
| return tag | |
| for key, paragraph in paragraphs_with_style.items(): | |
| for run in paragraph: | |
| ids = list(run["id"]) if run["id"] else [] | |
| if ids: | |
| output = open_tags(ids) + run["text"] + close_tags(ids) | |
| out_file.write(output) | |
| else: | |
| out_file.write("".join(run["text"])) | |
| out_file.write("\n") | |