|
|
import logging |
|
|
import json |
|
|
|
|
|
import torch |
|
|
from pathlib import Path |
|
|
from unicodedata import category, normalize |
|
|
from tokenizers import Tokenizer |
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
|
|
|
SOT = "[START]" |
|
|
EOT = "[STOP]" |
|
|
UNK = "[UNK]" |
|
|
SPACE = "[SPACE]" |
|
|
SPECIAL_TOKENS = [SOT, EOT, UNK, SPACE, "[PAD]", "[SEP]", "[CLS]", "[MASK]"] |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class EnTokenizer: |
|
|
def __init__(self, vocab_file_path): |
|
|
self.tokenizer: Tokenizer = Tokenizer.from_file(vocab_file_path) |
|
|
self.check_vocabset_sot_eot() |
|
|
|
|
|
def check_vocabset_sot_eot(self): |
|
|
voc = self.tokenizer.get_vocab() |
|
|
assert SOT in voc |
|
|
assert EOT in voc |
|
|
|
|
|
def text_to_tokens(self, text: str): |
|
|
text_tokens = self.encode(text) |
|
|
text_tokens = torch.IntTensor(text_tokens).unsqueeze(0) |
|
|
return text_tokens |
|
|
|
|
|
def encode(self, txt: str): |
|
|
""" |
|
|
clean_text > (append `lang_id`) > replace SPACE > encode text using Tokenizer |
|
|
""" |
|
|
txt = txt.replace(' ', SPACE) |
|
|
code = self.tokenizer.encode(txt) |
|
|
ids = code.ids |
|
|
return ids |
|
|
|
|
|
def decode(self, seq): |
|
|
if isinstance(seq, torch.Tensor): |
|
|
seq = seq.cpu().numpy() |
|
|
|
|
|
txt: str = self.tokenizer.decode(seq, skip_special_tokens=False) |
|
|
txt = txt.replace(' ', '') |
|
|
txt = txt.replace(SPACE, ' ') |
|
|
txt = txt.replace(EOT, '') |
|
|
txt = txt.replace(UNK, '') |
|
|
return txt |
|
|
|
|
|
|
|
|
|
|
|
REPO_ID = "ResembleAI/chatterbox" |
|
|
|
|
|
|
|
|
_kakasi = None |
|
|
_dicta = None |
|
|
_russian_stresser = None |
|
|
|
|
|
|
|
|
def is_kanji(c: str) -> bool: |
|
|
"""Check if character is kanji.""" |
|
|
return 19968 <= ord(c) <= 40959 |
|
|
|
|
|
|
|
|
def is_katakana(c: str) -> bool: |
|
|
"""Check if character is katakana.""" |
|
|
return 12449 <= ord(c) <= 12538 |
|
|
|
|
|
|
|
|
def hiragana_normalize(text: str) -> str: |
|
|
"""Japanese text normalization: converts kanji to hiragana; katakana remains the same.""" |
|
|
global _kakasi |
|
|
|
|
|
try: |
|
|
if _kakasi is None: |
|
|
import pykakasi |
|
|
_kakasi = pykakasi.kakasi() |
|
|
|
|
|
result = _kakasi.convert(text) |
|
|
out = [] |
|
|
|
|
|
for r in result: |
|
|
inp = r['orig'] |
|
|
hira = r["hira"] |
|
|
|
|
|
|
|
|
if any([is_kanji(c) for c in inp]): |
|
|
if hira and hira[0] in ["は", "へ"]: |
|
|
hira = " " + hira |
|
|
out.append(hira) |
|
|
|
|
|
|
|
|
elif all([is_katakana(c) for c in inp]) if inp else False: |
|
|
out.append(r['orig']) |
|
|
|
|
|
else: |
|
|
out.append(inp) |
|
|
|
|
|
normalized_text = "".join(out) |
|
|
|
|
|
|
|
|
import unicodedata |
|
|
normalized_text = unicodedata.normalize('NFKD', normalized_text) |
|
|
|
|
|
return normalized_text |
|
|
|
|
|
except ImportError: |
|
|
logger.warning("pykakasi not available - Japanese text processing skipped") |
|
|
return text |
|
|
|
|
|
|
|
|
def add_hebrew_diacritics(text: str) -> str: |
|
|
"""Hebrew text normalization: adds diacritics to Hebrew text.""" |
|
|
global _dicta |
|
|
|
|
|
try: |
|
|
if _dicta is None: |
|
|
from dicta_onnx import Dicta |
|
|
_dicta = Dicta() |
|
|
|
|
|
return _dicta.add_diacritics(text) |
|
|
|
|
|
except ImportError: |
|
|
logger.warning("dicta_onnx not available - Hebrew text processing skipped") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.warning(f"Hebrew diacritization failed: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
def korean_normalize(text: str) -> str: |
|
|
"""Korean text normalization: decompose syllables into Jamo for tokenization.""" |
|
|
|
|
|
def decompose_hangul(char): |
|
|
"""Decompose Korean syllable into Jamo components.""" |
|
|
if not ('\uac00' <= char <= '\ud7af'): |
|
|
return char |
|
|
|
|
|
|
|
|
base = ord(char) - 0xAC00 |
|
|
initial = chr(0x1100 + base // (21 * 28)) |
|
|
medial = chr(0x1161 + (base % (21 * 28)) // 28) |
|
|
final = chr(0x11A7 + base % 28) if base % 28 > 0 else '' |
|
|
|
|
|
return initial + medial + final |
|
|
|
|
|
|
|
|
result = ''.join(decompose_hangul(char) for char in text) |
|
|
return result.strip() |
|
|
|
|
|
|
|
|
class ChineseCangjieConverter: |
|
|
"""Converts Chinese characters to Cangjie codes for tokenization.""" |
|
|
|
|
|
def __init__(self, model_dir=None): |
|
|
self.word2cj = {} |
|
|
self.cj2word = {} |
|
|
self.segmenter = None |
|
|
self._load_cangjie_mapping(model_dir) |
|
|
self._init_segmenter() |
|
|
|
|
|
def _load_cangjie_mapping(self, model_dir=None): |
|
|
"""Load Cangjie mapping from HuggingFace model repository.""" |
|
|
try: |
|
|
cangjie_file = hf_hub_download( |
|
|
repo_id=REPO_ID, |
|
|
filename="Cangjie5_TC.json", |
|
|
cache_dir=model_dir |
|
|
) |
|
|
|
|
|
with open(cangjie_file, "r", encoding="utf-8") as fp: |
|
|
data = json.load(fp) |
|
|
|
|
|
for entry in data: |
|
|
word, code = entry.split("\t")[:2] |
|
|
self.word2cj[word] = code |
|
|
if code not in self.cj2word: |
|
|
self.cj2word[code] = [word] |
|
|
else: |
|
|
self.cj2word[code].append(word) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Could not load Cangjie mapping: {e}") |
|
|
|
|
|
def _init_segmenter(self): |
|
|
"""Initialize pkuseg segmenter.""" |
|
|
try: |
|
|
from spacy_pkuseg import pkuseg |
|
|
self.segmenter = pkuseg() |
|
|
except ImportError: |
|
|
logger.warning("pkuseg not available - Chinese segmentation will be skipped") |
|
|
self.segmenter = None |
|
|
|
|
|
def _cangjie_encode(self, glyph: str): |
|
|
"""Encode a single Chinese glyph to Cangjie code.""" |
|
|
normed_glyph = glyph |
|
|
code = self.word2cj.get(normed_glyph, None) |
|
|
if code is None: |
|
|
return None |
|
|
index = self.cj2word[code].index(normed_glyph) |
|
|
index = str(index) if index > 0 else "" |
|
|
return code + str(index) |
|
|
|
|
|
|
|
|
def __call__(self, text): |
|
|
"""Convert Chinese characters in text to Cangjie tokens.""" |
|
|
output = [] |
|
|
if self.segmenter is not None: |
|
|
segmented_words = self.segmenter.cut(text) |
|
|
full_text = " ".join(segmented_words) |
|
|
else: |
|
|
full_text = text |
|
|
|
|
|
for t in full_text: |
|
|
if category(t) == "Lo": |
|
|
cangjie = self._cangjie_encode(t) |
|
|
if cangjie is None: |
|
|
output.append(t) |
|
|
continue |
|
|
code = [] |
|
|
for c in cangjie: |
|
|
code.append(f"[cj_{c}]") |
|
|
code.append("[cj_.]") |
|
|
code = "".join(code) |
|
|
output.append(code) |
|
|
else: |
|
|
output.append(t) |
|
|
return "".join(output) |
|
|
|
|
|
|
|
|
def add_russian_stress(text: str) -> str: |
|
|
"""Russian text normalization: adds stress marks to Russian text.""" |
|
|
global _russian_stresser |
|
|
|
|
|
try: |
|
|
if _russian_stresser is None: |
|
|
from russian_text_stresser.text_stresser import RussianTextStresser |
|
|
_russian_stresser = RussianTextStresser() |
|
|
|
|
|
return _russian_stresser.stress_text(text) |
|
|
|
|
|
except ImportError: |
|
|
logger.warning("russian_text_stresser not available - Russian stress labeling skipped") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.warning(f"Russian stress labeling failed: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
class MTLTokenizer: |
|
|
def __init__(self, vocab_file_path): |
|
|
self.tokenizer: Tokenizer = Tokenizer.from_file(vocab_file_path) |
|
|
model_dir = Path(vocab_file_path).parent |
|
|
self.cangjie_converter = ChineseCangjieConverter(model_dir) |
|
|
self.check_vocabset_sot_eot() |
|
|
|
|
|
def check_vocabset_sot_eot(self): |
|
|
voc = self.tokenizer.get_vocab() |
|
|
assert SOT in voc |
|
|
assert EOT in voc |
|
|
|
|
|
def preprocess_text(self, raw_text: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
|
|
""" |
|
|
Text preprocessor that handles lowercase conversion and NFKD normalization. |
|
|
""" |
|
|
preprocessed_text = raw_text |
|
|
if lowercase: |
|
|
preprocessed_text = preprocessed_text.lower() |
|
|
if nfkd_normalize: |
|
|
preprocessed_text = normalize("NFKD", preprocessed_text) |
|
|
|
|
|
return preprocessed_text |
|
|
|
|
|
def text_to_tokens(self, text: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
|
|
text_tokens = self.encode(text, language_id=language_id, lowercase=lowercase, nfkd_normalize=nfkd_normalize) |
|
|
text_tokens = torch.IntTensor(text_tokens).unsqueeze(0) |
|
|
return text_tokens |
|
|
|
|
|
def encode(self, txt: str, language_id: str = None, lowercase: bool = True, nfkd_normalize: bool = True): |
|
|
txt = self.preprocess_text(txt, language_id=language_id, lowercase=lowercase, nfkd_normalize=nfkd_normalize) |
|
|
|
|
|
|
|
|
if language_id == 'zh': |
|
|
txt = self.cangjie_converter(txt) |
|
|
elif language_id == 'ja': |
|
|
txt = hiragana_normalize(txt) |
|
|
elif language_id == 'he': |
|
|
txt = add_hebrew_diacritics(txt) |
|
|
elif language_id == 'ko': |
|
|
txt = korean_normalize(txt) |
|
|
elif language_id == 'ru': |
|
|
txt = add_russian_stress(txt) |
|
|
|
|
|
|
|
|
if language_id: |
|
|
txt = f"[{language_id.lower()}]{txt}" |
|
|
|
|
|
txt = txt.replace(' ', SPACE) |
|
|
return self.tokenizer.encode(txt).ids |
|
|
|
|
|
def decode(self, seq): |
|
|
if isinstance(seq, torch.Tensor): |
|
|
seq = seq.cpu().numpy() |
|
|
|
|
|
txt = self.tokenizer.decode(seq, skip_special_tokens=False) |
|
|
txt = txt.replace(' ', '').replace(SPACE, ' ').replace(EOT, '').replace(UNK, '') |
|
|
return txt |
|
|
|