import gradio as gr import tensorflow as tf import sentencepiece as spm import numpy as np from tensorflow import keras from tensorflow.keras import layers import tensorflow_text as tf_text import os import random import tensorflow as tf import numpy as np text_pairs=[ ("Farmers fear that the elephant will destroy the crops","వర్షాలకు చేతికి వచ్చిన పంట దెబ్బతిన్నదని రైతులు వాపోతున్నారు"), ("The death toll in the state stands at 9,863","దీంతో రాష్ట్రంలో ఇప్పటి వరకు మొత్తం డిశ్చార్జ్‌ల సంఖ్య 9,15,626కి చేరింది"), ("Koo is available in Hindi, Kannada, Telugu, Tamil, Bengali, Gujarati and Marathi","ప్రశ్నలతో రూపొందించిన వీడియోలు మాత్రం ఆంగ్లం, హిందీ, మరాఠీ, కన్నడ, గుజరాతీ, బెంగాల్ భాషల్లో చూడోచ్చు" ) , ("How can the court direct the government to do this?","ప్రభుత్వం ఎలా వ్యవహరించి ఉండాల్సింది?" ), ("America is safer today" ,"అమెరికాలో పరిస్థితి రోజురోజుకూ దారుణంగా మారుతోంది" ), ("I don't look into that, to be president" ,"నేను ముఖ్యమంత్రిని కావాలని అనుకోలేదన్నారు" ), ("He had tested positive for coronavirus" ,"కరోనా లక్షణాలు కనిపించడంతో టెస్ట్ చేసుకున్న ఆయనకు పాజిటివ్ గా నిర్దారణ అయ్యింది" ), ("New Delhi: Amid the novel coronavirus situation in the country, locals in Delhi are taking precautionary measures in Delhi","న్యూడిల్లీ: దేశవ్యాప్తంగా కరోనా మహమ్మారి విజృంభిస్తున్న నేపథ్యంలో కేంద్ర ప్రభుత్వం మరింత అప్రమత్తమైంది" ), ("She was rescued yesterday and admitted to a hospital" ,"శనివారం నాడు ఆమె ఆసుపత్రి నుండి డిశ్చార్జ్ అయ్యారు") ] # ----------------------- # 3. Load SentencePiece models in TensorFlow # ----------------------- def load_spm(path): with open(path, "rb") as f: return f.read() spm_model_en = load_spm("spm_en.model") spm_model_te = load_spm("spm_te.model") tokenizer_en = tf_text.SentencepieceTokenizer(model=spm_model_en) tokenizer_te = tf_text.SentencepieceTokenizer(model=spm_model_te) # ----------------------- # 4. Encode text pairs # ----------------------- sequence_length = 50 def encode_source(texts): return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length)) def encode_target(texts): return tokenizer_te.tokenize(texts).to_tensor(shape=(None, sequence_length + 1)) # Convert a batch of token IDs to strings # Example: build dataset english_texts = [pair[0] for pair in text_pairs] telugu_texts = [pair[1] for pair in text_pairs] X = encode_source(tf.constant(english_texts)) Y = encode_target(tf.constant(telugu_texts)) import random for i in range(5): print(random.choice(text_pairs)) len(text_pairs) for idx in range(len(text_pairs)): english ,telugu = text_pairs[i] spanish = "[start] " + telugu + " [end]" text_pairs.append((english, telugu)) class TransformerDecoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.dense_proj = keras.Sequential( [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.supports_masking = True def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "num_heads": self.num_heads, "dense_dim": self.dense_dim, }) return config def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0) return tf.tile(mask, mult) def call(self, inputs, encoder_outputs, mask=None): causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast( mask[:, tf.newaxis, :], dtype="int32") padding_mask = tf.minimum(padding_mask, causal_mask) else: padding_mask = mask attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=causal_mask) attention_output_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=attention_output_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, ) attention_output_2 = self.layernorm_2( attention_output_1 + attention_output_2) proj_output = self.dense_proj(attention_output_2) return self.layernorm_3(attention_output_2 + proj_output) import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers # Define the PositionalEmbedding layer class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=embed_dim ) self.sequence_length = sequence_length self.vocab_size = vocab_size self.embed_dim = embed_dim def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): # Properly handle mask computation within Keras if mask is None: return None return mask def get_config(self): config = super().get_config() config.update({ "sequence_length": self.sequence_length, "vocab_size": self.vocab_size, "embed_dim": self.embed_dim, }) return config # Define the TransformerEncoder layer (example implementation) class TransformerEncoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim ) self.dense_proj = keras.Sequential([ layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim), ]) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() def call(self, inputs, mask=None): if mask is not None: mask = mask[:, tf.newaxis, :] attention_output = self.attention(inputs, inputs, attention_mask=mask) proj_input = self.layernorm_1(inputs + attention_output) proj_output = self.dense_proj(proj_input) return self.layernorm_2(proj_input + proj_output) def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "dense_dim": self.dense_dim, "num_heads": self.num_heads, }) return config import sentencepiece as spm sp_te = spm.SentencePieceProcessor(model_file="spm_te.model") def decode_ids(ids): return sp_te.decode(ids) import tensorflow as tf from tensorflow import keras loss_object = keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction="none" ) def masked_loss(y_true, y_pred): # Normal sparse CE (batch, seq_len) loss_ = loss_object(y_true, y_pred) # Create mask (ignore pad = 0) mask = tf.cast(tf.not_equal(y_true, 0), loss_.dtype) # Apply mask loss_ = loss_ * mask # Return mean only over non-masked tokens return tf.reduce_sum(loss_) / tf.reduce_sum(mask) def masked_accuracy(y_true, y_pred): y_pred = tf.argmax(y_pred, axis=-1, output_type=y_true.dtype) matches = tf.cast(tf.equal(y_true, y_pred), tf.float32) mask = tf.cast(tf.not_equal(y_true, 0), tf.float32) return tf.reduce_sum(matches * mask) / tf.reduce_sum(mask) # Define callbacks transformer = keras.models.load_model( "full_transformer.keras", custom_objects={ "TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding, "TransformerDecoder":TransformerDecoder, "masked_loss":masked_loss, "masked_accuracy":masked_accuracy } ) # Define callbacks transformer = keras.models.load_model( "full_transformer (2).keras", custom_objects={ "TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding, "TransformerDecoder":TransformerDecoder, "masked_loss":masked_loss, "masked_accuracy":masked_accuracy } ) # Define callbacks transformer2 = keras.models.load_model( "full_transformer (1).keras", custom_objects={ "TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding, "TransformerDecoder":TransformerDecoder, "masked_loss":masked_loss, "masked_accuracy":masked_accuracy } ) # Define callbacks transformer3 = keras.models.load_model( "full_transformer.keras", custom_objects={ "TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding, "TransformerDecoder":TransformerDecoder, "masked_loss":masked_loss, "masked_accuracy":masked_accuracy } ) def decode_tokens(token_ids): # token_ids: tf.Tensor shape (seq_len,) token_ids = tf.expand_dims(token_ids, 0) # add batch dim decoded = tokenizer_te.detokenize(token_ids) # returns tf.Tensor of shape (1,) return decoded[0].numpy().decode("utf-8") import tensorflow as tf import numpy as np def encode_source(texts): return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length)) # Modified decode_sequence to return tokens and text # Modified decode_sequence to return tokens and text def decode_sequence(input_sentence, t=transformer3, max_len=50): tokenized_input = encode_source([input_sentence]) # Initialize sequence with start token start_id = tokenizer_te.string_to_id('[start]').numpy() end_id = tokenizer_te.string_to_id('[end]').numpy() seq = [3] for _ in range(max_len): if seq[-1] == end_id: break tgt = tf.expand_dims(seq, 0) predictions = t([tokenized_input, tgt]) # Get probabilities for the last predicted token probs = tf.nn.softmax(predictions[0, len(seq)-1, :]).numpy() next_id = np.argmax(probs) # Select most probable token seq.append(int(next_id)) # Decode sequence to text decoded = tokenizer_te.detokenize(tf.constant([seq])).numpy()[0] decoded_text= decoded.decode("utf-8").replace("[start]", "").replace("[end]", "").strip() return decoded_text, seq max_decoded_sentence_length = 50 # Evaluate some random samples test_eng_texts = [pair[0] for pair in text_pairs] final_pairs = [pair[1] for pair in text_pairs] for _ in range(5): idx = random.randint(0, len(test_eng_texts) - 1) input_sentence = test_eng_texts[idx] decoded_text, _ = decode_sequence(input_sentence, transformer) original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip() idx = random.randint(0, len(test_eng_texts) - 1) input_sentence = test_eng_texts[idx] decoded_text, _ = decode_sequence(input_sentence, transformer3) original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip() # BLEU expects tokenized sentences original_tokens = tokenizer_te.tokenize([original]).numpy()[0] decoded_tokens = tokenizer_te.tokenize([decoded_text]).numpy()[0] print("original tokens:", original_tokens) print("decoded_tokens:", decoded_tokens) print("original:", original) print("decoded:", decoded_text) # Example decoding decoded_text, decoded_seq = decode_sequence("your response to the question is not good you need to improve and this is order not request", transformer3) print("Example decoding:", decoded_text, decoded_seq)