ELECTRA로 Binary Classification

올해 초에 TA 문서 요약 서비스 개발 중 BART모델을 알게 되었다.
또 깃헙에 KoBART를 알게 되었고 예시 코드를 보던 중 Pytorch Lightning을 처음 접하게 되었다.
Pytorch Lightning의 깔끔한 구조와 자동으로 함수만 구현하고 trainer.fit()만 하면 학습되는 점에 매료되었던 것 같다.

처음에 ELECTRA로 다른 사람이 구현한 것을 바탕으로 모델을 구현하려고 했으나 에러가 났고, 왜 이런 에러가 나는지 알지 못해 LSTM 모델부터 구현해봤다.
시계열데이터가 처음이라 많이 헤멨고, 또 삼성 주식 예측하는 모델을 만들었는데 2000년대에 비해 너무 많이 올라서 LMSE loss값이 60000대씩 나오는 무지막지한 경험을 했다(실패ㅜ).
나중에 데이터 스케일링을 다시 해서 도전해볼 생각이다.

Pytorch Lightning 사용법을 익혀서 다시 ELECTRA에 도전하게 되었다!

ELECTRA with Pytorch Lightning

일단 개발하면서 느낀 것을 정리해보겠다.

  1. 먼저 cpu 기반으로 동작하게 해서 개발/테스트하는 게 훨씬 좋다.

    • gpu 기반으로 동작하고 에러가 날 때 cuda에러로 나는 경우가 많아 정확한 원인 파악이 힘들었다.
    • cpu 기반은 어떤 에러가 어디서 났는지 직관적으로 보여준다.
    • cpu 기반으로 train할 때 에러없이 validation이 진행될 때 gpu기반으로 바꿔주었다.
  2. Tensor Shape 확인하기

    • DataModule로 나오는 batch들의 데이터 shape을 항상 확인하기
    • Electra model, torchmetrics api input output 확인하기
      • 간혹 데이터 모듈에서 나온 batch['label'] shape이 인풋과 다른 경우가 있어서 dm 모듈에서 나온 output shape과 api들의 인풋 shape을 확인할 필요가 있었다.

코드는 깃헙에 올려두었다.


사용한 ELECTRA 모델 : 'monologg/koelectra-small-v3-discriminator'

ELECTRADataModule.py

Dataset과 DataModule을 같이 구현했다.
사용한 전처리는 NSMC with PyTorch-Lightning 1.3.0, GPU, Colab를 참고하였다.

import os
import re

import emoji
import numpy as np
import pandas as pd
from soynlp.normalizer import repeat_normalize

import torch
from torch.utils.data import DataLoader, Dataset

import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import transformers
from transformers import ElectraForSequenceClassification, ElectraTokenizer, AdamW

class ElectraClassificationDataset(Dataset) :
    def __init__(self, path, sep, doc_col, label_col, max_length, 
                num_workers=1, labels_dict=None) :
        self.tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")

        self.max_length = max_length
        self.doc_col = doc_col
        self.label_col = label_col

        # labels
        # None : label이 num으로 되어 있음
        # dict : label이 num이 아닌 것으로 되어 있음
        # ex : {True : 1, False : 0}
        self.labels_dict = labels_dict

        # dataset
        df = pd.read_csv(path, sep=sep)
        # nan 제거
        df = df.dropna(axis=0)
        # 중복제거
        df.drop_duplicates(subset=[self.doc_col], inplace=True)
        self.dataset = df

    def __len__(self) :
        return len(self.dataset)

    def cleanse(self, text) :
        emojis = ''.join(emoji.UNICODE_EMOJI.keys())  
        pattern = re.compile(f'[^ .,?!/@$%~%·∼()\x00-\x7Fㄱ-힣{emojis}]+')
        url_pattern = re.compile(
            r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
        )
        processed = pattern.sub(' ', text)
        processed = url_pattern.sub(' ', processed)
        processed = processed.strip()
        processed = repeat_normalize(processed, num_repeats=2)
      
        return processed

    def __getitem__(self, idx) :
        document = self.cleanse(self.dataset[self.doc_col].iloc[idx])
        inputs = self.tokenizer(
            document,
            return_tensors='pt',
            truncation=True,
            max_length=self.max_length,
            padding='max_length',
            add_special_tokens=True
        )

        if self.labels_dict :
            label = self.labels_dict[self.dataset[self.label_col].iloc[idx]]
        else :
            label = self.dataset[self.label_col].iloc[idx]

        return {
            'input_ids' : inputs['input_ids'][0],
            'attention_mask' : inputs['attention_mask'][0],
            'label' : int(label)
        }

