이번 포스팅에서는 pytorch의 기본 흐름에 대해 적어보겠다. dataset은 car_evaluation data를 사용한다.
Data
car_evaluation
- price : 자동차 가격
- maint : 자동차 유지 비용
- doors : 자동차 문 개수
- persons : 수용 인원
- lug_capacity : 수하물 용량
- safety : 안정성
- output : 차 상태 - 이 데이터는 unacc, acc, good, vgood 중 하나의 값을 가진다.
Library
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
Data Load
dataset = pd.read_csv('./car_evaluation.csv')
dataset.head()

fig_size = plt.rcParams['figure.figsize']
fig_size[0] = 8
fig_size[1] = 6
plt.rcParams['figure.figsize'] = fig_size
dataset.output.value_counts().plot(kind='pie', autopct='%0.05f%%',
colors=['lightblue', 'lightgreen', 'orange', 'pink'],
explode=(0.05, 0.05, 0.05, 0.05))

Preprocessing
먼저 astype() 메서드를 이용하여 범주 특성을 갖는 데이터를 범주형 타입으로 변환한다. 또한, 파이토치를 이용한 모델 학습을 해야 하므로 범주형 타입을 텐서로 변환해야 한다.
categorical_col = ['price', 'maint', 'doors', 'persons', 'lug_capacity', 'safety']
for category in categorical_col:
dataset[category] = dataset[category].astype('category')
price = dataset['price'].cat.codes.values # 범주형 데이터를 텐서로 변환하기 위해 다음과 같은 절차가 필요하다.
maint = dataset['maint'].cat.codes.values
doors = dataset['doors'].cat.codes.values
persons = dataset['persons'].cat.codes.values
lug_capacity = dataset['lug_capacity'].cat.codes.values
safety = dataset['safety'].cat.codes.values
categorical_data = np.stack([price, maint, doors, persons, lug_capacity, safety], 1)
범주형 데이터를 숫자로 변환하기 위해 cat.codes를 사용한다. cat.codes는 어떤 클래스가 어떤 숫자로 매핑되어 있는지 확인이 어려운 단점이 있으므로 주의해야 한다.
이제 torch 모듈을 이용하여 배열을 텐서로 변환한다.
categorical_data = torch.tensor(categorical_data, dtype=torch.int64)
categorical_data

마지막으로 레이블로 사용할 컬럼에 대해서도 텐서로 변환해준다. 이번에는 get_dummies를 이용하여 넘파이 배열로 변환한다.
outputs = pd.get_dummies(dataset.output)
outputs = outputs.values
outputs = torch.tensor(outputs).flatten() # 1차원 텐서로 변환
print(categorical_data.shape)
print(outputs.shape)

워드 임베딩은 유사한 단어끼리 유사하게 인코딩되도록 표현하는 방법이다. 또한, 높은 차원의 임베딩일수록 단어 간의 세부적인 관계를 잘 파악할 수 있다. 따라서 단일 숫자로 변환된 넘파이 배열을 N차원으로 변경하여 사용한다.
배열은 N차원으로 변환하기 위해 먼저 모든 범주형 칼럼에 대한 임베딩 크기를 정의한다. 보통 컬럼의 고유 값 수를 2로 나누는 것을 많이 사용한다.
다음은 (모든 범주형 칼럼의 고유값 수, 차원의 크기) 형태의 배열을 출력한 결과이다.
categorical_column_sizes = [len(dataset[column].cat.categories) for column in categorical_col]
categorical_embedding_sizes = [(col_size, min(50, (col_size+1)//2)) for col_size in categorical_column_sizes]
print(categorical_embedding_sizes)

데이터셋을 훈련과 테스트 용도로 분리한다.
total_records = 1728
test_records = int(total_records * .2) # 전체 데이터 중 20%를 테스트로 사용
categorical_train = categorical_data[:total_records - test_records]
categorical_test = categorical_data[total_records - test_records:total_records]
train_outputs = outputs[:total_records - test_records]
test_outputs = outputs[total_records - test_records:total_records]
Modeling
class Model(nn.Module):
def __init__(self, embedding_size, output_size, layers, p=0.4):
super().__init__()
self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])
self.embedding_dropout = nn.Dropout(p)
all_layers = []
num_categorical_cols = sum((nf for ni, nf in embedding_size))
input_size = num_categorical_cols # 입력층의 크기를 찾기 위해 범주형 칼럼 개수를 input_size에 저장
for i in layers:
all_layers.append(nn.Linear(input_size, i))
all_layers.append(nn.ReLU(inplace=True))
all_layers.append(nn.BatchNorm1d(i))
all_layers.append(nn.Dropout(p))
input_size = i
all_layers.append(nn.Linear(layers[-1], output_size))
self.layers = nn.Sequential(*all_layers) # 신경망의 모든 계층이 순차적으로 실행되도록 모든 계층에 대한 목록을 nn.Sequential 클래스로 전달
def forward(self, x_categorical):
embeddings = []
for i, e in enumerate(self.all_embeddings):
embeddings.append(e(x_categorical[:, i]))
x = torch.cat(embeddings, 1)
x = self.embedding_dropout(x)
x = self.layers(x)
return x
model = Model(categorical_embedding_sizes, 4, [200, 100, 50], p=0.4)
print(model)

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cpu')
epochs = 500
aggregated_losses = []
train_outputs = train_outputs.to(device=device, dtype=torch.int64)
for i in range(epochs):
i += 1
y_pred = model(categorical_train)
single_loss = loss_function(y_pred, train_outputs)
aggregated_losses.append(single_loss)
if i%25 == 1:
print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')
optimizer.zero_grad()
single_loss.backward() # 가중치를 업데이트
optimizer.step() # 기울기 업데이트
print(f'epoch: {i:3} loss: {single_loss.item():10.10f}')

테스트 데이터셋으로 예측을 해보자.
test_outputs = test_outputs.to(device=device, dtype=torch.int64)
with torch.no_grad():
y_val = model(categorical_test)
loss = loss_function(y_val, test_outputs)
print(f'Loss: {loss:.8f}')

'딥러닝 > Pytorch' 카테고리의 다른 글
tensor의 복사방법 (0) | 2022.08.06 |
---|---|
[Pytorch] register_buffer (0) | 2022.08.01 |
pytorch - nn.function과 nn의 차이점 (0) | 2022.06.19 |
torch - GPU 사용하기 (0) | 2022.06.13 |
Pytorch nn.ModuleList (0) | 2022.05.30 |