Spaces:
Sleeping
Sleeping
| """流程圖 | |
| 讀取資料 → 分割資料 → 編碼 → 建立 Dataset / DataLoader | |
| ↓ | |
| 建立模型(BERT+LSTM+CNN) | |
| ↓ | |
| BERT 輸出 [batch, seq_len, 768] | |
| ↓ | |
| BiLSTM [batch, seq_len, hidden_dim*2] | |
| ↓ | |
| CNN 模組 (Conv1D + Dropout + GlobalMaxPooling1D) | |
| ↓ | |
| Linear 分類器(輸出詐騙機率) | |
| ↓ | |
| 訓練模型(Epochs) | |
| ↓ | |
| 評估模型(Accuracy / F1 / Precision / Recall) | |
| ↓ | |
| 儲存模型(.pth) | |
| """#引入重要套件Import Library | |
| import torch # PyTorch 主模組 | |
| import torch.nn as nn # 神經網路相關的層(例如 LSTM、Linear) | |
| import torch.nn.functional as F # 提供純函式版的操作方法,像是 F.relu()、F.cross_entropy(),通常不帶參數、不自動建立權重 | |
| import numpy as np | |
| import pandas as pd | |
| import os | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:16"#讓 CUDA 使用「更小記憶體分配塊」的方法,能有效減少 OOM 錯誤。 | |
| import re | |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score | |
| from tqdm import tqdm | |
| from sklearn.model_selection import train_test_split | |
| from torch.utils.data import DataLoader, Dataset # 提供 Dataset、DataLoader 類別 | |
| from transformers import BertTokenizer | |
| from sklearn.model_selection import train_test_split | |
| from transformers import BertModel | |
| #BertTokenizer 把文字句子轉換成 BERT 格式的 token ID,例如 [CLS] 今天 天氣 不錯 [SEP] → [101, 1234, 5678, ...] | |
| ##BertForSequenceClassification 是 Hugging Face 提供的一個完整 BERT 模型,接了分類用的 Linear 層,讓你直接拿來做分類任務(例如詐騙 vs 正常) | |
| data_file = "NorANDScamInfo_data1.csv" | |
| #正常訊息資料集在這新增 | |
| normal_files = [data_file] | |
| #詐騙訊息資料集在這新增 | |
| scam_files = [data_file] | |
| #資料前處理 | |
| class BertPreprocessor: | |
| def __init__(self, tokenizer_name="ckiplab/bert-base-chinese", max_len=128): | |
| self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name) | |
| self.max_len = max_len | |
| def load_and_clean(self, filepath): | |
| #載入 CSV 並清理 message 欄位。 | |
| df = pd.read_csv(filepath) | |
| df = df.dropna().drop_duplicates().reset_index(drop=True) | |
| # 文字清理:移除空白、保留中文英數與標點 | |
| df["message"] = df["message"].astype(str) | |
| df["message"] = df["message"].apply(lambda text: re.sub(r"\s+", "", text)) | |
| df["message"] = df["message"].apply(lambda text: re.sub(r"[^\u4e00-\u9fffA-Za-z0-9。,!?]", "", text)) | |
| return df[["message", "label"]] # 保留必要欄位 | |
| def encode(self, messages): | |
| #使用 HuggingFace BERT Tokenizer 將訊息編碼成模型輸入格式。 | |
| return self.tokenizer( | |
| list(messages), | |
| return_tensors="pt", | |
| truncation=True, | |
| padding="max_length", | |
| max_length=self.max_len | |
| ) | |
| #自動做資料前處理 | |
| def build_bert_inputs(normal_files, scam_files): | |
| #將正常與詐騙資料分別指定 label,統一清理、編碼,回傳模型可用的 input tensors 與 labels。 | |
| processor = BertPreprocessor() | |
| dfs = [] | |
| # 合併正常 + 詐騙檔案清單 | |
| all_files = normal_files + scam_files | |
| for filepath in all_files: | |
| df = processor.load_and_clean(filepath) | |
| dfs.append(df) | |
| # 合併所有資料。在資料清理過程中dropna():刪除有空值的列,drop_duplicates():刪除重複列,filter()或df[...]做條件過濾,concat():將多個 DataFrame合併 | |
| # 這些操作不會自動重排索引,造成索引亂掉。 | |
| # 合併後統一編號(常見於多筆資料合併)all_df = pd.concat(dfs, 關鍵-->ignore_index=True) | |
| all_df = pd.concat(dfs, ignore_index=True) | |
| #製作 train/val 資料集 | |
| train_texts, val_texts, train_labels, val_labels = train_test_split( | |
| all_df["message"], all_df["label"], | |
| stratify=all_df["label"], | |
| test_size=0.2, | |
| random_state=25, | |
| shuffle=True | |
| ) | |
| # 進行 BERT tokenizer 編碼 | |
| train_inputs = processor.encode(train_texts) | |
| val_inputs = processor.encode(val_texts) | |
| return train_inputs, train_labels, val_inputs, val_labels, processor | |
| #AUTO YA~以for迴圈自動新增個別變數內,build_bert_inputs能自動擷取新增資料 | |
| normal_files_labels = [normal for normal in normal_files] | |
| scam_files_labels = [scam for scam in scam_files] | |
| #print(bert_inputs.keys()) | |
| #定義 PyTorch Dataset 類別 | |
| class ScamDataset(Dataset): | |
| def __init__(self, inputs, labels): | |
| self.input_ids = inputs["input_ids"] # input_ids:句子的 token ID; attention_mask:注意力遮罩(0 = padding) | |
| self.attention_mask = inputs["attention_mask"] # token_type_ids:句子的 segment 區分 | |
| self.token_type_ids = inputs["token_type_ids"] # torch.tensor(x, dtype=...)將資料(x)轉為Tensor的標準做法。 | |
| self.labels = torch.tensor(labels.values, dtype=torch.float32) # x可以是 list、NumPy array、pandas series... | |
| # dtypefloat32:浮點數(常用於 回歸 或 BCELoss 二分類);long:整數(常用於 多分類 搭配 CrossEntropyLoss)。labels.values → 轉為 NumPy array | |
| def __len__(self): # 告訴 PyTorch 這個 Dataset 有幾筆資料 | |
| return len(self.labels) # 給 len(dataset) 或 for i in range(len(dataset)) 用的 | |
| def __getitem__(self, idx): #回傳第 idx 筆資料(會自動在訓練中一筆筆抓) | |
| return { #DataLoader 每次會呼叫這個方法多次來抓一個 batch 的資料 | |
| "input_ids":self.input_ids[idx], | |
| "attention_mask":self.attention_mask[idx], | |
| "token_type_ids":self.token_type_ids[idx], | |
| "labels":self.labels[idx] | |
| } | |
| # 這樣可以同時處理 scam 和 normal 資料,不用重複寫清理與 token 處理 | |
| train_inputs, train_labels, val_inputs, val_labels, processor = build_bert_inputs(normal_files, scam_files) | |
| train_dataset = ScamDataset(train_inputs, train_labels) | |
| val_dataset = ScamDataset(val_inputs, val_labels) | |
| train_loader = DataLoader(train_dataset, batch_size=8) | |
| val_loader = DataLoader(val_dataset, batch_size=8) | |
| #模型 | |
| class BertLSTM_CNN_Classifier(nn.Module): | |
| def __init__(self, hidden_dim=128, num_layers=1, dropout=0.3): | |
| super(BertLSTM_CNN_Classifier, self).__init__() | |
| self.bert = BertModel.from_pretrained("ckiplab/bert-base-chinese") #載入預訓練 BERT 模型(ckiplab 中文版) | |
| # LSTM 接在 BERT 的 token 輸出後(輸入是768維) | |
| self.LSTM = nn.LSTM(input_size=768, # 把 BERT 的 token 序列再交給雙向 LSTM 做時間序列建模 | |
| hidden_size=hidden_dim, | |
| num_layers=num_layers, | |
| batch_first=True, | |
| bidirectional=True) | |
| # CNN 模組:接在 LSTM 後的輸出上 | |
| self.conv1 = nn.Conv1d(in_channels=hidden_dim*2, | |
| out_channels=128, | |
| kernel_size=3, | |
| padding=1) | |
| self.dropout = nn.Dropout(dropout) | |
| self.global_maxpool = nn.AdaptiveAvgPool1d(1) # 等效於 GlobalMaxPooling1D | |
| self.classifier = nn.Linear(128,1) | |
| def forward(self, input_ids, attention_mask, token_type_ids): | |
| outputs = self.bert(input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| token_type_ids=token_type_ids) | |
| hidden_states = outputs.last_hidden_state # [batch, seq_len, 768] | |
| LSTM_out, _ = self.LSTM(hidden_states) # [batch, seq_len, hidden_dim*2] | |
| LSTM_out = LSTM_out.transpose(1, 2) # [batch, hidden_dim*2, seq_len] | |
| x = self.conv1(LSTM_out) # [batch, 128, seq_len] | |
| x = self.dropout(x) | |
| x = self.global_maxpool(x).squeeze(2) # [batch, 128] | |
| logits = self.classifier(x) | |
| return torch.sigmoid(logits).view(-1) # 👈 修正這行 | |
| # 設定 GPU 裝置 | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # 設定使用的最大執行緒數(視 CPU 而定) | |
| torch.set_num_threads(8) # 建議設成你系統的實體核心數 | |
| # 初始化模型 | |
| model = BertLSTM_CNN_Classifier().to(device) | |
| # 定義 optimizer 和損失函數 | |
| optimizer = torch.optim.Adam(model.parameters(),lr=2e-5) | |
| criterion = nn.BCELoss() | |
| # 訓練迴圈 | |
| if __name__ == "__main__": | |
| if os.path.exists("model.pth"): | |
| print("✅ 已找到 model.pth,載入模型跳過訓練") | |
| model.load_state_dict(torch.load("model.pth", map_location=device)) | |
| else: | |
| print("🚀 未找到 model.pth,開始訓練模型...") | |
| num_epochs = 10 | |
| for epoch in range(num_epochs): | |
| model.train() | |
| total_loss = 0.0 | |
| for batch in train_loader: | |
| optimizer.zero_grad() | |
| input_ids = batch["input_ids"].to(device) | |
| attention_mask = batch["attention_mask"].to(device) | |
| token_type_ids = batch["token_type_ids"].to(device) | |
| labels = batch["labels"].to(device) | |
| outputs = model(input_ids, attention_mask, token_type_ids) | |
| loss = criterion(outputs, labels) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| print(f"[Epoch{epoch+1}]Training Loss:{total_loss:.4f}") | |
| torch.save(model.state_dict(), "model.pth")# 儲存模型權重 | |
| print("✅ 模型訓練完成並儲存為 model.pth") | |