scam-detector / AI_Model_architecture.py
jerrynnms's picture
Update AI_Model_architecture.py
f3c720e verified
"""流程圖
讀取資料 → 分割資料 → 編碼 → 建立 Dataset / DataLoader
建立模型(BERT+LSTM+CNN)
BERT 輸出 [batch, seq_len, 768]
BiLSTM [batch, seq_len, hidden_dim*2]
CNN 模組 (Conv1D + Dropout + GlobalMaxPooling1D)
Linear 分類器(輸出詐騙機率)
訓練模型(Epochs)
評估模型(Accuracy / F1 / Precision / Recall)
儲存模型(.pth)
"""#引入重要套件Import Library
import torch # PyTorch 主模組
import torch.nn as nn # 神經網路相關的層(例如 LSTM、Linear)
import torch.nn.functional as F # 提供純函式版的操作方法,像是 F.relu()、F.cross_entropy(),通常不帶參數、不自動建立權重
import numpy as np
import pandas as pd
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:16"#讓 CUDA 使用「更小記憶體分配塊」的方法,能有效減少 OOM 錯誤。
import re
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset # 提供 Dataset、DataLoader 類別
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
from transformers import BertModel
#BertTokenizer 把文字句子轉換成 BERT 格式的 token ID,例如 [CLS] 今天 天氣 不錯 [SEP] → [101, 1234, 5678, ...]
##BertForSequenceClassification 是 Hugging Face 提供的一個完整 BERT 模型,接了分類用的 Linear 層,讓你直接拿來做分類任務(例如詐騙 vs 正常)
data_file = "NorANDScamInfo_data1.csv"
#正常訊息資料集在這新增
normal_files = [data_file]
#詐騙訊息資料集在這新增
scam_files = [data_file]
#資料前處理
class BertPreprocessor:
def __init__(self, tokenizer_name="ckiplab/bert-base-chinese", max_len=128):
self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name)
self.max_len = max_len
def load_and_clean(self, filepath):
#載入 CSV 並清理 message 欄位。
df = pd.read_csv(filepath)
df = df.dropna().drop_duplicates().reset_index(drop=True)
# 文字清理:移除空白、保留中文英數與標點
df["message"] = df["message"].astype(str)
df["message"] = df["message"].apply(lambda text: re.sub(r"\s+", "", text))
df["message"] = df["message"].apply(lambda text: re.sub(r"[^\u4e00-\u9fffA-Za-z0-9。,!?]", "", text))
return df[["message", "label"]] # 保留必要欄位
def encode(self, messages):
#使用 HuggingFace BERT Tokenizer 將訊息編碼成模型輸入格式。
return self.tokenizer(
list(messages),
return_tensors="pt",
truncation=True,
padding="max_length",
max_length=self.max_len
)
#自動做資料前處理
def build_bert_inputs(normal_files, scam_files):
#將正常與詐騙資料分別指定 label,統一清理、編碼,回傳模型可用的 input tensors 與 labels。
processor = BertPreprocessor()
dfs = []
# 合併正常 + 詐騙檔案清單
all_files = normal_files + scam_files
for filepath in all_files:
df = processor.load_and_clean(filepath)
dfs.append(df)
# 合併所有資料。在資料清理過程中dropna():刪除有空值的列,drop_duplicates():刪除重複列,filter()或df[...]做條件過濾,concat():將多個 DataFrame合併
# 這些操作不會自動重排索引,造成索引亂掉。
# 合併後統一編號(常見於多筆資料合併)all_df = pd.concat(dfs, 關鍵-->ignore_index=True)
all_df = pd.concat(dfs, ignore_index=True)
#製作 train/val 資料集
train_texts, val_texts, train_labels, val_labels = train_test_split(
all_df["message"], all_df["label"],
stratify=all_df["label"],
test_size=0.2,
random_state=25,
shuffle=True
)
# 進行 BERT tokenizer 編碼
train_inputs = processor.encode(train_texts)
val_inputs = processor.encode(val_texts)
return train_inputs, train_labels, val_inputs, val_labels, processor
#AUTO YA~以for迴圈自動新增個別變數內,build_bert_inputs能自動擷取新增資料
normal_files_labels = [normal for normal in normal_files]
scam_files_labels = [scam for scam in scam_files]
#print(bert_inputs.keys())
#定義 PyTorch Dataset 類別
class ScamDataset(Dataset):
def __init__(self, inputs, labels):
self.input_ids = inputs["input_ids"] # input_ids:句子的 token ID; attention_mask:注意力遮罩(0 = padding)
self.attention_mask = inputs["attention_mask"] # token_type_ids:句子的 segment 區分
self.token_type_ids = inputs["token_type_ids"] # torch.tensor(x, dtype=...)將資料(x)轉為Tensor的標準做法。
self.labels = torch.tensor(labels.values, dtype=torch.float32) # x可以是 list、NumPy array、pandas series...
# dtypefloat32:浮點數(常用於 回歸 或 BCELoss 二分類);long:整數(常用於 多分類 搭配 CrossEntropyLoss)。labels.values → 轉為 NumPy array
def __len__(self): # 告訴 PyTorch 這個 Dataset 有幾筆資料
return len(self.labels) # 給 len(dataset) 或 for i in range(len(dataset)) 用的
def __getitem__(self, idx): #回傳第 idx 筆資料(會自動在訓練中一筆筆抓)
return { #DataLoader 每次會呼叫這個方法多次來抓一個 batch 的資料
"input_ids":self.input_ids[idx],
"attention_mask":self.attention_mask[idx],
"token_type_ids":self.token_type_ids[idx],
"labels":self.labels[idx]
}
# 這樣可以同時處理 scam 和 normal 資料,不用重複寫清理與 token 處理
train_inputs, train_labels, val_inputs, val_labels, processor = build_bert_inputs(normal_files, scam_files)
train_dataset = ScamDataset(train_inputs, train_labels)
val_dataset = ScamDataset(val_inputs, val_labels)
train_loader = DataLoader(train_dataset, batch_size=8)
val_loader = DataLoader(val_dataset, batch_size=8)
#模型
class BertLSTM_CNN_Classifier(nn.Module):
def __init__(self, hidden_dim=128, num_layers=1, dropout=0.3):
super(BertLSTM_CNN_Classifier, self).__init__()
self.bert = BertModel.from_pretrained("ckiplab/bert-base-chinese") #載入預訓練 BERT 模型(ckiplab 中文版)
# LSTM 接在 BERT 的 token 輸出後(輸入是768維)
self.LSTM = nn.LSTM(input_size=768, # 把 BERT 的 token 序列再交給雙向 LSTM 做時間序列建模
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
bidirectional=True)
# CNN 模組:接在 LSTM 後的輸出上
self.conv1 = nn.Conv1d(in_channels=hidden_dim*2,
out_channels=128,
kernel_size=3,
padding=1)
self.dropout = nn.Dropout(dropout)
self.global_maxpool = nn.AdaptiveAvgPool1d(1) # 等效於 GlobalMaxPooling1D
self.classifier = nn.Linear(128,1)
def forward(self, input_ids, attention_mask, token_type_ids):
outputs = self.bert(input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids)
hidden_states = outputs.last_hidden_state # [batch, seq_len, 768]
LSTM_out, _ = self.LSTM(hidden_states) # [batch, seq_len, hidden_dim*2]
LSTM_out = LSTM_out.transpose(1, 2) # [batch, hidden_dim*2, seq_len]
x = self.conv1(LSTM_out) # [batch, 128, seq_len]
x = self.dropout(x)
x = self.global_maxpool(x).squeeze(2) # [batch, 128]
logits = self.classifier(x)
return torch.sigmoid(logits).view(-1) # 👈 修正這行
# 設定 GPU 裝置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 設定使用的最大執行緒數(視 CPU 而定)
torch.set_num_threads(8) # 建議設成你系統的實體核心數
# 初始化模型
model = BertLSTM_CNN_Classifier().to(device)
# 定義 optimizer 和損失函數
optimizer = torch.optim.Adam(model.parameters(),lr=2e-5)
criterion = nn.BCELoss()
# 訓練迴圈
if __name__ == "__main__":
if os.path.exists("model.pth"):
print("✅ 已找到 model.pth,載入模型跳過訓練")
model.load_state_dict(torch.load("model.pth", map_location=device))
else:
print("🚀 未找到 model.pth,開始訓練模型...")
num_epochs = 10
for epoch in range(num_epochs):
model.train()
total_loss = 0.0
for batch in train_loader:
optimizer.zero_grad()
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
token_type_ids = batch["token_type_ids"].to(device)
labels = batch["labels"].to(device)
outputs = model(input_ids, attention_mask, token_type_ids)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"[Epoch{epoch+1}]Training Loss:{total_loss:.4f}")
torch.save(model.state_dict(), "model.pth")# 儲存模型權重
print("✅ 模型訓練完成並儲存為 model.pth")