class ElectraClassificationDataModule(pl.LightningDataModule) :
    def __init__(self, train_path, valid_path, max_length, batch_size, sep,
                doc_col, label_col, num_workers=1, labels_dict=None) :
        super().__init__()
        self.batch_size = batch_size
        self.train_path = train_path
        self.valid_path = valid_path
        self.max_length = max_length
        self.doc_col = doc_col
        self.label_col = label_col
        self.sep = sep
        self.num_workers = num_workers
        self.labels_dict = labels_dict

    def setup(self, stage=None) :
        self.set_train = ElectraClassificationDataset(self.train_path, sep=self.sep,
                                            doc_col=self.doc_col, label_col=self.label_col,
                                            max_length = self.max_length, labels_dict=self.labels_dict)
        self.set_valid = ElectraClassificationDataset(self.valid_path, sep=self.sep,
                                            doc_col=self.doc_col, label_col=self.label_col,
                                            max_length = self.max_length, labels_dict=self.labels_dict)

    def train_dataloader(self) :
        train = DataLoader(self.set_train, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=True)
        return train
    
    def val_dataloader(self) :
        val = DataLoader(self.set_valid, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False)
        return val
    
    def test_dataloader(self) :
        test = DataLoader(self.set_valid, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False)
        return test

Datamodule을 구현하면서 pandas DataFrame에서 iloc을 왜 쓰는지 알게 되었다.
dataframe[column][idx]를 하게 되면 dropna 등에서 index값이 사라져서 KeyError가 발생하였는데 iloc을 사용하면 리스트에서 인덱스 값 고르듯이 참조해서 에러가 말끔히 사라졌다.

위 데이터 모듈을 동해 dm 객체를 생성하게 되면 다음과 같은 output이 나온다.

from ElectraDataModule import *
from ElectraBinaryClassification import *

electra = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-small-v3-discriminator")

dm = ElectraClassificationDataModule(batch_size=8, train_path='./ratings_train_pre.txt', valid_path='./ratings_test_pre.txt',
                                    max_length=256, sep='\t', doc_col='document', label_col='label', num_workers=1)

dm.setup()

t = dm.train_dataloader()

for idx, data in enumerate(t) :
    print(idx, data['input_ids'].shape, data['attention_mask'].shape, data['label'].shape)

v = dm.val_dataloader()
for idx, data in enumerate(v) :
    print(idx, data['input_ids'].shape, data['attention_mask'].shape, data['label'].shape)
idx, data = enumerate(t)

print(data['input_ids'])
print(data['input_ids'].shape)

print(data['attention_mask'])
print(data['attention_mask'].shape)

print(data['label'])
print(data['label'].shape)

output = electra.forward(data['input_ids'], attention_mask=data['attention_mask'], labels=data['label'].view([-1,1]))

print(output.loss)
# print(output.loss.shape)
print(output.logits)
print(output.logits.shape)

softmax = nn.functional.softmax(output.logits, dim=1)
print('softmax', softmax)
pred = softmax.argmax(dim=1)
print('pred', pred)

y_true = data['label'].tolist()
y_pred = pred.tolist()

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred)
rec = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f'acc : {acc}, prec : {prec}, rec : {rec}, f1 : {f1}')

위 코드를 실행시키면 다음과 같은 결과를 확인할 수 있다.
예제는 batch_size=8이라 [8, something]의 형태로 나온다.

tensor([[    2, 30261,  4029,  ...,     0,     0,     0],
        [    2,  3274,  4153,  ...,     0,     0,     0],
        [    2,  6395,  4835,  ...,     0,     0,     0],
        ...,
        [    2,  9186,  4032,  ...,     0,     0,     0],
        [    2, 24254,  4114,  ...,     0,     0,     0],
        [    2,  3280,  8274,  ...,     0,     0,     0]])
torch.Size([8, 256])
tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])
torch.Size([8, 256])
tensor([1, 1, 1, 0, 0, 1, 1, 1])
torch.Size([8])
tensor(0.6884, grad_fn=<NllLossBackward0>)
tensor([[-1.5446e-02, -1.7352e-03],
        [-1.4640e-02,  7.5216e-03],
        [-1.9785e-02, -4.1623e-03],
        [-1.4653e-02, -2.2624e-03],
        [-1.3766e-02,  3.6900e-05],
        [-1.8751e-02, -1.1004e-04],
        [-2.0297e-02, -3.1179e-03],
        [-8.0904e-03,  7.0416e-03]], grad_fn=<AddmmBackward0>)
torch.Size([8, 2])
softmax tensor([[0.4966, 0.5034],
        [0.4945, 0.5055],
        [0.4961, 0.5039],
        [0.4969, 0.5031],
        [0.4965, 0.5035],
        [0.4953, 0.5047],
        [0.4957, 0.5043],
        [0.4962, 0.5038]], grad_fn=<SoftmaxBackward0>)
