Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import gradio as gr | |
| from torch.utils.data import Dataset | |
| from transformers import PreTrainedModel, PretrainedConfig, Trainer, TrainingArguments | |
| from datasets import load_dataset | |
| import numpy as np | |
| # ===================== | |
| # 1. Load Dataset Subsets | |
| # ===================== | |
| dataset = load_dataset("bashyaldhiraj2067/500k_copy_error_dataset") | |
| train_subset = dataset["train"].select(range(int(len(dataset["train"]) * 0.1))) | |
| test_subset = dataset["test"].select(range(int(len(dataset["test"]) * 0.1))) | |
| print(f"Subset train size: {len(train_subset)}") | |
| print(f"Subset test size: {len(test_subset)}") | |
| # ===================== | |
| # 2. Tokenizer | |
| # ===================== | |
| special_tokens = ["<pad>", "<s>", "</s>", "<unk>"] | |
| nepali_chars = list("अआइईउऊऋॠऌॡऎएऐओऔकखगघङचछजझञटठडढणतथदधनपफबभमयरलवशषसह्ािीुूृॄेैोौंंःँ।०१२३४५६७८९,.;?!़ॅंःॊॅऒऽॉड़ॐ॥ऑऱफ़ढ़") | |
| char_vocab = special_tokens + nepali_chars | |
| char2id = {char: idx for idx, char in enumerate(char_vocab)} | |
| id2char = {idx: char for char, idx in char2id.items()} | |
| vocab_size = len(char2id) | |
| class CharTokenizer: | |
| def __init__(self, char2id, id2char, vocab_size): | |
| self.char2id = char2id | |
| self.id2char = id2char | |
| self.pad_token_id = char2id["<pad>"] | |
| self.unk_token_id = char2id["<unk>"] | |
| self.bos_token_id = char2id["<s>"] | |
| self.eos_token_id = char2id["</s>"] | |
| self.vocab_size = vocab_size | |
| def encode(self, text, max_length=128): | |
| ids = [self.char2id.get(ch, self.unk_token_id) for ch in text] | |
| ids = ids[:max_length] | |
| return ids + [self.pad_token_id] * (max_length - len(ids)) | |
| def decode(self, ids): | |
| return ''.join([self.id2char.get(i, '') for i in ids if i != self.pad_token_id]) | |
| def __call__(self, text, text_target=None, max_length=128): | |
| input_ids = self.encode(text, max_length) | |
| input_ids = torch.clamp(torch.tensor(input_ids), max=self.vocab_size - 1).tolist() | |
| result = {"input_ids": input_ids, "attention_mask": [1 if i != self.pad_token_id else 0 for i in input_ids]} | |
| if text_target: | |
| labels = self.encode(text_target, max_length) | |
| result["labels"] = labels | |
| return result | |
| tokenizer = CharTokenizer(char2id, id2char, vocab_size=vocab_size) | |
| # ===================== | |
| # 3. Dataset | |
| # ===================== | |
| class CopyDataset(Dataset): | |
| def __init__(self, data, tokenizer, max_length=128): | |
| self.data = data | |
| self.tokenizer = tokenizer | |
| self.max_length = max_length | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| noisy = self.data[idx]['incorrect'] | |
| clean = self.data[idx]['correct'] | |
| return self.tokenizer(noisy, text_target=clean, max_length=self.max_length) | |
| train_dataset = CopyDataset(train_subset, tokenizer) | |
| eval_dataset = CopyDataset(test_subset, tokenizer) | |
| # ===================== | |
| # 4. Transformer with Copy Mechanism | |
| # ===================== | |
| class TransformerCopyConfig(PretrainedConfig): | |
| def __init__(self, vocab_size=len(char2id), **kwargs): | |
| super().__init__(**kwargs) | |
| self.vocab_size = vocab_size | |
| # --- Model Components --- | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, max_len=512): | |
| super().__init__() | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| self.register_buffer('pe', pe.unsqueeze(0)) | |
| def forward(self, x): | |
| return x + self.pe[:, :x.size(1)] | |
| class TransformerCopyModel(nn.Module): | |
| def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dim_ff=512, dropout=0.1): | |
| super().__init__() | |
| self.embedding = nn.Embedding(vocab_size, d_model) | |
| self.positional_encoding = PositionalEncoding(d_model) | |
| encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_ff, dropout) | |
| decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_ff, dropout) | |
| self.encoder = nn.TransformerEncoder(encoder_layer, num_layers) | |
| self.decoder = nn.TransformerDecoder(decoder_layer, num_layers) | |
| self.copy_attention = nn.MultiheadAttention(d_model, nhead, dropout=dropout) | |
| self.copy_gate = nn.Linear(d_model * 2, 1) | |
| self.output_layer = nn.Linear(d_model, vocab_size) | |
| def forward(self, input_ids, attention_mask=None, labels=None): | |
| src = input_ids | |
| tgt = labels[:, :-1] | |
| tgt_y = labels[:, 1:] | |
| src_embed = self.embedding(src) | |
| tgt_embed = self.embedding(tgt) | |
| src_embed = self.positional_encoding(src_embed) | |
| tgt_embed = self.positional_encoding(tgt_embed) | |
| src_mask = (src == tokenizer.pad_token_id) | |
| tgt_mask = (tgt == tokenizer.pad_token_id) | |
| memory = self.encoder(src_embed.transpose(0, 1), src_key_padding_mask=src_mask) | |
| output = self.decoder( | |
| tgt_embed.transpose(0, 1), | |
| memory, | |
| tgt_key_padding_mask=tgt_mask, | |
| memory_key_padding_mask=src_mask | |
| ) | |
| attn_output, attn_weights = self.copy_attention(output, memory, memory, key_padding_mask=src_mask) | |
| concat = torch.cat([output, attn_output], dim=-1) | |
| copy_prob = torch.sigmoid(self.copy_gate(concat)) | |
| gen_logits = self.output_layer(output) | |
| gen_probs = F.softmax(gen_logits, dim=-1) | |
| loss = F.cross_entropy( | |
| gen_logits.transpose(0, 1).reshape(-1, gen_logits.size(-1)), | |
| tgt_y.reshape(-1), | |
| ignore_index=tokenizer.pad_token_id | |
| ) if labels is not None else None | |
| return {"loss": loss, "logits": gen_logits.transpose(0, 1)} | |
| # --- HF Wrapper --- | |
| class TransformerCopyHF(PreTrainedModel): | |
| config_class = TransformerCopyConfig | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.model = TransformerCopyModel(config.vocab_size) | |
| def forward(self, input_ids, attention_mask=None, labels=None): | |
| return self.model(input_ids, attention_mask, labels) | |
| model = TransformerCopyHF.from_pretrained("bashyaldhiraj2067/remove1_copy_transformer") | |
| model.eval() | |
| # ===================== | |
| # 5. Inference Function | |
| # ===================== | |
| def generate_clean_text(input_text, max_length=128): | |
| model_input = tokenizer.encode(input_text, max_length=max_length) | |
| input_ids = torch.tensor([model_input]) | |
| # Create dummy target input (just start token) | |
| decoder_input = torch.tensor([[tokenizer.bos_token_id]]) | |
| output_tokens = [] | |
| for _ in range(max_length): | |
| with torch.no_grad(): | |
| out = model(input_ids=input_ids, labels=torch.cat([decoder_input, torch.zeros((1, 1), dtype=torch.long)], dim=1)) | |
| next_token_logits = out["logits"][:, -1, :] | |
| next_token = torch.argmax(next_token_logits, dim=-1) | |
| next_token_id = next_token.item() | |
| if next_token_id == tokenizer.pad_token_id: | |
| break | |
| output_tokens.append(next_token_id) | |
| decoder_input = torch.cat([decoder_input, next_token.unsqueeze(0)], dim=1) | |
| return tokenizer.decode(output_tokens) | |
| # Gradio Interface Setup | |
| iface = gr.Interface( | |
| fn=generate_clean_text, | |
| inputs=gr.Textbox(label="Noisy Text"), | |
| outputs=gr.Textbox(label="Cleaned Text"), | |
| live=True | |
| ) | |
| iface.launch(debug=True) | |