Spaces:
Sleeping
Sleeping
File size: 9,829 Bytes
f3c720e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
"""流程圖
讀取資料 → 分割資料 → 編碼 → 建立 Dataset / DataLoader
↓
建立模型(BERT+LSTM+CNN)
↓
BERT 輸出 [batch, seq_len, 768]
↓
BiLSTM [batch, seq_len, hidden_dim*2]
↓
CNN 模組 (Conv1D + Dropout + GlobalMaxPooling1D)
↓
Linear 分類器(輸出詐騙機率)
↓
訓練模型(Epochs)
↓
評估模型(Accuracy / F1 / Precision / Recall)
↓
儲存模型(.pth)
"""#引入重要套件Import Library
import torch # PyTorch 主模組
import torch.nn as nn # 神經網路相關的層(例如 LSTM、Linear)
import torch.nn.functional as F # 提供純函式版的操作方法,像是 F.relu()、F.cross_entropy(),通常不帶參數、不自動建立權重
import numpy as np
import pandas as pd
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:16"#讓 CUDA 使用「更小記憶體分配塊」的方法,能有效減少 OOM 錯誤。
import re
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset # 提供 Dataset、DataLoader 類別
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
from transformers import BertModel
#BertTokenizer 把文字句子轉換成 BERT 格式的 token ID,例如 [CLS] 今天 天氣 不錯 [SEP] → [101, 1234, 5678, ...]
##BertForSequenceClassification 是 Hugging Face 提供的一個完整 BERT 模型,接了分類用的 Linear 層,讓你直接拿來做分類任務(例如詐騙 vs 正常)
data_file = "NorANDScamInfo_data1.csv"
#正常訊息資料集在這新增
normal_files = [data_file]
#詐騙訊息資料集在這新增
scam_files = [data_file]
#資料前處理
class BertPreprocessor:
def __init__(self, tokenizer_name="ckiplab/bert-base-chinese", max_len=128):
self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name)
self.max_len = max_len
def load_and_clean(self, filepath):
#載入 CSV 並清理 message 欄位。
df = pd.read_csv(filepath)
df = df.dropna().drop_duplicates().reset_index(drop=True)
# 文字清理:移除空白、保留中文英數與標點
df["message"] = df["message"].astype(str)
df["message"] = df["message"].apply(lambda text: re.sub(r"\s+", "", text))
df["message"] = df["message"].apply(lambda text: re.sub(r"[^\u4e00-\u9fffA-Za-z0-9。,!?]", "", text))
return df[["message", "label"]] # 保留必要欄位
def encode(self, messages):
#使用 HuggingFace BERT Tokenizer 將訊息編碼成模型輸入格式。
return self.tokenizer(
list(messages),
return_tensors="pt",
truncation=True,
padding="max_length",
max_length=self.max_len
)
#自動做資料前處理
def build_bert_inputs(normal_files, scam_files):
#將正常與詐騙資料分別指定 label,統一清理、編碼,回傳模型可用的 input tensors 與 labels。
processor = BertPreprocessor()
dfs = []
# 合併正常 + 詐騙檔案清單
all_files = normal_files + scam_files
for filepath in all_files:
df = processor.load_and_clean(filepath)
dfs.append(df)
# 合併所有資料。在資料清理過程中dropna():刪除有空值的列,drop_duplicates():刪除重複列,filter()或df[...]做條件過濾,concat():將多個 DataFrame合併
# 這些操作不會自動重排索引,造成索引亂掉。
# 合併後統一編號(常見於多筆資料合併)all_df = pd.concat(dfs, 關鍵-->ignore_index=True)
all_df = pd.concat(dfs, ignore_index=True)
#製作 train/val 資料集
train_texts, val_texts, train_labels, val_labels = train_test_split(
all_df["message"], all_df["label"],
stratify=all_df["label"],
test_size=0.2,
random_state=25,
shuffle=True
)
# 進行 BERT tokenizer 編碼
train_inputs = processor.encode(train_texts)
val_inputs = processor.encode(val_texts)
return train_inputs, train_labels, val_inputs, val_labels, processor
#AUTO YA~以for迴圈自動新增個別變數內,build_bert_inputs能自動擷取新增資料
normal_files_labels = [normal for normal in normal_files]
scam_files_labels = [scam for scam in scam_files]
#print(bert_inputs.keys())
#定義 PyTorch Dataset 類別
class ScamDataset(Dataset):
def __init__(self, inputs, labels):
self.input_ids = inputs["input_ids"] # input_ids:句子的 token ID; attention_mask:注意力遮罩(0 = padding)
self.attention_mask = inputs["attention_mask"] # token_type_ids:句子的 segment 區分
self.token_type_ids = inputs["token_type_ids"] # torch.tensor(x, dtype=...)將資料(x)轉為Tensor的標準做法。
self.labels = torch.tensor(labels.values, dtype=torch.float32) # x可以是 list、NumPy array、pandas series...
# dtypefloat32:浮點數(常用於 回歸 或 BCELoss 二分類);long:整數(常用於 多分類 搭配 CrossEntropyLoss)。labels.values → 轉為 NumPy array
def __len__(self): # 告訴 PyTorch 這個 Dataset 有幾筆資料
return len(self.labels) # 給 len(dataset) 或 for i in range(len(dataset)) 用的
def __getitem__(self, idx): #回傳第 idx 筆資料(會自動在訓練中一筆筆抓)
return { #DataLoader 每次會呼叫這個方法多次來抓一個 batch 的資料
"input_ids":self.input_ids[idx],
"attention_mask":self.attention_mask[idx],
"token_type_ids":self.token_type_ids[idx],
"labels":self.labels[idx]
}
# 這樣可以同時處理 scam 和 normal 資料,不用重複寫清理與 token 處理
train_inputs, train_labels, val_inputs, val_labels, processor = build_bert_inputs(normal_files, scam_files)
train_dataset = ScamDataset(train_inputs, train_labels)
val_dataset = ScamDataset(val_inputs, val_labels)
train_loader = DataLoader(train_dataset, batch_size=8)
val_loader = DataLoader(val_dataset, batch_size=8)
#模型
class BertLSTM_CNN_Classifier(nn.Module):
def __init__(self, hidden_dim=128, num_layers=1, dropout=0.3):
super(BertLSTM_CNN_Classifier, self).__init__()
self.bert = BertModel.from_pretrained("ckiplab/bert-base-chinese") #載入預訓練 BERT 模型(ckiplab 中文版)
# LSTM 接在 BERT 的 token 輸出後(輸入是768維)
self.LSTM = nn.LSTM(input_size=768, # 把 BERT 的 token 序列再交給雙向 LSTM 做時間序列建模
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
bidirectional=True)
# CNN 模組:接在 LSTM 後的輸出上
self.conv1 = nn.Conv1d(in_channels=hidden_dim*2,
out_channels=128,
kernel_size=3,
padding=1)
self.dropout = nn.Dropout(dropout)
self.global_maxpool = nn.AdaptiveAvgPool1d(1) # 等效於 GlobalMaxPooling1D
self.classifier = nn.Linear(128,1)
def forward(self, input_ids, attention_mask, token_type_ids):
outputs = self.bert(input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids)
hidden_states = outputs.last_hidden_state # [batch, seq_len, 768]
LSTM_out, _ = self.LSTM(hidden_states) # [batch, seq_len, hidden_dim*2]
LSTM_out = LSTM_out.transpose(1, 2) # [batch, hidden_dim*2, seq_len]
x = self.conv1(LSTM_out) # [batch, 128, seq_len]
x = self.dropout(x)
x = self.global_maxpool(x).squeeze(2) # [batch, 128]
logits = self.classifier(x)
return torch.sigmoid(logits).view(-1) # 👈 修正這行
# 設定 GPU 裝置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 設定使用的最大執行緒數(視 CPU 而定)
torch.set_num_threads(8) # 建議設成你系統的實體核心數
# 初始化模型
model = BertLSTM_CNN_Classifier().to(device)
# 定義 optimizer 和損失函數
optimizer = torch.optim.Adam(model.parameters(),lr=2e-5)
criterion = nn.BCELoss()
# 訓練迴圈
if __name__ == "__main__":
if os.path.exists("model.pth"):
print("✅ 已找到 model.pth,載入模型跳過訓練")
model.load_state_dict(torch.load("model.pth", map_location=device))
else:
print("🚀 未找到 model.pth,開始訓練模型...")
num_epochs = 10
for epoch in range(num_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
optimizer.zero_grad()
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
token_type_ids = batch["token_type_ids"].to(device)
labels = batch["labels"].to(device)
outputs = model(input_ids, attention_mask, token_type_ids)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"[Epoch{epoch+1}]Training Loss:{total_loss:.4f}")
torch.save(model.state_dict(), "model.pth")# 儲存模型權重
print("✅ 模型訓練完成並儲存為 model.pth")
|