신사(SinSa)

운동 자세 중에서 특정 자세가 포함된 경우 해당 자세가 위험한 사람에게 경고 안내가 필요한 상황이 있다. 스쿼트라거나 푸시업처럼 동작명이 아닌 무릎이 머리보다 높아지는 자세를 취할 때 위험하다고 판단되는 질병이나 질환 또는 주기 상 위험한 상태인 사람에게 시청각 자료가 위험할 수 있으니 자세에 주의를 권하는 메세지가 필요하다. 근데 경고 자체가 안전하다는 이유로 모든 영상에 경고를 부착하는 것은 오히려 효과가 반감되고 다른 지표들에 영향을 줄 수 있기 때문에 비슷한 자세라고 판단될 때 경고하는 편이 좋겠다고 생각했다. 직접 코드를 구현하는 것은 지양하는 편이지만 오랜만에 기술 공부도 할 겸 골연령 측정을 위해 컴퓨터 비전 공부했던 것들도 떠올라서 천천히 시도해봤다.

 

생각보다 금방 잘 되긴 했는데 비디오 앵글이 굉장히 다이나믹한 편이 아니라 잘 됐던 것일 수도 있다. 반대로 말해 여러 비디오와 각도에 대해서는 평가해보지 못했다. 그래도 애초에 높은 정확도를 요구하거나 비즈니스에서 중요한 건 아니었기 때문에 비전문가의 코드를 살릴 수 있었던 것이기도 하니 스스로 잘했다고 뿌듯하게 넘어가려고 한다. (ㅎㅎ)

 

import cv2
import torch
import torchvision.models as models
import torchvision.transforms as T
from tqdm import tqdm
import csv
import os

device = torch.device("mps") #M1 애플 실리콘 최적화를 위해 선언

class LegLiftDetector:
    def __init__(self):
        self.model = models.detection.keypointrcnn_resnet50_fpn(pretrained=True)
        self.model.eval()
        self.transform = T.Compose([T.ToPILImage(), T.Resize(300), T.ToTensor()])
        
        if torch.cuda.is_available():
            self.model.cuda()

    def detect_leg_lift(self, frame):
        with torch.no_grad():
            img_tensor = self.transform(frame).unsqueeze(0)
            if torch.cuda.is_available():
                img_tensor = img_tensor.cuda()
            prediction = self.model(img_tensor)
        
        # 사람 키포인트 설정
        if len(prediction[0]['scores']) and prediction[0]['scores'][0] > 0.9:
            keypoints = prediction[0]['keypoints'][0].cpu().numpy()
            head = keypoints[0]
            knee = keypoints[13]  # right knee in COCO keypoints
            if knee[1] < head[1]:  # 무릎의 y-coordinate가 머리의 y-coordinate보다 높아지는지 확인
                return True
        return False

def get_video_fps(video_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()
    return fps

def detect_leg_lifts_seconds_optimized(video_path, batch_size, skip_seconds):
    cap = cv2.VideoCapture(video_path)
    detector = LegLiftDetector()
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    skip_frames = int(skip_seconds * fps) - 1  # Adjusting for 0-based index
    results = []
    start_time = None
    leg_lift_detected_in_prev_frame = False
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frames = []

    for frame_num in tqdm(range(0, total_frames, skip_frames + 1), desc="Processing video", ncols=100):
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

        if len(frames) == batch_size or frame_num + skip_frames + 1 >= total_frames:
            with torch.no_grad():
                tensors = [detector.transform(f).unsqueeze(0) for f in frames]
                batch = torch.cat(tensors)
                if torch.cuda.is_available():
                    batch = batch.cuda()
                predictions = detector.model(batch)

            for prediction in predictions:
                if len(prediction['scores']) and prediction['scores'][0] > 0.9:
                    keypoints = prediction['keypoints'][0].cpu().numpy()
                    head = keypoints[0]
                    knee = keypoints[13]  # right knee in COCO keypoints
                    if knee[1] < head[1]:
                        leg_lift_detected = True
                    else:
                        leg_lift_detected = False
                else:
                    leg_lift_detected = False

                current_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000  # Convert to seconds

                if leg_lift_detected and not leg_lift_detected_in_prev_frame:
                    start_time = current_time
                    print("detected", start_time)
                elif not leg_lift_detected and leg_lift_detected_in_prev_frame and start_time is not None:
                    end_time = current_time
                    results.append((video_path, 'Leg Lift', start_time, end_time))
                    start_time = None

                leg_lift_detected_in_prev_frame = leg_lift_detected

            frames = []

    cap.release()
    return results

def save_results_to_csv(results, csv_path):
    with open(csv_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["Video Path", "Action", "Start Time (ms)", "End Time (ms)"])
        for row in results:
            writer.writerow(row)

def detect_leg_lifts_and_save_to_csv(video_path, csv_output_path):
	#skip_seconds를 낮게 할 수록 프레임을 더 많이 나누어 자세를 분석한다.
    results = detect_leg_lifts_seconds_optimized(video_path, batch_size=8, skip_seconds=0.1)
    save_results_to_csv(results, csv_output_path)

def process_all_videos_in_folder(folder_path, csv_output_path):
    try:
        video_files = [f for f in os.listdir(folder_path) if f and f.endswith(('.mp4', '.avi', '.mkv', '.mov'))]
    except TypeError:
        print(f"Error reading files from {folder_path}. Skipping this folder.")
        return
    all_results = []

    for video_file in video_files:
        if video_file:  # Ensure video_file is not None
          video_path = os.path.join(folder_path, video_file)
          print(f"Processing {video_file}...")
          results = detect_leg_lifts_and_save_to_csv(video_path, csv_output_path)
          if results is not None:
                all_results.extend(results)
          else:
                print(f"Warning: No results returned for {video_file}")

    save_results_to_csv(all_results, csv_output_path)

process_all_videos_in_folder("video_folder_path", "csv_output_path")

 

profile

신사(SinSa)

@신사(SinSa)

포스팅이 좋았다면 "좋아요❤️" 눌러주세요!