外观
BERT 模型微调做投满分
数据集介绍
./data 文件夹下有五十个 csv 文件。
$ ls
data_part_1.csv
data_part_10.csv
...每个 csv 文件由两列组成,分别是 title 和 label。
title,label
NYMEX原油期貨7日收盤小幅走高,0
网游迎来新盟友 儿童娱乐社区日渐受重 ◎#※,8
女排對日本兩年不敗終結 30年5個0比3這次最尷尬,7
通州K2清水灣板樓40平起1-3居在售均價25000元,1这里面包括繁体字、特殊符号、HTML标签、空格、重复文字、噪声等杂乱数据,所以我们要先清洗数据。
清洗数据
import re
import os
from opencc import OpenCC
from config import DATA_DIR
import pandas as pd
datas = pd.DataFrame()
# 样本太多了,少读一些
read_time = 5
i = 0
for file in os.listdir(DATA_DIR):
if file.endswith(".csv"):
df = pd.read_csv(os.path.join(DATA_DIR, file))
datas = pd.concat([datas, df], axis=0)
if i >= read_time:
break
i += 1
print(datas.shape) # (244000, 2)
print(f"{datas.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB") # 34.58 MB
# 删除重复样本
datas: pd.DataFrame = datas.drop_duplicates().reset_index(drop=True)
datas = datas.copy()
# 清除平台前缀
datas["title"] = datas["title"].str.replace(r"^(新浪新闻|网易资讯|知乎热榜|B站热评|抖音头条)", "", regex=True)
# 移除繁体字
cc = OpenCC("t2s")
datas["title"] = datas["title"].astype(str).apply(cc.convert)
print(datas[:3]) # 已经变成简体
# 去除HTML标签
regex = re.compile(r"<.*?>")
datas["title"] = datas["title"].str.replace(regex, "", regex=True)
# 删除无效字符和特殊符号
regex = re.compile(r"[^\w\s]")
datas["title"] = datas["title"].str.replace(regex, "", regex=True)
regex = re.compile(r"[★※◆→◎#¥% ]")
datas["title"] = datas["title"].str.replace(regex, "", regex=True)
regex = re.compile(r"(噪声数据1)")
datas["title"] = datas["title"].str.replace(regex, "", regex=True)
# 删除重复半段
def remove_duplicate_half(text: str) -> str:
length = len(text)
if length % 2 == 0:
half = length // 2
if text[:half] == text[half:]:
return text[:half]
return text
datas["title"] = datas["title"].astype(str).apply(remove_duplicate_half)
# 删除空值
datas = datas.dropna(subset=["title"])
datas = datas[datas["title"].str.lower() != "nan"]
datas.to_csv("washed_data.csv", index=False, encoding="utf-8")
print(f"{datas.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB") # 26.38 MB
print(datas.shape) # (219510, 2)清洗数据后,得到干净的 csv 文件,将它放在 washed_data.csv 文件中。
数据集划分
import pandas as pd
from config import test_size, val_size
from sklearn.model_selection import train_test_split
datas = pd.read_csv("washed_data.csv")
train_df, temp_df = train_test_split(datas, test_size=test_size + val_size, random_state=0, stratify=datas["label"])
test_df, val_df = train_test_split(temp_df, test_size=val_size / (test_size + val_size), random_state=0, stratify=temp_df["label"])
print(train_df.shape) # (175608, 2)
print(test_df.shape) # (21951, 2)
print(val_df.shape) # (21951, 2)
# 确认是否均衡分布
print(train_df["label"].value_counts())
print(test_df["label"].value_counts())
print(val_df["label"].value_counts())
train_df.to_csv("train.csv", index=False)
test_df.to_csv("test.csv", index=False)
val_df.to_csv("val.csv", index=False)训练模型
import os
import pandas as pd
import torch
from sklearn.metrics import f1_score
from torch import nn
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from transformers.models.bert import BertTokenizer, BertModel
from transformers.models.bert.tokenization_bert import BertTokenizer
from config import pretrained_model_path, labels_count, epoch, max_len
model_path = os.path.abspath(pretrained_model_path)
bert_tokenizer: BertTokenizer = BertTokenizer.from_pretrained(model_path, local_files_only=True)
bert_model = BertModel.from_pretrained(model_path, local_files_only=True)
# print(type(bert_tokenizer)) # <class 'transformers.models.bert.tokenization_bert.BertTokenizer'>
datas = pd.read_csv('washed_data.csv')
# labels_count = len(datas['label'].value_counts())
print(f'数据的类别个数:{labels_count}')
# 定义模型
class Model(nn.Module):
def __init__(self):
super().__init__()
self.bert = bert_model
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(bert_model.config.hidden_size, labels_count)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
# BERT的输出是(batch_size, seq_len, hidden_size),BERT的输出会把seq_len中的每一个token映射为hidden_size维的向量
# seq_len的首位是[CLS],取它作为分类器的输入向量
cls_output = outputs.last_hidden_state[:, 0, :] # [CLS]
cls_output = self.dropout(cls_output)
logits = self.classifier(cls_output)
return logits
# 定义数据集对象
class MyDataSet(Dataset):
def __init__(self, datas, max_len):
self.data = datas.reset_index(drop=True)
self.max_len = max_len
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
text = str(self.data.loc[idx, 'title'])
label = int(self.data.loc[idx, 'label'])
encoding = bert_tokenizer(text, truncation=True, padding='max_length', max_length=self.max_len, return_tensors=None)
return {'input_ids': encoding['input_ids'], 'attention_mask': encoding['attention_mask'], 'label': label}
def collate_fn(batch):
input_ids = [item['input_ids'] for item in batch]
attention_mask = [item['attention_mask'] for item in batch]
labels = [item['label'] for item in batch]
return {
'input_ids': torch.tensor(input_ids, dtype=torch.long),
'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
'labels': torch.tensor(labels, dtype=torch.long),
}
train_df = pd.read_csv('train.csv')
val_df = pd.read_csv('val.csv')
train_dataset = MyDataSet(train_df, max_len)
val_dataset = MyDataSet(val_df, max_len)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
# 训练
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Model().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=2e-5)
print(device)
def train_epoch(model, loader, optimizer, criterion):
model.train()
total_loss = 0
for i, batch in enumerate(tqdm(loader, desc='Training')):
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
optimizer.zero_grad()
logits = model(input_ids, attention_mask)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (i + 1) % 100 == 0:
print(f' [batch {i + 1}/{len(loader)}] loss: {total_loss / (i + 1):.4f}')
return total_loss / len(loader)
def eval_epoch(model, loader, criterion):
model.eval()
total_loss = 0
all_preds = []
all_labels = []
with torch.no_grad():
for i, batch in enumerate(tqdm(loader, desc='Evaluating')):
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
logits = model(input_ids, attention_mask)
loss = criterion(logits, labels)
total_loss += loss.item()
preds = torch.argmax(logits, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
if (i + 1) % 100 == 0:
print(f' [batch {i + 1}/{len(loader)}] loss: {total_loss / (i + 1):.4f}')
f1 = f1_score(all_labels, all_preds, average='macro')
return total_loss / len(loader), f1
best_f1 = 0
for epoch in range(epoch):
train_loss = train_epoch(model, train_loader, optimizer, criterion)
val_loss, val_f1 = eval_epoch(model, val_loader, criterion)
print(f'Epoch {epoch + 1}')
print(f'train_loss: {train_loss:.4f}')
print(f'val_loss: {val_loss:.4f}')
print(f'val_f1: {val_f1:.4f}')
# 保存最优模型
if val_f1 > best_f1:
best_f1 = val_f1
torch.save(model.state_dict(), 'best_model.pt')
print('模型已保存')测试模型
import os
import pandas as pd
import torch
from sklearn.metrics import accuracy_score, f1_score, classification_report
from torch import nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from transformers.models.bert import BertTokenizer, BertModel
from config import pretrained_model_path, labels_count, max_len
model_path = os.path.abspath(pretrained_model_path)
bert_tokenizer: BertTokenizer = BertTokenizer.from_pretrained(model_path, local_files_only=True)
bert_model = BertModel.from_pretrained(model_path, local_files_only=True)
class Model(nn.Module):
def __init__(self):
super().__init__()
self.bert = bert_model
self.dropout = nn.Dropout(0.1)
self.classifier = nn.Linear(bert_model.config.hidden_size, labels_count)
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
cls_output = outputs.last_hidden_state[:, 0, :]
cls_output = self.dropout(cls_output)
logits = self.classifier(cls_output)
return logits
class MyDataSet(Dataset):
def __init__(self, datas, max_len):
self.data = datas.reset_index(drop=True)
self.max_len = max_len
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
text = str(self.data.loc[idx, 'title'])
label = int(self.data.loc[idx, 'label'])
encoding = bert_tokenizer(text, truncation=True, padding='max_length', max_length=self.max_len, return_tensors=None)
return {'input_ids': encoding['input_ids'], 'attention_mask': encoding['attention_mask'], 'label': label}
def collate_fn(batch):
input_ids = [item['input_ids'] for item in batch]
attention_mask = [item['attention_mask'] for item in batch]
labels = [item['label'] for item in batch]
return {
'input_ids': torch.tensor(input_ids, dtype=torch.long),
'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
'labels': torch.tensor(labels, dtype=torch.long),
}
test_df = pd.read_csv('test.csv')
test_dataset = MyDataSet(test_df, max_len)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Model().to(device)
model.load_state_dict(torch.load('best_model.pt', map_location=device))
model.eval()
print(device)
all_preds = []
all_labels = []
with torch.no_grad():
for i, batch in enumerate(tqdm(test_loader, desc='Testing')):
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
logits = model(input_ids, attention_mask)
preds = torch.argmax(logits, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
if (i + 1) % 100 == 0:
print(f' [batch {i + 1}/{len(test_loader)}]')
acc = accuracy_score(all_labels, all_preds)
f1_macro = f1_score(all_labels, all_preds, average='macro')
f1_micro = f1_score(all_labels, all_preds, average='micro')
print(f'test accuracy : {acc:.4f}')
print(f'test f1 macro : {f1_macro:.4f}')
print(f'test f1 micro : {f1_micro:.4f}')
print('分类报告:')
print(classification_report(all_labels, all_preds))