일단 처음에는 데이터 불러오기부터 시작해보자. 코랩에서 코드를 작성했다.
데이터 불러오기
!pip install transformers datasets
from datasets import load_dataset
import pandas as pd
import numpy as np
import random
import time
import datetime
from tqdm import tqdm
from sklearn.preprocessing import MultiLabelBinarizer
import csv
import os
import torch
from torch.optim import AdamW
from transformers import pipeline
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, hamming_loss
!git clone https://github.com/adlnlp/K-MHaS.git
!ls -al K-MHaS
!ls -al K-MHaS/data
데이터 처리
df = pd.read_csv('/content/K-MHaS/data/kmhas_train.txt')
처음에 이렇게 작성했다가
ParserError: Error tokenizing data. C error: Expected 2 fields in line 33, saw 4 오류 발생 ㅜㅜ
이유는 판다스가 csv 파일을 읽을 때 구분값의 기본이 ,(콤마)인데, txt 파일 안 데이터를 살펴봤을 땐, /t 로 되어있었기 때문
df = pd.read_csv('/content/K-MHaS/data/kmhas_train.txt', sep="\t")
이렇게 수정해주니 오류가 발생하지 않고 데이터프레임에 넣을 수 있었다
그대로 train, test, valid 데이터가 이미 분리되어있어서 각 train_cs, valid_cs, test_cs에 넣어줬다
데이터를 확인해보니 라벨 전처리가 이미 숫자라서 따로 해주지 않고, BERT 계열 모델에 넣을 문장을 "특수 토큰"을 붙여서 전처리 해줬다 !
train_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', train_cs['label']))
validation_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', valid_cs['label']))
test_sentences = list(map(lambda x: '[CLS] ' + str(x) + ' [SEP]', test_cs['label']))
전처리 해주고 넘파이 배열로 수정해주는 작업도 해줬다.
사실 이거 왜 하는지 아직 이해하지 못했지만 강사님 코드 보고 따라하는 중... 추후에 좀 더 이해해보도록 하자...
train_labels = train_cs['labels'].to_numpy(dtype='int64')
validation_labels = valid_cs['labels'].to_numpy(dtype='int64')
test_labels = test_cs['labels'].to_numpy(dtype='int64')
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/tmp/ipython-input-2658705984.py in <cell line: 0>()
----> 1 train_labels = train_cs['label'].to_numpy(dtype='int64')
2 validation_labels = valid_cs['label'].to_numpy(dtype='int64')
3 test_labels = test_cs['label'].to_numpy(dtype='int64')
4 train_labels
/usr/local/lib/python3.12/dist-packages/pandas/core/base.py in to_numpy(self, dtype, copy, na_value, **kwargs)
660 values[np.asanyarray(isna(self))] = na_value
661
--> 662 result = np.asarray(values, dtype=dtype)
663
664 if (copy and not fillna) or (not copy and using_copy_on_write()):
ValueError: invalid literal for int() with base 10: '2,4'
해봤더니 오류가 발생했다... 왜냐면 to_numpy(dtype='int64')는 각 값을 정수(int)로 강제 변환하려고 하는데,
'2,4'는 int('2,4')가 불가능하니까 ValueError가 난 거 였다...
그래서 이거 하기 전에 먼저 멀티-핫 벡터로 바꿔주는 작업을 먼저 해야했다.
y_train = train_cs["label"].astype(str).str.split(",").apply(lambda xs: [int(x) for x in xs])
# 멀티핫 인코딩 (scikit-learn)
mlb = MultiLabelBinarizer()
Y_train = mlb.fit_transform(y_train)
Y_valid = mlb.transform(valid_cs["label"].astype(str).str.split(",").apply(lambda xs: [int(x) for x in xs]))
Y_test = mlb.transform(test_cs["label"].astype(str).str.split(",").apply(lambda xs: [int(x) for x in xs]))
mlb.classes_
# 결과 : array([0, 1, 2, 3, 4, 5, 6, 7, 8])
그 다음엔 손실함수 라벨을 float 텐서로 받기 때문에 float32로 변환하는 작업을 해준다.
Y_train = Y_train.astype("float32")
Y_valid = Y_valid.astype("float32")
Y_test = Y_test.astype("float32")
BERT 다운받기
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')
이제 잘 다듬은 문장 리스트와 라벨을 BERT에 넣기 좋은 텐서로 바꾸는 함수를 작성했다
MAX_LEN = 128
def data_to_tensor(sentences, labels, tokenizer):
# 정수 인코딩: 각 텍스트를 토큰화한 후 Vocabulary에 맵핑되는 정수 시퀀스로 변환
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
input_ids = [torch.tensor(tokenizer.convert_tokens_to_ids(x)) for x in tokenized_texts]
# PyTorch의 pad_sequence 사용 (batch_first=True로 설정)
padded_inputs = pad_sequence(input_ids, batch_first=True, padding_value=0)
# 고정된 MAX_LEN 길이로 패딩 적용 (부족하면 0 추가, 길면 자름)
if padded_inputs.shape[1] < MAX_LEN:
padded_inputs = F.pad(padded_inputs, (0, MAX_LEN - padded_inputs.shape[1]), value=0)
else:
padded_inputs = padded_inputs[:, :MAX_LEN]
# Attention Mask 생성 (0이 아닌 값은 1, 0은 0)
attention_masks = (padded_inputs != 0).float()
# 텐서 변환
tensor_inputs = padded_inputs
tensor_labels = torch.tensor(labels, dtype=torch.float32) # 멀티라벨이라서 float32
tensor_masks = attention_masks
return tensor_inputs, tensor_labels, tensor_masks
이제 텐서로 변환이 잘 되는지 확인 해보자
train_inputs, train_labels, train_masks = data_to_tensor(train_sentences, Y_train, tokenizer)
validation_inputs, validation_labels, validation_masks = data_to_tensor(validation_sentences, Y_valid, tokenizer)
test_inputs, test_labels, test_masks = data_to_tensor(test_sentences, Y_test, tokenizer)
print(train_inputs[0])
print(train_masks[0])
지금 만들어둔 텐서들을 파이토치에서 학습/검증/테스트에 쓰기 좋은 배치 단위 뽑아주는 데이터로더로 포장해보자 !
batch_size = 32
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)
test_data = TensorDataset(test_inputs, test_masks, test_labels)
test_sampler = RandomSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
print('훈련 데이터의 크기:', len(train_labels))
print('검증 데이터의 크기:', len(validation_labels))
print('테스트 데이터의 크기:', len(test_labels))
아까 멀티 핫 인코딩을 했을때 결과값이 array([0, 1, 2, 3, 4, 5, 6, 7, 8]) 이렇게 나왔기 때문에 label의 개수는 9다.
거기에 멀티라벨 설정까지 해주기
num_labels = 9
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertForSequenceClassification.from_pretrained("klue/bert-base", num_labels=num_labels)
# 멀티라벨 설정
model.config.problem_type = "multi_label_classification"
model.cuda()
이제 학습을 위한 코드 작성
optimizer = AdamW(model.parameters(), lr = 2e-5, eps = 1e-8)
epochs = 5
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)
def format_time(elapsed):
elapsed_rounded = int(round((elapsed)))
return str(datetime.timedelta(seconds=elapsed_rounded)) # hh:mm:ss
def metrics(predictions, labels):
y_pred = predictions
y_true = labels
accuracy = accuracy_score(y_true, y_pred)
f1_macro_average = f1_score(y_true=y_true, y_pred=y_pred, average='macro', zero_division=0)
f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro', zero_division=0)
f1_weighted_average = f1_score(y_true=y_true, y_pred=y_pred, average='weighted', zero_division=0)
metrics = {'accuracy': accuracy,
'f1_macro': f1_macro_average,
'f1_micro': f1_micro_average,
'f1_weighted': f1_weighted_average}
return metrics
seed_val = 2026
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)
진짜 학습을 시켜보자 !!
model.zero_grad()
for epoch_i in range(0, epochs):
print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
t0 = time.time()
total_loss = 0
model.train()
for step, batch in tqdm(enumerate(train_dataloader)):
if step % 500 == 0 and not step == 0:
elapsed = format_time(time.time() - t0)
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))
batch = tuple(t.to(device) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
outputs = model(b_input_ids,
token_type_ids=None,
attention_mask=b_input_mask,
labels=b_labels)
loss = outputs[0]
total_loss += loss.item()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
model.zero_grad()
avg_train_loss = total_loss / len(train_dataloader)
print("")
print(" Average training loss: {0:.4f}".format(avg_train_loss))
print(" Training epcoh took: {:}".format(format_time(time.time() - t0)))
잘 학습이 되었는지 테스트 해보기
라벨도 argmax 해서 맞춰주기 !
for b in label_ids:
accum_label_ids.append(np.argmax(b)) 이렇게 !
t0 = time.time()
model.eval()
accum_logits, accum_label_ids = [], []
for batch in validation_dataloader:
batch = tuple(t.to(device) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
with torch.no_grad():
outputs = model(b_input_ids,
token_type_ids=None,
attention_mask=b_input_mask)
logits = outputs[0]
logits = logits.detach().cpu().numpy()
label_ids = b_labels.to('cpu').numpy()
for b in logits:
accum_logits.append(np.argmax(b))
for b in label_ids:
accum_label_ids.append(np.argmax(b))
accum_logits = np.array(accum_logits)
accum_label_ids = np.array(accum_label_ids)
results = metrics(accum_logits, accum_label_ids)
print("Accuracy: {0:.4f}".format(results['accuracy']))
print("F1 (Macro) Score: {0:.4f}".format(results['f1_macro']))
print("F1 (Micro) Score: {0:.4f}".format(results['f1_micro']))
print("F1 (Weighted) Score: {0:.4f}".format(results['f1_weighted']))
학습은 잘 된 거 확인하고, 허깅페이스 API 사용하여 테스트 해보면서 마무리 ~
pipe = pipeline('text-classification', model=model.cuda(), tokenizer=tokenizer,
device=0, max_length=512, function_to_apply='sigmoid')
result = pipe('잼민이들 왜 그렇게 민폐냐?')
print(result)
label_dict = {'LABEL_0' : '출신차별', 'LABEL_1' : '외모차별', 'LABEL_2' : '정치성향차별', \
'LABEL_3': '혐오욕설', 'LABEL_4': '연령차별', 'LABEL_5': '성차별', 'LABEL_6' : '인종차별', \
'LABEL_7': '종교차별', 'LABEL_8': '해당사항없음'}
def prediction(text):
result = pipe(text)
return [label_dict[res['label']] for res in result[0] if res['score'] > 0.5]
prediction('잼민이들 왜 그렇게 민폐냐?')
# ['연령차별']'개발 기록실 > 실험 & 구현' 카테고리의 다른 글
| [YOLOv8 + RNN] 편의점/매장 이상행동(전도·파손) 탐지 파이프라인 만들기 (0) | 2026.03.09 |
|---|---|
| [OpenCV + Machine Learning] Kaggle 주조 제품 불량 이미지를 이용한 Random Forest 분류기 만들기 (0) | 2026.03.09 |
| [FastAPI + PyTorch] 컴퓨터비전 모델 기반 이미지 분석 API 서버 구축하기 (0) | 2026.02.09 |
| [React-Native] 사진 업로드 시 EXIF 위치 정보 자동 추출 (0) | 2026.01.29 |
| [ React-Native ] 카카오 로그인 구현하기 (1) | 2026.01.27 |