本次作業(yè)需要學(xué)習(xí)完transformer后完成!
Task
做語者辨識(shí)任務(wù),一共有600個(gè)語者,給了每一個(gè)語者的語音feature進(jìn)行訓(xùn)練,然后通過test_feature進(jìn)行語者辨識(shí)。(本質(zhì)上還是分類任務(wù)Classification)
Simple(0.60824):run sample code and know how to use transformer
Medium(0.70375):know how to adjust parameters of transformer
Strong(0.77750):construct conformer
Boss(0.86500):implement self-attention pooling and additive margin softmax
使用kaggle訓(xùn)練作業(yè)模型
助教樣例code解讀
數(shù)據(jù)集分析
-
mapping.json文件
將speakers的id映射到編號(hào)0~599,因?yàn)橐还灿?00個(gè)不同的speaker需要對(duì)語音進(jìn)行分類 -
metadata.json文件
存放的是training data,本次實(shí)驗(yàn)沒有專門設(shè)置validation data,需要從training data中劃分validation data
n_mels:在對(duì)語音數(shù)據(jù)進(jìn)行處理時(shí),從每一個(gè)時(shí)間維度上選取n_mels個(gè)維度來表示這個(gè)feature
speakers:以key-value形式存放speakers的id和所有feature(每個(gè)speaker都有多個(gè)feature)
feature_path:這個(gè)feature的文件名
mel_len:每一個(gè)feature的長(zhǎng)度(每一個(gè)可能都不一樣,后期需要處理) -
testdata.json文件
與metadata形式類似,需要我們進(jìn)行語者辨識(shí)。utterance:話語; 言論
Dataset
本次實(shí)驗(yàn)的數(shù)據(jù)來源于 Voxceleb2語音數(shù)據(jù)集,是真實(shí)世界中語者的語音,作業(yè)中選取了600個(gè)語者,和他們的語音進(jìn)行訓(xùn)練
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
class myDataset(Dataset):
def __init__(self, data_dir, segment_len=128):
self.data_dir = data_dir
self.segment_len = segment_len
# Load the mapping from speaker neme to their corresponding id.
mapping_path = Path(data_dir) / "mapping.json" #mapping_path: Dataset\mapping.json
mapping = json.load(mapping_path.open())
#mapping: {'speaker2id': {'id00464': 0, 'id00559': 1,
self.speaker2id = mapping["speaker2id"]
#self.speaker2id: {'id00464': 0, 'id00559': 1, 'id00578': 2, 'id00905': 3,...
# Load metadata of training data.
metadata_path = Path(data_dir) / "metadata.json"
metadata = json.load(open(metadata_path))["speakers"] #metadata中存放的key是speaker_id,value是每個(gè)speaker的feature和對(duì)應(yīng)長(zhǎng)度
# Get the total number of speaker.
self.speaker_num = len(metadata.keys())
self.data = []
for speaker in metadata.keys(): #遍歷每一個(gè)spearker_id
for utterances in metadata[speaker]: #通過speaker_id取出speaker的所有feature和len
"""
utterances格式:
{'feature_path': 'uttr-18e375195dc146fd8d14b8a322c29b90.pt', 'mel_len': 435}
{'feature_path': 'uttr-da9917d5853049178487c065c9e8b718.pt', 'mel_len': 490}...
"""
self.data.append([utterances["feature_path"], self.speaker2id[speaker]])
#self.data:[['uttr-18e375195dc146fd8d14b8a322c29b90.pt', 436],
# ['uttr-da9917d5853049178487c065c9e8b718.pt', 436],...
#一共600個(gè)speaker,436表示第436個(gè)speaker
def __len__(self):
return len(self.data)
def __getitem__(self, index):
feat_path, speaker = self.data[index] #feature和speaker編號(hào)[0,599]
# Load preprocessed mel-spectrogram.
mel = torch.load(os.path.join(self.data_dir, feat_path)) #加載feature
#mel.size():torch.Size([490, 40])
# Segmemt mel-spectrogram into "segment_len" frames.
if len(mel) > self.segment_len: #將feature切片成固定長(zhǎng)度
# Randomly get the starting point of the segment.
start = random.randint(0, len(mel) - self.segment_len) #隨機(jī)選取切片起始點(diǎn)
# Get a segment with "segment_len" frames.
mel = torch.FloatTensor(mel[start:start+self.segment_len])#截取長(zhǎng)度為segment_len的片段 mel.size():torch.Size([128, 40])
else:
mel = torch.FloatTensor(mel) #為什么小于segment_len不填充? 填充在dataloader中完成
# Turn the speaker id into long for computing loss later.
speaker = torch.FloatTensor([speaker]).long() #將speaker的編號(hào)轉(zhuǎn)為long類型
return mel, speaker
def get_speaker_number(self):
return self.speaker_num #600
Dataloader
主要任務(wù):1.劃分驗(yàn)證集 2.將長(zhǎng)度小于segment_len的mel進(jìn)行padding 3.生成dataloader
import torch
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
def collate_batch(batch): #用于整理數(shù)據(jù)的函數(shù),參數(shù)為dataloader中的一個(gè)batch
# Process features within a batch.
"""Collate a batch of data."""
mel, speaker = zip(*batch) #zip拆包,將一個(gè)batch中的mel和speaker分開,各自單獨(dú)形成一個(gè)數(shù)組
# Because we train the model batch by batch, we need to pad the features in the same batch to make their lengths the same.
#mel中元素長(zhǎng)度不相同時(shí),將所有的mel元素填充到最長(zhǎng)的元素的長(zhǎng)度,填充的值由padding_value決定
mel = pad_sequence(mel, batch_first=True, padding_value=-20) # pad log 10^(-20) which is very small value.
# mel: (batch size, length, 40)
return mel, torch.FloatTensor(speaker).long()
def get_dataloader(data_dir, batch_size, n_workers):
"""Generate dataloader"""
dataset = myDataset(data_dir)
speaker_num = dataset.get_speaker_number()
# Split dataset into training dataset and validation dataset
trainlen = int(0.9 * len(dataset))
lengths = [trainlen, len(dataset) - trainlen]
trainset, validset = random_split(dataset, lengths) #無覆蓋的隨機(jī)劃分訓(xùn)練集和驗(yàn)證集
train_loader = DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
drop_last=True,
num_workers=n_workers,
pin_memory=True,
collate_fn=collate_batch,
)
valid_loader = DataLoader(
validset,
batch_size=batch_size,
num_workers=n_workers,
drop_last=True,
pin_memory=True,
collate_fn=collate_batch,
)
return train_loader, valid_loader, speaker_num
Model
最關(guān)鍵部分,transformer運(yùn)用
transformer基礎(chǔ)架構(gòu)來自于論文: Attention Is All You Need
論文解讀: 李沐大神的論文帶讀,用了都說好
這里是分類任務(wù),僅需要使用Encoder部分
pytorch官方文檔: torch.nn.TransformerEncoderLayer
import torch
import torch.nn as nn
import torch.nn.functional as F
class Classifier(nn.Module):
def __init__(self, d_model=80, n_spks=600, dropout=0.1):
super().__init__()
# Project the dimension of features from that of input into d_model.
self.prenet = nn.Linear(40, d_model)
# TODO:
# Change Transformer to Conformer.
# https://arxiv.org/abs/2005.08100
#對(duì)于文本分類等下游任務(wù),只需要用到Encoder部分即可
#nhead:multi_head_attention中head個(gè)數(shù)
#d_model:輸入的feature的個(gè)數(shù)
#dim_feedforward:feedforward network的維度
#dropout默認(rèn)0.1
self.encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, dim_feedforward=256, nhead=2
)
# self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
# Project the the dimension of features from d_model into speaker nums.
self.pred_layer = nn.Sequential(
nn.Linear(d_model, d_model),
nn.ReLU(),
nn.Linear(d_model, n_spks),
)
def forward(self, mels):
"""
args:
mels: (batch size, length, 40)
return:
out: (batch size, n_spks)
"""
# out: (batch size, length, d_model) length=segment_len
out = self.prenet(mels)
# out: (length, batch size, d_model)
out = out.permute(1, 0, 2) #交換dim=0和dim=1
# The encoder layer expect features in the shape of (length, batch size, d_model).
out = self.encoder_layer(out)
# out: (batch size, length, d_model)
out = out.transpose(0, 1) #轉(zhuǎn)置dim=0和dim=1
# mean pooling
stats = out.mean(dim=1) #可以理解為求平均并去除維度1 stats.size():(batch_size,d_model)
# out: (batch, n_spks)
out = self.pred_layer(stats)
return out
Learning rate schedule
當(dāng)batch設(shè)置的比較大的時(shí)候通常需要比較大的學(xué)習(xí)率(通常batch_size和學(xué)習(xí)率成正比),但在剛開始訓(xùn)練時(shí),參數(shù)是隨機(jī)初始化的,梯度也比較大,這時(shí)學(xué)習(xí)率也比較大,會(huì)使得訓(xùn)練不穩(wěn)定。
warm up 方法就是在最初幾輪迭代采用比較小的學(xué)習(xí)率,等梯度下降到一定程度再恢復(fù)初始學(xué)習(xí)率
------《神經(jīng)網(wǎng)絡(luò)與深度學(xué)習(xí)》
import math
import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(
optimizer: Optimizer,
num_warmup_steps: int,
num_training_steps: int,
num_cycles: float = 0.5,
last_epoch: int = -1,
):
"""
Create a schedule with a learning rate that decreases following the values of the cosine function between the
initial lr set in the optimizer to 0, after a warmup period during which it increases linearly between 0 and the
initial lr set in the optimizer.
Args:
optimizer (:class:`~torch.optim.Optimizer`):
The optimizer for which to schedule the learning rate.
num_warmup_steps (:obj:`int`):
The number of steps for the warmup phase.
num_training_steps (:obj:`int`):
The total number of training steps.
num_cycles (:obj:`float`, `optional`, defaults to 0.5):
The number of waves in the cosine schedule (the defaults is to just decrease from the max value to 0
following a half-cosine).
last_epoch (:obj:`int`, `optional`, defaults to -1):
The index of the last epoch when resuming training.
Return:
:obj:`torch.optim.lr_scheduler.LambdaLR` with the appropriate schedule.
"""
def lr_lambda(current_step):
# Warmup
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
# decadence
progress = float(current_step - num_warmup_steps) / float(
max(1, num_training_steps - num_warmup_steps)
)
return max(
0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
)
return LambdaLR(optimizer, lr_lambda, last_epoch)
Model Function
調(diào)用自定義model的forward部分,每遍歷一個(gè)batch都要調(diào)用一次model_fn
import torch
def model_fn(batch, model, criterion, device):
"""Forward a batch through the model."""
mels, labels = batch
#print("model_fn_mels.size():",mels.size())
# out:torch.Size([16, 128, 40]) [batch_size,segment_len,40]
mels = mels.to(device)
labels = labels.to(device)
outs = model(mels)
loss = criterion(outs, labels)
# Get the speaker id with highest probability.
preds = outs.argmax(1)
# Compute accuracy.
accuracy = torch.mean((preds == labels).float())
return loss, accuracy
Validate
計(jì)算驗(yàn)證集上的準(zhǔn)確率
from tqdm import tqdm
import torch
def valid(dataloader, model, criterion, device):
"""Validate on validation set."""
model.eval()
running_loss = 0.0
running_accuracy = 0.0
#驗(yàn)證集5667個(gè)
pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr")
for i, batch in enumerate(dataloader):
with torch.no_grad():
loss, accuracy = model_fn(batch, model, criterion, device)
running_loss += loss.item()
running_accuracy += accuracy.item()
pbar.update(dataloader.batch_size)
pbar.set_postfix(
loss=f"{running_loss / (i+1):.2f}",
accuracy=f"{running_accuracy / (i+1):.2f}",
)
pbar.close()
model.train()
return running_accuracy / len(dataloader)
Main function
開始跑模型,這里與之前的作業(yè)有不同的地方。前幾個(gè)作業(yè)是跑完一個(gè)epoch也就是完整訓(xùn)練集,再開始跑驗(yàn)證集。這里是跑valid_steps個(gè)batch,跑一遍驗(yàn)證集。
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split
def parse_args():
"""arguments"""
config = {
"data_dir": "./Dataset",
"save_path": "model.ckpt",
"batch_size": 16,
"n_workers": 0,
"valid_steps": 2000,
"warmup_steps": 1000,
"save_steps": 10000,
"total_steps": 70000,
}
return config
def main(
data_dir,
save_path,
batch_size,
n_workers,
valid_steps,
warmup_steps,
total_steps,
save_steps,
):
"""Main function."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[Info]: Use {device} now!")
train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
train_iterator = iter(train_loader) #iter()生成迭代器,以batch為單位
#print("train_iterator:",train_iterator) #<torch.utils.data.dataloader._SingleProcessDataLoaderIter object at 0x000001FD07C558D0>
print(f"[Info]: Finish loading data!",flush = True)
model = Classifier(n_spks=speaker_num).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=1e-3)
scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps) #上面定義的warm up函數(shù)
print(f"[Info]: Finish creating model!",flush = True)
best_accuracy = -1.0
best_state_dict = None
pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")
#train valid_steps個(gè)batch再跑驗(yàn)證集
for step in range(total_steps): #一共運(yùn)行total_Steps輪,這里沒有epoch的概念
# Get data
try:
batch = next(train_iterator) #next()返回迭代器的下一個(gè)項(xiàng)目,即下一個(gè)batch
#print("batch[0].size():",batch[0].size())
#out:torch.Size([16, 128, 40]) [batch_size,segment_len,40]
except StopIteration: # 不指定 default 且迭代器元素耗盡, 將引發(fā) StopIteration 異常
train_iterator = iter(train_loader)
batch = next(train_iterator)
loss, accuracy = model_fn(batch, model, criterion, device) #計(jì)算當(dāng)前batch的loss和acc
#print("loss:",loss) #tensor(6.3915, device='cuda:0', grad_fn=<NllLossBackward0>)
batch_loss = loss.item() # loss是張量,item()可以取出張量中的值
#print("batch_loss:",batch_loss) #batch_loss: 6.391468048095703
batch_accuracy = accuracy.item()
# Updata model 反向傳播更新參數(shù),每跑一個(gè)batch都會(huì)更新
loss.backward()
optimizer.step()
scheduler.step()
optimizer.zero_grad()
# Log
pbar.update() #打印當(dāng)前l(fā)oss和acc
pbar.set_postfix(
loss=f"{batch_loss:.2f}",
accuracy=f"{batch_accuracy:.2f}",
step=step + 1,
)
# Do validation
if (step + 1) % valid_steps == 0: #經(jīng)過valid_steps開始跑驗(yàn)證集
pbar.close()
valid_accuracy = valid(valid_loader, model, criterion, device) #計(jì)算valid_acc
# keep the best model
if valid_accuracy > best_accuracy:
best_accuracy = valid_accuracy
best_state_dict = model.state_dict() #保存模型參數(shù)
pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")
# Save the best model so far.
if (step + 1) % save_steps == 0 and best_state_dict is not None: #每save_steps輪會(huì)保存一次當(dāng)前最好模型
torch.save(best_state_dict, save_path)
pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")
pbar.close()
if __name__ == "__main__":
main(**parse_args())
Inference
inference:推理,就是跑testing data
類比training即可
Main function of inference
類似Main function
樣例code得分
Medium
調(diào)整參數(shù)過medium
d_model=160
n_head=8
num_layers=2
linear layer:1層
total_steps=100000
這一輪train上準(zhǔn)確率100%,只雖然只進(jìn)行了13步,但從loss上可以看出是有過擬合的
Strong
Transformer->Conformer
先上結(jié)果,未過strong
嚴(yán)重過擬合,在訓(xùn)練集和驗(yàn)證集上均有過擬合現(xiàn)象,驗(yàn)證集上的準(zhǔn)確率遠(yuǎn)高于測(cè)試集上結(jié)果
論文地址: Conformer
conformer的思路很簡(jiǎn)單,就是將Transformer和CNN進(jìn)行結(jié)合。原因:
1.Transformer中由于attention機(jī)制,擁有很好的全局性。
2.CNN擁有較好的局部性,可以對(duì)細(xì)粒度的信息進(jìn)行提取。
兩者結(jié)合在語音上有較好的效果。論文中闡述了具體的model架構(gòu)。
- 首先 pip conformer包
!pip install conformer
- 導(dǎo)入conformer包
from conformer import ConformerBlock
- 修改module
import torch
import torch.nn as nn
import torch.nn.functional as F
class Classifier(nn.Module):
def __init__(self, d_model=512, n_spks=600, dropout=0.1):
super().__init__()
# Project the dimension of features from that of input into d_model.
self.prenet = nn.Linear(40, d_model)
# TODO:
# Change Transformer to Conformer.
# https://arxiv.org/abs/2005.08100
#對(duì)于文本分類等下游任務(wù),只需要用到Encoder部分即可
#nhead:multi_head_attention中head個(gè)數(shù)
#d_model:輸入的feature的個(gè)數(shù)
#dim_feedforward:feedforward network的維度
#dropout默認(rèn)0.1
#self.encoder_layer = nn.TransformerEncoderLayer(
#d_model=d_model, dim_feedforward=256, nhead=8
#)
#self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
self.conformer_block=ConformerBlock(
dim=d_model,
dim_head=64,
heads=8,
ff_mult=4,
conv_expansion_factor=2,
conv_kernel_size=31,
attn_dropout=dropout,
ff_dropout=dropout,
conv_dropout=dropout
)
# Project the the dimension of features from d_model into speaker nums.
self.pred_layer = nn.Sequential(
#nn.Linear(d_model, d_model),
#nn.ReLU(),
nn.Linear(d_model, n_spks),
)
def forward(self, mels):
"""
args:
mels: (batch size, length, 40)
return:
out: (batch size, n_spks)
"""
# out: (batch size, length, d_model) length=segment_len
out = self.prenet(mels)
# out: (length, batch size, d_model)
out = out.permute(1, 0, 2) #交換dim=0和dim=1
# The encoder layer expect features in the shape of (length, batch size, d_model).
out = self.conformer_block(out)
# out: (batch size, length, d_model)
out = out.transpose(0, 1) #轉(zhuǎn)置dim=0和dim=1
# mean pooling
stats = out.mean(dim=1) #可以理解為求平均并去除維度1 stats.size():(batch_size,d_model)
# out: (batch, n_spks)
out = self.pred_layer(stats)
return out
Self-attention pooling
self attention pooling論文
主要看論文中的self-attention pooling架構(gòu),和mean pooling相比之下,self-attention pooling是通過可學(xué)習(xí)參數(shù)來進(jìn)行pooling,相比mean pooling可以提取到一些信息。
參考大佬視頻講解
代碼:
#self attention pooling類實(shí)現(xiàn)
import torch.nn.functional as F
import torch.nn as nn
class Self_Attentive_Pooling(nn.Module):
def __init__(self,dim):
super(Self_Attentive_Pooling,self).__init__()
self.sap_linear=nn.Linear(dim,dim)
self.attention=nn.Parameter(torch.FloatTensor(dim,1))
def forward(self,x):
x=x.permute(0,2,1)
h=torch.tanh(self.sap_linear(x))
w=torch.matmul(h,self.attention).squeeze(dim=2)
w=F.softmax(w,dim=1).view(x.size(0),x.size(1),1)
x=torch.sum(x*w,dim=1)
return x
修改model:文章來源:http://www.zghlxwxcb.cn/news/detail-427294.html
import torch
import torch.nn as nn
import torch.nn.functional as F
class Classifier(nn.Module):
def __init__(self, d_model=512, n_spks=600, dropout=0.1):
super().__init__()
# Project the dimension of features from that of input into d_model.
self.prenet = nn.Linear(40, d_model)
# TODO:
# Change Transformer to Conformer.
# https://arxiv.org/abs/2005.08100
#對(duì)于文本分類等下游任務(wù),只需要用到Encoder部分即可
#nhead:multi_head_attention中head個(gè)數(shù)
#d_model:輸入的feature的個(gè)數(shù)
#dim_feedforward:feedforward network的維度
#dropout默認(rèn)0.1
#self.encoder_layer = nn.TransformerEncoderLayer(
#d_model=d_model, dim_feedforward=256, nhead=8
#)
#self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
self.conformer_block=ConformerBlock(
dim=d_model,
dim_head=64,
heads=8,
ff_mult=4,
conv_expansion_factor=2,
conv_kernel_size=31,
attn_dropout=dropout,
ff_dropout=dropout,
conv_dropout=dropout
)
# Project the the dimension of features from d_model into speaker nums.
self.pooling=Self_Attentive_Pooling(d_model)
self.pred_layer = nn.Sequential(
#nn.Linear(d_model, d_model),
#nn.ReLU(),
nn.Linear(d_model, n_spks),
)
def forward(self, mels):
"""
args:
mels: (batch size, length, 40)
return:
out: (batch size, n_spks)
"""
# out: (batch size, length, d_model) length=segment_len
out = self.prenet(mels)
# out: (length, batch size, d_model)
out = out.permute(1, 0, 2) #交換dim=0和dim=1
# The encoder layer expect features in the shape of (length, batch size, d_model).
out = self.conformer_block(out)
# out: (batch size, length, d_model)
#out = out.transpose(0, 1) #轉(zhuǎn)置dim=0和dim=1
# mean pooling
#stats = out.mean(dim=1) #可以理解為求平均并去除維度1 stats.size():(batch_size,d_model)
out=out.permute(1,2,0)
stats=self.pooling(out)
# out: (batch, n_spks)
out = self.pred_layer(stats)
return out
total_steps=70000
total_steps=100000文章來源地址http://www.zghlxwxcb.cn/news/detail-427294.html
到了這里,關(guān)于李宏毅_機(jī)器學(xué)習(xí)_作業(yè)4(詳解)_HW4 Classify the speakers的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!