diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..4b5a294 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda" +} \ No newline at end of file diff --git a/cctv.py b/cctv.py new file mode 100644 index 0000000..19cb672 --- /dev/null +++ b/cctv.py @@ -0,0 +1,337 @@ +# -*- coding: utf-8 -*- +""" +@file : cctv.py +@author: hsj100 +@license: A2TEC +@brief: multi cctv + +@section Modify History +- 2025-11-24 오후 1:38 hsj100 base + +""" + +import cv2 +import numpy as np +import threading +import time +import os + +from datetime import datetime + +from demo_detect import DemoDetect +from utils import get_monitorsize +from config_loader import CFG +from parsing_msg import ParsingMsg +from mqtt import mqtt_publisher + +# ========================================================================================== +# [Configuration Section] +# ========================================================================================== + +_cctv_cfg = CFG.get('cctv', {}) + +# 1. Camera Switch Interval (Seconds) +SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0) + +# 2. Camera Sources List +SOURCES = _cctv_cfg.get('sources', []) +if not SOURCES: + # 기본값 (설정이 없을 경우) + SOURCES = [ + {"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"}, + {"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"}, + ] + +# ========================================================================================== + +class CameraStream: + """ + 개별 카메라 스트림을 관리하는 클래스. + 별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다. + """ + def __init__(self, src, name="Camera"): + self.src = src + self.name = name + + # RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지) + os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" + self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) + + # OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음) + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + self.ret = False + self.latest_frame = None + self.running = False + self.is_active = False # [최적화] 현재 화면 표시 여부 + self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지 + self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화 + self.frame_skip_count = 0 + + # 연결 확인 + if not self.cap.isOpened(): + print(f"[{self.name}] 접속 실패: {self.src}") + else: + print(f"[{self.name}] 접속 성공") + self.running = True + # 스레드 시작 + self.thread = threading.Thread(target=self.update, args=()) + self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료 + self.thread.start() + + def update(self): + """백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)""" + while self.running: + # 1. 버퍼에서 압축된 데이터 가져오기 (grab은 항상 수행하여 버퍼가 쌓이지 않게 함) + if self.cap.grab(): + # [최적화 로직] 활성 상태일 때만 디코딩(retrieve) 수행 + # 비활성 상태일 때는 30프레임에 한 번만 디코딩하여 리소스 절약 + self.frame_skip_count += 1 + if self.is_active or (self.frame_skip_count % 30 == 0): + # 2. 실제 이미지로 디코딩 (무거움) + ret, frame = self.cap.retrieve() + + if ret: + with self.lock: + self.ret = ret + self.latest_frame = frame + self.last_read_time = time.time() # 수신 시간 갱신 + else: + # 디코딩을 건너뛰는 동안 CPU 부하를 아주 약간 줄이기 위해 미세한 대기 + time.sleep(0.001) + else: + # 스트림이 끊기거나 끝난 경우 재접속 로직 + print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...") + self.cap.release() + time.sleep(2.0) + + # 재접속 시도 (TCP 강제 설정 유지) + os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" + self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + if self.cap.isOpened(): + print(f"[{self.name}] 재접속 성공") + else: + print(f"[{self.name}] 재접속 실패") + + def get_frame(self): + """현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)""" + with self.lock: + # 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리 + if time.time() - self.last_read_time > 3.0: + return False, None + return self.ret, self.latest_frame + + def stop(self): + self.running = False + if self.thread.is_alive(): + self.thread.join() + self.cap.release() + +def main(): + + MQTT_TOPIC = "/hospital/ai1" + + parser = ParsingMsg() + + # Get Monitor Size inside main or top level if safe + MONITOR_RESOLUTION = get_monitorsize() + + # 2. 객체탐지 모델 초기화 + detector = DemoDetect() + print("모델 로딩 완료!") + + # 3. 카메라 객체 생성 및 백그라운드 수신 시작 + cameras = [] + for i, s in enumerate(SOURCES): + cam = CameraStream(s["src"], s["name"]) + # 첫 번째 카메라만 활성 상태로 시작 + if i == 0: + cam.is_active = True + cameras.append(cam) + + DISPLAY_TURN_ON = CFG.get('display').get('turn_on') # display 출력 여부 + + current_cam_index = 0 # 현재 보고 있는 카메라 인덱스 + last_switch_time = time.time() # 마지막 전환 시간 + switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환 + + # FPS 계산을 위한 변수 + prev_frame_time = time.time() + fps = 0 + + # FPS 통계 변수 + fps_min = float('inf') + fps_max = 0 + fps_sum = 0 + fps_count = 0 + warmup_frames = 30 # 초반 불안정한 프레임 제외 + + print("\n=== CCTV 시스템 시작 ===") + print(f"{switch_interval}초마다 자동으로 카메라가 전환됩니다.") + if DISPLAY_TURN_ON: + print("[Q] 또는 [ESC] : 종료\n") + else: + print("화면 출력이 꺼져 있습니다. 시스템은 백그라운드에서 계속 실행됩니다.") + print("종료하려면 터미널에서 Ctrl+C를 누르세요.\n") + + if DISPLAY_TURN_ON: + # 전체화면 윈도우 설정 (프레임 없이) + window_name = "CCTV Control Center" + cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN) + cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) + + while True: + # 1. 카메라 전환 처리 (자동) + current_time = time.time() + need_switch = False + + if DISPLAY_TURN_ON: + # 키 입력 처리 (종료만 체크) + key = cv2.waitKey(1) & 0xFF + if key == ord('q') or key == 27: # 종료 + break + + # 자동 전환 타이머 체크 + if current_time - last_switch_time >= switch_interval: + need_switch = True + + if need_switch: + # 기존 카메라 비활성화 (리소스 절약 모드 진입) + cameras[current_cam_index].is_active = False + + # 다음 카메라 인덱스 계산 + current_cam_index = (current_cam_index + 1) % len(cameras) + + # 새 카메라 활성화 (풀 디코딩 시작) + cameras[current_cam_index].is_active = True + + last_switch_time = current_time + print(f"-> 자동 전환: {cameras[current_cam_index].name}") + + # 2. 현재 선택된 카메라의 최신 프레임 가져오기 + target_cam = cameras[current_cam_index] + ret, frame = target_cam.get_frame() + + if ret and frame is not None: + # FPS 계산 + current_frame_time = time.time() + time_diff = current_frame_time - prev_frame_time + fps = 1 / time_diff if time_diff > 0 else 0 + prev_frame_time = current_frame_time + + # FPS 통계 갱신 (워밍업 이후) + if fps > 0: + fps_count += 1 + if fps_count > warmup_frames: + fps_sum += fps + real_count = fps_count - warmup_frames + + fps_avg = fps_sum / real_count + fps_min = min(fps_min, fps) + fps_max = max(fps_max, fps) + + # 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거) + # HPE + hpe_message = detector.inference_hpe(frame, False) + + #NOTE(jwkim): od모델 사용 안함 + # Helmet 탐지 + # helmet_message = detector.inference_helmet(frame, False) + + # 객체 탐지 + # od_message_raw = detector.inference_od(frame, False, class_name_view=True) + + # 필터링 + # od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False) + + od_message = [] + + parser.set(msg=hpe_message,img=frame) + parsing_msg = parser.parse() + + #mqtt publish + if parsing_msg is not None: + mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1) + + # 탐지 결과 라벨링 (원본 좌표 기준) + if DISPLAY_TURN_ON: + detector.hpe_labeling(frame, hpe_message) + detector.od_labeling(frame, od_message) + detector.border_labeling(frame, hpe_message, od_message) + + # 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용) + frame = cv2.resize(frame, MONITOR_RESOLUTION) + + # 화면에 카메라 이름과 시간 표시 (UI 효과) + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # 카메라 이름 (테두리 + 본문) + cam_text = f"CH {current_cam_index+1}: {target_cam.name}" + cv2.putText(frame, cam_text, (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게) + cv2.putText(frame, cam_text, (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # 시간 (테두리 + 본문) + cv2.putText(frame, timestamp, (20, 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 + cv2.putText(frame, timestamp, (20, 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # FPS (테두리 + 본문) + fps_text = f"FPS: {fps:.1f}" + cv2.putText(frame, fps_text, (20, 120), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 + cv2.putText(frame, fps_text, (20, 120), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # FPS 통계 (AVG, MIN, MAX) + stat_val_min = fps_min if fps_min != float('inf') else 0 + stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0 + + stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}" + cv2.putText(frame, stats_text, (20, 160), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA) + cv2.putText(frame, stats_text, (20, 160), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색 + + cv2.imshow(window_name, frame) + + + elif DISPLAY_TURN_ON: + # 신호가 없을 때 보여줄 대기 화면 + # MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함 + blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8) + + # "NO SIGNAL" 텍스트 설정 + text = f"NO SIGNAL ({target_cam.name})" + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1.5 + thickness = 3 + + # 텍스트 중앙 정렬 계산 + text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] + text_x = (blank_screen.shape[1] - text_size[0]) // 2 + text_y = (blank_screen.shape[0] + text_size[1]) // 2 + + # 텍스트 그리기 (빨간색) + cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA) + + cv2.imshow(window_name, blank_screen) + + # 4. 키 입력 처리 (종료만 처리) + if DISPLAY_TURN_ON: + key = cv2.waitKey(1) & 0xFF + if key == ord('q') or key == 27: # 종료 + break + + # 종료 처리 + print("시스템 종료 중...") + for cam in cameras: + cam.stop() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cctv_origin.py b/cctv_origin.py new file mode 100644 index 0000000..1cc281f --- /dev/null +++ b/cctv_origin.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +""" +@file : cctv.py +@author: hsj100 +@license: A2TEC +@brief: multi cctv + +@section Modify History +- 2025-11-24 오후 1:38 hsj100 base + +""" + +import cv2 +import numpy as np +import threading +import time +import os +from datetime import datetime +from demo_detect import DemoDetect +from utils import get_monitorsize +from config_loader import CFG + +# ========================================================================================== +# [Configuration Section] +# ========================================================================================== + +_cctv_cfg = CFG.get('cctv', {}) + +# 1. Camera Switch Interval (Seconds) +SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0) + +# 2. Camera Sources List +SOURCES = _cctv_cfg.get('sources', []) +if not SOURCES: + # 기본값 (설정이 없을 경우) + SOURCES = [ + {"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"}, + {"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"}, + ] + +# ========================================================================================== + +class CameraStream: + """ + 개별 카메라 스트림을 관리하는 클래스. + 별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다. + """ + def __init__(self, src, name="Camera"): + self.src = src + self.name = name + + # RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지) + os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" + self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) + + # OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음) + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + self.ret = False + self.latest_frame = None + self.running = False + self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지 + self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화 + + # 연결 확인 + if not self.cap.isOpened(): + print(f"[{self.name}] 접속 실패: {self.src}") + else: + print(f"[{self.name}] 접속 성공") + self.running = True + # 스레드 시작 + self.thread = threading.Thread(target=self.update, args=()) + self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료 + self.thread.start() + + def update(self): + """백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)""" + while self.running: + # 1. 버퍼에서 압축된 데이터 가져오기 (가볍고 빠름) + if self.cap.grab(): + # 2. 실제 이미지로 디코딩 (무거움) + ret, frame = self.cap.retrieve() + + if ret: + with self.lock: + self.ret = ret + self.latest_frame = frame + self.last_read_time = time.time() # 수신 시간 갱신 + else: + # grab은 성공했으나 디코딩 실패 (드문 경우) + continue + else: + # 스트림이 끊기거나 끝난 경우 재접속 로직 + print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...") + self.cap.release() + time.sleep(2.0) + + # 재접속 시도 (TCP 강제 설정 유지) + os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" + self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + if self.cap.isOpened(): + print(f"[{self.name}] 재접속 성공") + else: + print(f"[{self.name}] 재접속 실패") + + def get_frame(self): + """현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)""" + with self.lock: + # 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리 + if time.time() - self.last_read_time > 3.0: + return False, None + return self.ret, self.latest_frame + + def stop(self): + self.running = False + if self.thread.is_alive(): + self.thread.join() + self.cap.release() + +def main(): + # Get Monitor Size inside main or top level if safe + MONITOR_RESOLUTION = get_monitorsize() + + # 2. 객체탐지 모델 초기화 + detector = DemoDetect() + print("모델 로딩 완료!") + + # 3. 카메라 객체 생성 및 백그라운드 수신 시작 + cameras = [] + for s in SOURCES: + cam = CameraStream(s["src"], s["name"]) + cameras.append(cam) + + current_cam_index = 0 # 현재 보고 있는 카메라 인덱스 + last_switch_time = time.time() # 마지막 전환 시간 + switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환 + + # FPS 계산을 위한 변수 + prev_frame_time = time.time() + fps = 0 + + # FPS 통계 변수 + fps_min = float('inf') + fps_max = 0 + fps_sum = 0 + fps_count = 0 + warmup_frames = 30 # 초반 불안정한 프레임 제외 + + print("\n=== CCTV 관제 시스템 시작 ===") + print("5초마다 자동으로 카메라가 전환됩니다.") + print("[Q] 또는 [ESC]를 눌러 종료합니다.\n") + + # 전체화면 윈도우 설정 (프레임 없이) + window_name = "CCTV Control Center" + cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN) + cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) + + while True: + # 자동 전환 로직: 5초마다 다음 카메라로 전환 + current_time = time.time() + if current_time - last_switch_time >= switch_interval: + current_cam_index = (current_cam_index + 1) % len(cameras) + last_switch_time = current_time + print(f"-> 자동 전환: {cameras[current_cam_index].name}") + + # 3. 현재 선택된 카메라의 최신 프레임 가져오기 + target_cam = cameras[current_cam_index] + ret, frame = target_cam.get_frame() + + if ret and frame is not None: + # FPS 계산 + current_frame_time = time.time() + time_diff = current_frame_time - prev_frame_time + fps = 1 / time_diff if time_diff > 0 else 0 + prev_frame_time = current_frame_time + + # FPS 통계 갱신 (워밍업 이후) + if fps > 0: + fps_count += 1 + if fps_count > warmup_frames: + fps_sum += fps + real_count = fps_count - warmup_frames + + fps_avg = fps_sum / real_count + fps_min = min(fps_min, fps) + fps_max = max(fps_max, fps) + + # 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거) + # HPE + hpe_message = detector.inference_hpe(frame, False) + + # Helmet 탐지 + helmet_message = detector.inference_helmet(frame, False) + + # 객체 탐지 + od_message_raw = detector.inference_od(frame, False, class_name_view=True) + + # 필터링 + od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False) + + # 탐지 결과 라벨링 (원본 좌표 기준) + detector.hpe_labeling(frame, hpe_message) + detector.od_labeling(frame, od_message) + detector.border_labeling(frame, hpe_message, od_message) + + # 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용) + frame = cv2.resize(frame, MONITOR_RESOLUTION) + + # 화면에 카메라 이름과 시간 표시 (UI 효과) + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + # 카메라 이름 (테두리 + 본문) + cam_text = f"CH {current_cam_index+1}: {target_cam.name}" + cv2.putText(frame, cam_text, (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게) + cv2.putText(frame, cam_text, (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # 시간 (테두리 + 본문) + cv2.putText(frame, timestamp, (20, 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 + cv2.putText(frame, timestamp, (20, 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # FPS (테두리 + 본문) + fps_text = f"FPS: {fps:.1f}" + cv2.putText(frame, fps_text, (20, 120), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 + cv2.putText(frame, fps_text, (20, 120), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 + + # FPS 통계 (AVG, MIN, MAX) + stat_val_min = fps_min if fps_min != float('inf') else 0 + stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0 + + stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}" + cv2.putText(frame, stats_text, (20, 160), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA) + cv2.putText(frame, stats_text, (20, 160), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색 + + cv2.imshow(window_name, frame) + else: + # 신호가 없을 때 보여줄 대기 화면 + # MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함 + blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8) + + # "NO SIGNAL" 텍스트 설정 + text = f"NO SIGNAL ({target_cam.name})" + font = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 1.5 + thickness = 3 + + # 텍스트 중앙 정렬 계산 + text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] + text_x = (blank_screen.shape[1] - text_size[0]) // 2 + text_y = (blank_screen.shape[0] + text_size[1]) // 2 + + # 텍스트 그리기 (빨간색) + cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA) + + cv2.imshow(window_name, blank_screen) + + # 4. 키 입력 처리 (종료만 처리) + key = cv2.waitKey(1) & 0xFF + + # 종료 + if key == ord('q') or key == 27: + break + + # 종료 처리 + print("시스템 종료 중...") + for cam in cameras: + cam.stop() + cv2.destroyAllWindows() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/color_table.py b/color_table.py new file mode 100644 index 0000000..3f5a37e --- /dev/null +++ b/color_table.py @@ -0,0 +1,63 @@ +import project_config + +TEXT_COLOR_WHITE = [255,255,255] +TEXT_COLOR_BLACK = [0,0,0] + +class ColorInfo: + def __init__(self,label_color,font_color): + self.label_color = label_color + self.font_color = font_color + +dev_color_table = { + "person" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [183, 183, 183],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "truck" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [4, 63, 120],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_helmet_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [193, 193, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_helmet_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 0, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_gloves_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [184,255,205],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_gloves_work_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [70,251,124],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_gloves_insulated_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [59,216,106],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_gloves_insulated_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [45,169,82],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_gloves_insulated_on_3" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [29,114,55],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_gloves_insulated_on_4" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [14, 54, 26],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_rubber_insulated_sleeve_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [233, 210, 217],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_rubber_insulated_sleeve_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [195, 124, 142],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_boots_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [204, 242, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_boots_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [50, 194, 241],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_belt_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [209,165,253],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_belt_basic_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [158,68,248],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_belt_basic_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [130,54,206],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_belt_xband_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [102,40,164],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_belt_swing_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [71, 27, 116],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_vest_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [219,205,143],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_vest_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [142, 129, 69],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_suit_top_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [116, 186, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_suit_top_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 128, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_suit_bottom_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 255, 208],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_suit_bottom_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 255, 0],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "equipment_smartstick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [116, 134, 217],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "equipment_con_switch" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [12, 32, 133],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "sign_board_information" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 255, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "sign_board_construction" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 209, 209],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "sign_board_traffic" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 114, 114],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "sign_traffic_cone" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 81, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "truck_lift_bucket" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [175, 57, 175],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "wheel_chock" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [86, 9, 86],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "signal_light_stick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [212,59,8],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]) + } + + +demo_color_table = { + "person" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [183, 183, 183],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_helmet_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [193, 193, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_helmet_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 0, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_gloves_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [184,255,205],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_gloves_insulated_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [59,216,106],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_gloves_insulated_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [45,169,82],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "safety_boots_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [204, 242, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_boots_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [50, 194, 241],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_belt_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [209,165,253],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "safety_belt_swing_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [71, 27, 116],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]), + "sign_board_information" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 255, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "sign_traffic_cone" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 81, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]), + "signal_light_stick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [212,59,8],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]) + } \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..bcafb6b --- /dev/null +++ b/config.yaml @@ -0,0 +1,87 @@ +# =================================================== +# KEPCO DEMO 설정 파일 +# 이 파일을 수정하여 프로그램 동작을 변경할 수 있습니다. +# =================================================== + +# --- 모델 설정 --- +pt_type: "dev" # "dev" 또는 "demo" +use_hpe_person: true # Human Pose Estimation 사용 여부 +use_helmet_model: true # 헬멧 전용 모델 사용 여부 + +# --- 라벨 설정 --- +view_conf_score: false # confidence score 표시 여부 +show_gloves: true # glove on/off 라벨링 여부 +label_all_white: true # bounding box 색상 전부 흰색 여부 +use_hpe_frame_check: false # n개 프레임 체크 후 HPE 위험 판단 + +# --- 디버그 --- +add_cross_arm: false # cross arm 디버그 모드 + +# --- 소스 설정 --- +# RTSP 카메라 프리셋 (source에서 프리셋 이름으로 사용 가능) +cameras: + MG: "rtsp://admin:admin1263!@10.20.10.99:28554/onvif/media?profile=Profile1" + LF: "rtsp://admin:!1q2w3e4r@10.20.10.100:50554/profile1/media.smp" + FC: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.101/axis-media/media.amp" + MG_74_LTE: "rtsp://admin:admin1263!@223.171.48.74:28554/trackID=1" + FERMAT: "rtsp://192.168.200.233:50399/wd" + +source: "FC" # 카메라 프리셋 이름 또는 파일/RTSP 경로 +save_path: "./250630_result" + +# --- 모델 가중치 파일 --- +weights: + pose: "yolov8l-pose.pt" + helmet: "yolov8_dev1_97.pt" + # pt_type별 OD 모델 + od_dev: "yolov11m_dev1_8.pt" + od_demo: "yolov8_dev1_66.pt" + +# --- 모델 파라미터 --- +model_confidence: 0.5 +model_image_size: 640 +kpt_min_confidence: 0.5 + +# --- HPE 임계값 --- +hpe: + falldown_tilt_ratio: 0.80 + falldown_tilt_angle: 15 + body_tilt_ratio: 0.30 + cross_arm_ratio_threshold: 0.10 + cross_arm_angle_threshold: [15.0, 165.0] + arm_angle_threshold_cross: [80.0, 135.0] # add_cross_arm: true 일 때 + arm_angle_threshold_default: [150.0, 195.0] # add_cross_arm: false 일 때 + elbow_angle_threshold: [150.0, 185.0] + hpe_frame_check_max_count: 3 + +# --- 디스플레이 설정 --- +display: + turn_on: false # 화면 표시 여부 + border_thickness: 40 + border_thickness_half: 20 + normal_thickness: 2 + warning_thickness: 4 + hpe_thickness_ratio: 1 + text_size: 1 + text_thickness: 0 + ppe_union_min_percent: 0.9 + loadstreams_img_buffer: 5 + fhd_resolution: [1920, 1080] + +# --- CCTV 관제 설정 --- +cctv: + switch_interval: 5.0 + sources: + # - {src: 0, name: "Realtek_Webcam_0"} + # - {src: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.101/axis-media/media.amp", name: "ipcam_1"} + # - {src: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.102/axis-media/media.amp", name: "ipcam_2"} + - {src: "rtsp://192.168.200.231:50199/wd", name: "dev1_stream"} + - {src: "rtsp://192.168.200.232:50299/wd", name: "dev2_stream"} + # - {src: "rtsp://192.168.200.236:8554/videodevice", name: "boss_webcam"} + # - {src: "rtsp://192.168.200.236:8554/1.mp4", name: "boss_stream1"} + # - {src: "rtsp://192.168.200.214/1.mp4", name: "sgm_stream1"} + # - {src: "rtsp://192.168.200.214/2.mp4", name: "sgm_stream2"} + # - {src: "rtsp://192.168.200.111/1.mp4", name: "hp_stream1"} + # - {src: "rtsp://192.168.200.111/2.mp4", name: "hp_stream2"} + # - {src: "rtsp://192.168.200.215/1.mp4", name: "msk_stream1"} + # - {src: "rtsp://192.168.200.215/2.mp4", name: "msk_stream2"} diff --git a/config_loader.py b/config_loader.py new file mode 100644 index 0000000..44d0198 --- /dev/null +++ b/config_loader.py @@ -0,0 +1,35 @@ +import os +import sys +import yaml + + +def _get_base_dir(): + """exe 실행 시 exe 옆, 개발 시 스크립트 옆에서 config.yaml을 찾는다.""" + if getattr(sys, 'frozen', False): + # PyInstaller exe + return os.path.dirname(sys.executable) + else: + return os.path.dirname(os.path.abspath(__file__)) + + +def load_config(): + base_dir = _get_base_dir() + config_path = os.path.join(base_dir, 'config.yaml') + + if not os.path.exists(config_path): + raise FileNotFoundError(f"config.yaml을 찾을 수 없습니다: {config_path}") + + with open(config_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + + +def get_weights_dir(): + """weights 폴더 경로. exe 내부 번들 또는 exe 옆 폴더.""" + if getattr(sys, 'frozen', False): + # PyInstaller 번들 내부 (--add-data로 포함된 경우) + return os.path.join(sys._MEIPASS, 'weights') + else: + return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'weights') + + +CFG = load_config() diff --git a/demo_const.py b/demo_const.py new file mode 100644 index 0000000..036c653 --- /dev/null +++ b/demo_const.py @@ -0,0 +1,127 @@ +import project_config +from config_loader import CFG, get_weights_dir +from color_table import dev_color_table, demo_color_table + +# === 카메라 프리셋 === +_cameras = CFG.get('cameras', {}) +MG = _cameras.get('MG', '') +LF = _cameras.get('LF', '') +FC = _cameras.get('FC', '') +MG_74_LTE = _cameras.get('MG_74_LTE', '') +FERMAT = _cameras.get('FERMAT', '') + +# === 소스 === +_source_raw = CFG.get('source', '') +SOURCE = _cameras.get(_source_raw, _source_raw) # 프리셋 이름이면 URL로, 아니면 그대로 + +SAVE_PATH = CFG.get('save_path', './250630_result') + +# === 모델 가중치 경로 === +WEIGHTS_PATH = get_weights_dir() +_weights = CFG.get('weights', {}) +WEIGHTS_POSE = WEIGHTS_PATH + "/" + _weights.get('pose', 'yolov8l-pose.pt') +WEIGHTS_YOLO_HELMET = WEIGHTS_PATH + "/" + _weights.get('helmet', 'yolov8_dev1_97.pt') + +# PT +HELMET_KPT = [0, 1, 2] +RUBBER_INSULATED_SLEEVE_KPT = [7, 8] +SUIT_TOP_KPT = [6, 5] +SUIT_BOTTOM_KPT = [13, 14] +HELMET_CLASS_NAME = ['safety_helmet_off', 'safety_helmet_on'] + +# PT dev +if project_config.PT_TYPE == 'dev': + color_table = dev_color_table + + WEIGHTS_YOLO = WEIGHTS_PATH + "/" + _weights.get('od_dev', 'yolov11m_dev1_8.pt') + UNVISIBLE_CLS = [4, 5, 6, 7, 8, 9] + OFF_CLASS_LIST = [2, 4, 12, 14] + OFF_TRIGGER_CLASS_LIST = [2, 4] # (helmet, gloves) + PPE_CLASS_LIST = list(range(2, 19)) + + HELMET_CID = [2, 3] # OFF, ON + RUBBER_INSULATED_SLEEVE_CID = [10, 11] # OFF, ON + SUIT_TOP_CID = [] # OFF, ON + SUIT_BOTTOM_CID = [] # OFF, ON + + HELMET_ON_CID = 3 + GLOVES_WORK_ON_CID = 6 + BOOTS_ON_CID = 13 + TRAFFIC_CONE_CID = 21 + LIGHT_STICK_CLS_ID = 23 + +# PT DEMO +elif project_config.PT_TYPE == 'demo': + color_table = demo_color_table + + WEIGHTS_YOLO = WEIGHTS_PATH + "/" + _weights.get('od_demo', 'yolov8_dev1_66.pt') + + UNVISIBLE_CLS = [3, 4, 5] + OFF_CLASS_LIST = [1, 3, 6, 8] + OFF_TRIGGER_CLASS_LIST = [1, 3] # (helmet, gloves) + PPE_CLASS_LIST = list(range(1, 9)) + + HELMET_CID = [1, 2] # OFF, ON + RUBBER_INSULATED_SLEEVE_CID = [] + SUIT_TOP_CID = [] # OFF, ON + SUIT_BOTTOM_CID = [6, 7] # OFF, ON + + HELMET_ON_CID = 2 + GLOVES_WORK_ON_CID = 4 + BOOTS_ON_CID = 7 + TRAFFIC_CONE_CID = 11 + +# === 모델 파라미터 === +_display = CFG.get('display', {}) + +HPE_FRAME_CHECK_MAX_COUNT = CFG.get('hpe', {}).get('hpe_frame_check_max_count', 3) +PPE_UNION_MIN_PERCENT = _display.get('ppe_union_min_percent', 0.9) + +MODEL_CONFIDENCE = CFG.get('model_confidence', 0.5) +MODEL_IMAGE_SIZE = CFG.get('model_image_size', 640) + +WD_BORDER_COLOR = [0, 255, 255] +WD_THICKNESS_ON = False + +BORDER_OD_TEXT = "Danger(OD)" +BORDER_HPE_TEXT = "Danger(HPE)" + +KPT_MIN_CONFIDENCE = CFG.get('kpt_min_confidence', 0.5) + +# loadstreams +LOADSTREAMS_IMG_BUFFER = _display.get('loadstreams_img_buffer', 5) + +_fhd = _display.get('fhd_resolution', [1920, 1080]) +FHD_RESOLUTION = tuple(_fhd) + +BORDER_THICKNESS = _display.get('border_thickness', 40) +BORDER_THICKNESS_HALF = _display.get('border_thickness_half', 20) +NORMAL_THICKNESS = _display.get('normal_thickness', 2) +WARNING_THICKNESS = _display.get('warning_thickness', 4) + +HPE_THICKNESS_RAITO = _display.get('hpe_thickness_ratio', 1) + +TEXT_SIZE = _display.get('text_size', 1) +TEXT_THICKNESS = _display.get('text_thickness', 0) +TEXT_OD_STARTING_POINT = (20, 30) +TEXT_HPE_STARTING_POINT = (20, 30) + +# POSE HUMAN LABEL COLOR (BGR) +POSE_NORMAL_COLOR = [0, 0, 0] +POSE_CROSS_COLOR = [0, 0, 255] +POSE_FALL_COLOR = [0, 0, 255] +POSE_ANGLE_ARM_COLOR = [0, 51, 102] + +TEXT_COLOR_WHITE = [255, 255, 255] +TEXT_COLOR_BLACK = [0, 0, 0] + +text_color_white_list = [ + "truck", + "safety_gloves_insulated_on_1", + "safety_gloves_insulated_on_2", + "safety_rubber_insulated_sleeve_on", + "safety_belt_swing_on", + "safety_vest_on", + "safety_suit_top_on", + "sign_board_traffic" +] diff --git a/demo_detect.py b/demo_detect.py new file mode 100644 index 0000000..a995c0c --- /dev/null +++ b/demo_detect.py @@ -0,0 +1,797 @@ +import os +import cv2 +import random +import numpy as np +import imghdr + +import project_config +import demo_const as AI_CONST + +from predict import ObjectDetect, PoseDetect +from load_models import model_manager + +from ultralytics.data.loaders import LOGGER +from ultralytics.data.loaders import LoadImagesAndVideos + +from utils import LoadStreamsDaool,CustomVideoCapture,CLASS_INFORMATION,CLASS_SWAP_INFO,get_monitorsize,img_resize + +LOGGER.setLevel("ERROR") + +MONITOR_RESOLUTION = get_monitorsize() + +class DemoDetect: + + POSETYPE_NORMAL = int(0x0000) + POSETYPE_FALL = int(0x0080) + POSETYPE_CROSS = int(0x0100) + """ + 시연용 + """ + def __init__(self): + self.image_data = None + self.crop_img = False + + self.model = model_manager.get_od() + self.object_detect = ObjectDetect() + self.object_detect.set_model(self.model) + + # helmet + if project_config.USE_HELMET_MODEL: + self.helmet_model = model_manager.get_helmet() + self.helmet_detect = ObjectDetect() + self.helmet_detect.set_model(self.helmet_model) + + # HPE + self.pose_model = model_manager.get_hpe() + self.pose_predict = PoseDetect() + self.pose_predict.set_model(self.pose_model) + self.hpe_frame_count = 0 + + self.color_table = AI_CONST.color_table + + self.source =AI_CONST.SOURCE + self.save = False + self.ext = None + self.video_capture = None + + def set(self, args): + self.source = args.source + self.save = args.save + self.clahe = args.clahe + + def run(self): + if os.path.exists(self.source): + dataset = LoadImagesAndVideos(path=self.source) + else: + if self.save: + raise Exception("스트림영상은 저장 불가") + dataset = LoadStreamsDaool(sources=self.source) + + if self.save: + if imghdr.what(self.source): + #img + self.ext = ".jpg" + else: + #video + self.ext = ".mp4" + + os.makedirs(AI_CONST.SAVE_PATH, exist_ok=True) + + for _, image, *_ in dataset: + + image = image[0] + #image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) ##영상이 좌로 90도 회전되어 들어오는 경우가 있어 임시로 추가함 + image = self.image_calibration(image) + + if self.ext == ".mp4" and not self.video_capture: + self.video_capture = CustomVideoCapture() + self.video_capture.set_frame_size(image=image) + # _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}") + _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{'.avi'}") + if self.clahe: + _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect_clahe{'.avi'}") + self.video_capture.set_video_writer(path=_video_path) + print(_video_path) + + # hpe person detect + hpe_message = self.inference_hpe(image, self.crop_img) if project_config.USE_HPE_PERSON else [] + + if project_config.USE_HPE_FRAME_CHECK: + hpe_message = self.hpe_frame_check(hpe_message) + + # helmet detect + _helmet_message = self.inference_helmet(image, self.crop_img) if project_config.USE_HELMET_MODEL else [] + + # object detect + od_message_raw = self.inference_od(image, self.crop_img,class_name_view=True) + # od_message_raw = [] + + # od filtering + od_message = self.od_filtering(image,od_message_raw,_helmet_message,hpe_message,add_siginal_man=False) + + self.hpe_labeling(image,hpe_message) + + # od_message = [] # NOTE(jwkim) od 라벨링 막음 다시 그리고싶다면 주석 + self.od_labeling(image,od_message) + self.border_labeling(image,hpe_message,od_message) + + if self.save: + if imghdr.what(self.source): + #img + _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}") + cv2.imwrite(_video_path,image) + else: + #video + self.video_capture.write_video(image) + else: + cv2.namedWindow("image", cv2.WND_PROP_FULLSCREEN) + cv2.setWindowProperty("image", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) + + if MONITOR_RESOLUTION != AI_CONST.FHD_RESOLUTION: + image = img_resize(image,MONITOR_RESOLUTION) + cv2.imshow("image", image) + + # Hit 'q' on the keyboard to quit! + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + def inference_hpe(self, image, crop_image): + """ + inference 완료 후 off class 필터링 + + :param frame_count:현재 프레임 수 + :param image: 이미지 데이터 + :param crop_image: crop_image 유무 + + :return: 탐지 결과 + """ + # 초기화 + message = [] + self.pose_predict.set_image(image) + message = self.pose_predict.predict(working=True,crop_image=crop_image) + + return message + + def inference_helmet(self, image, crop_image): + """ + inference 완료후 탐지된 리스트 반환 + + :param image: 이미지 데이터 + :param crop_image: crop_image 유무 + :return: 탐지 결과 + """ + # 초기화 + message = [] + self.helmet_detect.set_image(image) + raw_message = self.helmet_detect.predict(crop_image=crop_image, class_name=True) + + for i in raw_message: + #ON + if i["class_name"] == 'head' or i["class_name"] == 'safety_helmet_off': + i["class_id"] = CLASS_SWAP_INFO['safety_helmet_off'] + i["class_name"] = CLASS_INFORMATION[i["class_id"]] + message.append(i) + #OFF + elif i["class_name"] == 'helmet' or i["class_name"] == 'safety_helmet_on': + i["class_id"] = CLASS_SWAP_INFO['safety_helmet_on'] + i["class_name"] = CLASS_INFORMATION[i["class_id"]] + message.append(i) + else: + continue + + return message + + def inference_od(self, image, crop_image, class_name_view=False): + """ + inference 완료후 탐지된 리스트 반환 + + :param image: 이미지 데이터 + :param crop_image: crop_image 유무 + :return: 탐지 결과 + """ + # 초기화 + message = [] + self.object_detect.set_image(image) + message = self.object_detect.predict(crop_image=crop_image, class_name=class_name_view) + + return message + + def od_filtering(self,image,od_raw_message,helmet_message,hpe_message,add_siginal_man=True): + """ + 여러 모델 inference 결과를 조합하여 od 결과 추출 + + :param image: 원본 이미지 + :param od_raw_message: od inference 결과 + :param helmet_message: helmet inference 결과 + :param hpe_message: hpe inference 결과 + :param add_siginal_man: 신호수 추가 유무 + :return: 필터링된 od inference 결과 + """ + ppe_filtered_message = [] + + #od helmet 제거 , helmet 추가 + helmet_changed_message = self.update_helmet(helmet_message, od_raw_message) + + #od person 제거, hpe_person 추가 + person_changed_message = self.update_person(hpe_message, helmet_changed_message) + + # 필터링 작업 (현재 box 겹침만 사용) + ppe_filtered_message = self.od_ppe_class_filter(person_changed_message,hpe_message) + + # signalman 추가 + if add_siginal_man: + signal_man = self.signal_man_message(image, ppe_filtered_message) + if signal_man: + ppe_filtered_message.append(signal_man) + + return ppe_filtered_message + + def update_helmet(self, helmet_message, od_message): + """ + helmet message 파싱 후 , od message 에서 helmet 제거 + 그후 합친 결과 return + + :param helmet_message: helmet detect result + :param od_message: od detect result + :return: result + """ + _result = [] + if not helmet_message: + return od_message + elif project_config.USE_HELMET_MODEL is False: + return od_message + else: + #parsing helmet msg + _result = _result + helmet_message + + #remove od helmet + for _od in od_message: + if _od['class_id'] not in AI_CONST.HELMET_CID: + _result.append(_od) + + return _result + + def update_person(self, hpe_message, od_message): + """ + hpe message 파싱 후 , od message 에서 person 제거 + 그후 합친 결과 return + + :param hpe_message: _description_ + :param od_message: _description_ + :return: _description_ + """ + _result = [] + if not hpe_message: + return od_message + elif project_config.USE_HPE_PERSON is False: + return od_message + else: + #parsing hpe msg + for _hpe in hpe_message: + _person_info = {k: v for k, v in _hpe['result'].items() if k not in ('pose_type', 'pose_level')} + _result.append(_person_info) + + #remove od person + for _od in od_message: + if _od['class_id'] != 0 : + _result.append(_od) + + return _result + + def od_ppe_class_filter(self, od_message, kpt_message): + """ + PPE_CLASS_LIST 로 정의된 클래스가 사용할수 있는지 없는지 판단하여 결과물 return + + - PPE_CLASS_LIST 중 helmet, rubber_insulated_sleeve, suit_top, suit_bottom class 경우 bbox가 keypoint에 포함되면 _result에 추가, + 포함되지 않는다면 person class와 겹치는지 확인후 겹치면 _result 에 추가 + + - PPE_CLASS_LIST 나머지 class 는 person class와 겹치면 _result 에 추가 + + - PPE_CLASS_LIST 가 아닐경우 _result에 추가 + + :param od_message: od list + :param kpt_message: keypoint list + :return: list + """ + _result = [] + _person = [] + _ppe_cls = [] + + # split classes + for i in od_message: + if i['class_id'] == 0: + _person.append(i) + _result.append(i) + elif i['class_id'] in AI_CONST.PPE_CLASS_LIST: + _ppe_cls.append(i) + else: + _result.append(i) + + # filtering ppe classes + for ppe in _ppe_cls: + for kpt in kpt_message: + if AI_CONST.HELMET_CID and ppe['class_id'] in AI_CONST.HELMET_CID: + #HELMET + kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.HELMET_KPT,type=1) + + if kpt_include: + _result.append(ppe) + break + else: + intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) + if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: + _result.append(ppe) + break + + elif AI_CONST.RUBBER_INSULATED_SLEEVE_CID and ppe['class_id'] in AI_CONST.RUBBER_INSULATED_SLEEVE_CID: + #RUBBER_INSULATED_SLEEVE + kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.RUBBER_INSULATED_SLEEVE_KPT,type=0) + + if kpt_include: + _result.append(ppe) + break + else: + intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) + if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: + _result.append(ppe) + break + + elif AI_CONST.SUIT_TOP_CID and ppe['class_id'] in AI_CONST.SUIT_TOP_CID: + #SUIT_TOP + kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_TOP_KPT,type=1) + + if kpt_include: + _result.append(ppe) + break + else: + intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) + if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: + _result.append(ppe) + break + + elif AI_CONST.SUIT_BOTTOM_CID and ppe['class_id'] in AI_CONST.SUIT_BOTTOM_CID: + #SUIT_BOTTOM + kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_BOTTOM_KPT,type=1) + + if kpt_include: + _result.append(ppe) + break + else: + intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) + if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: + _result.append(ppe) + break + + else: + intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) + if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: + _result.append(ppe) + break + + return _result + + def check_union_area(self, person, object): + """ + 두개의 bbox 가 겹치는지 확인후 + 겹친다면 두개의 bbox 겹치는 영역 return + 아닐경우 false return + :param person: bbox 좌표 x1,y1,x2,y2 + :param object: bbox 좌표 x1,y1,x2,y2 + """ + person_left, person_top, person_right, person_bot = ( + int(person[0]), + int(person[1]), + int(person[2]), + int(person[3]), + ) + obj_left, obj_top, obj_right, obj_bot = ( + int(object[0]), + int(object[1]), + int(object[2]), + int(object[3]), + ) + + ## case1 오른쪽으로 벗어나 있는 경우 + if person_right < obj_left: + return 0 + + ## case2 왼쪽으로 벗어나 있는 경우 + if person_left > obj_right: + return 0 + + ## case3 위쪽으로 벗어나 있는 경우 + if person_bot < obj_top: + return 0 + + ## case4 아래쪽으로 벗어나 있는 경우 + if person_top > obj_bot: + return 0 + + # 교집합 영역 찾기 + modified_left, modified_top, modified_right, modified_bot = obj_left, obj_top, obj_right, obj_bot + # x좌표 기준으로 이동 + if modified_left < person_left : # object 왼쪽 겹침 + modified_left = person_left + elif modified_right > person_right : #object 오른쪽 겹침 + modified_right = person_right + + # y좌표 기준으로 이동 + if modified_top < person_top : # object 위쪽 겹침 + modified_top = person_top + elif modified_bot > person_bot : #object 아래쪽 겹침 + modified_bot = person_bot + + width = modified_right - modified_left + height = modified_bot - modified_top + + if width * height > 0: + return (width * height)/((obj_right-obj_left)*(obj_bot-obj_top)) + else: + return 0 + + def plot_one_box(self, x, img, color=None, text_color=AI_CONST.TEXT_COLOR_WHITE, label=None, line_thickness=3): + # Plots one bounding box on image img + tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness + color = color or [random.randint(0, 255) for _ in range(3)] + c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) + cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) + if label: + + tf = max(tl - 1, 1) # font thickness + t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] + c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 + cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled + cv2.putText( + img, + label, + (c1[0], c1[1] - 2), + 0, + tl / 3, + text_color, + thickness=tf, + lineType=cv2.LINE_AA, + ) + + def plot_skeleton_kpts(self, im, kpts, steps, orig_shape=None): + # Plot the skeleton and keypointsfor coco datatset + palette = np.array( + [ + [255, 128, 0], + [255, 153, 51], + [255, 178, 102], + [230, 230, 0], + [255, 153, 255], + [153, 204, 255], + [255, 102, 255], + [255, 51, 255], + [102, 178, 255], + [51, 153, 255], + [255, 153, 153], + [255, 102, 102], + [255, 51, 51], + [153, 255, 153], + [102, 255, 102], + [51, 255, 51], + [0, 255, 0], + [0, 0, 255], + [255, 0, 0], + [255, 255, 255], + ] + ) + + skeleton = [ + [16, 14], + [14, 12], + [17, 15], + [15, 13], + [12, 13], + [6, 12], + [7, 13], + [6, 7], + [6, 8], + [7, 9], + [8, 10], + [9, 11], + [2, 3], + [1, 2], + [1, 3], + [2, 4], + [3, 5], + [4, 6], + [5, 7], + ] + + pose_limb_color = palette[ + [9, 9, 9, 9, 7, 7, 7, 0, 0, 0, 0, 0, 16, 16, 16, 16, 16, 16, 16] + ] + pose_kpt_color = palette[[16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 9, 9]] + radius = 4 + num_kpts = len(kpts) // steps + + for kid in range(num_kpts): + r, g, b = pose_kpt_color[kid] + x_coord, y_coord = kpts[steps * kid], kpts[steps * kid + 1] + if not (x_coord % 640 == 0 or y_coord % 640 == 0): + if steps == 3: + conf = kpts[steps * kid + 2] + if conf < AI_CONST.KPT_MIN_CONFIDENCE: + continue + cv2.circle( + im, + (int(x_coord), int(y_coord)), + radius, + (int(r), int(g), int(b)), + -1 * AI_CONST.HPE_THICKNESS_RAITO, + ) + + for sk_id, sk in enumerate(skeleton): + r, g, b = pose_limb_color[sk_id] + pos1 = (int(kpts[(sk[0] - 1) * steps]), int(kpts[(sk[0] - 1) * steps + 1])) + pos2 = (int(kpts[(sk[1] - 1) * steps]), int(kpts[(sk[1] - 1) * steps + 1])) + if steps == 3: + conf1 = kpts[(sk[0] - 1) * steps + 2] + conf2 = kpts[(sk[1] - 1) * steps + 2] + if conf1 < AI_CONST.KPT_MIN_CONFIDENCE or conf2 < AI_CONST.KPT_MIN_CONFIDENCE: + continue + if pos1[0] % 640 == 0 or pos1[1] % 640 == 0 or pos1[0] < 0 or pos1[1] < 0: + continue + if pos2[0] % 640 == 0 or pos2[1] % 640 == 0 or pos2[0] < 0 or pos2[1] < 0: + continue + cv2.line(im, pos1, pos2, (int(r), int(g), int(b)), thickness=2 * AI_CONST.HPE_THICKNESS_RAITO) + + def hpe_labeling(self,image,hpe_data): + for hpe in hpe_data: + _kpt=[] + for kpt, conf in zip(hpe["keypoints"], hpe["kpt_conf"]): + if kpt == None: + _kpt.append(0) + _kpt.append(0) + else: + _kpt.append(kpt[0]) + _kpt.append(kpt[1]) + _kpt.append(conf) + label_kpt = np.array(_kpt) + self.plot_skeleton_kpts(im=image, kpts=np.array(label_kpt), steps=3) + + def od_labeling(self,image,od_data): + for od in od_data: + if not project_config.SHOW_GLOVES: + if od['class_id'] in AI_CONST.UNVISIBLE_CLS: + continue + _label_color = [] + _text_color = [] + + if od["class_name"] not in list(self.color_table.keys()): + _label_color = AI_CONST.TEXT_COLOR_WHITE + _text_color = AI_CONST.TEXT_COLOR_BLACK + elif od["class_name"] in AI_CONST.text_color_white_list: + _label_color = AI_CONST.TEXT_COLOR_WHITE + _text_color = AI_CONST.TEXT_COLOR_BLACK + else: + _label_color = self.color_table[od["class_name"]].label_color + _text_color = self.color_table[od["class_name"]].font_color + + + self.plot_one_box( + od["bbox"], + image, + label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}", + color=_label_color, + text_color=_text_color, + line_thickness=AI_CONST.NORMAL_THICKNESS, + ) + + # # NOTE(gyong min) person만 bbox 표시 + # if od["class_name"] == 'person': + + # self.plot_one_box( + # od["bbox"], + # image, + # label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}", + # color=_label_color, + # text_color=_text_color, + # line_thickness=AI_CONST.NORMAL_THICKNESS, + # ) + + def border_labeling(self,image,hpe_data,od_data): + _current_pose_type = 0 + _border_color = [] + + _off_helmet = False + _off_glove = False + + #hpe warning + for hpe in hpe_data: + _current_pose_type = max(_current_pose_type, int(hpe['result']['pose_type'])) + + if _current_pose_type >= self.POSETYPE_CROSS: + #cross + _border_color = AI_CONST.POSE_CROSS_COLOR + elif _current_pose_type < self.POSETYPE_CROSS and _current_pose_type >= self.POSETYPE_FALL: + #fall + _border_color = AI_CONST.POSE_FALL_COLOR + + #NOTE(SGM)빨간 경고테두리 그리기 + if _current_pose_type>0: + cv2.rectangle( + image, + pt1=( + AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF, + AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF, + ), + pt2=( + image.shape[1] + - AI_CONST.BORDER_THICKNESS + - AI_CONST.BORDER_THICKNESS_HALF, + image.shape[0] + - AI_CONST.BORDER_THICKNESS + - AI_CONST.BORDER_THICKNESS_HALF, + ), + color=_border_color, + thickness=AI_CONST.BORDER_THICKNESS, + ) + + + + # cv2.putText( + # image, + # AI_CONST.BORDER_HPE_TEXT, + # AI_CONST.TEXT_HPE_STARTING_POINT, + # AI_CONST.TEXT_THICKNESS, + # AI_CONST.TEXT_SIZE, + # [0, 0, 0], + # thickness=1, + # lineType=cv2.LINE_AA, + # ) + + # od warning + for od in od_data: + if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[0]: + _off_helmet = True + if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[1]: + _off_glove = True + + if _off_glove and _off_helmet : + cv2.rectangle( + image, + pt1=(AI_CONST.BORDER_THICKNESS_HALF, AI_CONST.BORDER_THICKNESS_HALF), + pt2=( + image.shape[1] - AI_CONST.BORDER_THICKNESS_HALF, + image.shape[0] - AI_CONST.BORDER_THICKNESS_HALF, + ), + color=AI_CONST.WD_BORDER_COLOR, + thickness=AI_CONST.BORDER_THICKNESS) + + cv2.putText( + image, + AI_CONST.BORDER_OD_TEXT, + AI_CONST.TEXT_OD_STARTING_POINT, + AI_CONST.TEXT_THICKNESS, + AI_CONST.TEXT_SIZE, + [0, 0, 0], + thickness=1, + lineType=cv2.LINE_AA, + ) + + def check_keypoint_include(self, ppe_bbox, kpt_list, kpt_index_list, type): + """ + bbox에 keypoint가 포함되어 있는지 확인 + + :param ppe_bbox: ppe xyxy 좌표 + :param kpt_list: keypoint 정보 + :param kpt_index_list: keypoint 인덱스 정보 + :param type: 0: null값을 제외한 keypoint가 최소 하나라도 있으면 인정, 1: null값을 제외한 keypoint가 전부 있어야 인정 + + :return: boolean + """ + include = True + left, right = ppe_bbox[0], ppe_bbox[2] + top, bottom = ppe_bbox[1], ppe_bbox[3] + + if type == 0: + # 최소 하나 + include = False + for kpt_idx in kpt_index_list: + kpt = kpt_list[kpt_idx] + if kpt != None: + if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom): + include = True + break + else: + # 전부 + _null_count = 0 + for kpt_idx in kpt_index_list: + kpt = kpt_list[kpt_idx] + if kpt != None: + if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom): + pass + else: + include = False + break + else: + _null_count += 1 + + if _null_count == len(kpt_index_list): + include = False + + return include + + def hpe_frame_check(self, hpe_message, threshold=int(0x0080), maxcount=AI_CONST.HPE_FRAME_CHECK_MAX_COUNT): + _alert = False + if hpe_message: + for i in hpe_message: + if i['result']['pose_type']>= threshold: + self.hpe_frame_count += 1 + _alert = True + break + + if _alert: + if self.hpe_frame_count == maxcount: + self.hpe_frame_count = 0 + return hpe_message + else: + self.hpe_frame_count = 0 + + for j in hpe_message: + j['result']['pose_type'] = 0 + j['result']['pose_level'] = 0 + + else: + self.hpe_frame_count = 0 + return hpe_message + + def image_calibration(self, image): + """ + 이미지 역광 보정 + """ + if self.clahe: + # 이미지를 LAB 컬러 공간으로 변환 (L 채널에 CLAHE 적용) + + lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) + l, a, b = cv2.split(lab) + + # CLAHE 객체 생성 및 적용 + # clipLimit: 대비 제한 임계값, tileGridSize: 타일의 크기 + clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) + cl = clahe.apply(l) + + # 보정된 L 채널과 기존 a, b 채널을 합쳐 다시 LAB 이미지 생성 + limg = cv2.merge((cl, a, b)) + + # LAB 이미지를 다시 BGR 컬러 공간으로 변환 + corrected_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) + return corrected_img + + else: + return image + +def label_color(model,table): + label_colored=[] + text_colored=[] + model_key_list=list(model.model.names.values()) + + fixed_color=list(table.values()) + fixed_class=list(table.keys()) + + for cls in model_key_list: + if cls in fixed_class: + label_colored.append(table[cls]) + else: + while True: + _color = [random.randint(0, 255) for _ in range(3)] + + if _color not in fixed_color and _color not in label_colored: + break + label_colored.append(_color) + + if cls in AI_CONST.text_color_white_list: + text_colored.append(AI_CONST.TEXT_COLOR_WHITE) + else: + text_colored.append(AI_CONST.TEXT_COLOR_BLACK) + + return label_colored,text_colored + + + +if __name__ == "__main__": + pass diff --git a/hpe_classification/README.md b/hpe_classification/README.md new file mode 100644 index 0000000..471e20c --- /dev/null +++ b/hpe_classification/README.md @@ -0,0 +1,47 @@ +# UTILITY_YOLO_HPE_CLASSIFICATION +YOLO Pose 좌표 정보를 이용한 자세 추정 +

+### HPEClassification class +HPEClassification(pose_info, cross_ratio_threshold=0.1, cross_angle_threshold=(15.0, 165.0), falldown_tilt_ratio=0.8, falldown_tilt_angle=15, body_tilt_ratio=0.5, yolo_version=8) +- pose_info: pose keypoint 정보 (mandatory) + ```python + pose_info: { + 'person': (cx, cy, w, h, c), + 'keypoints': [ (x, y, c), (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16 + } + ``` +- cross_ratio_threshold: 팔 교차시 교차점의 최소 위치 비율. 생략시 0.1 +- cross_angle_threshold: 팔 교차시 교차각 범위 지정. 생략시 (15.0, 165.0) +- falldown_tilt_ratio: 넘어짐/비틀거림 감지를 위한 body rect 비율 (세로/가로). 생략시 0.8 +- falldown_tilt_angle: 상체 기울어짐 판정각. 작업자가 '작업중'일때 넘어짐 판정에 반영됨 (생략시 15도) +- body_tilt_ratio: 상체 기울어짐 판정을 위한 상체 비율 임계치 (가로/세로). 생략시 0.3 +- yolo_version: 사용 YOLO 버전. 생략시 8 + - v8 에서는 잡히지 않은 keypoint => (0,0) + - v7 에서는 잡히지 않은 keypoint => None +

+#### cross arms +- is_cross_arms() -> True/False +- get_cross_point() -> (x, y) +- get_cross_ratio() -> (ratio1, ratio2) +- get_cross_angle() -> 교차각 +- set_cross_ratio_threshold(value) +- set_cross_angle_threshold(angle) +#### fall down +- is_falldown(is_working_on) -> True/False + - is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다. +- set_falldown_tilt_ratio(value) +#### etc +- get_hpe_type(is_working_on) -> 탐지된 type 종류가 모두 포함된 정보. + - is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다. + - HPETypeMask.NORMAL = 0x0000 + - HPETypeMask.FALL_DOWN = 0x0080 + - HPETypeMask.CROSS_ARM = 0x0100 +- get_hpe_type(query, is_working_on) -> True/False +- get_hpe_level(is_working_on) -> 위험도. 현재 0~9까지의 값. + - is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다. +
+### HPETypeMask class +HPEClassification.get_hpe_type() 에서 return 되는 정보를 구분하는 mask 값 +- NORMAL = 0x0000 +- FALL_DOWN = 0x0080 +- CROSS_ARM = 0x0100 \ No newline at end of file diff --git a/hpe_classification/__init__.py b/hpe_classification/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hpe_classification/config.py b/hpe_classification/config.py new file mode 100644 index 0000000..2a685b5 --- /dev/null +++ b/hpe_classification/config.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +""" +@file : config.py +@author: yunikim +@license: A2TEC & DAOOLDNS +@brief: + +@section Modify History +- 2023-11-21 yunikim base + +""" +import logging +import project_config +from config_loader import CFG + +_hpe = CFG.get('hpe', {}) + +DEFAULT_YOLO_VERSION = 8 + +FALLDOWN_TILT_RATIO = _hpe.get('falldown_tilt_ratio', 0.80) +FALLDOWN_TILT_ANGLE = _hpe.get('falldown_tilt_angle', 15) +BODY_TILT_RATIO = _hpe.get('body_tilt_ratio', 0.30) + +CROSS_ARM_RATIO_THRESHOLD = _hpe.get('cross_arm_ratio_threshold', 0.10) +CROSS_ARM_ANGLE_THRESHOLD = tuple(_hpe.get('cross_arm_angle_threshold', [15.0, 165.0])) + +if project_config.ADD_CROSS_ARM: + ARM_ANGLE_THRESHOLD = tuple(_hpe.get('arm_angle_threshold_cross', [80.0, 135.0])) +else: + ARM_ANGLE_THRESHOLD = tuple(_hpe.get('arm_angle_threshold_default', [150.0, 195.0])) +ELBOW_ANGLE_THRESHOLD = tuple(_hpe.get('elbow_angle_threshold', [150.0, 185.0])) + +DEFAULT_LOG_LEVEL = logging.INFO diff --git a/hpe_classification/hpe_classification.py b/hpe_classification/hpe_classification.py new file mode 100644 index 0000000..226e707 --- /dev/null +++ b/hpe_classification/hpe_classification.py @@ -0,0 +1,581 @@ +""" +@file : hpe_classification.py +@author: yunikim +@license: DAOOLDNS +@brief: HPE 자세 추정 알고리즘. + 작업중지동작(cross arm) 및 넘어짐 감지. + 기존의 두 선분의 교차여부, 교차점 좌표 및 해당 좌표의 각 선분 상 위치 비율을 얻는 로직을 통합함. + +@section Modify History +- 2023-06-21 yunikim base + +""" + +import math +import numpy as np +from hpe_classification import config +import logging +import project_config + + +class HPETypeMask: + NORMAL = 0x0000 + FALL_DOWN = 0x0080 + CROSS_ARM = 0x0100 + + +class HPEClassification: + # keypoint는 실제 사용번호와 index가 다르다. + # index 0~16 => keypoint 1~17 + class __KPIndex: + NOSE = 0 # 코 + RIGHT_EYE = 1 # 눈 + LEFT_EYE = 2 + RIGHT_EAR = 3 # 귀 + LEFT_EAR = 4 + RIGHT_SHOULDER = 5 # 어깨 + LEFT_SHOULDER = 6 + RIGHT_ELBOW = 7 # 팔꿈치 + LEFT_ELBOW = 8 + RIGHT_WRIST = 9 # 손목 + LEFT_WRIST = 10 + RIGHT_HIP = 11 # 엉덩이 + LEFT_HIP = 12 + RIGHT_KNEE = 13 # 무릎 + LEFT_KNEE = 14 + RIGHT_FEET = 15 # 발 + LEFT_FEET = 16 + MAX = 17 + + class __HPELevel: + NORMAL = 0 + FALLDOWN = 8 + CROSSARM = 9 + + # parameter: + # pose_info: { + # 'person': (cx, cy, w, h, c), + # 'keypoints': [ (x, y, c), None, (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16 + # } + def __init__(self, pose_info: dict, + cross_ratio_threshold: float = config.CROSS_ARM_RATIO_THRESHOLD, + cross_angle_threshold: tuple = config.CROSS_ARM_ANGLE_THRESHOLD, + arm_angle_threshold: tuple = config.ARM_ANGLE_THRESHOLD, + elbow_angle_threshold: tuple = config.ELBOW_ANGLE_THRESHOLD, + falldown_tilt_ratio: float = config.FALLDOWN_TILT_RATIO, + falldown_tilt_angle: float = config.FALLDOWN_TILT_ANGLE, + body_tilt_ratio: float = config.BODY_TILT_RATIO, + yolo_version: int = config.DEFAULT_YOLO_VERSION): + """ + :param pose_info: HPE 좌표 정보 + :param cross_ratio_threshold: 팔 교차시 최소 비율 + :param cross_angle_threshold: 팔 교차시 검출 각도 범위. (min, max) + :param arm_angle_threshold: 팔 각도 범위. (min, max) + :param elbow_angle_threshold: 팔꿈치 각도 범위. (min, max) + :param falldown_tilt_ratio: 넘어짐 감지를 위한 비율 + :param falldown_tilt_angle: 작업중 상체 구부러짐 판정각 + :param body_tilt_ratio: 작업중 상체 구부러짐 판정을 위한 비율 임계치 + :param yolo_version: yolo version + """ + + self.__raw_data = pose_info + self.__bounding_box = pose_info['person'] + self.__keypoints = pose_info['keypoints'] + + # logger 설정 + self.__logger = self.__set_logger(self.__class__.__name__) + + # yolov8 대응 + # v8 에서는 잡히지 않은 point => (0,0) + # v7 에서는 잡히지 않은 point => None + if yolo_version == 8: + self.__convert_zero_point_to_none(self.__keypoints) + + # 작업중지동작 detect 관련 변수 + self.__cross_ratio_threshold = cross_ratio_threshold + self.__cross_angle_threshold = cross_angle_threshold + self.__cross_point = None + self.__cross_ratio = None + self.__cross_angle = None + self.__line1_point1 = self.__keypoints[self.__KPIndex.RIGHT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.RIGHT_ELBOW] else None + self.__line1_point2 = self.__keypoints[self.__KPIndex.RIGHT_WRIST][0:2] if self.__keypoints[self.__KPIndex.RIGHT_WRIST] else None + self.__line2_point1 = self.__keypoints[self.__KPIndex.LEFT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.LEFT_ELBOW] else None + self.__line2_point2 = self.__keypoints[self.__KPIndex.LEFT_WRIST][0:2] if self.__keypoints[self.__KPIndex.LEFT_WRIST] else None + + self.__detect_cross_arms() # 팔 교차 detect + + # 팔각도 detect 관련 변수 + self.__cross_arm_angle_threshold = arm_angle_threshold + self.__cross_left_arm_angle = None + self.__cross_right_arm_angle = None + + self.__cross_elbow_angle_threshold = elbow_angle_threshold + self.__cross_left_elbow_angle = None + self.__cross_right_elbow_angle = None + + self.__left_wrist = self.__keypoints[self.__KPIndex.LEFT_WRIST][0:2] if self.__keypoints[self.__KPIndex.LEFT_WRIST] else None + self.__left_elbow = self.__keypoints[self.__KPIndex.LEFT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.LEFT_ELBOW] else None + self.__left_shoulder = self.__keypoints[self.__KPIndex.LEFT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.LEFT_SHOULDER] else None + + self.__right_wrist = self.__keypoints[self.__KPIndex.RIGHT_WRIST][0:2] if self.__keypoints[self.__KPIndex.RIGHT_WRIST] else None + self.__right_elbow = self.__keypoints[self.__KPIndex.RIGHT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.RIGHT_ELBOW] else None + self.__right_shoulder = self.__keypoints[self.__KPIndex.RIGHT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.RIGHT_SHOULDER] else None + + self.__detect_angle_arms() # 팔 각도 detect + + # 넘어짐 detect 관련 변수 + self.__falldown_tilt_ratio = falldown_tilt_ratio + self.__falldown_tilt_angle = falldown_tilt_angle + self.__body_tilt_ratio = body_tilt_ratio + self.__body_tilt_angle = None + self.__overturn_upper = False + self.__overturn_lower = False + self.__badly_tilt = False + self.__kp_rhip = self.__keypoints[self.__KPIndex.RIGHT_HIP][0:2] if self.__keypoints[self.__KPIndex.RIGHT_HIP] else None + self.__kp_lhip = self.__keypoints[self.__KPIndex.LEFT_HIP][0:2] if self.__keypoints[self.__KPIndex.LEFT_HIP] else None + self.__kp_rsldr = self.__keypoints[self.__KPIndex.RIGHT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.RIGHT_SHOULDER] else None + self.__kp_lsldr = self.__keypoints[self.__KPIndex.LEFT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.LEFT_SHOULDER] else None + self.__kp_rft = self.__keypoints[self.__KPIndex.RIGHT_FEET][0:2] if self.__keypoints[self.__KPIndex.RIGHT_FEET] else None + self.__kp_lft = self.__keypoints[self.__KPIndex.LEFT_FEET][0:2] if self.__keypoints[self.__KPIndex.LEFT_FEET] else None + + self.__detect_falldown() # 넘어짐 detect + + @property + def falldown_ratio(self): + return self.__falldown_tilt_ratio + + @property + def cross_threshold(self): + return self.__cross_ratio_threshold + + @property + def body_tilt_angle(self): + return self.__body_tilt_angle + + def __convert_zero_point_to_none(self, points): + """ + 좌표가 (0,0)인 경우 None으로 대체 + (yolov8 대응) + :param points: 좌표리스트 + """ + for idx, point in enumerate(points): + if point is not None and point[0] == 0 and point[1] == 0: + points[idx] = None + + def get_pose_info(self): + return self.__raw_data + + def get_keypoints(self): + return self.__keypoints + + def get_bounding_box(self): + return self.__bounding_box + + def is_cross_arms(self): + """ + 팔이 cross 상태인지 여부 확인 + """ + if self.__cross_point is not None and self.__cross_ratio is not None and self.__cross_angle is not None: + if self.__cross_ratio[0] > self.__cross_ratio_threshold and self.__cross_ratio[1] > self.__cross_ratio_threshold: + if self.__cross_angle_threshold[0] < self.__cross_angle < self.__cross_angle_threshold[1]: + return True + + return False + + def is_angle_arms(self): + """ + 팔이 꺽여있는지 확인 + """ + if project_config.ADD_CROSS_ARM: + if self.__cross_left_arm_angle is not None and self.__cross_right_arm_angle is not None: + if self.__cross_arm_angle_threshold[0] <= self.__cross_left_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_left_arm_angle and \ + self.__cross_arm_angle_threshold[0] <= self.__cross_right_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_right_arm_angle: + return True + else: + if self.__cross_left_arm_angle is not None and self.__cross_right_arm_angle is not None and self.__cross_left_elbow_angle is not None and self.__cross_left_elbow_angle is not None: + if (self.__cross_arm_angle_threshold[0] <= self.__cross_left_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_left_arm_angle) and \ + (self.__cross_arm_angle_threshold[0] <= self.__cross_right_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_right_arm_angle) and \ + (self.__cross_elbow_angle_threshold[0] <= self.__cross_left_elbow_angle and self.__cross_elbow_angle_threshold[1] >= self.__cross_left_arm_angle) and \ + (self.__cross_elbow_angle_threshold[0] <= self.__cross_right_elbow_angle and self.__cross_elbow_angle_threshold[1] >= self.__cross_right_arm_angle): + return True + return False + + def get_cross_point(self): + if self.__cross_point is None: + self.__logger.debug("no cross info") + return self.__cross_point + + def get_cross_ratio(self): + if self.__cross_ratio is None: + self.__logger.debug("no cross info") + return self.__cross_ratio + + def get_cross_angle(self): + if self.__cross_angle is None: + self.__logger.debug("no cross info") + return self.__cross_angle + + def set_cross_ratio_threshold(self, threshold): + """ + 교차여부를 결정할 때 사용하는 교차점 위치 최소 비율 설정 + :param threshold: 0.0 ~ 0.5 사이의 값 + """ + self.__cross_ratio_threshold = threshold + + def set_cross_angle_threshold(self, angle_range: tuple): + """ + 교차여부를 결정할 때 사용하는 교차각 범위 설정 + :param angle_range: 교차각 범위, ex) (15.0, 165.0) + """ + self.__cross_angle_threshold = angle_range + + def set_falldown_tilt_ratio(self, tilt_ratio): + self.__falldown_tilt_ratio = tilt_ratio + self.__detect_falldown() + + def set_falldown_tilt_angle(self, tilt_angle): + """ + :param tilt_angle: 작업중 상체 구부러짐 판정각 + """ + self.__falldown_tilt_angle = tilt_angle + self.__detect_falldown() + + def is_falldown(self, is_working_on=True): + """ + 넘어짐 상태 여부 확인 + :param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다. + """ + result = True if self.__overturn_upper or self.__overturn_lower or self.__badly_tilt else False + tilt_angle = False + if self.__body_tilt_angle is not None: + if (self.__body_tilt_angle[0] < self.__falldown_tilt_angle) or (self.__body_tilt_angle[1] < self.__falldown_tilt_angle): + tilt_angle = True + + return result or (is_working_on and tilt_angle) + + def get_hpe_type(self, query=None, is_working_on=True): + """ + 감지된 Pose의 bit masking 획득하거나 특정 pose 여부를 확인한다. + :param query: 확인하고자 하는 Pose Mask. 생략시 Pose bit mask 리턴 + :param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다. + :return: query 입력시 True/False, 미입력시 Pose bit mask + """ + result = 0x0000 + + if self.is_falldown(is_working_on): + result |= HPETypeMask.FALL_DOWN + if project_config.ADD_CROSS_ARM: + if self.is_cross_arms(): + result |= HPETypeMask.CROSS_ARM + elif self.is_angle_arms(): + result |= HPETypeMask.CROSS_ARM + else: + if self.is_angle_arms(): + result |= HPETypeMask.CROSS_ARM + self.__logger.debug(msg=f"***angle arms -> left angle: {int(self.__cross_left_arm_angle)}, right angle: {int(self.__cross_right_arm_angle)}, left elbow: {int(self.__cross_left_elbow_angle)}, right elbow: {int(self.__cross_right_elbow_angle)}") + + if query is None: + return result + + return (result & query) != 0 + + # 현재 type별 level값을 return 하고 있으나, + # 추후 특정 계산식으로 level을 계산하게 될 예정 + def get_hpe_level(self, is_working_on=True): + """ + :param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다. + """ + result = 0 + + if self.is_falldown(is_working_on) and result < self.__HPELevel.FALLDOWN: + result = self.__HPELevel.FALLDOWN + + if (self.is_cross_arms() or self.is_angle_arms()) and result < self.__HPELevel.CROSSARM: + result = self.__HPELevel.CROSSARM + + return result + + def __detect_cross_arms(self): + self.__cross_point = self.__cross_ratio = self.__cross_angle = None + + if not self.__line1_point1 or not self.__line1_point2 or not self.__line2_point1 or not self.__line2_point2: + self.__logger.debug("need 4 points for detecting cross arms") + return + + # 손목이 팔꿈치 아래에 있는 경우는 제외 + if self.__line1_point1[1] < self.__line2_point2[1] and self.__line2_point1[1] < self.__line1_point2[1]: + self.__logger.debug("both wrist is below elbow.") + return + + line1_ccw = self.__ccw(self.__line1_point1, self.__line1_point2, self.__line2_point1) * \ + self.__ccw(self.__line1_point1, self.__line1_point2, self.__line2_point2) + line2_ccw = self.__ccw(self.__line2_point1, self.__line2_point2, self.__line1_point1) * \ + self.__ccw(self.__line2_point1, self.__line2_point2, self.__line1_point2) + + if line1_ccw < 0 and line2_ccw < 0: + self.__cross_point = self.__get_cross_point() + self.__cross_ratio = self.__get_cross_ratio() + self.__cross_angle = self.__get_cross_angle() + + # 두 선분의 교차지점 좌표 획득. + # 교차됨이 확인 된 후에 사용할 것. + # 출력: 교차지점 좌표 tuple + def __get_cross_point(self): + """ + 두 직선의 교차점 좌표를 구함 + :return: + """ + x1, y1 = self.__line1_point1[0:2] + x2, y2 = self.__line1_point2[0:2] + x3, y3 = self.__line2_point1[0:2] + x4, y4 = self.__line2_point2[0:2] + + cx = ((x1 * y2 - x2 * y1) * (x3 - x4) - (x1 - x2) * (x3 * y4 - x4 * y3)) / \ + ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)) + cy = ((x1 * y2 - x2 * y1) * (y3 - y4) - (y1 - y2) * (x3 * y4 - x4 * y3)) / \ + ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)) + + return cx, cy + + def __get_cross_angle(self, p1=None, p2=None, cross_point=None): + """ + 교차하는 팔의 각도를 구함 + :param p1: p2: 구하고자 하는 vector 방향의 각 좌표 + :param cross_point: 교차점 + """ + if cross_point is None and self.__cross_point is None: + return None + + p1 = self.__line1_point1[0:2] if p1 is None else p1 # 오른 팔꿈치 + p2 = self.__line2_point2[0:2] if p2 is None else p2 # 왼 손목 + cross_point = self.__cross_point if cross_point is None else cross_point + + return self.three_point_angle(p1, p2, cross_point) + + @staticmethod + def three_point_angle(p1, p2, cross_point): + """ + 세 점 사이의 각도를 구함 + """ + x1, y1 = p1[0:2] + x2, y2 = p2[0:2] + cx, cy = cross_point[0:2] + + radian = np.arctan2((y1-cy), (x1-cx)) - np.arctan2((y2-cy), (x2-cx)) + radian = np.fabs(radian * 180 / np.pi) + if radian > 180: + radian = 360 - radian + + return radian + + def __get_cross_ratio(self): + """ + 선분 위의 점이 해당 선분의 어느정도 위치에 있는지 확인 + (끝점에서 얼마나 떨어져 있는지 선분에 대한 비율) + :return: 작은 비율 기준으로 리턴됨. 0.0 ~ 0.5, 0.5 이면 가운데 + """ + length_line1 = math.dist(self.__line1_point1[0:2], self.__line1_point2[0:2]) + length_point1 = math.dist(self.__line1_point1[0:2], self.__cross_point) + + ratio1 = length_point1 / length_line1 + if ratio1 > 0.5: + ratio1 = math.fabs(1.0 - ratio1) + + length_line2 = math.dist(self.__line2_point1[0:2], self.__line2_point2[0:2]) + length_point2 = math.dist(self.__line2_point1[0:2], self.__cross_point) + + ratio2 = length_point2 / length_line2 + if ratio2 > 0.5: + ratio2 = math.fabs(1.0 - ratio2) + + return ratio1, ratio2 + + # 입력: 3개의 point, 각 (x, y) + # 출력: 0, 1, -1 + @staticmethod + def __ccw(point1, point2, point3): + """ + 3개의 점에 대한 CCW 계산 함수 + :return: -1, 0, 1 + """ + x1, y1 = point1[0:2] + x2, y2 = point2[0:2] + x3, y3 = point3[0:2] + cross_product = ((x2 - x1) * (y3 - y1)) - ((x3 - x1) * (y2 - y1)) + + if cross_product > 0: + return 1 # 반시계방향 (counter clockwise) + elif cross_product < 0: + return -1 # 시계방향 (clockwise) + + return 0 # 평행 (collinear) + + def __detect_angle_arms(self): + self.__cross_left_arm_angle = self.__cross_right_arm_angle = self.__cross_left_elbow_angle = self.__cross_right_elbow_angle = None + + if project_config.ADD_CROSS_ARM: + if self.__left_elbow is None or self.__left_shoulder is None or self.__right_elbow is None or self.__right_shoulder is None: + self.__logger.debug("need 4 points for detecting angle arms") + return + else: + if self.__left_elbow is None or self.__left_shoulder is None or self.__right_elbow is None or self.__right_shoulder is None or \ + self.__left_wrist is None or self.__right_wrist is None: + self.__logger.debug("need 6 points for detecting angle arms") + return + + self.__cross_left_arm_angle, self.__cross_right_arm_angle, self.__cross_left_elbow_angle ,self.__cross_right_elbow_angle = self.__get_angle_arms() + + @staticmethod + def three_point_arm_angle(p1, p2, cross_point): + """ + 세 점 사이(팔)의 각도를 구함 + p1: 몸쪽에서 가장 먼 keypoint + p2: 몸쪽에서 가장 가까운 keypoint + """ + x1, y1 = p1[0:2] + x2, y2 = p2[0:2] + cx, cy = cross_point[0:2] + + radian = np.arctan2((y1-cy), (x1-cx)) - np.arctan2((y2-cy), (x2-cx)) + radian = np.fabs(radian * 180 / np.pi) + if y1 > cy or y1 > y2: + return None + + if radian > 180 : + radian = 360 - radian + + return radian + + def __get_angle_arms(self): + left_angle = self.three_point_arm_angle(p1=self.__left_elbow,cross_point=self.__left_shoulder,p2=self.__right_shoulder) + right_angle = self.three_point_arm_angle(p1=self.__right_elbow,cross_point=self.__right_shoulder,p2=self.__left_shoulder) + left_elbow = None if project_config.ADD_CROSS_ARM else self.three_point_arm_angle(p1=self.__left_wrist,cross_point=self.__left_elbow,p2=self.__left_shoulder) + right_elbow = None if project_config.ADD_CROSS_ARM else self.three_point_arm_angle(p1=self.__right_wrist,cross_point=self.__right_elbow,p2=self.__right_shoulder) + + return left_angle,right_angle,left_elbow,right_elbow + + def __detect_falldown(self): + """ + 인스턴스 생성시 주어진 key points를 바탕으로 falldown 판단 + - shoulder 좌표가 hip 좌표 아래에 있는 경우: __overturn_upper set + - hip 좌표가 feet 좌표 아래에 있는 경우: __overturn_lower set + - shoulder 좌표 및 feet 좌표의 ratio를 계산하여 임계치 이하인 경우: __badly_tilt set + - shoulder 좌표 및 hip 좌표를 통해 body tilt angle 계산 __body_tilt_angle에 저장 + """ + self.__overturn_upper = False + self.__overturn_lower = False + self.__badly_tilt = False + self.__body_tilt_angle = None + + # 1. 상체 뒤집힘 + if self.__kp_lhip and self.__kp_rhip and self.__kp_rsldr and self.__kp_lsldr: + if self.__kp_lhip[1] < self.__kp_lsldr[1] and self.__kp_rhip[1] < self.__kp_rsldr[1]: + self.__overturn_upper = True + else: + self.__logger.debug("need hip and shoulder points for detecting upper body falldown") + + # 2. 하체 뒤집힘 + if self.__kp_rft and self.__kp_lft and self.__kp_lhip and self.__kp_rhip: + if self.__kp_lhip[1] > self.__kp_lft[1] and self.__kp_rhip[1] > self.__kp_rft[1]: + self.__overturn_lower = True + else: + self.__logger.debug("need feet points for detecting lower body falldown") + + # 3. 신체의 기울어짐 (좌우 어깨, 좌우 발) + if self.__kp_lft and self.__kp_rft and self.__kp_rsldr and self.__kp_lsldr: + xmax = np.max([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lft[0], self.__kp_rft[0]]) + ymax = np.max([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lft[1], self.__kp_rft[1]]) + xmin = np.min([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lft[0], self.__kp_rft[0]]) + ymin = np.min([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lft[1], self.__kp_rft[1]]) + body_width = math.fabs(xmax - xmin) + body_height = math.fabs(ymax - ymin) + + if (body_width > 1e-4) and ((body_height / body_width) < self.__falldown_tilt_ratio): + self.__badly_tilt = True + else: + self.__logger.debug("need feet and shoulder points for detecting badly tilt") + + # 4. 상체 기울어짐 (좌우 어깨, 좌우 엉덩이) + if self.__kp_lhip and self.__kp_rhip and self.__kp_rsldr and self.__kp_lsldr: + angle_left = self.three_point_angle(self.__kp_lsldr, self.__kp_rhip, self.__kp_lhip) + angle_right = self.three_point_angle(self.__kp_rsldr, self.__kp_lhip, self.__kp_rhip) + self.__body_tilt_angle = (angle_left, angle_right) + + xmax = np.max([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lhip[0], self.__kp_rhip[0]]) + ymax = np.max([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lhip[1], self.__kp_rhip[1]]) + xmin = np.min([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lhip[0], self.__kp_rhip[0]]) + ymin = np.min([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lhip[1], self.__kp_rhip[1]]) + body_width = math.fabs(xmax - xmin) + body_height = math.fabs(ymax - ymin) + + if (body_height > 1e-4) and ((body_width / body_height) < self.__body_tilt_ratio): + self.__body_tilt_angle = None + else: + self.__logger.debug("need feet and shoulder points for calculating body tilt angle") + + def __set_logger(self, name): + logger = logging.getLogger(name) + logger_handler = logging.StreamHandler() + logger_handler.setFormatter(logging.Formatter("[%(levelname)s][%(asctime)s][%(module)s::%(funcName)s()] %(message)s")) + logger.addHandler(logger_handler) + logger.setLevel(config.DEFAULT_LOG_LEVEL) + + return logger + + def log(self, level, message): + self.__logger.log(level, message) + + +if __name__ == "__main__": + # pose_info: { + # 'person': (cx, cy, w, h, c), + # 'keypoints': [ (x, y, c), None, (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16 + # } + pose_info1 = { + 'person': (0, 0, 0, 0, 0), + 'keypoints': [ + (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), + (4.64, 3, 0), (-3.2, 2.85, 0), # 오른쪽, 왼쪽 어깨 + (-2.83, 2.2, 0), (5.45, 3.34, 0), # 오른쪽, 왼쪽 팔꿈치 + (6.5, 1.6, 0), (0.98, 1.77, 0), # 오른쪽, 왼쪽 손목 + (3.4, 3.9, 0), (-8.6, 3.2, 0), # 오른쪽, 왼쪽 엉덩이 + (0, 0, 0), (0, 0, 0), + (0, 20, 0), (0, 20, 0) # 오른쪽, 왼쪽 발 + ] + } + test1 = HPEClassification(pose_info1, cross_ratio_threshold=0.1) + print(f'is_cross_arms({test1.cross_threshold}):', test1.is_cross_arms()) + print('angle:', test1.get_cross_angle()) + test1.set_cross_ratio_threshold(0.05) + print(f'is_cross_arms({test1.cross_threshold}):', test1.is_cross_arms()) + print(test1.get_cross_point()) + print(test1.get_cross_ratio()) + print('angle:', test1.get_cross_angle()) + print('is_falldown:', test1.is_falldown(True)) + print(test1.get_hpe_type(HPETypeMask.CROSS_ARM)) + print(test1.get_hpe_type(HPETypeMask.FALL_DOWN)) + print(hex(test1.get_hpe_type())) + print('hpe level:', test1.get_hpe_level()) + print('--------------------------') + print('body tilt angle:', test1.body_tilt_angle) + + print('--------------------------') + + pose_info2 = { + 'person': (0, 0, 0, 0, 0), + 'keypoints': [ + (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), + (5, 20, 0), (3, 17, 0), # 오른쪽, 왼쪽 어깨 + (-2.83, 2.2, 0), (0.98, 1.77, 0), # 오른쪽, 왼쪽 팔꿈치 + (6.5, 1.6, 0), (5.45, 3.34, 0), # 오른쪽, 왼쪽 손목 + (0, 30, 0), (0, 30, 0), # 오른쪽, 왼쪽 엉덩이 + (0, 0, 0), (0, 0, 0), + (37, 31, 0), (40, 42, 0) # 오른쪽, 왼쪽 발 + ] + } + test2 = HPEClassification(pose_info2, falldown_tilt_ratio=0.5) + print('is_cross_arms:', test2.is_cross_arms()) + print(f'is_falldown({test2.falldown_ratio}):', test2.is_falldown()) + test2.set_falldown_tilt_ratio(0.7) + print(f'is_falldown({test2.falldown_ratio}):', test2.is_falldown()) + print('hpe level:', test2.get_hpe_level()) diff --git a/hpe_classification/requirements.txt b/hpe_classification/requirements.txt new file mode 100644 index 0000000..d773a2d --- /dev/null +++ b/hpe_classification/requirements.txt @@ -0,0 +1,2 @@ +numpy +opencv-python diff --git a/load_models.py b/load_models.py new file mode 100644 index 0000000..a9ffe03 --- /dev/null +++ b/load_models.py @@ -0,0 +1,108 @@ +import numpy as np +from ultralytics import YOLO + +import demo_const as AI_CONST +import project_config + +# from ai_engine.custom_logger.custom_log import log + +class AIObjectModel: + """ + Yolov8 OD model Load + """ + + MODEL_NAME = "OD" + WEIGHTS = AI_CONST.WEIGHTS_YOLO + + def __init__(self) -> None: + self.model = "" + self.set_model(weights=self.WEIGHTS) + + def set_model(self, weights): + self.model = YOLO(weights) + + # Check device status + import torch + device_status = "CUDA (GPU)" if torch.cuda.is_available() else "CPU" + print(f"[{self.MODEL_NAME}] Model loaded. Using: {device_status}") + + # warm up models (입력 영상 해상도가 다를경우 수정해야함) + img = np.zeros([1080,1920,3],dtype=np.uint8) # black image FHD 1920x1080 + # img = np.zeros([360,640,3],dtype=np.uint8) # black image HD 640×360 + self.model.predict(source=img, verbose=False) + + # log.info(f"YOLOV8 {self.MODEL_NAME} MODEL LOADED") + + def get_model(self): + return self.model + + def get_class_info(self): + return self.model.names + +class AIHelmetObjectModel(AIObjectModel): + """ + Yolov8 OD model Load + """ + + MODEL_NAME = "OD-Helmet" + WEIGHTS = AI_CONST.WEIGHTS_YOLO_HELMET + + +class AIHPEModel(AIObjectModel): + """ + Yolov8 HPE model Load + """ + + MODEL_NAME = "HPE" + WEIGHTS = AI_CONST.WEIGHTS_POSE + + +class AIModelManager: + def __init__(self) -> None: + self.od_model_info = "" + self.hpe_model_info = "" + + self.helmet_model_info = "" + + def set_od(self): # set od + if not self.od_model_info: + self.od_model_info = AIObjectModel() + + def set_hpe(self): # set hpe + if not self.hpe_model_info: + self.hpe_model_info = AIHPEModel() + + def set_helmet(self): # set od + if not self.helmet_model_info: + self.helmet_model_info = AIHelmetObjectModel() + + def get_od(self): + if not self.od_model_info: + raise Exception("YOLO model not loaded") + + return self.od_model_info + + def get_hpe(self): + if not self.hpe_model_info: + raise Exception("HPE model not loaded") + + return self.hpe_model_info + + def get_helmet(self): + if not self.helmet_model_info: + raise Exception("helmet model not loaded") + + return self.helmet_model_info + + +model_manager = "" + +if not model_manager: + model_manager = AIModelManager() + model_manager.set_od() + model_manager.set_hpe() + if project_config.USE_HELMET_MODEL: + model_manager.set_helmet() + +if __name__ == "__main__": + pass \ No newline at end of file diff --git a/mqtt.py b/mqtt.py new file mode 100644 index 0000000..6402f6d --- /dev/null +++ b/mqtt.py @@ -0,0 +1,124 @@ +import threading +import os, sys +import paho.mqtt.client as mqtt + +""" +todo + +log 설정 별도 필요 +상수값 설정 필요 +""" + +MQTT_USER_ID = "a2d2admin" +MQTT_USER_PW = "a2d24992!" +MQTT_HOST = "localhost" +MQTT_PORT = 52002 + +class MQTTManager: + """ + MQTT manager + subscribe 사용시 set_onmessage 함수 이용하여 on_message세팅 + """ + + def __init__(self, + name="MQTT", + id=MQTT_USER_ID, + pw=MQTT_USER_PW, + host=MQTT_HOST, + port=MQTT_PORT) -> None: + self.name = name + self.id = id + self.pw = pw + self.host = host + self.port = port + + self.status = False + self.status_subscribe = False + + self.client = mqtt.Client() + + self.client.username_pw_set(self.id, self.pw) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.on_subscribe = self.on_subscribe + self.client.on_unsubscribe = self.on_unsubscribe + self.client.on_message = self.on_message + + self.mqtt_lock = threading.Lock() + + def __del__(self): + self.end() + + def set_onmessage(self, function): + self.client.on_message = function + + def get_status(self): + return self.status + + def mqtt_connect(self): + try: + if self.status: + pass #log.warning('Already connected to mqtt') + else: + self.client.connect(self.host, self.port) + except Exception as e: + pass #log.error((f"{self.name} connect fail")) + raise ConnectionError(f"{self.name} connect fail") + + def on_connect(self, client, userdata, flags, rc): + """ + mqtt 연결 확인 함수 + 연결이 정상적으로 되었다면 rc = 0 + """ + if rc == 0: + self.mqtt_lock.acquire() + self.status = True + pass #log.info(f"{self.name} connected OK") + self.mqtt_lock.release() + else: + pass #log.warning("Bad connection Returned code=", rc) + + def on_disconnect(self, client, userdata, flags, rc=0): + pass #log.info(f"{self.name} disconnected") + self.mqtt_lock.acquire() + self.status = False + self.mqtt_lock.release() + + def on_subscribe(self, client, userdata, mid, granted_qos): + self.status_subscribe = True + pass #log.info("subscribed: " + str(mid) + " " + str(granted_qos)) + + def on_unsubscribe(self, client, userdata, mid): + self.status_subscribe = False + pass #log.info("unsubscribed: " + str(mid)) + + def on_message(self, client, userdata, msg): + """ + 구독중인 상태에서 메세지가 들어왔을때 처리 함수 + set_onmessage 에서 set 하여 사용 + """ + pass + + def start_pub(self): + self.client.loop_start() + + def start_sub(self, topic): + self.client.subscribe(topic) + self.client.loop_start() + + def end_sub(self, topic): + self.client.unsubscribe(topic) + # self.client.loop_stop() + + def end(self): + self.client.loop_stop() + self.client.disconnect() + pass #log.info(f"{self.name} Ending Connection") + + +mqtt_publisher = None + +if mqtt_publisher is None: + mqtt_publisher = MQTTManager() + mqtt_publisher.mqtt_connect() + mqtt_publisher.start_pub() \ No newline at end of file diff --git a/parsing_msg.py b/parsing_msg.py new file mode 100644 index 0000000..cf807f3 --- /dev/null +++ b/parsing_msg.py @@ -0,0 +1,147 @@ +import cv2,json +from base64 import b64encode + +from schema import * + + +class ParsingMsg(): + + IMAGE_FORMAT = 'jpg' + + def __init__(self): + self.msg = None + self.img = None + self.parsed_msg = None + + def set(self, msg:list, img): + self.msg = None + self.img = None + + self.msg = msg + self.img = img + self.parsed_msg = None + + def get(self): + result = self.parsed_msg + return result + + def image_encoding(self, img_data): + """ + 이미지데이터(np.ndarray) 를 바이너리 데이터로 변환 + :param img_data: 이미지 데이터 + :return: base64 format + """ + _, JPEG = cv2.imencode(".jpg", img_data, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) + # Base64 encode + b64 = b64encode(JPEG) + + return b64.decode("utf-8") + + def image_cropping(self, img_data, bbox): + """ + 이미지데이터(np.ndarray) 를 bbox 기준으로 크롭 + :param img_data: 이미지 데이터 + :param bbox: [x1, y1, x2, y2] + :return: cropped image data + """ + x1, y1, x2, y2 = bbox + cropped_img = img_data[y1:y2, x1:x2] + return cropped_img + + def parse_header(self): + """ + *camera_id, ward_id, frame_id 미구현 + """ + msg = Header( + camera_id= "CAMERA_001", + ward_id= "WARD_001", + timestamp= datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + frame_id= 0 + ) + return msg + + def parse_summary(self, activate_cnt, total_cnt): + """ + *system_health 미구현 + """ + msg = Summary( + total_objects_count= total_cnt, + active_alerts_count= activate_cnt, + system_health= SyestemHealth.ok + ) + return msg + + def parse_object(self): + """ + *bbox,skeleton 제외 전부 미구현 + """ + msg = [] + activate_cnt = 0 + total_cnt = 0 + + if self.msg: + + for i in self.msg: + + has_img = True if i.get('person') and self.img is not None else False + b64_img = self.image_encoding(self.image_cropping(self.img, i.get('person'))) if has_img else '' + if i['result']['pose_type'] > 0: + activate_cnt += 1 + total_cnt += 1 + + sub_msg = DetectedObject( + tracking_id= i['result']['object_id'], + status= Status.stable if i['result']['pose_type'] == 0 else Status.fall_detected, + status_detail= None if i['result']['pose_type'] == 0 else StatusDetail.sudden_fall, + severity= None if i['result']['pose_type'] == 0 else Severity.critical, + location_zone= 'ZONE_001', + duration= 0.0, + bbox= i.get('person'), + skeleton= [[round(x, 0) for x in kpt] + [round(conf,2)] for kpt, conf in zip(i.get('keypoints'), i.get('kpt_conf'))], + metrics=Metrics( + velocity=0.0, + angle=0.0 + ), + visual_data=ObjectVisualData( + format=self.IMAGE_FORMAT, + has_image= has_img, + base64_str= b64_img + ) + ) + msg.append(sub_msg) + return msg, activate_cnt, total_cnt + + def parse_virtual(self): + has_img = True if self.img is not None else False + b64_img = self.image_encoding(self.img) if has_img else '' + + msg= VisualData( + format=self.IMAGE_FORMAT, + has_image= has_img, + base64_str= b64_img + ) + + return msg + + def parse(self): + """ + * 위험note 여부는 수정 필요 + """ + object_msg, activate_cnt, total_cnt = self.parse_object() + + if activate_cnt == 0: + return None + + header_msg = self.parse_header() + summary_msg = self.parse_summary(activate_cnt, total_cnt) + visual_msg = self.parse_virtual() + + parsing_msg = FallDetectionSchema( + header= header_msg, + summary= summary_msg, + objects= object_msg, + visual_data= visual_msg + ) + + return parsing_msg.model_dump_json() + \ No newline at end of file diff --git a/predict.py b/predict.py new file mode 100644 index 0000000..07c1251 --- /dev/null +++ b/predict.py @@ -0,0 +1,147 @@ +import cv2 +import demo_const as AI_CONST +from pydantic.main import BaseModel + +# from DL.custom_utils import crop_image,image_encoding +from hpe_classification.hpe_classification import HPEClassification +from hpe_classification import config as hpe_config + + +class HistoryInfo(BaseModel): + """ + history 에 저장될 정보 + """ + class_name: str + class_id: int + object_id: int = None + bbox: list + bbox_conf: float + keypoint: list = None + kpt_conf: list = None + +class ObjectDetect: + """ + ObjectDetect + """ + MODEL_CONFIDENCE = AI_CONST.MODEL_CONFIDENCE + + def __init__(self) -> None: + self.image = '' + self.model = '' + self.class_info = '' + + def predict(self,crop_image = True, class_name=True): + od_predict = self.model.model.predict(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE) + message_list = [] + for object_list in od_predict: + for box in object_list.boxes: + _parsing_data = self.parsing(bbox=box) + _name = _parsing_data.class_name if class_name else False + message_list.append(self._od_mqtt_message(_parsing_data, image= self.image, crop=crop_image, name=_name)) + return message_list + + def set_image(self,image): + self.image = image + + def set_model(self,model): + self.model = model + self.class_info = self.model.get_class_info() + + def _od_mqtt_message(self,data,image,crop,name): + message ={} + + # if crop: + # crop_img = image_encoding(crop_image(xyxy=data.bbox,img=image)) + # else: + crop_img = None + + if name: + message = { + "class_id" : data.class_id, + "class_name" : name, + "confidence" : data.bbox_conf, + "bbox" : data.bbox, + "object_id": data.object_id, + "parent_object_id": None, + "image": crop_img + } + + else: + message = { + "class_id" : data.class_id, + "confidence" : data.bbox_conf, + "bbox" : data.bbox, + "object_id": data.object_id, + "parent_object_id": None, + "image": crop_img + } + + return message + + def parsing(self, bbox, kpt=None): + """ + 데이터 파싱 + kpt(pose) 정보가 없을시 keypoint 관련정보 None으로 처리 + object id가 없을시 None으로 처리 + + :param bbox: object detect 정보 + :param kpt: pose detect 정보 + :return: HistoryInfo + """ + + _cls_id = int(bbox.cls[0].item()) + _history_info = HistoryInfo( + class_id = _cls_id, + class_name = self.class_info[_cls_id], + bbox = list(map(round, bbox.xyxy[0].detach().cpu().tolist())), + bbox_conf = round(bbox.conf[0].item(), 2), + ) + if bbox.id is not None: + _history_info.object_id = int(bbox.id[0].item()) + + if kpt: + _history_info.keypoint = kpt.xy[0].detach().cpu().tolist() + _history_info.kpt_conf = kpt.conf[0].detach().cpu().tolist() + + return _history_info + +class PoseDetect(ObjectDetect): + """ + PoseDetect + """ + MODEL_CONFIDENCE = 0.5 + + FALLDOWN_TILT_RATIO = hpe_config.FALLDOWN_TILT_RATIO + FALLDOWN_TILT_ANGLE = hpe_config.FALLDOWN_TILT_ANGLE + CROSS_RATIO_THRESHOLD = hpe_config.CROSS_ARM_RATIO_THRESHOLD + CROSS_ANGLE_THRESHOLD = hpe_config.CROSS_ARM_ANGLE_THRESHOLD + + def predict(self,working,crop_image=True): + + # pose_predict = self.model.model.predict(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE) + #NOTE(jwkim): pose predict + pose_predict = self.model.model.track(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE, persist=True) + + message_list = [] + for object_list in pose_predict: + for box, pose in zip(object_list.boxes, object_list.keypoints): + _parsing_data = self.parsing(bbox=box, kpt=pose) + current_pose={} + current_pose["person"]= _parsing_data.bbox + current_pose["keypoints"]= _parsing_data.keypoint + current_pose["kpt_conf"] = _parsing_data.kpt_conf + + # HPEClassification + hpe_classification = HPEClassification(pose_info=current_pose) + + current_pose["result"]=self._od_mqtt_message(_parsing_data, self.image, crop=crop_image, name=_parsing_data.class_name) + current_pose["result"]['pose_type'] = hpe_classification.get_hpe_type(is_working_on=working) + current_pose["result"]['pose_level'] = hpe_classification.get_hpe_level(is_working_on=working) + + message_list.append(current_pose) + return message_list + + +if __name__ == "__main__": + pass + diff --git a/project_config.py b/project_config.py new file mode 100644 index 0000000..34d6941 --- /dev/null +++ b/project_config.py @@ -0,0 +1,17 @@ +# === config.yaml에서 설정값 로드 === +from config_loader import CFG + +PT_TYPE = CFG.get('pt_type', 'dev') + +# === use model === +USE_HPE_PERSON = CFG.get('use_hpe_person', True) +USE_HELMET_MODEL = CFG.get('use_helmet_model', True) + +# === label === +VIEW_CONF_SCORE = CFG.get('view_conf_score', False) +SHOW_GLOVES = CFG.get('show_gloves', True) +LABEL_ALL_WHITE = CFG.get('label_all_white', True) +USE_HPE_FRAME_CHECK = CFG.get('use_hpe_frame_check', False) + +# === debug === +ADD_CROSS_ARM = CFG.get('add_cross_arm', False) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d04d8aa --- /dev/null +++ b/requirements.txt @@ -0,0 +1,18 @@ +# Core AI Framework +ultralytics + +# Image Processing & GUI +opencv-python + +# Configuration & Data Validation +PyYAML +pydantic + +# System Utilities +pyautogui + +# Math & Array (usually installed with ultralytics, but good to have) +numpy + +#MQTT Client +paho-mqtt \ No newline at end of file diff --git a/schema.py b/schema.py new file mode 100644 index 0000000..5f93336 --- /dev/null +++ b/schema.py @@ -0,0 +1,86 @@ +from pydantic.main import BaseModel +from enum import Enum +from pydantic import BaseModel +from typing import List, Optional +from datetime import datetime + +#========================================= +class Header(BaseModel): + camera_id: str + ward_id: str + timestamp: datetime + frame_id: int +#========================================= + + +#========================================= +class SyestemHealth(str,Enum): + ok = "OK" + overload = "OVERLOAD" + critical = "CRITICAL" + + +class Summary(BaseModel): + active_alerts_count: int + total_objects_count: int + system_health: SyestemHealth +#========================================= + + +#========================================= +class Status(str,Enum): + stable = "STABLE" + fall_detected = "FALL_DETECTED" + + +class StatusDetail(str,Enum): + sudden_fall = "SUDDEN_FALL" + slip_down = "SLIP_DOWN" + + +class Severity(str,Enum): + low = "LOW" + medium = "MEDIUM" + critical = "CRITICAL" + + +class Metrics(BaseModel): + velocity: float + angle: float + + +class ObjectVisualData(BaseModel): + format: str + has_image: bool + base64_str: str + + +class DetectedObject(BaseModel): + tracking_id: int|None + status: Status + status_detail: StatusDetail|None + severity: Severity|None + location_zone: str + duration: float + bbox: List[int] # [x1, y1, x2, y2] + skeleton: Optional[List[List[float]]] + metrics: Metrics + visual_data: ObjectVisualData +#========================================= + + +#========================================= +class VisualData(BaseModel): + format: str + has_image: bool + base64_str: str +#========================================= + + +#========================================= +class FallDetectionSchema(BaseModel): + header: Header + summary: Summary + objects: List[DetectedObject] + visual_data: VisualData +#========================================= diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..78afc0c --- /dev/null +++ b/utils.py @@ -0,0 +1,89 @@ +import numpy as np +import time +import cv2 +import demo_const as AI_CONST +import copy + +from load_models import model_manager +from ultralytics.data.loaders import LoadStreams,LOGGER + +def class_info(): + od_model = model_manager.get_od() # pt파일에 따라 값이 달라짐 + + class_dict = copy.deepcopy(od_model.model.names) + class_dict[1000] = 'signalman' + return class_dict + + +CLASS_INFORMATION = {} +if not CLASS_INFORMATION: + CLASS_INFORMATION = class_info() + CLASS_SWAP_INFO = {v:k for k,v in CLASS_INFORMATION.items()} + +class LoadStreamsDaool(LoadStreams): + + def update(self, i, cap, stream): + """Read stream `i` frames in daemon thread.""" + n, f = 0, self.frames[i] # frame number, frame array + while self.running and cap.isOpened() and n < (f - 1): + if len(self.imgs[i]) < AI_CONST.LOADSTREAMS_IMG_BUFFER: # keep a <=30-image buffer + n += 1 + cap.grab() # .read() = .grab() followed by .retrieve() + if n % self.vid_stride == 0: + success, im = cap.retrieve() + if not success: + im = np.zeros(self.shape[i], dtype=np.uint8) + LOGGER.warning("WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.") + cap.open(stream) # re-open stream if signal was lost + if self.buffer: + self.imgs[i].append(im) + else: + self.imgs[i] = [im] + else: + time.sleep(0.01) # wait until the buffer is empty + +class CustomVideoCapture: + def __init__(self) -> None: + self.fps = 30 + # self.fourcc = cv2.VideoWriter_fourcc(*'X264') + self.fourcc = cv2.VideoWriter_fourcc(*'XVID') + self.size = None + self.video_writer = None + + def set_frame_size(self, image): + if isinstance(image, np.ndarray): + height,width,ch = image.shape + self.size = (int(width), int (height)) + + def set_video_writer(self, path): + if self.size == None: + raise + + self.video_writer = cv2.VideoWriter(path, self.fourcc, self.fps, self.size) + + def write_video(self,frame): + if self.video_writer: + self.video_writer.write(frame) + + def release_video(self): + if self.video_writer: + self.video_writer.release() + self.video_writer = None + + +def get_monitorsize(): + width = 0 + height = 0 + try: + import pyautogui #pip install pyautogui + width = pyautogui.size()[0] # getting the width of the screen + height = pyautogui.size()[1] # getting the height of the screen + + except Exception: + (width,height) = AI_CONST.FHD_RESOLUTION + + finally: + return (width,height) + +def img_resize(image,size): + return cv2.resize(image, dsize=size, interpolation=cv2.INTER_AREA) \ No newline at end of file diff --git a/weights/yolov11m_dev1_8.pt b/weights/yolov11m_dev1_8.pt new file mode 100644 index 0000000..a5c7b65 Binary files /dev/null and b/weights/yolov11m_dev1_8.pt differ diff --git a/weights/yolov8_dev1_66.pt b/weights/yolov8_dev1_66.pt new file mode 100644 index 0000000..521df2b Binary files /dev/null and b/weights/yolov8_dev1_66.pt differ diff --git a/weights/yolov8_dev1_89.pt b/weights/yolov8_dev1_89.pt new file mode 100644 index 0000000..c966cc5 Binary files /dev/null and b/weights/yolov8_dev1_89.pt differ diff --git a/weights/yolov8_dev1_97.pt b/weights/yolov8_dev1_97.pt new file mode 100644 index 0000000..49aaed9 Binary files /dev/null and b/weights/yolov8_dev1_97.pt differ diff --git a/weights/yolov8l-pose.pt b/weights/yolov8l-pose.pt new file mode 100644 index 0000000..66c73e7 Binary files /dev/null and b/weights/yolov8l-pose.pt differ