블로그를 티스토리에서 깃허브 블로그로 옮깁니다ㅎㅎ
티스토리가 편하긴 하지만 깃허브 블로그의 매력을 무시못하겠더라구요. 여기 업로드 했던 글은 천천히 옮기도록 하겠습니다.
아래 블로그 많이 와주세요~
감사합니다!
Dongdong’s Devlog
Dongdong’s Blog
youngdong2.github.io
블로그를 티스토리에서 깃허브 블로그로 옮깁니다ㅎㅎ
티스토리가 편하긴 하지만 깃허브 블로그의 매력을 무시못하겠더라구요. 여기 업로드 했던 글은 천천히 옮기도록 하겠습니다.
아래 블로그 많이 와주세요~
감사합니다!
Dongdong’s Devlog
Dongdong’s Blog
youngdong2.github.io
이번 포스팅에서는 pytorch의 기본 흐름에 대해 적어보겠다. dataset은 car_evaluation data를 사용한다.
car_evaluation
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
dataset = pd.read_csv('./car_evaluation.csv')
dataset.head()
fig_size = plt.rcParams['figure.figsize']
fig_size[0] = 8
fig_size[1] = 6
plt.rcParams['figure.figsize'] = fig_size
dataset.output.value_counts().plot(kind='pie', autopct='%0.05f%%',
colors=['lightblue', 'lightgreen', 'orange', 'pink'],
explode=(0.05, 0.05, 0.05, 0.05))
먼저 astype() 메서드를 이용하여 범주 특성을 갖는 데이터를 범주형 타입으로 변환한다. 또한, 파이토치를 이용한 모델 학습을 해야 하므로 범주형 타입을 텐서로 변환해야 한다.
categorical_col = ['price', 'maint', 'doors', 'persons', 'lug_capacity', 'safety']
for category in categorical_col:
dataset[category] = dataset[category].astype('category')
price = dataset['price'].cat.codes.values # 범주형 데이터를 텐서로 변환하기 위해 다음과 같은 절차가 필요하다.
maint = dataset['maint'].cat.codes.values
doors = dataset['doors'].cat.codes.values
persons = dataset['persons'].cat.codes.values
lug_capacity = dataset['lug_capacity'].cat.codes.values
safety = dataset['safety'].cat.codes.values
categorical_data = np.stack([price, maint, doors, persons, lug_capacity, safety], 1)
범주형 데이터를 숫자로 변환하기 위해 cat.codes를 사용한다. cat.codes는 어떤 클래스가 어떤 숫자로 매핑되어 있는지 확인이 어려운 단점이 있으므로 주의해야 한다.
이제 torch 모듈을 이용하여 배열을 텐서로 변환한다.
categorical_data = torch.tensor(categorical_data, dtype=torch.int64)
categorical_data
마지막으로 레이블로 사용할 컬럼에 대해서도 텐서로 변환해준다. 이번에는 get_dummies를 이용하여 넘파이 배열로 변환한다.
outputs = pd.get_dummies(dataset.output)
outputs = outputs.values
outputs = torch.tensor(outputs).flatten() # 1차원 텐서로 변환
print(categorical_data.shape)
print(outputs.shape)
워드 임베딩은 유사한 단어끼리 유사하게 인코딩되도록 표현하는 방법이다. 또한, 높은 차원의 임베딩일수록 단어 간의 세부적인 관계를 잘 파악할 수 있다. 따라서 단일 숫자로 변환된 넘파이 배열을 N차원으로 변경하여 사용한다.
배열은 N차원으로 변환하기 위해 먼저 모든 범주형 칼럼에 대한 임베딩 크기를 정의한다. 보통 컬럼의 고유 값 수를 2로 나누는 것을 많이 사용한다.
다음은 (모든 범주형 칼럼의 고유값 수, 차원의 크기) 형태의 배열을 출력한 결과이다.
categorical_column_sizes = [len(dataset[column].cat.categories) for column in categorical_col]
categorical_embedding_sizes = [(col_size, min(50, (col_size+1)//2)) for col_size in categorical_column_sizes]
print(categorical_embedding_sizes)
데이터셋을 훈련과 테스트 용도로 분리한다.
total_records = 1728
test_records = int(total_records * .2) # 전체 데이터 중 20%를 테스트로 사용
categorical_train = categorical_data[:total_records - test_records]
categorical_test = categorical_data[total_records - test_records:total_records]
train_outputs = outputs[:total_records - test_records]
test_outputs = outputs[total_records - test_records:total_records]
class Model(nn.Module):
def __init__(self, embedding_size, output_size, layers, p=0.4):
super().__init__()
self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])
self.embedding_dropout = nn.Dropout(p)
all_layers = []
num_categorical_cols = sum((nf for ni, nf in embedding_size))
input_size = num_categorical_cols # 입력층의 크기를 찾기 위해 범주형 칼럼 개수를 input_size에 저장
for i in layers:
all_layers.append(nn.Linear(input_size, i))
all_layers.append(nn.ReLU(inplace=True))
all_layers.append(nn.BatchNorm1d(i))
all_layers.append(nn.Dropout(p))
input_size = i
all_layers.append(nn.Linear(layers[-1], output_size))
self.layers = nn.Sequential(*all_layers) # 신경망의 모든 계층이 순차적으로 실행되도록 모든 계층에 대한 목록을 nn.Sequential 클래스로 전달
def forward(self, x_categorical):
embeddings = []
for i, e in enumerate(self.all_embeddings):
embeddings.append(e(x_categorical[:, i]))
x = torch.cat(embeddings, 1)
x = self.embedding_dropout(x)
x = self.layers(x)
return x
model = Model(categorical_embedding_sizes, 4, [200, 100, 50], p=0.4)
print(model)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cpu')
epochs = 500
aggregated_losses = []
train_outputs = train_outputs.to(device=device, dtype=torch.int64)
for i in range(epochs):
i += 1
y_pred = model(categorical_train)
single_loss = loss_function(y_pred, train_outputs)
aggregated_losses.append(single_loss)
if i%25 == 1:
print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')
optimizer.zero_grad()
single_loss.backward() # 가중치를 업데이트
optimizer.step() # 기울기 업데이트
print(f'epoch: {i:3} loss: {single_loss.item():10.10f}')
테스트 데이터셋으로 예측을 해보자.
test_outputs = test_outputs.to(device=device, dtype=torch.int64)
with torch.no_grad():
y_val = model(categorical_test)
loss = loss_function(y_val, test_outputs)
print(f'Loss: {loss:.8f}')
tensor의 복사방법 (0) | 2022.08.06 |
---|---|
[Pytorch] register_buffer (0) | 2022.08.01 |
pytorch - nn.function과 nn의 차이점 (0) | 2022.06.19 |
torch - GPU 사용하기 (0) | 2022.06.13 |
Pytorch nn.ModuleList (0) | 2022.05.30 |
Attention is all you need논문의 pytorch Implementation 코드를 리뷰하는 포스팅이다.
https://github.com/jadore801120/attention-is-all-you-need-pytorch
GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer model in "Attention is All
A PyTorch implementation of the Transformer model in "Attention is All You Need". - GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer mo...
github.com
Transformer 코드 리뷰는 세 개의 포스팅으로 나누어 진행을 할 것이고 본 포스팅은 세 번째로 모델 구조 부분이다.
다만 자세한 모델 설명은 이전의 포스팅에서 설명했으므로 생략하겠다.
지난 포스팅에서 Transformer 모델에 대한 클래스를 정의했다. 이제 이를 호출해오자. 각각의 변수들은 주석을 통해 확인할 수 있다.
transformer = Transformer(
opt.src_vocab_size, # src_vocab_size, vocab 내 token의 갯수
opt.trg_vocab_size, # trg_vocab_size, vocab내 token의 갯수
src_pad_idx=opt.src_pad_idx, # src vocab의 padding token의 index
trg_pad_idx=opt.trg_pad_idx, # trg vocab의 padding token의 index
trg_emb_prj_weight_sharing=opt.proj_share_weight,
emb_src_trg_weight_sharing=opt.embs_share_weight,
d_k=opt.d_k, # multi head attention에서 사용할 key 차원 / (d_mode / n_head를 따름)
d_v=opt.d_v, # multi head attention에서 사용할 value 차원
d_model=opt.d_model, # encoder decoder에서의 정해진 입력과 출력의 크기, embedding vector의 차원과 동일
d_word_vec=opt.d_word_vec, #word ebedding 차원
d_inner=opt.d_inner_hid, # position wise layer 은닉층 크기
n_layers=opt.n_layers, # encoder,decoder stack 층 갯수
n_head=opt.n_head, # multi head 값
dropout=opt.dropout, # drop out 값
scale_emb_or_prj=opt.scale_emb_or_prj).to(device)
Attention is All you Need 논문에서는 lr scheduler를 새로 정의하고 이 함수를 통해 학습률을 통제하는 전략을 사용한다.
$$lr = d_{model}^{-0.5} \cdot min(step\_num^{-0.5}, step\_num \cdot warmup\_steps^{-1.5})$$
이 함수의 특징은 다음과 같다.
class ScheduledOptim():
'''A simple wrapper class for learning rate scheduling'''
def __init__(self, optimizer, lr_mul, d_model, n_warmup_steps):
self._optimizer = optimizer
self.lr_mul = lr_mul
self.d_model = d_model
self.n_warmup_steps = n_warmup_steps
self.n_steps = 0
def step_and_update_lr(self):
"Step with the inner optimizer"
self._update_learning_rate()
self._optimizer.step()
def zero_grad(self):
"Zero out the gradients with the inner optimizer"
self._optimizer.zero_grad()
def _get_lr_scale(self):
d_model = self.d_model
n_steps, n_warmup_steps = self.n_steps, self.n_warmup_steps
return (d_model ** -0.5) * min(n_steps ** (-0.5), n_steps * n_warmup_steps ** (-1.5))
def _update_learning_rate(self):
''' Learning rate scheduling per step '''
self.n_steps += 1
lr = self.lr_mul * self._get_lr_scale()
for param_group in self._optimizer.param_groups:
param_group['lr'] = lr
optimizer = ScheduledOptim(
optim.Adam(transformer.parameters(), betas=(0.9, 0.98), eps=1e-09),
opt.lr_mul, opt.d_model, opt.n_warmup_steps)
def cal_performance(pred, gold, trg_pad_idx, smoothing=False):
''' Apply label smoothing if needed '''
# loss 값을 구한다
loss = cal_loss(pred, gold, trg_pad_idx, smoothing=smoothing)
# 전체 단어 중에 가장 값이 높은 index 검색
pred = pred.max(1)[1]
gold = gold.contiguous().view(-1)
# 정답지에서 padding index가 아닌거 조회
non_pad_mask = gold.ne(trg_pad_idx)
# 예측단어 중 정답을 맞춘거의 갯수
n_correct = pred.eq(gold).masked_select(non_pad_mask).sum().item()
# 정답 label 의 갯수
n_word = non_pad_mask.sum().item()
return loss, n_correct, n_word
# loss, 맞춘갯수, 전체갯수
def cal_loss(pred, gold, trg_pad_idx, smoothing=False):
''' Calculate cross entropy loss, apply label smoothing if needed. '''
gold = gold.contiguous().view(-1)
if smoothing:
eps = 0.1
n_class = pred.size(1)
# gold값을 pred의 shape으로 바꿔서, one-hot encoding 적용한다.
one_hot = torch.zeros_like(pred).scatter(1, gold.view(-1, 1), 1)
# epsilon 값에 따라 smoothing 처리한다, [0,1,0,0] → [0.03, 0.9, 0.03, 0.03]
one_hot = one_hot * (1 - eps) + (1 - one_hot) * eps / (n_class - 1)
# pred 값을 softmax를 이용하여 확률값으로 변환한다.
log_prb = F.log_softmax(pred, dim=1)
# gold에서 padding이 아닌 값의 index를 뽑는다
non_pad_mask = gold.ne(trg_pad_idx)
# 예측값과 smoothing된 정답값을 곱하여 loss를 산출한다.
loss = -(one_hot * log_prb).sum(dim=1)
# 해당 loss값에서 mask되지 않은 값을 제거하고 loss를 구한다
loss = loss.masked_select(non_pad_mask).sum() # average later
else:
# smoothing을 사용하지 않은 경우에는 cross entropy를 사용하여 loss를 구한다.
loss = F.cross_entropy(pred, gold, ignore_index=trg_pad_idx, reduction='sum')
return loss
def train_epoch(model, training_data, optimizer, opt, device, smoothing):
''' Epoch operation in training phase'''
# model.train() 학습할때 필요한 drop out, batch_normalization 등의 기능을 활성화
# model.eval()과 model.train()을 병행하므로, 모델 학습시에는 model.train() 호출해야함
model.train()
total_loss, n_word_total, n_word_correct = 0, 0, 0
desc = ' - (Training) '
for batch in tqdm(training_data, mininterval=2, desc=desc, leave=False):
# prepare data
# ① src_seq.trg_seq, trg에 대한 정답 label "gold" 생성
src_seq = patch_src(batch.src, opt.src_pad_idx).to(device)
trg_seq, gold = map(lambda x: x.to(device), patch_trg(batch.trg, opt.trg_pad_idx))
# forward
# backward 전 optimizer의 기울기를 초기화해야만 새로운 가중치 편향에 대해서 새로운 기울기를 구할 수 있습니다.
optimizer.zero_grad()
# ② model 예측값 생성
pred = model(src_seq, trg_seq) # 256 * (trg 문장길이-1), 10077(vocab)
# ③ loss값 계산
loss, n_correct, n_word = cal_performance(
pred, gold, opt.trg_pad_idx, smoothing=smoothing)
# ④ parameter update 진행
loss.backward()
optimizer.step_and_update_lr()
# note keeping
n_word_total += n_word
n_word_correct += n_correct
total_loss += loss.item()
# 평균 loss
loss_per_word = total_loss/n_word_total
# 평균 acc
accuracy = n_word_correct/n_word_total
return loss_per_word, accuracy
def eval_epoch(model, validation_data, device, opt):
''' Epoch operation in evaluation phase '''
model.eval()
total_loss, n_word_total, n_word_correct = 0, 0, 0
desc = ' - (Validation) '
with torch.no_grad():
for batch in tqdm(validation_data, mininterval=2, desc=desc, leave=False):
# prepare data
src_seq = patch_src(batch.src, opt.src_pad_idx).to(device)
trg_seq, gold = map(lambda x: x.to(device), patch_trg(batch.trg, opt.trg_pad_idx))
# forward
pred = model(src_seq, trg_seq)
loss, n_correct, n_word = cal_performance(
pred, gold, opt.trg_pad_idx, smoothing=False)
# note keeping
n_word_total += n_word
n_word_correct += n_correct
total_loss += loss.item()
loss_per_word = total_loss/n_word_total
accuracy = n_word_correct/n_word_total
return loss_per_word, accuracy
def train(model, training_data, validation_data, optimizer, device, opt):
''' Start training '''
# Use tensorboard to plot curves, e.g. perplexity, accuracy, learning rate
if opt.use_tb:
print("[Info] Use Tensorboard")
from torch.utils.tensorboard import SummaryWriter
tb_writer = SummaryWriter(log_dir=os.path.join(opt.output_dir, 'tensorboard'))
log_train_file = os.path.join(opt.output_dir, 'train.log')
log_valid_file = os.path.join(opt.output_dir, 'valid.log')
print('[Info] Training performance will be written to file: {} and {}'.format(
log_train_file, log_valid_file))
with open(log_train_file, 'w') as log_tf, open(log_valid_file, 'w') as log_vf:
log_tf.write('epoch,loss,ppl,accuracy\n')
log_vf.write('epoch,loss,ppl,accuracy\n')
def print_performances(header, ppl, accu, start_time, lr):
print(' - {header:12} ppl: {ppl: 8.5f}, accuracy: {accu:3.3f} %, lr: {lr:8.5f}, '\
'elapse: {elapse:3.3f} min'.format(
header=f"({header})", ppl=ppl,
accu=100*accu, elapse=(time.time()-start_time)/60, lr=lr))
#valid_accus = []
valid_losses = []
for epoch_i in range(opt.epoch):
print('[ Epoch', epoch_i, ']')
start = time.time()
train_loss, train_accu = train_epoch(
model, training_data, optimizer, opt, device, smoothing=opt.label_smoothing)
train_ppl = math.exp(min(train_loss, 100)) # train_loss : loss_per_word, PPL = exp(cross_entropy)
# Current learning rate
lr = optimizer._optimizer.param_groups[0]['lr']
print_performances('Training', train_ppl, train_accu, start, lr)
start = time.time()
valid_loss, valid_accu = eval_epoch(model, validation_data, device, opt)
valid_ppl = math.exp(min(valid_loss, 100))
print_performances('Validation', valid_ppl, valid_accu, start, lr)
valid_losses += [valid_loss]
checkpoint = {'epoch': epoch_i, 'settings': opt, 'model': model.state_dict()}
if opt.save_mode == 'all':
model_name = 'model_accu_{accu:3.3f}.chkpt'.format(accu=100*valid_accu)
torch.save(checkpoint, model_name)
elif opt.save_mode == 'best':
model_name = 'model.chkpt'
if valid_loss <= min(valid_losses):
torch.save(checkpoint, os.path.join(opt.output_dir, model_name))
print(' - [Info] The checkpoint file has been updated.')
with open(log_train_file, 'a') as log_tf, open(log_valid_file, 'a') as log_vf:
log_tf.write('{epoch},{loss: 8.5f},{ppl: 8.5f},{accu:3.3f}\n'.format(
epoch=epoch_i, loss=train_loss,
ppl=train_ppl, accu=100*train_accu))
log_vf.write('{epoch},{loss: 8.5f},{ppl: 8.5f},{accu:3.3f}\n'.format(
epoch=epoch_i, loss=valid_loss,
ppl=valid_ppl, accu=100*valid_accu))
if opt.use_tb:
tb_writer.add_scalars('ppl', {'train': train_ppl, 'val': valid_ppl}, epoch_i)
tb_writer.add_scalars('accuracy', {'train': train_accu*100, 'val': valid_accu*100}, epoch_i)
tb_writer.add_scalar('learning_rate', lr, epoch_i)
여기서 PPL(Perplexity)은 언어 모델을 평가하기 위한 지표이다. PPL은 곧 언어 모델의 분기계수인데, 분기계수란 tree자료구조에서 branch의 개수를 의미하고, 한 가지 경우를 골라야 하는 task에서 선택지의 개수를 뜻한다. 언어모델에서 분기계수는 이전 단어로 다음 단어를 예측할 때 몇개의 단어 후보를 고려하는지를 의미한다.
즉, PPL 값이 낮을수록 언어 모델이 쉽게 정답을 찾아내는 것이므로 성능이 우수하다고 평가할 수 있다.
번역은 크게 세가지 구조로 구성되어 있다.
먼저 test 데이터를 불러와야 한다.
data = pickle.load(open(opt.data_pkl, 'rb'))
SRC, TRG = data['vocab']['src'], data['vocab']['trg']
# padding index와 시작, 끝 index를 가져온다.
opt.src_pad_idx = SRC.vocab.stoi[Constants.PAD_WORD]
opt.trg_pad_idx = TRG.vocab.stoi[Constants.PAD_WORD]
opt.trg_bos_idx = TRG.vocab.stoi[Constants.BOS_WORD]
opt.trg_eos_idx = TRG.vocab.stoi[Constants.EOS_WORD]
test_loader = Dataset(examples=data['test'], fields={'src': SRC, 'trg': TRG})
그 다음 학습한 모델을 불러온다.
''' Translate input text with trained model. '''
import torch
import dill as pickle
from tqdm import tqdm
def load_model(opt, device):
# load model
checkpoint = torch.load(opt.model, map_location=device)
# model의 option load
model_opt = checkpoint['settings']
# transformer model을 model option에 따라 재생성
model = Transformer(
model_opt.src_vocab_size,
model_opt.trg_vocab_size,
model_opt.src_pad_idx,
model_opt.trg_pad_idx,
trg_emb_prj_weight_sharing=model_opt.proj_share_weight,
emb_src_trg_weight_sharing=model_opt.embs_share_weight,
d_k=model_opt.d_k,
d_v=model_opt.d_v,
d_model=model_opt.d_model,
d_word_vec=model_opt.d_word_vec,
d_inner=model_opt.d_inner_hid,
n_layers=model_opt.n_layers,
n_head=model_opt.n_head,
dropout=model_opt.dropout).to(device)
# 학습된 weight를 model에 반영
model.load_state_dict(checkpoint['model'])
print('[Info] Trained model state loaded.')
return model
- 추후 내용 추가 예정
[코드리뷰] Attention is All You Need-Pytorch [2] Model Architecture (0) | 2022.09.29 |
---|---|
[코드리뷰] Attention is All You Need-Pytorch [1] Preprocess (0) | 2022.08.29 |
Attention is all you need논문의 pytorch Implementation 코드를 리뷰하는 포스팅이다.
https://github.com/jadore801120/attention-is-all-you-need-pytorch
GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer model in "Attention is All
A PyTorch implementation of the Transformer model in "Attention is All You Need". - GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer mo...
github.com
Transformer 코드 리뷰는 세 개의 포스팅으로 나누어 진행을 할 것이고 본 포스팅은 두 번째로 모델 구조 부분이다.
다만 자세한 모델 설명은 이전의 포스팅에서 설명했으므로 생략하겠다.
지난 전처리에서 vocab size는 10077이였다. 이 vocab 내 모든 단어들을 고유 벡터로 만들어서 512차원으로 embedding 시켜주기 위해 nn.Embedding을 사용한다.
import torch.nn as nn
vacab_len = 10077
import torch.nn as nn
embedding_layer = nn.Embedding(num_embeddings=vacab_len,
embedding_dim=512,
padding_idx=1)
print('vocab 내 모든 단어들을 고유 벡터로 만들어서 512차원으로 embedding 시켜줌')
print(f'LOOK UP TABLE SIZE: {embedding_layer.weight.shape}')
print(embedding_layer.weight)
vocab 내 모든 단어들을 고유 벡터로 만들어서 512차원으로 embedding 시켜줌
LOOK UP TABLE SIZE: torch.Size([10077, 512])
Parameter containing:
tensor([[ 0.9914, -0.4633, -1.2087, ..., -2.0206, -2.9503, -0.4875],
[ 0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[-0.5413, -0.2046, -0.0866, ..., -0.1290, 0.0060, 0.8837],
...,
[-0.6048, -0.5161, 0.6198, ..., -0.0604, -0.0853, 0.4896],
[ 0.2955, 0.1371, 0.4321, ..., -0.3920, 0.7107, 0.5773],
[-0.6375, -0.2004, 0.9800, ..., 0.6815, 0.3131, 1.8058]],
requires_grad=True)
transformer에서 문장 내 각 token의 위치정보를 넣어주기 위해 positional encoding을 해준다. Positional Encoding은 embedding과 차원이 동일하며, embedding vector와 더하여 위치정보를 추가하게 된다.
class PositionalEncoding(nn.Module):
def __init__(self, d_hid, n_position=200):
super(PositionalEncoding, self).__init__()
# Not a parameter
self.register_buffer('pos_table', self._get_sinusoid_encoding_table(n_position, d_hid)) # ①
def _get_sinusoid_encoding_table(self, n_position, d_hid):
'''Sinusoid position encoding table'''
def get_position_angle_vec(position):
return [position / np.power(10000, 2 * (hid_j // 2) / d_hid) for hid_j in range(d_hid)]
sinusoid_table = np.array([get_position_angle_vec(pos_i) for pos_i in range(n_position)])
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
return torch.FloatTensor(sinusoid_table).unsqueeze(0)
def forward(self, x):
return x + self.pos_table[:, :x.size(1)].clone().detach() # x : torch.size([328, 36, 512])
class ScaledDotProductAttention(nn.Module):
''' Scaled Dot-Product Attention '''
def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature
self.dropout = nn.Dropout(attn_dropout)
def forward(self, q, k, v, mask=None):
# ① attention score 계산
attn = torch.matmul(q / self.temperature, k.transpose(2, 3))
# ② attention을 계산하지 않는 위치를 매우작은값으로 치환
# - 매우작은 값은 softmax를 거칠때 0과 가까운 값이 되므로 무시한다.
if mask is not None:
attn = attn.masked_fill(mask == 0, -1e9)
# ③ soft max를 이용하여 attention weight 계산
attn = self.dropout(F.softmax(attn, dim=-1))
# ④ 해당 분포값에 v를 곱하여 attention value를 구한다
output = torch.matmul(attn, v)
return output, attn
# attention values, attention weight
class MultiHeadAttention(nn.Module):
''' Multi-Head Attention module '''
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
super().__init__()
self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
self.w_qs = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_ks = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_vs = nn.Linear(d_model, n_head * d_v, bias=False)
self.fc = nn.Linear(n_head * d_v, d_model, bias=False)
self.attention = ScaledDotProductAttention(temperature=d_k ** 0.5)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
def forward(self, q, k, v, mask=None):
d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)
residual = q
# Pass through the pre-attention projection: b x lq x (n*dv)
# Separate different heads: b x lq x n x dv
q = self.w_qs(q).view(sz_b, len_q, n_head, d_k) # [256, len, 8, 64]
k = self.w_ks(k).view(sz_b, len_k, n_head, d_k) # [256, len, 8, 64]
v = self.w_vs(v).view(sz_b, len_v, n_head, d_v) # [256, len, 8, 64]
# Transpose for attention dot product: b x n x lq x dv
q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2) # [256, 8, len, 64]
if mask is not None:
mask = mask.unsqueeze(1) # For head axis broadcasting.
q, attn = self.attention(q, k, v, mask=mask) # [256, 8, len, 64], [256, 8, len, 36]
# Transpose to move the head dimension back: b x lq x n x dv
# Combine the last two dimensions to concatenate all the heads together: b x lq x (n*dv)
q = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1) # [256, 36, 512]
q = self.dropout(self.fc(q))
q += residual
q = self.layer_norm(q)
return q, attn
Multi Head Attention은 세가지 요소를 입력으로 받는다.
class PositionwiseFeedForward(nn.Module):
''' A two-feed-forward-layer module '''
def __init__(self, d_in, d_hid, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_in, d_hid) # position-wise 512 -> 2048
self.w_2 = nn.Linear(d_hid, d_in) # position-wise 2048 -> 512
self.layer_norm = nn.LayerNorm(d_in, eps=1e-6)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
residual = x
x = self.w_2(F.relu(self.w_1(x)))
x = self.dropout(x)
x += residual
x = self.layer_norm(x)
return x
다음은 하나의 인코더 레이어를 정의한다. 인코더 레이어는 입력과 출력의 차원이 같다. 이러한 특징을 이용해 인코더 레이어를 여러 번 중첩해 사용할 수 있다.
class EncoderLayer(nn.Module):
''' Compose with two layers '''
def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout=0.1):
super(EncoderLayer, self).__init__()
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout=dropout)
def forward(self, enc_input, slf_attn_mask=None):
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn
여기서 slf_attn_mask는 pad_idx에 대하여 mask값을 0으로 처리하는 다음과 같은 함수이다. 이는 필요 없는 패딩 인덱스에 대해 attention 연산을 안 하겠다는 의미이다.
def get_pad_mask(seq, pad_idx):
return (seq != pad_idx).unsqueeze(-2)
이 클래스에서 전체 인코더 아키텍처를 정의한다.
class Encoder(nn.Module):
''' A encoder model with self attention mechanism. '''
def __init__(
self, n_src_vocab, d_word_vec, n_layers, n_head, d_k, d_v,
d_model, d_inner, pad_idx, dropout=0.1, n_position=200, scale_emb=False):
super().__init__()
# 모든 단어들을 embedding (고유 백터를 가진 차원으로 변경)
# 현재 코드에서는10077의 vocab 내 단어들을 512 차원으로 embedding 시킴
self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=pad_idx)
# 문장의 최대길이까지 positional encoding을함
# positional encoding : rnn이 아니므로 순서에 대한 정보를 반영하기위한 방법
self.position_enc = PositionalEncoding(d_word_vec, n_position=n_position)
self.dropout = nn.Dropout(p=dropout)
# multiple encoder
# encoder layer stack으로 encoder 층을 6개로 쌓는다.
# encoder layer : multihead attentions, feedforward로 구성
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
# layer_norm
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
self.scale_emb = scale_emb
self.d_model = d_model
def forward(self, src_seq, src_mask, return_attns=True):
enc_slf_attn_list = []
# -- Forward
# ① word embedding
enc_output = self.src_word_emb(src_seq)
if self.scale_emb:
enc_output *= self.d_model ** 0.5
# ② positional encoding
enc_output = self.dropout(self.position_enc(enc_output))
# normalization
enc_output = self.layer_norm(enc_output)
# ③ stacked encoder layers
for enc_layer in self.layer_stack:
enc_output, enc_slf_attn = enc_layer(enc_output, slf_attn_mask=src_mask)
enc_slf_attn_list += [enc_slf_attn] if return_attns else []
if return_attns:
return enc_output, enc_slf_attn_list
return enc_output,
decoder layer도 encoder layer와 같이 입력과 출력의 차원이 같다. 이러한 특징을 이용해 트랜스포머의 decoder는 decoder layer를 여러 번 중첩해 사용한다.
또한 decoder layer는 encoder layer와 다르게 두 개의 multi head attention이 사용된다.
Masked Multi-head Attention에서는 뒤쪽 단어의 점수를 참고하지 않도록 대각행렬 위쪽을 마스킹 처리를 해준다.
# masked multi-head attention을 하기 위한 mask 과정
def get_subsequent_mask(seq):
''' For masking out the subsequent info. '''
sz_b, len_s = seq.size()
subsequent_mask = (1 - torch.triu(
torch.ones((1, len_s, len_s), device=seq.device), diagonal=1)).bool() # 대각행렬 윗부분을 False로 치환
return subsequent_mask
class DecoderLayer(nn.Module):
''' Compose with three layers '''
def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout=0.1):
super(DecoderLayer, self).__init__()
#
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout=dropout)
def forward(
self, dec_input, enc_output,
slf_attn_mask=None, dec_enc_attn_mask=None):
# ① sublayer 1: decoder input 값에 대한 self attention
dec_output, dec_slf_attn = self.slf_attn(dec_input, dec_input, dec_input, mask=slf_attn_mask)
# ② sublayer 2 : ①의 결과값을 query로, key,value는 encoder의 output 값으로 attention
dec_output, dec_enc_attn = self.enc_attn(dec_output, enc_output, enc_output, mask=dec_enc_attn_mask)
# ③ sublayer 3 : position wise feed forward를 통과
dec_output = self.pos_ffn(dec_output)
return dec_output, dec_slf_attn, dec_enc_attn
class Decoder(nn.Module):
''' A decoder model with self attention mechanism. '''
def __init__(
self, n_trg_vocab, d_word_vec, n_layers, n_head, d_k, d_v,
d_model, d_inner, pad_idx, n_position=200, dropout=0.1, scale_emb=False):
super().__init__()
# target word embedding
self.trg_word_emb = nn.Embedding(n_trg_vocab, d_word_vec, padding_idx=pad_idx)
# positional encoding
self.position_enc = PositionalEncoding(d_word_vec, n_position=n_position)
self.dropout = nn.Dropout(p=dropout)
# stacked decoder layers
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
# layer_normalization
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
self.scale_emb = scale_emb
self.d_model = d_model
def forward(self, trg_seq, trg_mask, enc_output, src_mask, return_attns=True):
dec_slf_attn_list, dec_enc_attn_list = [], []
# -- Forward
# ① target words embedding
dec_output = self.trg_word_emb(trg_seq)
if self.scale_emb:
dec_output *= self.d_model ** 0.5
# ② positional encoding
dec_output = self.dropout(self.position_enc(dec_output))
dec_output = self.layer_norm(dec_output)
# ③ decoder_layer stacked
for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output, slf_attn_mask=trg_mask, dec_enc_attn_mask=src_mask)
dec_slf_attn_list += [dec_slf_attn] if return_attns else []
dec_enc_attn_list += [dec_enc_attn] if return_attns else []
if return_attns:
return dec_output, dec_slf_attn_list, dec_enc_attn_list
return dec_output,
마지막으로 transformer model을 구현해보자. transformer 구조는 크게보면 다음과 같다.
''' Define the Transformer model '''
import torch
import torch.nn as nn
import numpy as np
#from transformer.Layers import EncoderLayer, DecoderLayer
class Transformer(nn.Module):
''' A sequence to sequence model with attention mechanism. '''
def __init__(
self, n_src_vocab, n_trg_vocab, src_pad_idx, trg_pad_idx,
d_word_vec=512, d_model=512, d_inner=2048,
n_layers=6, n_head=8, d_k=64, d_v=64, dropout=0.1, n_position=200,
trg_emb_prj_weight_sharing=True, emb_src_trg_weight_sharing=True,
scale_emb_or_prj='prj'):
super().__init__()
# padding index 저장
self.src_pad_idx, self.trg_pad_idx = src_pad_idx, trg_pad_idx
assert scale_emb_or_prj in ['emb', 'prj', 'none']
scale_emb = (scale_emb_or_prj == 'emb') if trg_emb_prj_weight_sharing else False
self.scale_prj = (scale_emb_or_prj == 'prj') if trg_emb_prj_weight_sharing else False
self.d_model = d_model
# encoder 정의
self.encoder = Encoder(
n_src_vocab=n_src_vocab, n_position=n_position,
d_word_vec=d_word_vec, d_model=d_model, d_inner=d_inner,
n_layers=n_layers, n_head=n_head, d_k=d_k, d_v=d_v,
pad_idx=src_pad_idx, dropout=dropout, scale_emb=scale_emb)
# decoder 정의
self.decoder = Decoder(
n_trg_vocab=n_trg_vocab, n_position=n_position,
d_word_vec=d_word_vec, d_model=d_model, d_inner=d_inner,
n_layers=n_layers, n_head=n_head, d_k=d_k, d_v=d_v,
pad_idx=trg_pad_idx, dropout=dropout, scale_emb=scale_emb)
# 최종 output layers 정의
self.trg_word_prj = nn.Linear(d_model, n_trg_vocab, bias=False)
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
assert d_model == d_word_vec, \
'To facilitate the residual connections, \
the dimensions of all module outputs shall be the same.'
if trg_emb_prj_weight_sharing:
# Share the weight between target word embedding & last dense layer
self.trg_word_prj.weight = self.decoder.trg_word_emb.weight
if emb_src_trg_weight_sharing:
self.encoder.src_word_emb.weight = self.decoder.trg_word_emb.weight
def forward(self, src_seq, trg_seq):
# ① source, target 데이터에 대한 mask 생성
src_mask = get_pad_mask(src_seq, self.src_pad_idx)
trg_mask = get_pad_mask(trg_seq, self.trg_pad_idx) & get_subsequent_mask(trg_seq)
# ② encoder 층을 통과
#enc_output, *_ = self.encoder(src_seq, src_mask)
enc_output, *_ = self.encoder(src_seq, src_mask)
# ③ decoder 층을 통과
#dec_output, *_ = self.decoder(trg_seq, trg_mask, enc_output, src_mask)
dec_output, attention1, attention2 = self.decoder(trg_seq, trg_mask, enc_output, src_mask)
# ④ 최종 weight 층을 통과
seq_logit = self.trg_word_prj(dec_output)
if self.scale_prj:
seq_logit *= self.d_model ** -0.5
return seq_logit.view(-1, seq_logit.size(2))
여기까지 transformer를 구성하는 모듈들에 대해 알아보았다. 다음 포스팅에는 전체적인 학습과 옵티마이저, 예측 함수에 대해 알아보도록 하겠다.
[코드리뷰] Attention is All You Need-Pytorch [3] 모델 학습 및 번역 (0) | 2022.12.16 |
---|---|
[코드리뷰] Attention is All You Need-Pytorch [1] Preprocess (0) | 2022.08.29 |
Attention is all you need논문의 pytorch Implementation 코드를 리뷰하는 포스팅이다.
https://github.com/jadore801120/attention-is-all-you-need-pytorch
GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer model in "Attention is All
A PyTorch implementation of the Transformer model in "Attention is All You Need". - GitHub - jadore801120/attention-is-all-you-need-pytorch: A PyTorch implementation of the Transformer mo...
github.com
Transformer 코드 리뷰는 세 개의 포스팅으로 나누어 진행을 할 것이고 본 포스팅은 그 첫 번째로 전처리 부분이다.
먼저 필요한 라이브러리를 임포트해주고 파라미터를 EasyDict를 통해 선언해주자.
import spacy
import torchtext.data # torchtext의 버전은 0.3.1을 사용
import torchtext.datasets
import dill as pickle
import easydict
opt = easydict.EasyDict({
"lang_src" : 'de_core_news_sm',# source 언어 / spacy 사용시
"lang_trg" : 'en_core_web_sm', # target 언어 / spacy 사용시
"save_data" : True,
"data_src" : None,
"data_trg" : None,
"max_len" :100, # 한 문장에 들어가는 최대 token 수 / 해당 갯수 이상이면 버림
"min_word_count" : 3, # vocab을 만들어 줄때 최소 갯수 : 해당 갯수 미만이면 <unk>로 vocab이 형성됨
"keep_case": True, # 대소문자 구분
"share_vocab" : "store_true", # target vocab과 source vocab을 하나로 합치는지 여부
})
tokenizers는 문장을 개별 토큰으로 변환해주는 데 사용된다. 본 코드에서는 nlp를 쉽게 처리할 수 있도록 도와주는 패키지인 spacy를 이용하여 토큰화를 한다. 먼저 영어와 독일어 전처리 모듈을 설치하자.
!python -m spacy download en
!python -m spacy download de
모듈을 설치한 후 독일어와 영어를 토큰화 해주자.
src_lang_model = spacy.load('de_core_news_sm') # 독일어 토큰화
trg_lang_model = spacy.load('en_core_web_sm') # 영어 토큰화
토큰화의 예시는 다음과 같다.
# 토큰화 예시
tokenized = trg_lang_model.tokenizer('I am a student.')
for i, token in enumerate(tokenized):
print(f'index {i}: {token.text}')
index 0: I
index 1: am
index 2: a
index 3: student
index 4: .
독일어와 영어의 토큰화 함수를 정의한다.
# 독일어 문장을 토큰화하는 함수
def tokenize_src(text):
return [tok.text for tok in src_lang_model.tokenizer(text)]
# 영어 문장을 토큰화하는 함수
def tokenize_trg(text):
return [tok.text for tok in trg_lang_model.tokenizer(text)]
default 토큰(padding unknown, start, end token)을 정의하고 torchtext의 Field 라이브러리를 사용하여 데이터 처리를 준비한다.
Field는 데이터타입과 이를 텐서로 변환할 지시사항과 함께 정의하는 것이다. Field는 텐서로 표현될 수 있는 덱스트 데이터 타입을 처리하고, 각 토큰을 숫자 인덱스로 맵핑시켜주는 단어장(vocab) 객체가 있다. 또한 토큰화 하는 함수, 전처리 등을 지정할 수 있다.
PAD_WORD = '<blank>' # padding token
UNK_WORD = '<unk>' # unknown token
BOS_WORD = '<s>' # start token
EOS_WORD = '</s>' # end token
SRC = torchtext.data.Field(
tokenize=tokenize_src, lower=False,
pad_token=PAD_WORD, init_token=BOS_WORD, eos_token=EOS_WORD)
TRG = torchtext.data.Field(
tokenize=tokenize_trg, lower=False,
pad_token=PAD_WORD, init_token=BOS_WORD, eos_token=EOS_WORD)
이제 영어-독일어 번역 데이터셋인 Multi30k를 불러오자. (약 3만개의 영어, 독일어 문장)
MAX_LEN = opt.max_len # 문장의 최대 허용 token 갯수, token의 갯수보다 문장내 토큰의 갯수가 return False
MIN_FREQ = opt.min_word_count # vocab을 만들어 줄때 최소 갯수 : 해당 갯수 미만이면 <unk>로 vocab이 형성됨
def filter_examples_with_length(x):
return len(vars(x)['src']) <= MAX_LEN and len(vars(x)['trg']) <= MAX_LEN
train, val, test = torchtext.datasets.Multi30k.splits(
exts = ('.de', '.en'),
fields = (SRC, TRG),
filter_pred = filter_examples_with_length)
print(f"학습 데이터셋(training dataset) 크기: {len(train.examples)}개")
print(f"평가 데이터셋(validation dataset) 크기: {len(val.examples)}개")
print(f"테스트 데이터셋(testing dataset) 크기: {len(test.examples)}개")
학습 데이터셋(training dataset) 크기: 29000개
평가 데이터셋(validation dataset) 크기: 1014개
테스트 데이터셋(testing dataset) 크기: 1000개
학습 데이터 중 하나를 선택해 출력해보자.
# 학습 데이터 중 하나를 선택해 출력
print(vars(train.examples[25])['src'])
print(vars(train.examples[25])['trg'])
['Eine', 'Person', 'in', 'einem', 'blauen', 'Mantel', 'steht', 'auf', 'einem', 'belebten', 'Gehweg', 'und', 'betrachtet', 'ein', 'Gemälde', 'einer', 'Straßenszene', '.']
['A', 'person', 'dressed', 'in', 'a', 'blue', 'coat', 'is', 'standing', 'in', 'on', 'a', 'busy', 'sidewalk', ',', 'studying', 'painting', 'of', 'a', 'street', 'scene', '.']
'Eine Person in einem blauen Mantel steht auf einem belebten Gehweg und betrachtet ein Gemälde einer Straßenszene .'
=> 'A person in a blue coat stands on a busy sidewalk and looks at a painting of a street scene'
(파란 코트를 입은 사람이 붐비는 인도에 서서 거리의 풍경을 보고 있다.)
field 객체의 build_vocab 메서드를 이용해 영어와 독일어 단어 사전을 생성한다. 여기서는 최소 3번 이상 등장한 단어만을 선택하는데 이때, 2번 이하로 나오는 단어는 '<unk>'token으로 변환된다.
SRC.build_vocab(train.src, min_freq=MIN_FREQ)
TRG.build_vocab(train.trg, min_freq=MIN_FREQ)
print(f'len(SRC) : {len(SRC.vocab)}')
print(f'len(TRG) : {len(TRG.vocab)}')
len(SRC) : 5497
len(TRG) : 4727
영어 단어의 토큰화 예시를 살펴보자.
print(TRG.vocab.stoi['abcabc']) # 없는 단어 : 0
print(TRG.vocab.stoi[TRG.pad_token]) # 패딩 : 1
print(TRG.vocab.stoi['<s>']) # <s> : 2
print(TRG.vocab.stoi['</s>']) # </s> : 3
print(TRG.vocab.stoi['python']) # 2번 이하 단어
print(TRG.vocab.stoi["world"])
0
1
2
3
0
1870
본 코드에서는 share_vocab함수를 사용한다. 이 함수는 여러 개의 tokenizer를 사용할 때만 사용하는데 여러 개일 경우 같은 단어가 복수의 토큰 값을 가지게 되기 때문에 하나로 합치는 과정이 필요하기 때문에 사용한다. 본 논문은 독어와 영어에 중복되는 단어가 있기 때문에 사용한 것 같다.
for w, _ in SRC.vocab.stoi.items():
# TODO: Also update the `freq`, although it is not likely to be used.
if w not in TRG.vocab.stoi:
TRG.vocab.stoi[w] = len(TRG.vocab.stoi)
TRG.vocab.itos = [None] * len(TRG.vocab.stoi)
for w, i in TRG.vocab.stoi.items():
TRG.vocab.itos[i] = w
SRC.vocab.stoi = TRG.vocab.stoi
SRC.vocab.itos = TRG.vocab.itos
print('[Info] Get merged vocabulary size:', len(TRG.vocab))
[Info] Get merged vocabulary size: 10079
위의 과정을 통해 생성한 option과 vocab(src, trg), train, valid, test data를 저장해야 한다.
data = {
'settings': opt,
'vocab': {'src': SRC, 'trg': TRG},
'train': train.examples,
'valid': val.examples,
'test': test.examples}
print('[Info] Dumping the processed data to pickle file', opt.save_data)
with open("m30k_deen_shr.pkl","wb") as f:
pickle.dump(data,f)
[Info] Dumping the processed data to pickle file True
다음 포스팅에서는 Attention is all you need의 메인이 되는 Transformer 아키택쳐에 대해 리뷰해보겠다.
[코드리뷰] Attention is All You Need-Pytorch [3] 모델 학습 및 번역 (0) | 2022.12.16 |
---|---|
[코드리뷰] Attention is All You Need-Pytorch [2] Model Architecture (0) | 2022.09.29 |