[ 문제 상황 ]
시험 과제로 편의점/매장 CCTV 영상에서 “이상행동”을 자동으로 탐지하는 파이프라인을 구현해야 했다.
요구 조건을 정리하면:
- 데이터셋: AI-Hub 편의점·매장 이상행동 데이터 (전도, 파손 등)
- 입력: CCTV 영상
- 출력:
- 프레임 번호
- Person ID
- 해당 시점에 이상행동(Abnormal behavior) 이 감지되었는지
- 제약:
- Google Colab 환경에서 동작
- 제공된
Sample.zip안의 폴더 구조를 그대로 활용 - YOLOv8으로 사람을 추적하고, 그 궤적을 RNN(LSTM/GRU)으로 분류
- XML 라벨(이벤트 구간)을 활용해 정밀하게 학습할 것
단순히 “YOLO 한 번 돌리고 끝”이 아니라,
- 객체 탐지(Detection)
- 추적(Tracking)
- 시계열 특징 추출(Tracking trajectory + Aspect Ratio)
- RNN 기반 행동 분류
- XML 기반 이벤트 구간 필터링 + 데이터 증강 + 클래스 가중치
까지 모두 연결된, 꽤 현실적인 이상행동 탐지 파이프라인을 만드는 것이 목표였다.
[ 데이터 준비: Sample.zip에서 영상 자동 탐색하기 ]
우선 Colab에서 필요한 라이브러리를 설치하고, 경로를 정의했다.
import os
import numpy as np
import torch
import torch.nn as nn
import cv2
import xml.etree.ElementTree as ET
from pathlib import Path
from collections import defaultdict
import zipfile
from ultralytics import YOLO
from torch.utils.data import TensorDataset, DataLoader
DATA_PATH와 압축 해제 경로를 지정한 뒤, Sample.zip을 풀어준다.
DATA_PATH = Path("/content/drive/MyDrive/KDT 수업 내용 파일/시험/OpenCV & RNN_GAN/data")
UNZIP_DIR = Path("/content/sample_data")
sample_zip = DATA_PATH / "Sample.zip"
if sample_zip.exists():
UNZIP_DIR.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(sample_zip, "r") as zf:
zf.extractall(UNZIP_DIR)
print(f"압축 해제 완료: {sample_zip} -> {UNZIP_DIR}")
else:
UNZIP_DIR.mkdir(parents=True, exist_ok=True)
print(f"Sample.zip 없음: {sample_zip} (압축 해제 생략)")
영상 파일은 압축 해제된 폴더 안에서 첫 번째로 발견되는 mp4/avi 파일을 사용하도록 했다.
def find_first_video(root_path, extensions=(".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI")):
root = Path(root_path)
if not root.exists():
return None
for ext in extensions:
for p in root.rglob(f"*{ext}"):
if p.is_file():
return str(p)
return None
unzip_path = UNZIP_DIR
VIDEO_PATH = find_first_video(unzip_path) or find_first_video(DATA_PATH)
if VIDEO_PATH is None:
VIDEO_PATH = "/content/sample_video.mp4"
print("압축 해제 폴더 및 DATA_PATH에 영상이 없습니다. 사용할 경로(기본값):", VIDEO_PATH)
else:
print("사용할 샘플 영상:", VIDEO_PATH)
이렇게 해 두면, 경로가 조금 달라져도 “가장 먼저 보이는 샘플 영상”을 자동으로 선택해 준다.
[ YOLOv8 + ByteTrack: 사람 탐지 & 추적 ]
이상행동은 “사람의 행동”이기 때문에, 먼저 사람을 robust 하게 잡아야 한다.
그래서 YOLOv8n + 내장 트래커(track=True) 를 사용해, 사람 박스의 연속 궤적을 얻었다.
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SEQ_LEN = 30
FEATURE_DIM = 5 # (x, y, w, h, w/h) — 종횡비로 전도(누움) 탐지 강화
HIDDEN_SIZE = 64
NUM_LAYERS = 2
NUM_CLASSES = 2 # 0: 정상, 1: 이상행동
LR = 1e-3
EPOCHS = 20
BATCH_SIZE = 32
OUTPUT_VIDEO_PATH = "/content/output_detection_tracking.mp4"
FINAL_RESULT_PATH = "/content/final_result.mp4"
def run_detection_and_tracking(video_path, person_only=True, conf=0.1, imgsz=640):
model = YOLO("yolov8n.pt")
results = model.track(
source=video_path,
persist=True,
classes=[0], # person 클래스만
iou=0.5,
conf=conf, # 임계값을 낮춰 누워있는 사람도 잡게 함
imgsz=imgsz,
verbose=False,
stream=True,
)
tracks = []
by_frame = defaultdict(list)
for frame_idx, r in enumerate(results):
if r.boxes is None:
continue
boxes = r.boxes
has_id = boxes.id is not None
for i in range(len(boxes)):
cls = int(boxes.cls[i].item())
if person_only and cls != 0:
continue
if has_id:
tid = int(boxes.id[i].item())
else:
# ID가 안 붙는 경우(누움 초기 등)는 프레임·인덱스 기반 임시 ID 할당
tid = -(frame_idx * 1000 + i)
x1, y1, x2, y2 = boxes.xyxy[i].cpu().numpy()
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
tracks.append((frame_idx, tid, x_center, y_center, w, h))
by_frame[frame_idx].append((tid, float(x1), float(y1), float(x2), float(y2)))
return tracks, by_frame
이렇게 얻은 tracks는 이후 RNN 입력으로 쓰이고,by_frame은 시각화(바운딩 박스 + ID + 라벨)에 사용된다.
[ 궤적을 시계열 특징으로 만들기: (x, y, w, h, w/h) ]
YOLO의 프레임별 박스만으로는 “행동”을 알기 어렵다.
그래서 각 Person ID마다 시간 축으로 좌표를 이어붙인 시퀀스를 만들었다.
def build_sequences(by_id, seq_len=SEQ_LEN, feature_dim=FEATURE_DIM, normalize=True):
sequences = []
for track_id, points in by_id.items():
if len(points) < seq_len:
continue
pts = np.array(points, dtype=np.float32)
frame_idx = pts[:, 0]
feats = pts[:, 1:5] # x, y, w, h
aspect_ratio = feats[:, 2] / (feats[:, 3] + 1e-6) # w/h
feats = np.concatenate([feats, aspect_ratio[:, np.newaxis]], axis=1) # (N, 5)
if normalize:
feats[:, :4] = feats[:, :4] / 640.0
feats[:, :4] = np.clip(feats[:, :4], 0, 2)
feats[:, 4] = np.clip(feats[:, 4], 0, 3) # w/h 보통 0.3~2.5
for start in range(0, len(feats) - seq_len + 1):
end = start + seq_len
seq = feats[start:end][:, :feature_dim]
f_start = int(frame_idx[start])
f_end = int(frame_idx[end - 1])
sequences.append((f_start, f_end, track_id, seq))
return sequences
여기서 핵심은 Aspect Ratio(종횡비) w/h:
- 서 있을 때: 보통
h > w→w/h < 1 - 누워 있을 때(전도):
w >> h→w/h > 1
즉, 전도(fall) 는 시계열 상에서 종횡비가 급격히 바뀌는 패턴을 보이기 때문에,
RNN이 이 특징을 잡아 학습할 수 있도록 한 것이다.
[ XML 이벤트 구간으로 정밀 레이블링: 전도 vs 파손 ]
AI-Hub 데이터에는 XML 라벨이 함께 제공된다.
여기에는 각 영상에 대해,
fall_start,fall_endbroken_start,broken_end
같은 태그가 들어 있어, 전도/파손이 일어나는 정확한 프레임 구간을 알려준다.
1. XML 파서 구현
def parse_event_xml(xml_path):
xml_path = Path(xml_path)
if not xml_path.exists():
return {"fall": [], "broken": []}
try:
tree = ET.parse(str(xml_path))
root = tree.getroot()
except Exception as e:
print(f"XML 파싱 실패: {xml_path.name} - {e}")
return {"fall": [], "broken": []}
def collect_intervals(prefix):
starts = [int(el.text) for el in root.iter(f"{prefix}_start") if el.text and el.text.isdigit()]
ends = [int(el.text) for el in root.iter(f"{prefix}_end") if el.text and el.text.isdigit()]
intervals = []
for s, e in zip(starts, ends):
if e >= s:
intervals.append((s, e))
return intervals
fall_intervals = collect_intervals("fall")
broken_intervals = collect_intervals("broken")
return {"fall": fall_intervals, "broken": broken_intervals}
def in_any_interval(frame_idx, intervals):
for s, e in intervals:
if s <= frame_idx <= e:
return True
return False
2. 전도/파손 폴더와 XML 자동 매칭
폴더 구조는 아래처럼 정리했다.
/content/sample_data/전도→ 클래스 0/content/sample_data/파손→ 클래스 1
각 영상에 대해 동일한 파일명 + .xml 확장자를 가진 XML을 찾아, 그 안의 구간을 활용한다.
PATH_CLASS0 = Path("/content/sample_data/전도")
PATH_CLASS1 = Path("/content/sample_data/파손")
VIDEO_EXTENSIONS = (".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI")
def get_all_video_paths(dir_path):
root = Path(dir_path)
if not root.exists():
return []
paths = []
for ext in VIDEO_EXTENSIONS:
paths.extend(root.rglob(f"*{ext}"))
return sorted(set(p for p in paths if p.is_file()))
all_sequences = []
all_labels = []
for path_dir, label in [(PATH_CLASS0, 0), (PATH_CLASS1, 1)]:
video_paths = get_all_video_paths(path_dir)
name = "전도" if label == 0 else "파손"
print(f"[{name}] 영상 {len(video_paths)}개 발견")
for vp in video_paths:
try:
raw_tracks, _ = run_detection_and_tracking(str(vp))
by_id_v = defaultdict(list)
for (fi, tid, xc, yc, w, h) in raw_tracks:
by_id_v[tid].append((fi, xc, yc, w, h))
seqs = build_sequences(by_id_v, seq_len=SEQ_LEN, feature_dim=FEATURE_DIM)
xml_path = vp.with_suffix(".xml")
if xml_path.exists():
events = parse_event_xml(xml_path)
fall_intervals = events["fall"]
broken_intervals = events["broken"]
filtered_seqs, filtered_labels = [], []
for (f_start, f_end, tid, seq) in seqs:
mid_frame = int((f_start + f_end) / 2)
if label == 0:
# 전도 폴더: fall 구간에 포함되는 시퀀스만 사용
if in_any_interval(mid_frame, fall_intervals):
filtered_seqs.append((f_start, f_end, tid, seq))
filtered_labels.append(0)
else:
# 파손 폴더: broken 구간에 포함되는 시퀀스만 사용
if in_any_interval(mid_frame, broken_intervals):
filtered_seqs.append((f_start, f_end, tid, seq))
filtered_labels.append(1)
if filtered_seqs:
all_sequences.extend(filtered_seqs)
all_labels.extend(filtered_labels)
else:
# XML은 있지만 이벤트 매칭이 안 되면 전체 구간을 그대로 사용
all_sequences.extend(seqs)
all_labels.extend([label] * len(seqs))
else:
# XML이 없으면 폴더 레이블 전체 사용
all_sequences.extend(seqs)
all_labels.extend([label] * len(seqs))
except Exception as e:
print(f"영상 처리 스킵 ({name}): {vp.name} - {e}")
이렇게 하면,
- 전도 폴더의 영상이라도
fall구간이 아닌 시퀀스는 학습에서 제외되고, - 파손 폴더는
broken구간만 사용하게 되어, - 이벤트가 실제로 발생한 타이밍에 맞춘 정밀한 레이블링이 가능해진다.
[ 데이터 증강: 노이즈 + 좌우 반전으로 3배 늘리기 ]
실제 영상 수는 많지 않기 때문에, RNN이 일반화되도록 시퀀스 단위 데이터 증강도 추가했다.
MIN_SEQ_FOR_AUGMENT = 300 # 이보다 적으면 증강 수행 (원본 + 노이즈 + 플립 → 약 3배)
def augment_sequence(seq, aug_type="noise", noise_std=0.02):
seq = seq.copy().astype(np.float32)
if aug_type == "noise":
seq += np.random.randn(*seq.shape).astype(np.float32) * noise_std
seq = np.clip(seq, 0, 3) # 정규화 범위 유지
elif aug_type == "flip":
# x 좌표 좌우 반전 (정규화 기준: 2 - x)
seq[:, 0] = 2.0 - seq[:, 0]
seq[:, 0] = np.clip(seq[:, 0], 0, 2)
return seq
if len(all_sequences) >= 1 and len(all_sequences) < MIN_SEQ_FOR_AUGMENT:
orig_count = len(all_sequences)
extra_seqs, extra_labels = [], []
for i, (f_start, f_end, tid, seq) in enumerate(all_sequences):
label = all_labels[i]
# 노이즈 버전
seq_noise = augment_sequence(seq, "noise", 0.02)
extra_seqs.append((f_start, f_end, tid, seq_noise))
extra_labels.append(label)
# 좌우 반전 버전
seq_flip = augment_sequence(seq, "flip")
extra_seqs.append((f_start, f_end, tid, seq_flip))
extra_labels.append(label)
all_sequences.extend(extra_seqs)
all_labels.extend(extra_labels)
print(f"데이터 증강: {orig_count} → {len(all_sequences)} 시퀀스 (노이즈·플립 적용, 약 3배)")
시험 당시 결과:
생성된 시퀀스 수: 1468 (전도(0): 1043, 파손(1): 425)
[ RNN 모델: GRU로 전도(0) / 파손(1) 분류하기 ]
시퀀스 입력은 (batch, seq_len, feature_dim) 형태이며,
여기서는 feature_dim = 5로 (x, y, w, h, w/h)를 사용했다.
1. 모델 정의 (LSTM / GRU)
class BehaviorLSTM(nn.Module):
def __init__(self, input_size=FEATURE_DIM, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS, num_classes=NUM_CLASSES):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
out, (h_n, _) = self.lstm(x)
last_h = h_n[-1]
logits = self.fc(last_h)
return logits
class BehaviorGRU(nn.Module):
def __init__(self, input_size=FEATURE_DIM, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS, num_classes=NUM_CLASSES):
super().__init__()
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
def forward(self, x):
out, h_n = self.gru(x)
last_h = h_n[-1]
logits = self.fc(last_h)
return logits
실제 학습에서는 GRU 버전을 사용했다.
model = BehaviorGRU(
input_size=FEATURE_DIM,
hidden_size=HIDDEN_SIZE,
num_layers=NUM_LAYERS,
num_classes=NUM_CLASSES,
).to(DEVICE)
print(model)
2. 클래스 불균형을 고려한 가중치 설정
전도(0)와 파손(1)의 시퀀스 개수가 다르기 때문에,
소수 클래스에 더 큰 가중치를 주는 방식으로 CrossEntropyLoss를 구성했다.
np.random.seed(2026)
X = np.array([s[3] for s in all_sequences], dtype=np.float32)
if len(X) == 0:
X = np.random.randn(50, SEQ_LEN, FEATURE_DIM).astype(np.float32)
all_sequences = [(0, SEQ_LEN - 1, 1, X[i]) for i in range(50)]
all_labels = [np.random.randint(0, 2) for _ in range(50)]
y = np.array(all_labels, dtype=np.int64)
X_t = torch.from_numpy(X)
y_t = torch.from_numpy(y).long()
dataset = TensorDataset(X_t, y_t)
loader = DataLoader(dataset, batch_size=min(BATCH_SIZE, len(X)), shuffle=True)
n_class0 = int((y == 0).sum())
n_class1 = int((y == 1).sum())
n_total = len(y)
if n_class0 > 0 and n_class1 > 0 and n_class0 != n_class1:
w0 = n_total / (NUM_CLASSES * n_class0)
w1 = n_total / (NUM_CLASSES * n_class1)
class_weights = torch.tensor([w0, w1], dtype=torch.float32).to(DEVICE)
criterion = nn.CrossEntropyLoss(weight=class_weights)
print(f"클래스 가중치 적용: 전도(0)={n_class0}개→w={w0:.3f}, 파손(1)={n_class1}개→w={w1:.3f}")
else:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
3. 학습 루프
model.train()
for epoch in range(EPOCHS):
total_loss = 0.0
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.to(DEVICE), batch_y.to(DEVICE)
optimizer.zero_grad()
logits = model(batch_x)
loss = criterion(logits, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}/{EPOCHS} Loss: {total_loss/len(loader):.4f}")
print("학습 완료.")
예시 출력:
클래스 가중치 적용: 전도(0)=1043개→w=0.704, 파손(1)=425개→w=1.727
Epoch 5/20 Loss: 0.4391
Epoch 10/20 Loss: 0.2970
Epoch 15/20 Loss: 0.2676
Epoch 20/20 Loss: 0.2409
학습 완료.
[ 추론 및 결과 출력: Frame / Person ID / Abnormal behavior detected ]
시험에서 요구한 출력 형식은 다음과 같다.
Frame 38
Person ID 3
Abnormal behavior detected
이를 맞추기 위해, 각 시퀀스에 대해 모델을 돌리고,
이상행동(여기서는 파손/전도를 통합해 “이상행동=1”)으로 분류된 경우에만 출력했다.
model.eval()
results_to_print = []
with torch.no_grad():
for (f_start, f_end, track_id, seq) in all_sequences:
x = torch.from_numpy(seq).float().unsqueeze(0).to(DEVICE)
logits = model(x)
pred = logits.argmax(dim=1).item()
if pred == 1: # 이상행동
results_to_print.append((f_end, track_id)) # 마지막 프레임 기준
seen = set()
for (frame_num, person_id) in sorted(results_to_print, key=lambda t: (t[0], t[1])):
key = (frame_num, person_id)
if key in seen:
continue
seen.add(key)
print(f"Frame {frame_num}")
print(f"Person ID {person_id}")
print("Abnormal behavior detected")
print()
실행 결과, 특정 구간에서 여러 Person ID에 대해 지속적으로 “Abnormal behavior detected”가 출력되는 것을 확인할 수 있었다.
[ 시각화: 바운딩 박스 + ID + 라벨 오버레이 ]
텍스트 출력만으로는 직관적 이해가 어렵기 때문에,
YOLO 추적 결과와 RNN 예측 라벨을 영상 위에 직접 오버레이하는 함수도 만들었다.
def visualize_results(video_path, by_frame, output_path, frame_track_to_label=None, label_text=None, default_text=None):
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
for (tid, x1, y1, x2, y2) in by_frame.get(frame_idx, []):
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"ID {tid}", (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
pred = frame_track_to_label.get((frame_idx, tid)) if frame_track_to_label else None
if pred is not None and label_text:
text = label_text.get(pred, "?")
elif default_text:
text = default_text
else:
text = None
if text:
cv2.putText(frame, text, (x1, y1 - 28), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
out.write(frame)
frame_idx += 1
cap.release()
out.release()
초기 단계에서는 frame_track_to_label 없이 ID만 시각화하고,
이후 학습된 모델의 예측 라벨(전도/파손)을 매핑해 “FALLING DETECTED / VANDALISM DETECTED” 같은 텍스트를 띄울 수 있다.
[ 핵심 정리 ]
| 항목 | 내용 |
|---|---|
| 데이터셋 | AI-Hub 편의점·매장 이상행동 데이터 (전도, 파손) |
| 탐지/추적 | YOLOv8n + track=True (ByteTrack) |
| 시계열 특징 | (x, y, w, h, w/h) + 슬라이딩 윈도우(SEQ_LEN=30) |
| 라벨링 | XML의 fall_start/end, broken_start/end 구간만 사용해 전도(0)/파손(1) 부여 |
| 데이터 증강 | 시퀀스 단위 노이즈 추가 + 좌우 반전으로 약 3배 확대 |
| 모델 | GRU 기반 RNN (input 5D, hidden 64, 2-layer, output 2-class) |
| 클래스 불균형 처리 | CrossEntropyLoss에 클래스 가중치 적용 (소수 클래스에 더 큰 비중) |
| 출력 형식 | Frame N, Person ID K, Abnormal behavior detected |
| 시각화 | 바운딩 박스 + ID + 라벨 텍스트를 영상 위에 직접 오버레이 |
[ 오늘의 포인트 ]
“이상행동 탐지는 결국 Detection · Tracking · Sequence Modeling · Labeling을 모두 엮는 작업”이라는 것
- YOLO로 “어디에 사람이 있는지”를 보고,
- RNN으로 “시간에 따라 어떻게 움직이는지”를 보고,
- XML로 “언제부터 언제까지가 진짜 이벤트인지”를 잡아주고,
- 증강과 클래스 가중치로 데이터 부족과 불균형을 보정한다.
이 과정을 한 번 끝까지 구현해 보니,
단순한 시험을 넘어서 실제 프로젝트에도 바로 응용 가능한 이상행동 파이프라인의 뼈대를 만들 수 있었다.
'개발 기록실 > 실험 & 구현' 카테고리의 다른 글
| 랜덤 벽 GridWorld에서 TD Learning으로 상태가치 함수 배우기 (0) | 2026.03.13 |
|---|---|
| REINFORCE로 CartPole-v1 학습하기 – 정책 기반 에이전트 실습 (0) | 2026.03.13 |
| [OpenCV + Machine Learning] Kaggle 주조 제품 불량 이미지를 이용한 Random Forest 분류기 만들기 (0) | 2026.03.09 |
| [FastAPI + PyTorch] 컴퓨터비전 모델 기반 이미지 분석 API 서버 구축하기 (0) | 2026.02.09 |
| [React-Native] 사진 업로드 시 EXIF 위치 정보 자동 추출 (0) | 2026.01.29 |