사쿠의 데이터 블로그

Anomaly Detection with AE (1) 본문

Python(데이터 분석, 딥러닝)

Anomaly Detection with AE (1)

토스트먹어 2019. 5. 25. 16:37

★★ 이상점 탐지 ★★

 

 

다른 블로그를 참조해보면 이상점 탐지는 여러 분야에서 활용하고 있다고 한다. 

 

하지만 내가 들어보고 직접 경험해본 예시로는 아래 세 가지가 있다. 

 

넷마블 - 비정상 유저 탐지 (링크)
NCSoft - You Are a Game Bot! (링크)
AWS .. (추후 수정)

 

모두들 각 분야의 문제를 효율적으로 해결했다. 

 

그중, 넷마블이 소개했던 Auto Encoder를 활용한 이상점 탐지를 재현해보려 한다. 

 

 

이상점 탐지란!?!

회귀분석을 공부했던 분이라면 Outlier를 떠올려도 무방할 것 같다.

데이터 set에서 나올 수 없는 혹은 나오기 힘든 데이터를 이상점이라고 말한다. 

 

이전 포스팅 AE 개념에서 다뤘던 Mnist 데이터를 이용해 진행하려 한다. 

 

 

그림1. Mnist는 숫자 이미지 데이터 (구글 검색)

Mnist에서 나올 수 없는 데이터는?

노이즈 데이터와 문자(A)가 그 예시가 될 수 있다.

그림2. Mnist의 데이터에서 나올 수 없는 이상점

 

 

 

Mnist 데이터로 이상점을 탐지한다는 말을 정리해 보면

정상 데이터가 아닌, 노이즈나 다른 문자(A)를 찾는 과정이라고 할 수 있다. 

그림3. Mnist에서 이상점을 탐지

 

 

이번 포스팅에서는 AE를 만드는 방법을 소개하려고 합니다.

AutoEncoder는 Encoder와 Decoder 2개의 부분으로 나눌 수 있습니다.

 

 

그림4. AutoEncoder의 구조

 

 

 

Data 불러오기

1
2
3
4
5
6
7
8
9
10
11
import os
import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import save_image
 
#  맨 처음 한번만 다운로드 하기
dataset = MNIST('./data', transform=img_transform, download=True)
cs

필요한 라이브러리와 Mnist를 다운로드한 뒤 dataset이라는 변수에 저장한다. 

토치를 설치하는 방법은 여기를 참조 부탁드립니다.

 

 

AE 정의

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class encoder(nn.Module):
    def __init__(self):
        super(encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.BatchNorm1d(num_features=28 * 28),
            nn.Linear(28 * 282),
            nn.ReLU(True)
        )
 
    def forward(self, x):
        x = self.encoder(x)
        return x
 
 
class decoder(nn.Module):
    def __init__(self):
        super(decoder, self).__init__()
 
        self.decoder = nn.Sequential(
            nn.Linear(228 * 28),
            nn.BatchNorm1d(num_features=28 * 28),
            nn.Tanh()
        )
 
    def forward(self, x):
        x = self.decoder(x)
        return x
cs

AE는 Encoder와 Decoder 부분으로 나누어 정의한다. 

 

각각의 역할을 정리하면 다음과 같다. 

 

<Encoder>

1. 28 * 28 이미지를 Input으로 받는다. 

2. BatchNorm을 태운다. 

3. 28 * 28(=784) 차원에서 2차원으로 줄인다.

4. ReLu를 통과한다.

 

<Decoder>

1. 2차원 데이터를 784차원으로 확대.

2. BatchNorm을 태운다.

3. Tanh로 Output를 만들어 낸다.

 

 

AE 불러오기 & Optimizer 설정

1
2
3
4
5
6
7
8
9
10
#  모델 설정
encoder = encoder().cuda().train()
decoder = decoder().cuda().train()
 
 
#  모델 Optimizer 설정
criterion = nn.MSELoss()
encoder_optimizer = torch.optim.Adam( encoder.parameters(), lr=learning_rate, weight_decay=1e-5)
decoder_optimizer = torch.optim.Adam( decoder.parameters(), lr=learning_rate, weight_decay=1e-5)
 
cs