pred tensor([1, 1, 1, 1, 1, 1, 1, 1])
acc : 0.75, prec : 0.75, rec : 1.0, f1 : 0.8571428571428571

ElectraBinaryClassification.py

import os

import torch
import torchmetrics
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import transformers
from transformers import ElectraForSequenceClassification, ElectraTokenizer, AdamW

device = torch.device("cuda")

# https://medium.com/huggingface/multi-label-text-classification-using-bert-the-mighty-transformer-69714fa3fb3d
# https://huggingface.co/docs/transformers/v4.15.0/en/model_doc/electra#transformers.ElectraForSequenceClassification

class ElectraClassification(pl.LightningModule) :
    def __init__(self, learning_rate) :
        super().__init__()
        self.learning_rate = learning_rate
        self.save_hyperparameters()
        
        self.electra = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-small-v3-discriminator")

        self.metric_acc = torchmetrics.Accuracy()
        self.metric_f1 = torchmetrics.F1(num_classes=2)
        self.metric_rec = torchmetrics.Recall(num_classes=2)
        self.metric_pre = torchmetrics.Precision(num_classes=2)

        self.loss_func = nn.CrossEntropyLoss()

    def forward(self, input_ids, attention_mask, labels=None) :
        output = self.electra(input_ids=input_ids, 
                                attention_mask=attention_mask, 
                                labels=labels)
        return output

    def training_step(self, batch, batch_idx) :
        '''
        ##########################################################
        electra forward input shape information
        * input_ids.shape (batch_size, max_length)
        * attention_mask.shape (batch_size, max_length)
        * label.shape (batch_size,)
        ##########################################################
        '''

        # change label shape (list -> torch.Tensor((batch_size, 1)))
        label = batch['label'].view([-1,1])

        output = self(input_ids=batch['input_ids'].to(device),
                        attention_mask=batch['attention_mask'].to(device),
                        labels=label.to(device))
        '''
        ##########################################################
        electra forward output shape information
        * loss.shape (1,)
        * logits.shape (batch_size, config.num_labels=2)
        '''
        logits = output.logits

        loss = output.loss
        # loss = self.loss_func(logits.to(device), batch['label'].to(device))

        softmax = nn.functional.softmax(logits, dim=1)
        preds = softmax.argmax(dim=1)

        self.log("train_loss", loss, prog_bar=True)
        
        return {
            'loss' : loss,
            'pred' : preds,
            'label' : batch['label']
        }

    def training_epoch_end(self, outputs, state='train') :
        y_true = []
        y_pred = []
        for i in outputs :
            y_true += i['label'].tolist()
            y_pred += i['pred'].tolist()

        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred)
        rec = recall_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)

        # self.log(state+'_acc', acc, on_epoch=True, prog_bar=True)
        # self.log(state+'_precision', prec, on_epoch=True, prog_bar=True)
        # self.log(state+'_recall', rec, on_epoch=True, prog_bar=True)
        # self.log(state+'_f1', f1, on_epoch=True, prog_bar=True)
        print(f'[Epoch {self.trainer.current_epoch} {state.upper()}] Acc: {acc}, Prec: {prec}, Rec: {rec}, F1: {f1}')

    def validation_step(self, batch, batch_idx) :
        '''
        ##########################################################
        electra forward input shape information
        * input_ids.shape (batch_size, max_length)
        * attention_mask.shape (batch_size, max_length)
        ##########################################################
        '''
        output = self(input_ids=batch['input_ids'].to(device),
                        attention_mask=batch['attention_mask'].to(device))
        logits = output.logits
        preds = nn.functional.softmax(logits, dim=1).argmax(dim=1)

        labels = batch['label']
        accuracy = self.metric_acc(preds, labels)
        f1 = self.metric_f1(preds, labels)
        recall = self.metric_rec(preds, labels)
        precision = self.metric_pre(preds, labels)
        self.log('val_accuracy', accuracy, on_epoch=True, prog_bar=True)
        self.log('val_f1', f1, on_epoch=True, prog_bar=True)
        self.log('val_recall', recall, on_epoch=True, prog_bar=True)
        self.log('val_precision', precision, on_epoch=True, prog_bar=True)

        return {
            'accuracy' : accuracy,
            'f1' : f1,
            'recall' : recall,
            'precision' : precision
        }

    def validation_epoch_end(self, outputs) :
        val_acc = torch.stack([i['accuracy'] for i in outputs]).mean()
        val_f1 = torch.stack([i['f1'] for i in outputs]).mean()
        val_rec = torch.stack([i['recall'] for i in outputs]).mean()
        val_pre = torch.stack([i['precision'] for i in outputs]).mean()
        # self.log('val_f1', val_f1, on_epoch=True, prog_bar=True)
        # self.log('val_acc', val_acc, on_epoch=True, prog_bar=True)
        print(f'val_accuracy : {val_acc}, val_f1 : {val_f1}, val_recall : {val_rec}, val_precision : {val_pre}')
        
    
    def configure_optimizers(self) :
        optimizer = torch.optim.AdamW(self.electra.parameters(), lr=self.learning_rate)
        lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
        
        return {
            'optimizer' : optimizer,
            'lr_scheduler' : lr_scheduler
        }

