telugu_translator / model.py
hash-map's picture
Update model.py
f29098c verified
import gradio as gr
import tensorflow as tf
import sentencepiece as spm
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_text as tf_text
import os
import random
import tensorflow as tf
import numpy as np
text_pairs=[
("Farmers fear that the elephant will destroy the crops","వర్షాలకు చేతికి వచ్చిన పంట దెబ్బతిన్నదని రైతులు వాపోతున్నారు"),
("The death toll in the state stands at 9,863","దీంతో రాష్ట్రంలో ఇప్పటి వరకు మొత్తం డిశ్చార్జ్‌ల సంఖ్య 9,15,626కి చేరింది"),
("Koo is available in Hindi, Kannada, Telugu, Tamil, Bengali, Gujarati and Marathi","ప్రశ్నలతో రూపొందించిన వీడియోలు మాత్రం ఆంగ్లం, హిందీ, మరాఠీ, కన్నడ, గుజరాతీ, బెంగాల్ భాషల్లో చూడోచ్చు" ) ,
("How can the court direct the government to do this?","ప్రభుత్వం ఎలా వ్యవహరించి ఉండాల్సింది?" ),
("America is safer today" ,"అమెరికాలో పరిస్థితి రోజురోజుకూ దారుణంగా మారుతోంది" ),
("I don't look into that, to be president" ,"నేను ముఖ్యమంత్రిని కావాలని అనుకోలేదన్నారు" ),
("He had tested positive for coronavirus" ,"కరోనా లక్షణాలు కనిపించడంతో టెస్ట్ చేసుకున్న ఆయనకు పాజిటివ్ గా నిర్దారణ అయ్యింది" ),
("New Delhi: Amid the novel coronavirus situation in the country, locals in Delhi are taking precautionary measures in Delhi","న్యూడిల్లీ: దేశవ్యాప్తంగా కరోనా మహమ్మారి విజృంభిస్తున్న నేపథ్యంలో కేంద్ర ప్రభుత్వం మరింత అప్రమత్తమైంది" ),
("She was rescued yesterday and admitted to a hospital" ,"శనివారం నాడు ఆమె ఆసుపత్రి నుండి డిశ్చార్జ్ అయ్యారు")
]
# -----------------------
# 3. Load SentencePiece models in TensorFlow
# -----------------------
def load_spm(path):
with open(path, "rb") as f:
return f.read()
spm_model_en = load_spm("spm_en.model")
spm_model_te = load_spm("spm_te.model")
tokenizer_en = tf_text.SentencepieceTokenizer(model=spm_model_en)
tokenizer_te = tf_text.SentencepieceTokenizer(model=spm_model_te)
# -----------------------
# 4. Encode text pairs
# -----------------------
sequence_length = 50
def encode_source(texts):
return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length))
def encode_target(texts):
return tokenizer_te.tokenize(texts).to_tensor(shape=(None, sequence_length + 1))
# Convert a batch of token IDs to strings
# Example: build dataset
english_texts = [pair[0] for pair in text_pairs]
telugu_texts = [pair[1] for pair in text_pairs]
X = encode_source(tf.constant(english_texts))
Y = encode_target(tf.constant(telugu_texts))
import random
for i in range(5):
print(random.choice(text_pairs))
len(text_pairs)
for idx in range(len(text_pairs)):
english ,telugu = text_pairs[i]
spanish = "[start] " + telugu + " [end]"
text_pairs.append((english, telugu))
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"num_heads": self.num_heads,
"dense_dim": self.dense_dim,
})
return config
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32)], axis=0)
return tf.tile(mask, mult)
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(
mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
else:
padding_mask = mask
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=causal_mask)
attention_output_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=attention_output_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
attention_output_2 = self.layernorm_2(
attention_output_1 + attention_output_2)
proj_output = self.dense_proj(attention_output_2)
return self.layernorm_3(attention_output_2 + proj_output)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Define the PositionalEmbedding layer
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=vocab_size, output_dim=embed_dim
)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=embed_dim
)
self.sequence_length = sequence_length
self.vocab_size = vocab_size
self.embed_dim = embed_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
# Properly handle mask computation within Keras
if mask is None:
return None
return mask
def get_config(self):
config = super().get_config()
config.update({
"sequence_length": self.sequence_length,
"vocab_size": self.vocab_size,
"embed_dim": self.embed_dim,
})
return config
# Define the TransformerEncoder layer (example implementation)
class TransformerEncoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim
)
self.dense_proj = keras.Sequential([
layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),
])
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
def call(self, inputs, mask=None):
if mask is not None:
mask = mask[:, tf.newaxis, :]
attention_output = self.attention(inputs, inputs, attention_mask=mask)
proj_input = self.layernorm_1(inputs + attention_output)
proj_output = self.dense_proj(proj_input)
return self.layernorm_2(proj_input + proj_output)
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"dense_dim": self.dense_dim,
"num_heads": self.num_heads,
})
return config
import sentencepiece as spm
sp_te = spm.SentencePieceProcessor(model_file="spm_te.model")
def decode_ids(ids):
return sp_te.decode(ids)
import tensorflow as tf
from tensorflow import keras
loss_object = keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction="none"
)
def masked_loss(y_true, y_pred):
# Normal sparse CE (batch, seq_len)
loss_ = loss_object(y_true, y_pred)
# Create mask (ignore pad = 0)
mask = tf.cast(tf.not_equal(y_true, 0), loss_.dtype)
# Apply mask
loss_ = loss_ * mask
# Return mean only over non-masked tokens
return tf.reduce_sum(loss_) / tf.reduce_sum(mask)
def masked_accuracy(y_true, y_pred):
y_pred = tf.argmax(y_pred, axis=-1, output_type=y_true.dtype)
matches = tf.cast(tf.equal(y_true, y_pred), tf.float32)
mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
return tf.reduce_sum(matches * mask) / tf.reduce_sum(mask)
# Define callbacks
transformer = keras.models.load_model(
"full_transformer.keras",
custom_objects={
"TransformerEncoder": TransformerEncoder,
"PositionalEmbedding": PositionalEmbedding,
"TransformerDecoder":TransformerDecoder,
"masked_loss":masked_loss,
"masked_accuracy":masked_accuracy
}
)
# Define callbacks
transformer = keras.models.load_model(
"full_transformer (2).keras",
custom_objects={
"TransformerEncoder": TransformerEncoder,
"PositionalEmbedding": PositionalEmbedding,
"TransformerDecoder":TransformerDecoder,
"masked_loss":masked_loss,
"masked_accuracy":masked_accuracy
}
)
# Define callbacks
transformer2 = keras.models.load_model(
"full_transformer (1).keras",
custom_objects={
"TransformerEncoder": TransformerEncoder,
"PositionalEmbedding": PositionalEmbedding,
"TransformerDecoder":TransformerDecoder,
"masked_loss":masked_loss,
"masked_accuracy":masked_accuracy
}
)
# Define callbacks
transformer3 = keras.models.load_model(
"full_transformer.keras",
custom_objects={
"TransformerEncoder": TransformerEncoder,
"PositionalEmbedding": PositionalEmbedding,
"TransformerDecoder":TransformerDecoder,
"masked_loss":masked_loss,
"masked_accuracy":masked_accuracy
}
)
def decode_tokens(token_ids):
# token_ids: tf.Tensor shape (seq_len,)
token_ids = tf.expand_dims(token_ids, 0) # add batch dim
decoded = tokenizer_te.detokenize(token_ids) # returns tf.Tensor of shape (1,)
return decoded[0].numpy().decode("utf-8")
import tensorflow as tf
import numpy as np
def encode_source(texts):
return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length))
# Modified decode_sequence to return tokens and text
# Modified decode_sequence to return tokens and text
def decode_sequence(input_sentence, t=transformer3, max_len=50):
tokenized_input = encode_source([input_sentence])
# Initialize sequence with start token
start_id = tokenizer_te.string_to_id('[start]').numpy()
end_id = tokenizer_te.string_to_id('[end]').numpy()
seq = [3]
for _ in range(max_len):
if seq[-1] == end_id:
break
tgt = tf.expand_dims(seq, 0)
predictions = t([tokenized_input, tgt])
# Get probabilities for the last predicted token
probs = tf.nn.softmax(predictions[0, len(seq)-1, :]).numpy()
next_id = np.argmax(probs) # Select most probable token
seq.append(int(next_id))
# Decode sequence to text
decoded = tokenizer_te.detokenize(tf.constant([seq])).numpy()[0]
decoded_text= decoded.decode("utf-8").replace("[start]", "").replace("[end]", "").strip()
return decoded_text, seq
max_decoded_sentence_length = 50
# Evaluate some random samples
test_eng_texts = [pair[0] for pair in text_pairs]
final_pairs = [pair[1] for pair in text_pairs]
for _ in range(5):
idx = random.randint(0, len(test_eng_texts) - 1)
input_sentence = test_eng_texts[idx]
decoded_text, _ = decode_sequence(input_sentence, transformer)
original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip()
idx = random.randint(0, len(test_eng_texts) - 1)
input_sentence = test_eng_texts[idx]
decoded_text, _ = decode_sequence(input_sentence, transformer3)
original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip()
# BLEU expects tokenized sentences
original_tokens = tokenizer_te.tokenize([original]).numpy()[0]
decoded_tokens = tokenizer_te.tokenize([decoded_text]).numpy()[0]
print("original tokens:", original_tokens)
print("decoded_tokens:", decoded_tokens)
print("original:", original)
print("decoded:", decoded_text)
# Example decoding
decoded_text, decoded_seq = decode_sequence("your response to the question is not good you need to improve and this is order not request", transformer3)
print("Example decoding:", decoded_text, decoded_seq)