.cuda()의 의미는 GPU를 이용한다는 뜻이다.

 

모델의 학습은 MSE를 이용한다. (자세한 내용은 다음 코드를 참조)

 

Optimizer로는 Adam을 이용한다.

 

 

 

모델 학습

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for epoch in range(num_epochs):
    for data in dataloader:
        img, _ = data  # label 은 가져오지 않는다.
        img = img.view(img.size(0), -1)
        img = Variable(img).cuda()
        # ===================forward=====================
        latent_z = encoder(img)
        output = decoder(latent_z )
        # ===================backward====================
        loss = criterion(output, img)
 
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()
    # ===================log========================
    print('epoch [{}/{}], loss:{:.4f}' .format(epoch + 1, num_epochs, float(loss.data) ))
 
    if epoch % 10 == 0:
        # pic = to_img(output.cpu().data)
        pic = output.cpu().data
        pic = pic.view(pic.size(0), 12828)
 
        save_image(pic, './AE_img/output_image_{}.png'.format(epoch))
 
#  모델 저장
torch.save(encoder.state_dict(), './encoder.pth')
torch.save(decoder.state_dict(), './decoder.pth')
cs

 

 학습을 마친 AE는 원본과 비슷한 Output을 만들어 낸다.

 

그림5. 학습이 끝난 AE의 Output

 

 

전체 코드 - 깃허브

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#  라이브러리
import os
import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import save_image
 
 
# 미리 만들어둔 모델 불러오기
from AE_model import encoder, decoder
 
#  이미지를 저장할 폴더 생성
if not os.path.exists('./AE_img'):
    os.mkdir('./AE_img')
 
def to_img(x):
    x = 0.5 * (x + 1)
    x = x.clamp(01)
    x = x.view(x.size(0), 12828)
    return x
 
 
img_transform = transforms.Compose([
    transforms.ToTensor()
])
 
#  Hyper Parameter 설정
num_epochs = 100
batch_size = 128
learning_rate = 1e-3
 
 
#  맨 처음 한번만 다운로드 하기
# dataset = MNIST('./data', transform=img_transform, download=True)
 
#  데이터 불러오기
dataset = MNIST('./data', transform=img_transform)
dataloader = DataLoader(dataset, batch_size=batch_size , shuffle=True)
 
 
#  모델 설정
encoder = encoder().cuda().train()
decoder = decoder().cuda().train()
 
 
#  모델 Optimizer 설정
criterion = nn.MSELoss()
encoder_optimizer = torch.optim.Adam( encoder.parameters(), lr=learning_rate, weight_decay=1e-5)
decoder_optimizer = torch.optim.Adam( decoder.parameters(), lr=learning_rate, weight_decay=1e-5)
 
 
for epoch in range(num_epochs):
    for data in dataloader:
        img, _ = data  # label 은 가져오지 않는다.
        img = img.view(img.size(0), -1)
        img = Variable(img).cuda()
        # ===================forward=====================
        latent_z = encoder(img)
        output = decoder(latent_z )
        # ===================backward====================
        loss = criterion(output, img)
 
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()
    # ===================log========================
    print('epoch [{}/{}], loss:{:.4f}' .format(epoch + 1, num_epochs, float(loss.data) ))
 
    if epoch % 10 == 0:
        # pic = to_img(output.cpu().data)
        pic = output.cpu().data
        pic = pic.view(pic.size(0), 12828)
 
        save_image(pic, './AE_img/output_image_{}.png'.format(epoch))
 
#  모델 저장
torch.save(encoder.state_dict(), './encoder.pth')
torch.save(decoder.state_dict(), './decoder.pth')
cs

 깃허브에도 동일한 코드를 공개했습니다. 

 

 

다음에는 AE를 이용해 Anomaly Detection을 실행해 보겠습니다.

 

'Python(데이터 분석, 딥러닝)' 카테고리의 다른 글

Python에서 Bigquery 연동하기  (0) 2022.01.23
변수 중요도 측정 - Permutation importance  (0) 2019.10.27
Anomaly Detection with AE (2)  (0) 2019.05.26
Auto Encoder(개념)  (1) 2019.05.25
Pytorch 설치  (0) 2019.05.19