sklearn은 처음 참고한 블로그에서 해당 라이브러리를 써서 테스트 해봤고 그 후에는 torchmetrics 를 사용해서 score를 계산했다.

gpu_binary_train.py

from ElectraDataModule import *
from ElectraBinaryClassification import *

if __name__ == "__main__" :
    model = ElectraClassification(learning_rate=0.0001)

    dm = ElectraClassificationDataModule(batch_size=8, train_path='./ratings_train_pre.txt', valid_path='./ratings_test_pre.txt',
                                    max_length=256, sep='\t', doc_col='document', label_col='label', num_workers=1)
    
    checkpoint_callback = pl.callbacks.ModelCheckpoint(monitor='val_accuracy',
                                                    dirpath='./sample_electra_binary_nsmc_chpt',
                                                    filename='KoELECTRA/{epoch:02d}-{val_accuracy:.3f}',
                                                    verbose=True,
                                                    save_last=True,
                                                    mode='max',
                                                    save_top_k=-1,
                                                    )
    
    tb_logger = pl_loggers.TensorBoardLogger(os.path.join('./sample_electra_binary_nsmc_chpt', 'tb_logs'))

    lr_logger = pl.callbacks.LearningRateMonitor()

    trainer = pl.Trainer(
        default_root_dir='./sample_electra_binary_nsmc_chpt/checkpoints',
        logger = tb_logger,
        callbacks = [checkpoint_callback, lr_logger],
        max_epochs=3,
        gpus=1
    )

    trainer.fit(model, dm)

nsmc데이터는 epoch을 3을 주고 학습 시켰다.

Epoch 2: 100%|███████████████████████████████████████████| 23837/23837 [27:27<00:00, 14.47it/s, loss=0.307, v_num=0, train_loss=0.472, val_accuracy=0.875, val_f1=0.875, val_recall=0.875, val_precision=0.875]
Saving latest checkpoint.

val_accuracy값 0.875가 측정된 것으로 체크포인트가 저장됐다.

service.py

import re

import emoji
from soynlp.normalizer import repeat_normalize

from ElectraDataModule import *
from ElectraBinaryClassification import *


def infer(x, path) :
    model = ElectraClassification.load_from_checkpoint(path)
    tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")
    
    emojis = ''.join(emoji.UNICODE_EMOJI.keys())  
    pattern = re.compile(f'[^ .,?!/@$%~%·∼()\x00-\x7Fㄱ-힣{emojis}]+')
    url_pattern = re.compile(
        r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
    )
    processed = pattern.sub(' ', x)
    processed = url_pattern.sub(' ', processed)
    processed = processed.strip()
    processed = repeat_normalize(processed, num_repeats=2)

    tokenized = tokenizer(processed, return_tensors='pt')

    output = model(tokenized.input_ids, tokenized.attention_mask)
    return nn.functional.softmax(output.logits, dim=-1)

text = '요딴 걸 영화라고 만들어놨네'
print(infer(text,'sample_electra_binary_nsmc_chpt/KoELECTRA/epoch=02-val_accuracy=0.875.ckpt'))

위 코드를 실행시키면 다음의 결과가 나온다.

tensor([[0.9807, 0.0193]], grad_fn=<SoftmaxBackward0>)

argamax(dim=1)해주면 결과는 0이 나오니 부정으로 잘 나오는 것을 알 수 있다.

text = '요딴 걸 영화라고 만들어놨네  내가 본 최고의 영화'

text를 위와 같이 해서 돌리면

tensor([[0.4957, 0.5043]], grad_fn=<SoftmaxBackward0>)

결과는 1로, 미약하지만 긍정으로 나오는 것을 볼 수 있다.

몇 가지 문장을 시도해봤는데 만족스러운 결과가 나왔다.


모델 구현에 도움을 준 학교 선배님과 친구에게 감사의 인사를 드립니다!

이제 Multi Class Classification을 구현해야겠다!

References

Data References

Code References

좋은 웹페이지 즐겨찾기