Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tensorflow as tf | |
| import sentencepiece as spm | |
| import numpy as np | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| import tensorflow_text as tf_text | |
| import os | |
| import random | |
| import tensorflow as tf | |
| import numpy as np | |
| text_pairs=[ | |
| ("Farmers fear that the elephant will destroy the crops","వర్షాలకు చేతికి వచ్చిన పంట దెబ్బతిన్నదని రైతులు వాపోతున్నారు"), | |
| ("The death toll in the state stands at 9,863","దీంతో రాష్ట్రంలో ఇప్పటి వరకు మొత్తం డిశ్చార్జ్ల సంఖ్య 9,15,626కి చేరింది"), | |
| ("Koo is available in Hindi, Kannada, Telugu, Tamil, Bengali, Gujarati and Marathi","ప్రశ్నలతో రూపొందించిన వీడియోలు మాత్రం ఆంగ్లం, హిందీ, మరాఠీ, కన్నడ, గుజరాతీ, బెంగాల్ భాషల్లో చూడోచ్చు" ) , | |
| ("How can the court direct the government to do this?","ప్రభుత్వం ఎలా వ్యవహరించి ఉండాల్సింది?" ), | |
| ("America is safer today" ,"అమెరికాలో పరిస్థితి రోజురోజుకూ దారుణంగా మారుతోంది" ), | |
| ("I don't look into that, to be president" ,"నేను ముఖ్యమంత్రిని కావాలని అనుకోలేదన్నారు" ), | |
| ("He had tested positive for coronavirus" ,"కరోనా లక్షణాలు కనిపించడంతో టెస్ట్ చేసుకున్న ఆయనకు పాజిటివ్ గా నిర్దారణ అయ్యింది" ), | |
| ("New Delhi: Amid the novel coronavirus situation in the country, locals in Delhi are taking precautionary measures in Delhi","న్యూడిల్లీ: దేశవ్యాప్తంగా కరోనా మహమ్మారి విజృంభిస్తున్న నేపథ్యంలో కేంద్ర ప్రభుత్వం మరింత అప్రమత్తమైంది" ), | |
| ("She was rescued yesterday and admitted to a hospital" ,"శనివారం నాడు ఆమె ఆసుపత్రి నుండి డిశ్చార్జ్ అయ్యారు") | |
| ] | |
| # ----------------------- | |
| # 3. Load SentencePiece models in TensorFlow | |
| # ----------------------- | |
| def load_spm(path): | |
| with open(path, "rb") as f: | |
| return f.read() | |
| spm_model_en = load_spm("spm_en.model") | |
| spm_model_te = load_spm("spm_te.model") | |
| tokenizer_en = tf_text.SentencepieceTokenizer(model=spm_model_en) | |
| tokenizer_te = tf_text.SentencepieceTokenizer(model=spm_model_te) | |
| # ----------------------- | |
| # 4. Encode text pairs | |
| # ----------------------- | |
| sequence_length = 50 | |
| def encode_source(texts): | |
| return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length)) | |
| def encode_target(texts): | |
| return tokenizer_te.tokenize(texts).to_tensor(shape=(None, sequence_length + 1)) | |
| # Convert a batch of token IDs to strings | |
| # Example: build dataset | |
| english_texts = [pair[0] for pair in text_pairs] | |
| telugu_texts = [pair[1] for pair in text_pairs] | |
| X = encode_source(tf.constant(english_texts)) | |
| Y = encode_target(tf.constant(telugu_texts)) | |
| import random | |
| for i in range(5): | |
| print(random.choice(text_pairs)) | |
| len(text_pairs) | |
| for idx in range(len(text_pairs)): | |
| english ,telugu = text_pairs[i] | |
| spanish = "[start] " + telugu + " [end]" | |
| text_pairs.append((english, telugu)) | |
| class TransformerDecoder(layers.Layer): | |
| def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): | |
| super().__init__(**kwargs) | |
| self.embed_dim = embed_dim | |
| self.dense_dim = dense_dim | |
| self.num_heads = num_heads | |
| self.attention_1 = layers.MultiHeadAttention( | |
| num_heads=num_heads, key_dim=embed_dim) | |
| self.attention_2 = layers.MultiHeadAttention( | |
| num_heads=num_heads, key_dim=embed_dim) | |
| self.dense_proj = keras.Sequential( | |
| [layers.Dense(dense_dim, activation="relu"), | |
| layers.Dense(embed_dim),] | |
| ) | |
| self.layernorm_1 = layers.LayerNormalization() | |
| self.layernorm_2 = layers.LayerNormalization() | |
| self.layernorm_3 = layers.LayerNormalization() | |
| self.supports_masking = True | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "embed_dim": self.embed_dim, | |
| "num_heads": self.num_heads, | |
| "dense_dim": self.dense_dim, | |
| }) | |
| return config | |
| def get_causal_attention_mask(self, inputs): | |
| input_shape = tf.shape(inputs) | |
| batch_size, sequence_length = input_shape[0], input_shape[1] | |
| i = tf.range(sequence_length)[:, tf.newaxis] | |
| j = tf.range(sequence_length) | |
| mask = tf.cast(i >= j, dtype="int32") | |
| mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) | |
| mult = tf.concat( | |
| [tf.expand_dims(batch_size, -1), | |
| tf.constant([1, 1], dtype=tf.int32)], axis=0) | |
| return tf.tile(mask, mult) | |
| def call(self, inputs, encoder_outputs, mask=None): | |
| causal_mask = self.get_causal_attention_mask(inputs) | |
| if mask is not None: | |
| padding_mask = tf.cast( | |
| mask[:, tf.newaxis, :], dtype="int32") | |
| padding_mask = tf.minimum(padding_mask, causal_mask) | |
| else: | |
| padding_mask = mask | |
| attention_output_1 = self.attention_1( | |
| query=inputs, | |
| value=inputs, | |
| key=inputs, | |
| attention_mask=causal_mask) | |
| attention_output_1 = self.layernorm_1(inputs + attention_output_1) | |
| attention_output_2 = self.attention_2( | |
| query=attention_output_1, | |
| value=encoder_outputs, | |
| key=encoder_outputs, | |
| attention_mask=padding_mask, | |
| ) | |
| attention_output_2 = self.layernorm_2( | |
| attention_output_1 + attention_output_2) | |
| proj_output = self.dense_proj(attention_output_2) | |
| return self.layernorm_3(attention_output_2 + proj_output) | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| # Define the PositionalEmbedding layer | |
| class PositionalEmbedding(layers.Layer): | |
| def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): | |
| super().__init__(**kwargs) | |
| self.token_embeddings = layers.Embedding( | |
| input_dim=vocab_size, output_dim=embed_dim | |
| ) | |
| self.position_embeddings = layers.Embedding( | |
| input_dim=sequence_length, output_dim=embed_dim | |
| ) | |
| self.sequence_length = sequence_length | |
| self.vocab_size = vocab_size | |
| self.embed_dim = embed_dim | |
| def call(self, inputs): | |
| length = tf.shape(inputs)[-1] | |
| positions = tf.range(start=0, limit=length, delta=1) | |
| embedded_tokens = self.token_embeddings(inputs) | |
| embedded_positions = self.position_embeddings(positions) | |
| return embedded_tokens + embedded_positions | |
| def compute_mask(self, inputs, mask=None): | |
| # Properly handle mask computation within Keras | |
| if mask is None: | |
| return None | |
| return mask | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "sequence_length": self.sequence_length, | |
| "vocab_size": self.vocab_size, | |
| "embed_dim": self.embed_dim, | |
| }) | |
| return config | |
| # Define the TransformerEncoder layer (example implementation) | |
| class TransformerEncoder(layers.Layer): | |
| def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): | |
| super().__init__(**kwargs) | |
| self.embed_dim = embed_dim | |
| self.dense_dim = dense_dim | |
| self.num_heads = num_heads | |
| self.attention = layers.MultiHeadAttention( | |
| num_heads=num_heads, key_dim=embed_dim | |
| ) | |
| self.dense_proj = keras.Sequential([ | |
| layers.Dense(dense_dim, activation="relu"), | |
| layers.Dense(embed_dim), | |
| ]) | |
| self.layernorm_1 = layers.LayerNormalization() | |
| self.layernorm_2 = layers.LayerNormalization() | |
| def call(self, inputs, mask=None): | |
| if mask is not None: | |
| mask = mask[:, tf.newaxis, :] | |
| attention_output = self.attention(inputs, inputs, attention_mask=mask) | |
| proj_input = self.layernorm_1(inputs + attention_output) | |
| proj_output = self.dense_proj(proj_input) | |
| return self.layernorm_2(proj_input + proj_output) | |
| def get_config(self): | |
| config = super().get_config() | |
| config.update({ | |
| "embed_dim": self.embed_dim, | |
| "dense_dim": self.dense_dim, | |
| "num_heads": self.num_heads, | |
| }) | |
| return config | |
| import sentencepiece as spm | |
| sp_te = spm.SentencePieceProcessor(model_file="spm_te.model") | |
| def decode_ids(ids): | |
| return sp_te.decode(ids) | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| loss_object = keras.losses.SparseCategoricalCrossentropy( | |
| from_logits=True, reduction="none" | |
| ) | |
| def masked_loss(y_true, y_pred): | |
| # Normal sparse CE (batch, seq_len) | |
| loss_ = loss_object(y_true, y_pred) | |
| # Create mask (ignore pad = 0) | |
| mask = tf.cast(tf.not_equal(y_true, 0), loss_.dtype) | |
| # Apply mask | |
| loss_ = loss_ * mask | |
| # Return mean only over non-masked tokens | |
| return tf.reduce_sum(loss_) / tf.reduce_sum(mask) | |
| def masked_accuracy(y_true, y_pred): | |
| y_pred = tf.argmax(y_pred, axis=-1, output_type=y_true.dtype) | |
| matches = tf.cast(tf.equal(y_true, y_pred), tf.float32) | |
| mask = tf.cast(tf.not_equal(y_true, 0), tf.float32) | |
| return tf.reduce_sum(matches * mask) / tf.reduce_sum(mask) | |
| # Define callbacks | |
| transformer = keras.models.load_model( | |
| "full_transformer.keras", | |
| custom_objects={ | |
| "TransformerEncoder": TransformerEncoder, | |
| "PositionalEmbedding": PositionalEmbedding, | |
| "TransformerDecoder":TransformerDecoder, | |
| "masked_loss":masked_loss, | |
| "masked_accuracy":masked_accuracy | |
| } | |
| ) | |
| # Define callbacks | |
| transformer = keras.models.load_model( | |
| "full_transformer (2).keras", | |
| custom_objects={ | |
| "TransformerEncoder": TransformerEncoder, | |
| "PositionalEmbedding": PositionalEmbedding, | |
| "TransformerDecoder":TransformerDecoder, | |
| "masked_loss":masked_loss, | |
| "masked_accuracy":masked_accuracy | |
| } | |
| ) | |
| # Define callbacks | |
| transformer2 = keras.models.load_model( | |
| "full_transformer (1).keras", | |
| custom_objects={ | |
| "TransformerEncoder": TransformerEncoder, | |
| "PositionalEmbedding": PositionalEmbedding, | |
| "TransformerDecoder":TransformerDecoder, | |
| "masked_loss":masked_loss, | |
| "masked_accuracy":masked_accuracy | |
| } | |
| ) | |
| # Define callbacks | |
| transformer3 = keras.models.load_model( | |
| "full_transformer.keras", | |
| custom_objects={ | |
| "TransformerEncoder": TransformerEncoder, | |
| "PositionalEmbedding": PositionalEmbedding, | |
| "TransformerDecoder":TransformerDecoder, | |
| "masked_loss":masked_loss, | |
| "masked_accuracy":masked_accuracy | |
| } | |
| ) | |
| def decode_tokens(token_ids): | |
| # token_ids: tf.Tensor shape (seq_len,) | |
| token_ids = tf.expand_dims(token_ids, 0) # add batch dim | |
| decoded = tokenizer_te.detokenize(token_ids) # returns tf.Tensor of shape (1,) | |
| return decoded[0].numpy().decode("utf-8") | |
| import tensorflow as tf | |
| import numpy as np | |
| def encode_source(texts): | |
| return tokenizer_en.tokenize(texts).to_tensor(shape=(None, sequence_length)) | |
| # Modified decode_sequence to return tokens and text | |
| # Modified decode_sequence to return tokens and text | |
| def decode_sequence(input_sentence, t=transformer3, max_len=50): | |
| tokenized_input = encode_source([input_sentence]) | |
| # Initialize sequence with start token | |
| start_id = tokenizer_te.string_to_id('[start]').numpy() | |
| end_id = tokenizer_te.string_to_id('[end]').numpy() | |
| seq = [3] | |
| for _ in range(max_len): | |
| if seq[-1] == end_id: | |
| break | |
| tgt = tf.expand_dims(seq, 0) | |
| predictions = t([tokenized_input, tgt]) | |
| # Get probabilities for the last predicted token | |
| probs = tf.nn.softmax(predictions[0, len(seq)-1, :]).numpy() | |
| next_id = np.argmax(probs) # Select most probable token | |
| seq.append(int(next_id)) | |
| # Decode sequence to text | |
| decoded = tokenizer_te.detokenize(tf.constant([seq])).numpy()[0] | |
| decoded_text= decoded.decode("utf-8").replace("[start]", "").replace("[end]", "").strip() | |
| return decoded_text, seq | |
| max_decoded_sentence_length = 50 | |
| # Evaluate some random samples | |
| test_eng_texts = [pair[0] for pair in text_pairs] | |
| final_pairs = [pair[1] for pair in text_pairs] | |
| for _ in range(5): | |
| idx = random.randint(0, len(test_eng_texts) - 1) | |
| input_sentence = test_eng_texts[idx] | |
| decoded_text, _ = decode_sequence(input_sentence, transformer) | |
| original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip() | |
| idx = random.randint(0, len(test_eng_texts) - 1) | |
| input_sentence = test_eng_texts[idx] | |
| decoded_text, _ = decode_sequence(input_sentence, transformer3) | |
| original = final_pairs[idx].replace("[start]", "").replace("[end]", "").strip() | |
| # BLEU expects tokenized sentences | |
| original_tokens = tokenizer_te.tokenize([original]).numpy()[0] | |
| decoded_tokens = tokenizer_te.tokenize([decoded_text]).numpy()[0] | |
| print("original tokens:", original_tokens) | |
| print("decoded_tokens:", decoded_tokens) | |
| print("original:", original) | |
| print("decoded:", decoded_text) | |
| # Example decoding | |
| decoded_text, decoded_seq = decode_sequence("your response to the question is not good you need to improve and this is order not request", transformer3) | |
| print("Example decoding:", decoded_text, decoded_seq) |