version: v0.0.1

This commit is contained in:
2026-02-25 15:05:58 +09:00
parent 1f8e1e3c04
commit 0b446eed54
26 changed files with 3129 additions and 0 deletions

4
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"python-envs.defaultEnvManager": "ms-python.python:conda",
"python-envs.defaultPackageManager": "ms-python.python:conda"
}

337
cctv.py Normal file
View File

@@ -0,0 +1,337 @@
# -*- coding: utf-8 -*-
"""
@file : cctv.py
@author: hsj100
@license: A2TEC
@brief: multi cctv
@section Modify History
- 2025-11-24 오후 1:38 hsj100 base
"""
import cv2
import numpy as np
import threading
import time
import os
from datetime import datetime
from demo_detect import DemoDetect
from utils import get_monitorsize
from config_loader import CFG
from parsing_msg import ParsingMsg
from mqtt import mqtt_publisher
# ==========================================================================================
# [Configuration Section]
# ==========================================================================================
_cctv_cfg = CFG.get('cctv', {})
# 1. Camera Switch Interval (Seconds)
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
# 2. Camera Sources List
SOURCES = _cctv_cfg.get('sources', [])
if not SOURCES:
# 기본값 (설정이 없을 경우)
SOURCES = [
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
]
# ==========================================================================================
class CameraStream:
"""
개별 카메라 스트림을 관리하는 클래스.
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
"""
def __init__(self, src, name="Camera"):
self.src = src
self.name = name
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.ret = False
self.latest_frame = None
self.running = False
self.is_active = False # [최적화] 현재 화면 표시 여부
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
self.frame_skip_count = 0
# 연결 확인
if not self.cap.isOpened():
print(f"[{self.name}] 접속 실패: {self.src}")
else:
print(f"[{self.name}] 접속 성공")
self.running = True
# 스레드 시작
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
self.thread.start()
def update(self):
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
while self.running:
# 1. 버퍼에서 압축된 데이터 가져오기 (grab은 항상 수행하여 버퍼가 쌓이지 않게 함)
if self.cap.grab():
# [최적화 로직] 활성 상태일 때만 디코딩(retrieve) 수행
# 비활성 상태일 때는 30프레임에 한 번만 디코딩하여 리소스 절약
self.frame_skip_count += 1
if self.is_active or (self.frame_skip_count % 30 == 0):
# 2. 실제 이미지로 디코딩 (무거움)
ret, frame = self.cap.retrieve()
if ret:
with self.lock:
self.ret = ret
self.latest_frame = frame
self.last_read_time = time.time() # 수신 시간 갱신
else:
# 디코딩을 건너뛰는 동안 CPU 부하를 아주 약간 줄이기 위해 미세한 대기
time.sleep(0.001)
else:
# 스트림이 끊기거나 끝난 경우 재접속 로직
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
self.cap.release()
time.sleep(2.0)
# 재접속 시도 (TCP 강제 설정 유지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if self.cap.isOpened():
print(f"[{self.name}] 재접속 성공")
else:
print(f"[{self.name}] 재접속 실패")
def get_frame(self):
"""현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)"""
with self.lock:
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
if time.time() - self.last_read_time > 3.0:
return False, None
return self.ret, self.latest_frame
def stop(self):
self.running = False
if self.thread.is_alive():
self.thread.join()
self.cap.release()
def main():
MQTT_TOPIC = "/hospital/ai1"
parser = ParsingMsg()
# Get Monitor Size inside main or top level if safe
MONITOR_RESOLUTION = get_monitorsize()
# 2. 객체탐지 모델 초기화
detector = DemoDetect()
print("모델 로딩 완료!")
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
cameras = []
for i, s in enumerate(SOURCES):
cam = CameraStream(s["src"], s["name"])
# 첫 번째 카메라만 활성 상태로 시작
if i == 0:
cam.is_active = True
cameras.append(cam)
DISPLAY_TURN_ON = CFG.get('display').get('turn_on') # display 출력 여부
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
last_switch_time = time.time() # 마지막 전환 시간
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
# FPS 계산을 위한 변수
prev_frame_time = time.time()
fps = 0
# FPS 통계 변수
fps_min = float('inf')
fps_max = 0
fps_sum = 0
fps_count = 0
warmup_frames = 30 # 초반 불안정한 프레임 제외
print("\n=== CCTV 시스템 시작 ===")
print(f"{switch_interval}초마다 자동으로 카메라가 전환됩니다.")
if DISPLAY_TURN_ON:
print("[Q] 또는 [ESC] : 종료\n")
else:
print("화면 출력이 꺼져 있습니다. 시스템은 백그라운드에서 계속 실행됩니다.")
print("종료하려면 터미널에서 Ctrl+C를 누르세요.\n")
if DISPLAY_TURN_ON:
# 전체화면 윈도우 설정 (프레임 없이)
window_name = "CCTV Control Center"
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while True:
# 1. 카메라 전환 처리 (자동)
current_time = time.time()
need_switch = False
if DISPLAY_TURN_ON:
# 키 입력 처리 (종료만 체크)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 종료
break
# 자동 전환 타이머 체크
if current_time - last_switch_time >= switch_interval:
need_switch = True
if need_switch:
# 기존 카메라 비활성화 (리소스 절약 모드 진입)
cameras[current_cam_index].is_active = False
# 다음 카메라 인덱스 계산
current_cam_index = (current_cam_index + 1) % len(cameras)
# 새 카메라 활성화 (풀 디코딩 시작)
cameras[current_cam_index].is_active = True
last_switch_time = current_time
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
# 2. 현재 선택된 카메라의 최신 프레임 가져오기
target_cam = cameras[current_cam_index]
ret, frame = target_cam.get_frame()
if ret and frame is not None:
# FPS 계산
current_frame_time = time.time()
time_diff = current_frame_time - prev_frame_time
fps = 1 / time_diff if time_diff > 0 else 0
prev_frame_time = current_frame_time
# FPS 통계 갱신 (워밍업 이후)
if fps > 0:
fps_count += 1
if fps_count > warmup_frames:
fps_sum += fps
real_count = fps_count - warmup_frames
fps_avg = fps_sum / real_count
fps_min = min(fps_min, fps)
fps_max = max(fps_max, fps)
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
# HPE
hpe_message = detector.inference_hpe(frame, False)
#NOTE(jwkim): od모델 사용 안함
# Helmet 탐지
# helmet_message = detector.inference_helmet(frame, False)
# 객체 탐지
# od_message_raw = detector.inference_od(frame, False, class_name_view=True)
# 필터링
# od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
od_message = []
parser.set(msg=hpe_message,img=frame)
parsing_msg = parser.parse()
#mqtt publish
if parsing_msg is not None:
mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1)
# 탐지 결과 라벨링 (원본 좌표 기준)
if DISPLAY_TURN_ON:
detector.hpe_labeling(frame, hpe_message)
detector.od_labeling(frame, od_message)
detector.border_labeling(frame, hpe_message, od_message)
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
frame = cv2.resize(frame, MONITOR_RESOLUTION)
# 화면에 카메라 이름과 시간 표시 (UI 효과)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 카메라 이름 (테두리 + 본문)
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# 시간 (테두리 + 본문)
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS (테두리 + 본문)
fps_text = f"FPS: {fps:.1f}"
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS 통계 (AVG, MIN, MAX)
stat_val_min = fps_min if fps_min != float('inf') else 0
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
cv2.imshow(window_name, frame)
elif DISPLAY_TURN_ON:
# 신호가 없을 때 보여줄 대기 화면
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
# "NO SIGNAL" 텍스트 설정
text = f"NO SIGNAL ({target_cam.name})"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1.5
thickness = 3
# 텍스트 중앙 정렬 계산
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
text_x = (blank_screen.shape[1] - text_size[0]) // 2
text_y = (blank_screen.shape[0] + text_size[1]) // 2
# 텍스트 그리기 (빨간색)
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
cv2.imshow(window_name, blank_screen)
# 4. 키 입력 처리 (종료만 처리)
if DISPLAY_TURN_ON:
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 종료
break
# 종료 처리
print("시스템 종료 중...")
for cam in cameras:
cam.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()

280
cctv_origin.py Normal file
View File

@@ -0,0 +1,280 @@
# -*- coding: utf-8 -*-
"""
@file : cctv.py
@author: hsj100
@license: A2TEC
@brief: multi cctv
@section Modify History
- 2025-11-24 오후 1:38 hsj100 base
"""
import cv2
import numpy as np
import threading
import time
import os
from datetime import datetime
from demo_detect import DemoDetect
from utils import get_monitorsize
from config_loader import CFG
# ==========================================================================================
# [Configuration Section]
# ==========================================================================================
_cctv_cfg = CFG.get('cctv', {})
# 1. Camera Switch Interval (Seconds)
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
# 2. Camera Sources List
SOURCES = _cctv_cfg.get('sources', [])
if not SOURCES:
# 기본값 (설정이 없을 경우)
SOURCES = [
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
]
# ==========================================================================================
class CameraStream:
"""
개별 카메라 스트림을 관리하는 클래스.
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
"""
def __init__(self, src, name="Camera"):
self.src = src
self.name = name
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.ret = False
self.latest_frame = None
self.running = False
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
# 연결 확인
if not self.cap.isOpened():
print(f"[{self.name}] 접속 실패: {self.src}")
else:
print(f"[{self.name}] 접속 성공")
self.running = True
# 스레드 시작
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
self.thread.start()
def update(self):
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
while self.running:
# 1. 버퍼에서 압축된 데이터 가져오기 (가볍고 빠름)
if self.cap.grab():
# 2. 실제 이미지로 디코딩 (무거움)
ret, frame = self.cap.retrieve()
if ret:
with self.lock:
self.ret = ret
self.latest_frame = frame
self.last_read_time = time.time() # 수신 시간 갱신
else:
# grab은 성공했으나 디코딩 실패 (드문 경우)
continue
else:
# 스트림이 끊기거나 끝난 경우 재접속 로직
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
self.cap.release()
time.sleep(2.0)
# 재접속 시도 (TCP 강제 설정 유지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if self.cap.isOpened():
print(f"[{self.name}] 재접속 성공")
else:
print(f"[{self.name}] 재접속 실패")
def get_frame(self):
"""현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)"""
with self.lock:
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
if time.time() - self.last_read_time > 3.0:
return False, None
return self.ret, self.latest_frame
def stop(self):
self.running = False
if self.thread.is_alive():
self.thread.join()
self.cap.release()
def main():
# Get Monitor Size inside main or top level if safe
MONITOR_RESOLUTION = get_monitorsize()
# 2. 객체탐지 모델 초기화
detector = DemoDetect()
print("모델 로딩 완료!")
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
cameras = []
for s in SOURCES:
cam = CameraStream(s["src"], s["name"])
cameras.append(cam)
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
last_switch_time = time.time() # 마지막 전환 시간
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
# FPS 계산을 위한 변수
prev_frame_time = time.time()
fps = 0
# FPS 통계 변수
fps_min = float('inf')
fps_max = 0
fps_sum = 0
fps_count = 0
warmup_frames = 30 # 초반 불안정한 프레임 제외
print("\n=== CCTV 관제 시스템 시작 ===")
print("5초마다 자동으로 카메라가 전환됩니다.")
print("[Q] 또는 [ESC]를 눌러 종료합니다.\n")
# 전체화면 윈도우 설정 (프레임 없이)
window_name = "CCTV Control Center"
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while True:
# 자동 전환 로직: 5초마다 다음 카메라로 전환
current_time = time.time()
if current_time - last_switch_time >= switch_interval:
current_cam_index = (current_cam_index + 1) % len(cameras)
last_switch_time = current_time
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
# 3. 현재 선택된 카메라의 최신 프레임 가져오기
target_cam = cameras[current_cam_index]
ret, frame = target_cam.get_frame()
if ret and frame is not None:
# FPS 계산
current_frame_time = time.time()
time_diff = current_frame_time - prev_frame_time
fps = 1 / time_diff if time_diff > 0 else 0
prev_frame_time = current_frame_time
# FPS 통계 갱신 (워밍업 이후)
if fps > 0:
fps_count += 1
if fps_count > warmup_frames:
fps_sum += fps
real_count = fps_count - warmup_frames
fps_avg = fps_sum / real_count
fps_min = min(fps_min, fps)
fps_max = max(fps_max, fps)
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
# HPE
hpe_message = detector.inference_hpe(frame, False)
# Helmet 탐지
helmet_message = detector.inference_helmet(frame, False)
# 객체 탐지
od_message_raw = detector.inference_od(frame, False, class_name_view=True)
# 필터링
od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
# 탐지 결과 라벨링 (원본 좌표 기준)
detector.hpe_labeling(frame, hpe_message)
detector.od_labeling(frame, od_message)
detector.border_labeling(frame, hpe_message, od_message)
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
frame = cv2.resize(frame, MONITOR_RESOLUTION)
# 화면에 카메라 이름과 시간 표시 (UI 효과)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 카메라 이름 (테두리 + 본문)
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# 시간 (테두리 + 본문)
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS (테두리 + 본문)
fps_text = f"FPS: {fps:.1f}"
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS 통계 (AVG, MIN, MAX)
stat_val_min = fps_min if fps_min != float('inf') else 0
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
cv2.imshow(window_name, frame)
else:
# 신호가 없을 때 보여줄 대기 화면
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
# "NO SIGNAL" 텍스트 설정
text = f"NO SIGNAL ({target_cam.name})"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1.5
thickness = 3
# 텍스트 중앙 정렬 계산
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
text_x = (blank_screen.shape[1] - text_size[0]) // 2
text_y = (blank_screen.shape[0] + text_size[1]) // 2
# 텍스트 그리기 (빨간색)
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
cv2.imshow(window_name, blank_screen)
# 4. 키 입력 처리 (종료만 처리)
key = cv2.waitKey(1) & 0xFF
# 종료
if key == ord('q') or key == 27:
break
# 종료 처리
print("시스템 종료 중...")
for cam in cameras:
cam.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()

63
color_table.py Normal file
View File

@@ -0,0 +1,63 @@
import project_config
TEXT_COLOR_WHITE = [255,255,255]
TEXT_COLOR_BLACK = [0,0,0]
class ColorInfo:
def __init__(self,label_color,font_color):
self.label_color = label_color
self.font_color = font_color
dev_color_table = {
"person" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [183, 183, 183],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"truck" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [4, 63, 120],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_helmet_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [193, 193, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_helmet_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 0, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_gloves_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [184,255,205],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_gloves_work_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [70,251,124],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_gloves_insulated_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [59,216,106],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_gloves_insulated_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [45,169,82],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_gloves_insulated_on_3" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [29,114,55],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_gloves_insulated_on_4" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [14, 54, 26],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_rubber_insulated_sleeve_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [233, 210, 217],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_rubber_insulated_sleeve_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [195, 124, 142],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_boots_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [204, 242, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_boots_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [50, 194, 241],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_belt_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [209,165,253],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_belt_basic_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [158,68,248],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_belt_basic_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [130,54,206],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_belt_xband_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [102,40,164],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_belt_swing_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [71, 27, 116],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_vest_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [219,205,143],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_vest_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [142, 129, 69],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_suit_top_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [116, 186, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_suit_top_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 128, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_suit_bottom_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 255, 208],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_suit_bottom_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 255, 0],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"equipment_smartstick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [116, 134, 217],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"equipment_con_switch" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [12, 32, 133],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"sign_board_information" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 255, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"sign_board_construction" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 209, 209],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"sign_board_traffic" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 114, 114],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"sign_traffic_cone" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 81, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"truck_lift_bucket" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [175, 57, 175],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"wheel_chock" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [86, 9, 86],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"signal_light_stick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [212,59,8],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255])
}
demo_color_table = {
"person" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [183, 183, 183],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_helmet_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [193, 193, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_helmet_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 0, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_gloves_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [184,255,205],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_gloves_insulated_on_1" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [59,216,106],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_gloves_insulated_on_2" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [45,169,82],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"safety_boots_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [204, 242, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_boots_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [50, 194, 241],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_belt_off" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [209,165,253],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"safety_belt_swing_on" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [71, 27, 116],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255]),
"sign_board_information" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [0, 255, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"sign_traffic_cone" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [255, 81, 255],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [0, 0, 0]),
"signal_light_stick" : ColorInfo(label_color=TEXT_COLOR_WHITE if project_config.LABEL_ALL_WHITE else [212,59,8],font_color=TEXT_COLOR_BLACK if project_config.LABEL_ALL_WHITE else [255, 255, 255])
}

87
config.yaml Normal file
View File

@@ -0,0 +1,87 @@
# ===================================================
# KEPCO DEMO 설정 파일
# 이 파일을 수정하여 프로그램 동작을 변경할 수 있습니다.
# ===================================================
# --- 모델 설정 ---
pt_type: "dev" # "dev" 또는 "demo"
use_hpe_person: true # Human Pose Estimation 사용 여부
use_helmet_model: true # 헬멧 전용 모델 사용 여부
# --- 라벨 설정 ---
view_conf_score: false # confidence score 표시 여부
show_gloves: true # glove on/off 라벨링 여부
label_all_white: true # bounding box 색상 전부 흰색 여부
use_hpe_frame_check: false # n개 프레임 체크 후 HPE 위험 판단
# --- 디버그 ---
add_cross_arm: false # cross arm 디버그 모드
# --- 소스 설정 ---
# RTSP 카메라 프리셋 (source에서 프리셋 이름으로 사용 가능)
cameras:
MG: "rtsp://admin:admin1263!@10.20.10.99:28554/onvif/media?profile=Profile1"
LF: "rtsp://admin:!1q2w3e4r@10.20.10.100:50554/profile1/media.smp"
FC: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.101/axis-media/media.amp"
MG_74_LTE: "rtsp://admin:admin1263!@223.171.48.74:28554/trackID=1"
FERMAT: "rtsp://192.168.200.233:50399/wd"
source: "FC" # 카메라 프리셋 이름 또는 파일/RTSP 경로
save_path: "./250630_result"
# --- 모델 가중치 파일 ---
weights:
pose: "yolov8l-pose.pt"
helmet: "yolov8_dev1_97.pt"
# pt_type별 OD 모델
od_dev: "yolov11m_dev1_8.pt"
od_demo: "yolov8_dev1_66.pt"
# --- 모델 파라미터 ---
model_confidence: 0.5
model_image_size: 640
kpt_min_confidence: 0.5
# --- HPE 임계값 ---
hpe:
falldown_tilt_ratio: 0.80
falldown_tilt_angle: 15
body_tilt_ratio: 0.30
cross_arm_ratio_threshold: 0.10
cross_arm_angle_threshold: [15.0, 165.0]
arm_angle_threshold_cross: [80.0, 135.0] # add_cross_arm: true 일 때
arm_angle_threshold_default: [150.0, 195.0] # add_cross_arm: false 일 때
elbow_angle_threshold: [150.0, 185.0]
hpe_frame_check_max_count: 3
# --- 디스플레이 설정 ---
display:
turn_on: false # 화면 표시 여부
border_thickness: 40
border_thickness_half: 20
normal_thickness: 2
warning_thickness: 4
hpe_thickness_ratio: 1
text_size: 1
text_thickness: 0
ppe_union_min_percent: 0.9
loadstreams_img_buffer: 5
fhd_resolution: [1920, 1080]
# --- CCTV 관제 설정 ---
cctv:
switch_interval: 5.0
sources:
# - {src: 0, name: "Realtek_Webcam_0"}
# - {src: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.101/axis-media/media.amp", name: "ipcam_1"}
# - {src: "rtsp://daool:Ekdnfeldpsdptm1@192.168.200.102/axis-media/media.amp", name: "ipcam_2"}
- {src: "rtsp://192.168.200.231:50199/wd", name: "dev1_stream"}
- {src: "rtsp://192.168.200.232:50299/wd", name: "dev2_stream"}
# - {src: "rtsp://192.168.200.236:8554/videodevice", name: "boss_webcam"}
# - {src: "rtsp://192.168.200.236:8554/1.mp4", name: "boss_stream1"}
# - {src: "rtsp://192.168.200.214/1.mp4", name: "sgm_stream1"}
# - {src: "rtsp://192.168.200.214/2.mp4", name: "sgm_stream2"}
# - {src: "rtsp://192.168.200.111/1.mp4", name: "hp_stream1"}
# - {src: "rtsp://192.168.200.111/2.mp4", name: "hp_stream2"}
# - {src: "rtsp://192.168.200.215/1.mp4", name: "msk_stream1"}
# - {src: "rtsp://192.168.200.215/2.mp4", name: "msk_stream2"}

35
config_loader.py Normal file
View File

@@ -0,0 +1,35 @@
import os
import sys
import yaml
def _get_base_dir():
"""exe 실행 시 exe 옆, 개발 시 스크립트 옆에서 config.yaml을 찾는다."""
if getattr(sys, 'frozen', False):
# PyInstaller exe
return os.path.dirname(sys.executable)
else:
return os.path.dirname(os.path.abspath(__file__))
def load_config():
base_dir = _get_base_dir()
config_path = os.path.join(base_dir, 'config.yaml')
if not os.path.exists(config_path):
raise FileNotFoundError(f"config.yaml을 찾을 수 없습니다: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def get_weights_dir():
"""weights 폴더 경로. exe 내부 번들 또는 exe 옆 폴더."""
if getattr(sys, 'frozen', False):
# PyInstaller 번들 내부 (--add-data로 포함된 경우)
return os.path.join(sys._MEIPASS, 'weights')
else:
return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'weights')
CFG = load_config()

127
demo_const.py Normal file
View File

@@ -0,0 +1,127 @@
import project_config
from config_loader import CFG, get_weights_dir
from color_table import dev_color_table, demo_color_table
# === 카메라 프리셋 ===
_cameras = CFG.get('cameras', {})
MG = _cameras.get('MG', '')
LF = _cameras.get('LF', '')
FC = _cameras.get('FC', '')
MG_74_LTE = _cameras.get('MG_74_LTE', '')
FERMAT = _cameras.get('FERMAT', '')
# === 소스 ===
_source_raw = CFG.get('source', '')
SOURCE = _cameras.get(_source_raw, _source_raw) # 프리셋 이름이면 URL로, 아니면 그대로
SAVE_PATH = CFG.get('save_path', './250630_result')
# === 모델 가중치 경로 ===
WEIGHTS_PATH = get_weights_dir()
_weights = CFG.get('weights', {})
WEIGHTS_POSE = WEIGHTS_PATH + "/" + _weights.get('pose', 'yolov8l-pose.pt')
WEIGHTS_YOLO_HELMET = WEIGHTS_PATH + "/" + _weights.get('helmet', 'yolov8_dev1_97.pt')
# PT
HELMET_KPT = [0, 1, 2]
RUBBER_INSULATED_SLEEVE_KPT = [7, 8]
SUIT_TOP_KPT = [6, 5]
SUIT_BOTTOM_KPT = [13, 14]
HELMET_CLASS_NAME = ['safety_helmet_off', 'safety_helmet_on']
# PT dev
if project_config.PT_TYPE == 'dev':
color_table = dev_color_table
WEIGHTS_YOLO = WEIGHTS_PATH + "/" + _weights.get('od_dev', 'yolov11m_dev1_8.pt')
UNVISIBLE_CLS = [4, 5, 6, 7, 8, 9]
OFF_CLASS_LIST = [2, 4, 12, 14]
OFF_TRIGGER_CLASS_LIST = [2, 4] # (helmet, gloves)
PPE_CLASS_LIST = list(range(2, 19))
HELMET_CID = [2, 3] # OFF, ON
RUBBER_INSULATED_SLEEVE_CID = [10, 11] # OFF, ON
SUIT_TOP_CID = [] # OFF, ON
SUIT_BOTTOM_CID = [] # OFF, ON
HELMET_ON_CID = 3
GLOVES_WORK_ON_CID = 6
BOOTS_ON_CID = 13
TRAFFIC_CONE_CID = 21
LIGHT_STICK_CLS_ID = 23
# PT DEMO
elif project_config.PT_TYPE == 'demo':
color_table = demo_color_table
WEIGHTS_YOLO = WEIGHTS_PATH + "/" + _weights.get('od_demo', 'yolov8_dev1_66.pt')
UNVISIBLE_CLS = [3, 4, 5]
OFF_CLASS_LIST = [1, 3, 6, 8]
OFF_TRIGGER_CLASS_LIST = [1, 3] # (helmet, gloves)
PPE_CLASS_LIST = list(range(1, 9))
HELMET_CID = [1, 2] # OFF, ON
RUBBER_INSULATED_SLEEVE_CID = []
SUIT_TOP_CID = [] # OFF, ON
SUIT_BOTTOM_CID = [6, 7] # OFF, ON
HELMET_ON_CID = 2
GLOVES_WORK_ON_CID = 4
BOOTS_ON_CID = 7
TRAFFIC_CONE_CID = 11
# === 모델 파라미터 ===
_display = CFG.get('display', {})
HPE_FRAME_CHECK_MAX_COUNT = CFG.get('hpe', {}).get('hpe_frame_check_max_count', 3)
PPE_UNION_MIN_PERCENT = _display.get('ppe_union_min_percent', 0.9)
MODEL_CONFIDENCE = CFG.get('model_confidence', 0.5)
MODEL_IMAGE_SIZE = CFG.get('model_image_size', 640)
WD_BORDER_COLOR = [0, 255, 255]
WD_THICKNESS_ON = False
BORDER_OD_TEXT = "Danger(OD)"
BORDER_HPE_TEXT = "Danger(HPE)"
KPT_MIN_CONFIDENCE = CFG.get('kpt_min_confidence', 0.5)
# loadstreams
LOADSTREAMS_IMG_BUFFER = _display.get('loadstreams_img_buffer', 5)
_fhd = _display.get('fhd_resolution', [1920, 1080])
FHD_RESOLUTION = tuple(_fhd)
BORDER_THICKNESS = _display.get('border_thickness', 40)
BORDER_THICKNESS_HALF = _display.get('border_thickness_half', 20)
NORMAL_THICKNESS = _display.get('normal_thickness', 2)
WARNING_THICKNESS = _display.get('warning_thickness', 4)
HPE_THICKNESS_RAITO = _display.get('hpe_thickness_ratio', 1)
TEXT_SIZE = _display.get('text_size', 1)
TEXT_THICKNESS = _display.get('text_thickness', 0)
TEXT_OD_STARTING_POINT = (20, 30)
TEXT_HPE_STARTING_POINT = (20, 30)
# POSE HUMAN LABEL COLOR (BGR)
POSE_NORMAL_COLOR = [0, 0, 0]
POSE_CROSS_COLOR = [0, 0, 255]
POSE_FALL_COLOR = [0, 0, 255]
POSE_ANGLE_ARM_COLOR = [0, 51, 102]
TEXT_COLOR_WHITE = [255, 255, 255]
TEXT_COLOR_BLACK = [0, 0, 0]
text_color_white_list = [
"truck",
"safety_gloves_insulated_on_1",
"safety_gloves_insulated_on_2",
"safety_rubber_insulated_sleeve_on",
"safety_belt_swing_on",
"safety_vest_on",
"safety_suit_top_on",
"sign_board_traffic"
]

797
demo_detect.py Normal file
View File

@@ -0,0 +1,797 @@
import os
import cv2
import random
import numpy as np
import imghdr
import project_config
import demo_const as AI_CONST
from predict import ObjectDetect, PoseDetect
from load_models import model_manager
from ultralytics.data.loaders import LOGGER
from ultralytics.data.loaders import LoadImagesAndVideos
from utils import LoadStreamsDaool,CustomVideoCapture,CLASS_INFORMATION,CLASS_SWAP_INFO,get_monitorsize,img_resize
LOGGER.setLevel("ERROR")
MONITOR_RESOLUTION = get_monitorsize()
class DemoDetect:
POSETYPE_NORMAL = int(0x0000)
POSETYPE_FALL = int(0x0080)
POSETYPE_CROSS = int(0x0100)
"""
시연용
"""
def __init__(self):
self.image_data = None
self.crop_img = False
self.model = model_manager.get_od()
self.object_detect = ObjectDetect()
self.object_detect.set_model(self.model)
# helmet
if project_config.USE_HELMET_MODEL:
self.helmet_model = model_manager.get_helmet()
self.helmet_detect = ObjectDetect()
self.helmet_detect.set_model(self.helmet_model)
# HPE
self.pose_model = model_manager.get_hpe()
self.pose_predict = PoseDetect()
self.pose_predict.set_model(self.pose_model)
self.hpe_frame_count = 0
self.color_table = AI_CONST.color_table
self.source =AI_CONST.SOURCE
self.save = False
self.ext = None
self.video_capture = None
def set(self, args):
self.source = args.source
self.save = args.save
self.clahe = args.clahe
def run(self):
if os.path.exists(self.source):
dataset = LoadImagesAndVideos(path=self.source)
else:
if self.save:
raise Exception("스트림영상은 저장 불가")
dataset = LoadStreamsDaool(sources=self.source)
if self.save:
if imghdr.what(self.source):
#img
self.ext = ".jpg"
else:
#video
self.ext = ".mp4"
os.makedirs(AI_CONST.SAVE_PATH, exist_ok=True)
for _, image, *_ in dataset:
image = image[0]
#image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) ##영상이 좌로 90도 회전되어 들어오는 경우가 있어 임시로 추가함
image = self.image_calibration(image)
if self.ext == ".mp4" and not self.video_capture:
self.video_capture = CustomVideoCapture()
self.video_capture.set_frame_size(image=image)
# _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}")
_video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{'.avi'}")
if self.clahe:
_video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect_clahe{'.avi'}")
self.video_capture.set_video_writer(path=_video_path)
print(_video_path)
# hpe person detect
hpe_message = self.inference_hpe(image, self.crop_img) if project_config.USE_HPE_PERSON else []
if project_config.USE_HPE_FRAME_CHECK:
hpe_message = self.hpe_frame_check(hpe_message)
# helmet detect
_helmet_message = self.inference_helmet(image, self.crop_img) if project_config.USE_HELMET_MODEL else []
# object detect
od_message_raw = self.inference_od(image, self.crop_img,class_name_view=True)
# od_message_raw = []
# od filtering
od_message = self.od_filtering(image,od_message_raw,_helmet_message,hpe_message,add_siginal_man=False)
self.hpe_labeling(image,hpe_message)
# od_message = [] # NOTE(jwkim) od 라벨링 막음 다시 그리고싶다면 주석
self.od_labeling(image,od_message)
self.border_labeling(image,hpe_message,od_message)
if self.save:
if imghdr.what(self.source):
#img
_video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}")
cv2.imwrite(_video_path,image)
else:
#video
self.video_capture.write_video(image)
else:
cv2.namedWindow("image", cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty("image", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
if MONITOR_RESOLUTION != AI_CONST.FHD_RESOLUTION:
image = img_resize(image,MONITOR_RESOLUTION)
cv2.imshow("image", image)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def inference_hpe(self, image, crop_image):
"""
inference 완료 후 off class 필터링
:param frame_count:현재 프레임 수
:param image: 이미지 데이터
:param crop_image: crop_image 유무
:return: 탐지 결과
"""
# 초기화
message = []
self.pose_predict.set_image(image)
message = self.pose_predict.predict(working=True,crop_image=crop_image)
return message
def inference_helmet(self, image, crop_image):
"""
inference 완료후 탐지된 리스트 반환
:param image: 이미지 데이터
:param crop_image: crop_image 유무
:return: 탐지 결과
"""
# 초기화
message = []
self.helmet_detect.set_image(image)
raw_message = self.helmet_detect.predict(crop_image=crop_image, class_name=True)
for i in raw_message:
#ON
if i["class_name"] == 'head' or i["class_name"] == 'safety_helmet_off':
i["class_id"] = CLASS_SWAP_INFO['safety_helmet_off']
i["class_name"] = CLASS_INFORMATION[i["class_id"]]
message.append(i)
#OFF
elif i["class_name"] == 'helmet' or i["class_name"] == 'safety_helmet_on':
i["class_id"] = CLASS_SWAP_INFO['safety_helmet_on']
i["class_name"] = CLASS_INFORMATION[i["class_id"]]
message.append(i)
else:
continue
return message
def inference_od(self, image, crop_image, class_name_view=False):
"""
inference 완료후 탐지된 리스트 반환
:param image: 이미지 데이터
:param crop_image: crop_image 유무
:return: 탐지 결과
"""
# 초기화
message = []
self.object_detect.set_image(image)
message = self.object_detect.predict(crop_image=crop_image, class_name=class_name_view)
return message
def od_filtering(self,image,od_raw_message,helmet_message,hpe_message,add_siginal_man=True):
"""
여러 모델 inference 결과를 조합하여 od 결과 추출
:param image: 원본 이미지
:param od_raw_message: od inference 결과
:param helmet_message: helmet inference 결과
:param hpe_message: hpe inference 결과
:param add_siginal_man: 신호수 추가 유무
:return: 필터링된 od inference 결과
"""
ppe_filtered_message = []
#od helmet 제거 , helmet 추가
helmet_changed_message = self.update_helmet(helmet_message, od_raw_message)
#od person 제거, hpe_person 추가
person_changed_message = self.update_person(hpe_message, helmet_changed_message)
# 필터링 작업 (현재 box 겹침만 사용)
ppe_filtered_message = self.od_ppe_class_filter(person_changed_message,hpe_message)
# signalman 추가
if add_siginal_man:
signal_man = self.signal_man_message(image, ppe_filtered_message)
if signal_man:
ppe_filtered_message.append(signal_man)
return ppe_filtered_message
def update_helmet(self, helmet_message, od_message):
"""
helmet message 파싱 후 , od message 에서 helmet 제거
그후 합친 결과 return
:param helmet_message: helmet detect result
:param od_message: od detect result
:return: result
"""
_result = []
if not helmet_message:
return od_message
elif project_config.USE_HELMET_MODEL is False:
return od_message
else:
#parsing helmet msg
_result = _result + helmet_message
#remove od helmet
for _od in od_message:
if _od['class_id'] not in AI_CONST.HELMET_CID:
_result.append(_od)
return _result
def update_person(self, hpe_message, od_message):
"""
hpe message 파싱 후 , od message 에서 person 제거
그후 합친 결과 return
:param hpe_message: _description_
:param od_message: _description_
:return: _description_
"""
_result = []
if not hpe_message:
return od_message
elif project_config.USE_HPE_PERSON is False:
return od_message
else:
#parsing hpe msg
for _hpe in hpe_message:
_person_info = {k: v for k, v in _hpe['result'].items() if k not in ('pose_type', 'pose_level')}
_result.append(_person_info)
#remove od person
for _od in od_message:
if _od['class_id'] != 0 :
_result.append(_od)
return _result
def od_ppe_class_filter(self, od_message, kpt_message):
"""
PPE_CLASS_LIST 로 정의된 클래스가 사용할수 있는지 없는지 판단하여 결과물 return
- PPE_CLASS_LIST 중 helmet, rubber_insulated_sleeve, suit_top, suit_bottom class 경우 bbox가 keypoint에 포함되면 _result에 추가,
포함되지 않는다면 person class와 겹치는지 확인후 겹치면 _result 에 추가
- PPE_CLASS_LIST 나머지 class 는 person class와 겹치면 _result 에 추가
- PPE_CLASS_LIST 가 아닐경우 _result에 추가
:param od_message: od list
:param kpt_message: keypoint list
:return: list
"""
_result = []
_person = []
_ppe_cls = []
# split classes
for i in od_message:
if i['class_id'] == 0:
_person.append(i)
_result.append(i)
elif i['class_id'] in AI_CONST.PPE_CLASS_LIST:
_ppe_cls.append(i)
else:
_result.append(i)
# filtering ppe classes
for ppe in _ppe_cls:
for kpt in kpt_message:
if AI_CONST.HELMET_CID and ppe['class_id'] in AI_CONST.HELMET_CID:
#HELMET
kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.HELMET_KPT,type=1)
if kpt_include:
_result.append(ppe)
break
else:
intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox'])
if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT:
_result.append(ppe)
break
elif AI_CONST.RUBBER_INSULATED_SLEEVE_CID and ppe['class_id'] in AI_CONST.RUBBER_INSULATED_SLEEVE_CID:
#RUBBER_INSULATED_SLEEVE
kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.RUBBER_INSULATED_SLEEVE_KPT,type=0)
if kpt_include:
_result.append(ppe)
break
else:
intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox'])
if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT:
_result.append(ppe)
break
elif AI_CONST.SUIT_TOP_CID and ppe['class_id'] in AI_CONST.SUIT_TOP_CID:
#SUIT_TOP
kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_TOP_KPT,type=1)
if kpt_include:
_result.append(ppe)
break
else:
intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox'])
if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT:
_result.append(ppe)
break
elif AI_CONST.SUIT_BOTTOM_CID and ppe['class_id'] in AI_CONST.SUIT_BOTTOM_CID:
#SUIT_BOTTOM
kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_BOTTOM_KPT,type=1)
if kpt_include:
_result.append(ppe)
break
else:
intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox'])
if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT:
_result.append(ppe)
break
else:
intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox'])
if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT:
_result.append(ppe)
break
return _result
def check_union_area(self, person, object):
"""
두개의 bbox 가 겹치는지 확인후
겹친다면 두개의 bbox 겹치는 영역 return
아닐경우 false return
:param person: bbox 좌표 x1,y1,x2,y2
:param object: bbox 좌표 x1,y1,x2,y2
"""
person_left, person_top, person_right, person_bot = (
int(person[0]),
int(person[1]),
int(person[2]),
int(person[3]),
)
obj_left, obj_top, obj_right, obj_bot = (
int(object[0]),
int(object[1]),
int(object[2]),
int(object[3]),
)
## case1 오른쪽으로 벗어나 있는 경우
if person_right < obj_left:
return 0
## case2 왼쪽으로 벗어나 있는 경우
if person_left > obj_right:
return 0
## case3 위쪽으로 벗어나 있는 경우
if person_bot < obj_top:
return 0
## case4 아래쪽으로 벗어나 있는 경우
if person_top > obj_bot:
return 0
# 교집합 영역 찾기
modified_left, modified_top, modified_right, modified_bot = obj_left, obj_top, obj_right, obj_bot
# x좌표 기준으로 이동
if modified_left < person_left : # object 왼쪽 겹침
modified_left = person_left
elif modified_right > person_right : #object 오른쪽 겹침
modified_right = person_right
# y좌표 기준으로 이동
if modified_top < person_top : # object 위쪽 겹침
modified_top = person_top
elif modified_bot > person_bot : #object 아래쪽 겹침
modified_bot = person_bot
width = modified_right - modified_left
height = modified_bot - modified_top
if width * height > 0:
return (width * height)/((obj_right-obj_left)*(obj_bot-obj_top))
else:
return 0
def plot_one_box(self, x, img, color=None, text_color=AI_CONST.TEXT_COLOR_WHITE, label=None, line_thickness=3):
# Plots one bounding box on image img
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
if label:
tf = max(tl - 1, 1) # font thickness
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled
cv2.putText(
img,
label,
(c1[0], c1[1] - 2),
0,
tl / 3,
text_color,
thickness=tf,
lineType=cv2.LINE_AA,
)
def plot_skeleton_kpts(self, im, kpts, steps, orig_shape=None):
# Plot the skeleton and keypointsfor coco datatset
palette = np.array(
[
[255, 128, 0],
[255, 153, 51],
[255, 178, 102],
[230, 230, 0],
[255, 153, 255],
[153, 204, 255],
[255, 102, 255],
[255, 51, 255],
[102, 178, 255],
[51, 153, 255],
[255, 153, 153],
[255, 102, 102],
[255, 51, 51],
[153, 255, 153],
[102, 255, 102],
[51, 255, 51],
[0, 255, 0],
[0, 0, 255],
[255, 0, 0],
[255, 255, 255],
]
)
skeleton = [
[16, 14],
[14, 12],
[17, 15],
[15, 13],
[12, 13],
[6, 12],
[7, 13],
[6, 7],
[6, 8],
[7, 9],
[8, 10],
[9, 11],
[2, 3],
[1, 2],
[1, 3],
[2, 4],
[3, 5],
[4, 6],
[5, 7],
]
pose_limb_color = palette[
[9, 9, 9, 9, 7, 7, 7, 0, 0, 0, 0, 0, 16, 16, 16, 16, 16, 16, 16]
]
pose_kpt_color = palette[[16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 9, 9]]
radius = 4
num_kpts = len(kpts) // steps
for kid in range(num_kpts):
r, g, b = pose_kpt_color[kid]
x_coord, y_coord = kpts[steps * kid], kpts[steps * kid + 1]
if not (x_coord % 640 == 0 or y_coord % 640 == 0):
if steps == 3:
conf = kpts[steps * kid + 2]
if conf < AI_CONST.KPT_MIN_CONFIDENCE:
continue
cv2.circle(
im,
(int(x_coord), int(y_coord)),
radius,
(int(r), int(g), int(b)),
-1 * AI_CONST.HPE_THICKNESS_RAITO,
)
for sk_id, sk in enumerate(skeleton):
r, g, b = pose_limb_color[sk_id]
pos1 = (int(kpts[(sk[0] - 1) * steps]), int(kpts[(sk[0] - 1) * steps + 1]))
pos2 = (int(kpts[(sk[1] - 1) * steps]), int(kpts[(sk[1] - 1) * steps + 1]))
if steps == 3:
conf1 = kpts[(sk[0] - 1) * steps + 2]
conf2 = kpts[(sk[1] - 1) * steps + 2]
if conf1 < AI_CONST.KPT_MIN_CONFIDENCE or conf2 < AI_CONST.KPT_MIN_CONFIDENCE:
continue
if pos1[0] % 640 == 0 or pos1[1] % 640 == 0 or pos1[0] < 0 or pos1[1] < 0:
continue
if pos2[0] % 640 == 0 or pos2[1] % 640 == 0 or pos2[0] < 0 or pos2[1] < 0:
continue
cv2.line(im, pos1, pos2, (int(r), int(g), int(b)), thickness=2 * AI_CONST.HPE_THICKNESS_RAITO)
def hpe_labeling(self,image,hpe_data):
for hpe in hpe_data:
_kpt=[]
for kpt, conf in zip(hpe["keypoints"], hpe["kpt_conf"]):
if kpt == None:
_kpt.append(0)
_kpt.append(0)
else:
_kpt.append(kpt[0])
_kpt.append(kpt[1])
_kpt.append(conf)
label_kpt = np.array(_kpt)
self.plot_skeleton_kpts(im=image, kpts=np.array(label_kpt), steps=3)
def od_labeling(self,image,od_data):
for od in od_data:
if not project_config.SHOW_GLOVES:
if od['class_id'] in AI_CONST.UNVISIBLE_CLS:
continue
_label_color = []
_text_color = []
if od["class_name"] not in list(self.color_table.keys()):
_label_color = AI_CONST.TEXT_COLOR_WHITE
_text_color = AI_CONST.TEXT_COLOR_BLACK
elif od["class_name"] in AI_CONST.text_color_white_list:
_label_color = AI_CONST.TEXT_COLOR_WHITE
_text_color = AI_CONST.TEXT_COLOR_BLACK
else:
_label_color = self.color_table[od["class_name"]].label_color
_text_color = self.color_table[od["class_name"]].font_color
self.plot_one_box(
od["bbox"],
image,
label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}",
color=_label_color,
text_color=_text_color,
line_thickness=AI_CONST.NORMAL_THICKNESS,
)
# # NOTE(gyong min) person만 bbox 표시
# if od["class_name"] == 'person':
# self.plot_one_box(
# od["bbox"],
# image,
# label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}",
# color=_label_color,
# text_color=_text_color,
# line_thickness=AI_CONST.NORMAL_THICKNESS,
# )
def border_labeling(self,image,hpe_data,od_data):
_current_pose_type = 0
_border_color = []
_off_helmet = False
_off_glove = False
#hpe warning
for hpe in hpe_data:
_current_pose_type = max(_current_pose_type, int(hpe['result']['pose_type']))
if _current_pose_type >= self.POSETYPE_CROSS:
#cross
_border_color = AI_CONST.POSE_CROSS_COLOR
elif _current_pose_type < self.POSETYPE_CROSS and _current_pose_type >= self.POSETYPE_FALL:
#fall
_border_color = AI_CONST.POSE_FALL_COLOR
#NOTE(SGM)빨간 경고테두리 그리기
if _current_pose_type>0:
cv2.rectangle(
image,
pt1=(
AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF,
AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF,
),
pt2=(
image.shape[1]
- AI_CONST.BORDER_THICKNESS
- AI_CONST.BORDER_THICKNESS_HALF,
image.shape[0]
- AI_CONST.BORDER_THICKNESS
- AI_CONST.BORDER_THICKNESS_HALF,
),
color=_border_color,
thickness=AI_CONST.BORDER_THICKNESS,
)
# cv2.putText(
# image,
# AI_CONST.BORDER_HPE_TEXT,
# AI_CONST.TEXT_HPE_STARTING_POINT,
# AI_CONST.TEXT_THICKNESS,
# AI_CONST.TEXT_SIZE,
# [0, 0, 0],
# thickness=1,
# lineType=cv2.LINE_AA,
# )
# od warning
for od in od_data:
if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[0]:
_off_helmet = True
if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[1]:
_off_glove = True
if _off_glove and _off_helmet :
cv2.rectangle(
image,
pt1=(AI_CONST.BORDER_THICKNESS_HALF, AI_CONST.BORDER_THICKNESS_HALF),
pt2=(
image.shape[1] - AI_CONST.BORDER_THICKNESS_HALF,
image.shape[0] - AI_CONST.BORDER_THICKNESS_HALF,
),
color=AI_CONST.WD_BORDER_COLOR,
thickness=AI_CONST.BORDER_THICKNESS)
cv2.putText(
image,
AI_CONST.BORDER_OD_TEXT,
AI_CONST.TEXT_OD_STARTING_POINT,
AI_CONST.TEXT_THICKNESS,
AI_CONST.TEXT_SIZE,
[0, 0, 0],
thickness=1,
lineType=cv2.LINE_AA,
)
def check_keypoint_include(self, ppe_bbox, kpt_list, kpt_index_list, type):
"""
bbox에 keypoint가 포함되어 있는지 확인
:param ppe_bbox: ppe xyxy 좌표
:param kpt_list: keypoint 정보
:param kpt_index_list: keypoint 인덱스 정보
:param type: 0: null값을 제외한 keypoint가 최소 하나라도 있으면 인정, 1: null값을 제외한 keypoint가 전부 있어야 인정
:return: boolean
"""
include = True
left, right = ppe_bbox[0], ppe_bbox[2]
top, bottom = ppe_bbox[1], ppe_bbox[3]
if type == 0:
# 최소 하나
include = False
for kpt_idx in kpt_index_list:
kpt = kpt_list[kpt_idx]
if kpt != None:
if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom):
include = True
break
else:
# 전부
_null_count = 0
for kpt_idx in kpt_index_list:
kpt = kpt_list[kpt_idx]
if kpt != None:
if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom):
pass
else:
include = False
break
else:
_null_count += 1
if _null_count == len(kpt_index_list):
include = False
return include
def hpe_frame_check(self, hpe_message, threshold=int(0x0080), maxcount=AI_CONST.HPE_FRAME_CHECK_MAX_COUNT):
_alert = False
if hpe_message:
for i in hpe_message:
if i['result']['pose_type']>= threshold:
self.hpe_frame_count += 1
_alert = True
break
if _alert:
if self.hpe_frame_count == maxcount:
self.hpe_frame_count = 0
return hpe_message
else:
self.hpe_frame_count = 0
for j in hpe_message:
j['result']['pose_type'] = 0
j['result']['pose_level'] = 0
else:
self.hpe_frame_count = 0
return hpe_message
def image_calibration(self, image):
"""
이미지 역광 보정
"""
if self.clahe:
# 이미지를 LAB 컬러 공간으로 변환 (L 채널에 CLAHE 적용)
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
# CLAHE 객체 생성 및 적용
# clipLimit: 대비 제한 임계값, tileGridSize: 타일의 크기
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
cl = clahe.apply(l)
# 보정된 L 채널과 기존 a, b 채널을 합쳐 다시 LAB 이미지 생성
limg = cv2.merge((cl, a, b))
# LAB 이미지를 다시 BGR 컬러 공간으로 변환
corrected_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)
return corrected_img
else:
return image
def label_color(model,table):
label_colored=[]
text_colored=[]
model_key_list=list(model.model.names.values())
fixed_color=list(table.values())
fixed_class=list(table.keys())
for cls in model_key_list:
if cls in fixed_class:
label_colored.append(table[cls])
else:
while True:
_color = [random.randint(0, 255) for _ in range(3)]
if _color not in fixed_color and _color not in label_colored:
break
label_colored.append(_color)
if cls in AI_CONST.text_color_white_list:
text_colored.append(AI_CONST.TEXT_COLOR_WHITE)
else:
text_colored.append(AI_CONST.TEXT_COLOR_BLACK)
return label_colored,text_colored
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,47 @@
# UTILITY_YOLO_HPE_CLASSIFICATION
YOLO Pose 좌표 정보를 이용한 자세 추정
<br><br>
### HPEClassification class
HPEClassification(pose_info, cross_ratio_threshold=0.1, cross_angle_threshold=(15.0, 165.0), falldown_tilt_ratio=0.8, falldown_tilt_angle=15, body_tilt_ratio=0.5, yolo_version=8)
- pose_info: pose keypoint 정보 (mandatory)
```python
pose_info: <dict> {
'person': (cx, cy, w, h, c),
'keypoints': [ (x, y, c), (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16
}
```
- cross_ratio_threshold: 팔 교차시 교차점의 최소 위치 비율. 생략시 0.1
- cross_angle_threshold: 팔 교차시 교차각 범위 지정. 생략시 (15.0, 165.0)
- falldown_tilt_ratio: 넘어짐/비틀거림 감지를 위한 body rect 비율 (세로/가로). 생략시 0.8
- falldown_tilt_angle: 상체 기울어짐 판정각. 작업자가 '작업중'일때 넘어짐 판정에 반영됨 (생략시 15도)
- body_tilt_ratio: 상체 기울어짐 판정을 위한 상체 비율 임계치 (가로/세로). 생략시 0.3
- yolo_version: 사용 YOLO 버전. 생략시 8
- v8 에서는 잡히지 않은 keypoint => (0,0)
- v7 에서는 잡히지 않은 keypoint => None
<br><br>
#### cross arms
- is_cross_arms() -> True/False
- get_cross_point() -> (x, y)
- get_cross_ratio() -> (ratio1, ratio2)
- get_cross_angle() -> 교차각
- set_cross_ratio_threshold(value)
- set_cross_angle_threshold(angle)
#### fall down
- is_falldown(is_working_on) -> True/False
- is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다.
- set_falldown_tilt_ratio(value)
#### etc
- get_hpe_type(is_working_on) -> 탐지된 type 종류가 모두 포함된 정보.
- is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다.
- HPETypeMask.NORMAL = 0x0000
- HPETypeMask.FALL_DOWN = 0x0080
- HPETypeMask.CROSS_ARM = 0x0100
- get_hpe_type(query, is_working_on) -> True/False
- get_hpe_level(is_working_on) -> 위험도. 현재 0~9까지의 값.
- is_working_on: 현재 작업자가 작업중인지 여부.(True/False) 작업중인 경우에는 상체 구부러짐을 반영한다.
<br>
### HPETypeMask class
HPEClassification.get_hpe_type() 에서 return 되는 정보를 구분하는 mask 값
- NORMAL = 0x0000
- FALL_DOWN = 0x0080
- CROSS_ARM = 0x0100

View File

View File

@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
@file : config.py
@author: yunikim
@license: A2TEC & DAOOLDNS
@brief:
@section Modify History
- 2023-11-21 yunikim base
"""
import logging
import project_config
from config_loader import CFG
_hpe = CFG.get('hpe', {})
DEFAULT_YOLO_VERSION = 8
FALLDOWN_TILT_RATIO = _hpe.get('falldown_tilt_ratio', 0.80)
FALLDOWN_TILT_ANGLE = _hpe.get('falldown_tilt_angle', 15)
BODY_TILT_RATIO = _hpe.get('body_tilt_ratio', 0.30)
CROSS_ARM_RATIO_THRESHOLD = _hpe.get('cross_arm_ratio_threshold', 0.10)
CROSS_ARM_ANGLE_THRESHOLD = tuple(_hpe.get('cross_arm_angle_threshold', [15.0, 165.0]))
if project_config.ADD_CROSS_ARM:
ARM_ANGLE_THRESHOLD = tuple(_hpe.get('arm_angle_threshold_cross', [80.0, 135.0]))
else:
ARM_ANGLE_THRESHOLD = tuple(_hpe.get('arm_angle_threshold_default', [150.0, 195.0]))
ELBOW_ANGLE_THRESHOLD = tuple(_hpe.get('elbow_angle_threshold', [150.0, 185.0]))
DEFAULT_LOG_LEVEL = logging.INFO

View File

@@ -0,0 +1,581 @@
"""
@file : hpe_classification.py
@author: yunikim
@license: DAOOLDNS
@brief: HPE 자세 추정 알고리즘.
작업중지동작(cross arm) 및 넘어짐 감지.
기존의 두 선분의 교차여부, 교차점 좌표 및 해당 좌표의 각 선분 상 위치 비율을 얻는 로직을 통합함.
@section Modify History
- 2023-06-21 yunikim base
"""
import math
import numpy as np
from hpe_classification import config
import logging
import project_config
class HPETypeMask:
NORMAL = 0x0000
FALL_DOWN = 0x0080
CROSS_ARM = 0x0100
class HPEClassification:
# keypoint는 실제 사용번호와 index가 다르다.
# index 0~16 => keypoint 1~17
class __KPIndex:
NOSE = 0 # 코
RIGHT_EYE = 1 # 눈
LEFT_EYE = 2
RIGHT_EAR = 3 # 귀
LEFT_EAR = 4
RIGHT_SHOULDER = 5 # 어깨
LEFT_SHOULDER = 6
RIGHT_ELBOW = 7 # 팔꿈치
LEFT_ELBOW = 8
RIGHT_WRIST = 9 # 손목
LEFT_WRIST = 10
RIGHT_HIP = 11 # 엉덩이
LEFT_HIP = 12
RIGHT_KNEE = 13 # 무릎
LEFT_KNEE = 14
RIGHT_FEET = 15 # 발
LEFT_FEET = 16
MAX = 17
class __HPELevel:
NORMAL = 0
FALLDOWN = 8
CROSSARM = 9
# parameter:
# pose_info: <dict> {
# 'person': (cx, cy, w, h, c),
# 'keypoints': [ (x, y, c), None, (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16
# }
def __init__(self, pose_info: dict,
cross_ratio_threshold: float = config.CROSS_ARM_RATIO_THRESHOLD,
cross_angle_threshold: tuple = config.CROSS_ARM_ANGLE_THRESHOLD,
arm_angle_threshold: tuple = config.ARM_ANGLE_THRESHOLD,
elbow_angle_threshold: tuple = config.ELBOW_ANGLE_THRESHOLD,
falldown_tilt_ratio: float = config.FALLDOWN_TILT_RATIO,
falldown_tilt_angle: float = config.FALLDOWN_TILT_ANGLE,
body_tilt_ratio: float = config.BODY_TILT_RATIO,
yolo_version: int = config.DEFAULT_YOLO_VERSION):
"""
:param pose_info: HPE 좌표 정보
:param cross_ratio_threshold: 팔 교차시 최소 비율
:param cross_angle_threshold: 팔 교차시 검출 각도 범위. (min, max)
:param arm_angle_threshold: 팔 각도 범위. (min, max)
:param elbow_angle_threshold: 팔꿈치 각도 범위. (min, max)
:param falldown_tilt_ratio: 넘어짐 감지를 위한 비율
:param falldown_tilt_angle: 작업중 상체 구부러짐 판정각
:param body_tilt_ratio: 작업중 상체 구부러짐 판정을 위한 비율 임계치
:param yolo_version: yolo version
"""
self.__raw_data = pose_info
self.__bounding_box = pose_info['person']
self.__keypoints = pose_info['keypoints']
# logger 설정
self.__logger = self.__set_logger(self.__class__.__name__)
# yolov8 대응
# v8 에서는 잡히지 않은 point => (0,0)
# v7 에서는 잡히지 않은 point => None
if yolo_version == 8:
self.__convert_zero_point_to_none(self.__keypoints)
# 작업중지동작 detect 관련 변수
self.__cross_ratio_threshold = cross_ratio_threshold
self.__cross_angle_threshold = cross_angle_threshold
self.__cross_point = None
self.__cross_ratio = None
self.__cross_angle = None
self.__line1_point1 = self.__keypoints[self.__KPIndex.RIGHT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.RIGHT_ELBOW] else None
self.__line1_point2 = self.__keypoints[self.__KPIndex.RIGHT_WRIST][0:2] if self.__keypoints[self.__KPIndex.RIGHT_WRIST] else None
self.__line2_point1 = self.__keypoints[self.__KPIndex.LEFT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.LEFT_ELBOW] else None
self.__line2_point2 = self.__keypoints[self.__KPIndex.LEFT_WRIST][0:2] if self.__keypoints[self.__KPIndex.LEFT_WRIST] else None
self.__detect_cross_arms() # 팔 교차 detect
# 팔각도 detect 관련 변수
self.__cross_arm_angle_threshold = arm_angle_threshold
self.__cross_left_arm_angle = None
self.__cross_right_arm_angle = None
self.__cross_elbow_angle_threshold = elbow_angle_threshold
self.__cross_left_elbow_angle = None
self.__cross_right_elbow_angle = None
self.__left_wrist = self.__keypoints[self.__KPIndex.LEFT_WRIST][0:2] if self.__keypoints[self.__KPIndex.LEFT_WRIST] else None
self.__left_elbow = self.__keypoints[self.__KPIndex.LEFT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.LEFT_ELBOW] else None
self.__left_shoulder = self.__keypoints[self.__KPIndex.LEFT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.LEFT_SHOULDER] else None
self.__right_wrist = self.__keypoints[self.__KPIndex.RIGHT_WRIST][0:2] if self.__keypoints[self.__KPIndex.RIGHT_WRIST] else None
self.__right_elbow = self.__keypoints[self.__KPIndex.RIGHT_ELBOW][0:2] if self.__keypoints[self.__KPIndex.RIGHT_ELBOW] else None
self.__right_shoulder = self.__keypoints[self.__KPIndex.RIGHT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.RIGHT_SHOULDER] else None
self.__detect_angle_arms() # 팔 각도 detect
# 넘어짐 detect 관련 변수
self.__falldown_tilt_ratio = falldown_tilt_ratio
self.__falldown_tilt_angle = falldown_tilt_angle
self.__body_tilt_ratio = body_tilt_ratio
self.__body_tilt_angle = None
self.__overturn_upper = False
self.__overturn_lower = False
self.__badly_tilt = False
self.__kp_rhip = self.__keypoints[self.__KPIndex.RIGHT_HIP][0:2] if self.__keypoints[self.__KPIndex.RIGHT_HIP] else None
self.__kp_lhip = self.__keypoints[self.__KPIndex.LEFT_HIP][0:2] if self.__keypoints[self.__KPIndex.LEFT_HIP] else None
self.__kp_rsldr = self.__keypoints[self.__KPIndex.RIGHT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.RIGHT_SHOULDER] else None
self.__kp_lsldr = self.__keypoints[self.__KPIndex.LEFT_SHOULDER][0:2] if self.__keypoints[self.__KPIndex.LEFT_SHOULDER] else None
self.__kp_rft = self.__keypoints[self.__KPIndex.RIGHT_FEET][0:2] if self.__keypoints[self.__KPIndex.RIGHT_FEET] else None
self.__kp_lft = self.__keypoints[self.__KPIndex.LEFT_FEET][0:2] if self.__keypoints[self.__KPIndex.LEFT_FEET] else None
self.__detect_falldown() # 넘어짐 detect
@property
def falldown_ratio(self):
return self.__falldown_tilt_ratio
@property
def cross_threshold(self):
return self.__cross_ratio_threshold
@property
def body_tilt_angle(self):
return self.__body_tilt_angle
def __convert_zero_point_to_none(self, points):
"""
좌표가 (0,0)인 경우 None으로 대체
(yolov8 대응)
:param points: 좌표리스트
"""
for idx, point in enumerate(points):
if point is not None and point[0] == 0 and point[1] == 0:
points[idx] = None
def get_pose_info(self):
return self.__raw_data
def get_keypoints(self):
return self.__keypoints
def get_bounding_box(self):
return self.__bounding_box
def is_cross_arms(self):
"""
팔이 cross 상태인지 여부 확인
"""
if self.__cross_point is not None and self.__cross_ratio is not None and self.__cross_angle is not None:
if self.__cross_ratio[0] > self.__cross_ratio_threshold and self.__cross_ratio[1] > self.__cross_ratio_threshold:
if self.__cross_angle_threshold[0] < self.__cross_angle < self.__cross_angle_threshold[1]:
return True
return False
def is_angle_arms(self):
"""
팔이 꺽여있는지 확인
"""
if project_config.ADD_CROSS_ARM:
if self.__cross_left_arm_angle is not None and self.__cross_right_arm_angle is not None:
if self.__cross_arm_angle_threshold[0] <= self.__cross_left_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_left_arm_angle and \
self.__cross_arm_angle_threshold[0] <= self.__cross_right_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_right_arm_angle:
return True
else:
if self.__cross_left_arm_angle is not None and self.__cross_right_arm_angle is not None and self.__cross_left_elbow_angle is not None and self.__cross_left_elbow_angle is not None:
if (self.__cross_arm_angle_threshold[0] <= self.__cross_left_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_left_arm_angle) and \
(self.__cross_arm_angle_threshold[0] <= self.__cross_right_arm_angle and self.__cross_arm_angle_threshold[1] >= self.__cross_right_arm_angle) and \
(self.__cross_elbow_angle_threshold[0] <= self.__cross_left_elbow_angle and self.__cross_elbow_angle_threshold[1] >= self.__cross_left_arm_angle) and \
(self.__cross_elbow_angle_threshold[0] <= self.__cross_right_elbow_angle and self.__cross_elbow_angle_threshold[1] >= self.__cross_right_arm_angle):
return True
return False
def get_cross_point(self):
if self.__cross_point is None:
self.__logger.debug("no cross info")
return self.__cross_point
def get_cross_ratio(self):
if self.__cross_ratio is None:
self.__logger.debug("no cross info")
return self.__cross_ratio
def get_cross_angle(self):
if self.__cross_angle is None:
self.__logger.debug("no cross info")
return self.__cross_angle
def set_cross_ratio_threshold(self, threshold):
"""
교차여부를 결정할 때 사용하는 교차점 위치 최소 비율 설정
:param threshold: 0.0 ~ 0.5 사이의 값
"""
self.__cross_ratio_threshold = threshold
def set_cross_angle_threshold(self, angle_range: tuple):
"""
교차여부를 결정할 때 사용하는 교차각 범위 설정
:param angle_range: 교차각 범위, ex) (15.0, 165.0)
"""
self.__cross_angle_threshold = angle_range
def set_falldown_tilt_ratio(self, tilt_ratio):
self.__falldown_tilt_ratio = tilt_ratio
self.__detect_falldown()
def set_falldown_tilt_angle(self, tilt_angle):
"""
:param tilt_angle: 작업중 상체 구부러짐 판정각
"""
self.__falldown_tilt_angle = tilt_angle
self.__detect_falldown()
def is_falldown(self, is_working_on=True):
"""
넘어짐 상태 여부 확인
:param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다.
"""
result = True if self.__overturn_upper or self.__overturn_lower or self.__badly_tilt else False
tilt_angle = False
if self.__body_tilt_angle is not None:
if (self.__body_tilt_angle[0] < self.__falldown_tilt_angle) or (self.__body_tilt_angle[1] < self.__falldown_tilt_angle):
tilt_angle = True
return result or (is_working_on and tilt_angle)
def get_hpe_type(self, query=None, is_working_on=True):
"""
감지된 Pose의 bit masking 획득하거나 특정 pose 여부를 확인한다.
:param query: 확인하고자 하는 Pose Mask. 생략시 Pose bit mask 리턴
:param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다.
:return: query 입력시 True/False, 미입력시 Pose bit mask
"""
result = 0x0000
if self.is_falldown(is_working_on):
result |= HPETypeMask.FALL_DOWN
if project_config.ADD_CROSS_ARM:
if self.is_cross_arms():
result |= HPETypeMask.CROSS_ARM
elif self.is_angle_arms():
result |= HPETypeMask.CROSS_ARM
else:
if self.is_angle_arms():
result |= HPETypeMask.CROSS_ARM
self.__logger.debug(msg=f"***angle arms -> left angle: {int(self.__cross_left_arm_angle)}, right angle: {int(self.__cross_right_arm_angle)}, left elbow: {int(self.__cross_left_elbow_angle)}, right elbow: {int(self.__cross_right_elbow_angle)}")
if query is None:
return result
return (result & query) != 0
# 현재 type별 level값을 return 하고 있으나,
# 추후 특정 계산식으로 level을 계산하게 될 예정
def get_hpe_level(self, is_working_on=True):
"""
:param is_working_on: 현재 작업중인지 여부. 작업중인 상태에서는 상체 구부러짐 각도를 반영한다.
"""
result = 0
if self.is_falldown(is_working_on) and result < self.__HPELevel.FALLDOWN:
result = self.__HPELevel.FALLDOWN
if (self.is_cross_arms() or self.is_angle_arms()) and result < self.__HPELevel.CROSSARM:
result = self.__HPELevel.CROSSARM
return result
def __detect_cross_arms(self):
self.__cross_point = self.__cross_ratio = self.__cross_angle = None
if not self.__line1_point1 or not self.__line1_point2 or not self.__line2_point1 or not self.__line2_point2:
self.__logger.debug("need 4 points for detecting cross arms")
return
# 손목이 팔꿈치 아래에 있는 경우는 제외
if self.__line1_point1[1] < self.__line2_point2[1] and self.__line2_point1[1] < self.__line1_point2[1]:
self.__logger.debug("both wrist is below elbow.")
return
line1_ccw = self.__ccw(self.__line1_point1, self.__line1_point2, self.__line2_point1) * \
self.__ccw(self.__line1_point1, self.__line1_point2, self.__line2_point2)
line2_ccw = self.__ccw(self.__line2_point1, self.__line2_point2, self.__line1_point1) * \
self.__ccw(self.__line2_point1, self.__line2_point2, self.__line1_point2)
if line1_ccw < 0 and line2_ccw < 0:
self.__cross_point = self.__get_cross_point()
self.__cross_ratio = self.__get_cross_ratio()
self.__cross_angle = self.__get_cross_angle()
# 두 선분의 교차지점 좌표 획득.
# 교차됨이 확인 된 후에 사용할 것.
# 출력: 교차지점 좌표 tuple
def __get_cross_point(self):
"""
두 직선의 교차점 좌표를 구함
:return:
"""
x1, y1 = self.__line1_point1[0:2]
x2, y2 = self.__line1_point2[0:2]
x3, y3 = self.__line2_point1[0:2]
x4, y4 = self.__line2_point2[0:2]
cx = ((x1 * y2 - x2 * y1) * (x3 - x4) - (x1 - x2) * (x3 * y4 - x4 * y3)) / \
((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4))
cy = ((x1 * y2 - x2 * y1) * (y3 - y4) - (y1 - y2) * (x3 * y4 - x4 * y3)) / \
((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4))
return cx, cy
def __get_cross_angle(self, p1=None, p2=None, cross_point=None):
"""
교차하는 팔의 각도를 구함
:param p1: p2: 구하고자 하는 vector 방향의 각 좌표
:param cross_point: 교차점
"""
if cross_point is None and self.__cross_point is None:
return None
p1 = self.__line1_point1[0:2] if p1 is None else p1 # 오른 팔꿈치
p2 = self.__line2_point2[0:2] if p2 is None else p2 # 왼 손목
cross_point = self.__cross_point if cross_point is None else cross_point
return self.three_point_angle(p1, p2, cross_point)
@staticmethod
def three_point_angle(p1, p2, cross_point):
"""
세 점 사이의 각도를 구함
"""
x1, y1 = p1[0:2]
x2, y2 = p2[0:2]
cx, cy = cross_point[0:2]
radian = np.arctan2((y1-cy), (x1-cx)) - np.arctan2((y2-cy), (x2-cx))
radian = np.fabs(radian * 180 / np.pi)
if radian > 180:
radian = 360 - radian
return radian
def __get_cross_ratio(self):
"""
선분 위의 점이 해당 선분의 어느정도 위치에 있는지 확인
(끝점에서 얼마나 떨어져 있는지 선분에 대한 비율)
:return: 작은 비율 기준으로 리턴됨. 0.0 ~ 0.5, 0.5 이면 가운데
"""
length_line1 = math.dist(self.__line1_point1[0:2], self.__line1_point2[0:2])
length_point1 = math.dist(self.__line1_point1[0:2], self.__cross_point)
ratio1 = length_point1 / length_line1
if ratio1 > 0.5:
ratio1 = math.fabs(1.0 - ratio1)
length_line2 = math.dist(self.__line2_point1[0:2], self.__line2_point2[0:2])
length_point2 = math.dist(self.__line2_point1[0:2], self.__cross_point)
ratio2 = length_point2 / length_line2
if ratio2 > 0.5:
ratio2 = math.fabs(1.0 - ratio2)
return ratio1, ratio2
# 입력: 3개의 point, 각 (x, y)
# 출력: 0, 1, -1
@staticmethod
def __ccw(point1, point2, point3):
"""
3개의 점에 대한 CCW 계산 함수
:return: -1, 0, 1
"""
x1, y1 = point1[0:2]
x2, y2 = point2[0:2]
x3, y3 = point3[0:2]
cross_product = ((x2 - x1) * (y3 - y1)) - ((x3 - x1) * (y2 - y1))
if cross_product > 0:
return 1 # 반시계방향 (counter clockwise)
elif cross_product < 0:
return -1 # 시계방향 (clockwise)
return 0 # 평행 (collinear)
def __detect_angle_arms(self):
self.__cross_left_arm_angle = self.__cross_right_arm_angle = self.__cross_left_elbow_angle = self.__cross_right_elbow_angle = None
if project_config.ADD_CROSS_ARM:
if self.__left_elbow is None or self.__left_shoulder is None or self.__right_elbow is None or self.__right_shoulder is None:
self.__logger.debug("need 4 points for detecting angle arms")
return
else:
if self.__left_elbow is None or self.__left_shoulder is None or self.__right_elbow is None or self.__right_shoulder is None or \
self.__left_wrist is None or self.__right_wrist is None:
self.__logger.debug("need 6 points for detecting angle arms")
return
self.__cross_left_arm_angle, self.__cross_right_arm_angle, self.__cross_left_elbow_angle ,self.__cross_right_elbow_angle = self.__get_angle_arms()
@staticmethod
def three_point_arm_angle(p1, p2, cross_point):
"""
세 점 사이(팔)의 각도를 구함
p1: 몸쪽에서 가장 먼 keypoint
p2: 몸쪽에서 가장 가까운 keypoint
"""
x1, y1 = p1[0:2]
x2, y2 = p2[0:2]
cx, cy = cross_point[0:2]
radian = np.arctan2((y1-cy), (x1-cx)) - np.arctan2((y2-cy), (x2-cx))
radian = np.fabs(radian * 180 / np.pi)
if y1 > cy or y1 > y2:
return None
if radian > 180 :
radian = 360 - radian
return radian
def __get_angle_arms(self):
left_angle = self.three_point_arm_angle(p1=self.__left_elbow,cross_point=self.__left_shoulder,p2=self.__right_shoulder)
right_angle = self.three_point_arm_angle(p1=self.__right_elbow,cross_point=self.__right_shoulder,p2=self.__left_shoulder)
left_elbow = None if project_config.ADD_CROSS_ARM else self.three_point_arm_angle(p1=self.__left_wrist,cross_point=self.__left_elbow,p2=self.__left_shoulder)
right_elbow = None if project_config.ADD_CROSS_ARM else self.three_point_arm_angle(p1=self.__right_wrist,cross_point=self.__right_elbow,p2=self.__right_shoulder)
return left_angle,right_angle,left_elbow,right_elbow
def __detect_falldown(self):
"""
인스턴스 생성시 주어진 key points를 바탕으로 falldown 판단
- shoulder 좌표가 hip 좌표 아래에 있는 경우: __overturn_upper set
- hip 좌표가 feet 좌표 아래에 있는 경우: __overturn_lower set
- shoulder 좌표 및 feet 좌표의 ratio를 계산하여 임계치 이하인 경우: __badly_tilt set
- shoulder 좌표 및 hip 좌표를 통해 body tilt angle 계산 __body_tilt_angle에 저장
"""
self.__overturn_upper = False
self.__overturn_lower = False
self.__badly_tilt = False
self.__body_tilt_angle = None
# 1. 상체 뒤집힘
if self.__kp_lhip and self.__kp_rhip and self.__kp_rsldr and self.__kp_lsldr:
if self.__kp_lhip[1] < self.__kp_lsldr[1] and self.__kp_rhip[1] < self.__kp_rsldr[1]:
self.__overturn_upper = True
else:
self.__logger.debug("need hip and shoulder points for detecting upper body falldown")
# 2. 하체 뒤집힘
if self.__kp_rft and self.__kp_lft and self.__kp_lhip and self.__kp_rhip:
if self.__kp_lhip[1] > self.__kp_lft[1] and self.__kp_rhip[1] > self.__kp_rft[1]:
self.__overturn_lower = True
else:
self.__logger.debug("need feet points for detecting lower body falldown")
# 3. 신체의 기울어짐 (좌우 어깨, 좌우 발)
if self.__kp_lft and self.__kp_rft and self.__kp_rsldr and self.__kp_lsldr:
xmax = np.max([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lft[0], self.__kp_rft[0]])
ymax = np.max([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lft[1], self.__kp_rft[1]])
xmin = np.min([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lft[0], self.__kp_rft[0]])
ymin = np.min([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lft[1], self.__kp_rft[1]])
body_width = math.fabs(xmax - xmin)
body_height = math.fabs(ymax - ymin)
if (body_width > 1e-4) and ((body_height / body_width) < self.__falldown_tilt_ratio):
self.__badly_tilt = True
else:
self.__logger.debug("need feet and shoulder points for detecting badly tilt")
# 4. 상체 기울어짐 (좌우 어깨, 좌우 엉덩이)
if self.__kp_lhip and self.__kp_rhip and self.__kp_rsldr and self.__kp_lsldr:
angle_left = self.three_point_angle(self.__kp_lsldr, self.__kp_rhip, self.__kp_lhip)
angle_right = self.three_point_angle(self.__kp_rsldr, self.__kp_lhip, self.__kp_rhip)
self.__body_tilt_angle = (angle_left, angle_right)
xmax = np.max([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lhip[0], self.__kp_rhip[0]])
ymax = np.max([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lhip[1], self.__kp_rhip[1]])
xmin = np.min([self.__kp_lsldr[0], self.__kp_rsldr[0], self.__kp_lhip[0], self.__kp_rhip[0]])
ymin = np.min([self.__kp_lsldr[1], self.__kp_rsldr[1], self.__kp_lhip[1], self.__kp_rhip[1]])
body_width = math.fabs(xmax - xmin)
body_height = math.fabs(ymax - ymin)
if (body_height > 1e-4) and ((body_width / body_height) < self.__body_tilt_ratio):
self.__body_tilt_angle = None
else:
self.__logger.debug("need feet and shoulder points for calculating body tilt angle")
def __set_logger(self, name):
logger = logging.getLogger(name)
logger_handler = logging.StreamHandler()
logger_handler.setFormatter(logging.Formatter("[%(levelname)s][%(asctime)s][%(module)s::%(funcName)s()] %(message)s"))
logger.addHandler(logger_handler)
logger.setLevel(config.DEFAULT_LOG_LEVEL)
return logger
def log(self, level, message):
self.__logger.log(level, message)
if __name__ == "__main__":
# pose_info: <dict> {
# 'person': (cx, cy, w, h, c),
# 'keypoints': [ (x, y, c), None, (x, y, c), ... ] # 1번이 index 0, 2번 index 1, ... 17번 index 16
# }
pose_info1 = {
'person': (0, 0, 0, 0, 0),
'keypoints': [
(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0),
(4.64, 3, 0), (-3.2, 2.85, 0), # 오른쪽, 왼쪽 어깨
(-2.83, 2.2, 0), (5.45, 3.34, 0), # 오른쪽, 왼쪽 팔꿈치
(6.5, 1.6, 0), (0.98, 1.77, 0), # 오른쪽, 왼쪽 손목
(3.4, 3.9, 0), (-8.6, 3.2, 0), # 오른쪽, 왼쪽 엉덩이
(0, 0, 0), (0, 0, 0),
(0, 20, 0), (0, 20, 0) # 오른쪽, 왼쪽 발
]
}
test1 = HPEClassification(pose_info1, cross_ratio_threshold=0.1)
print(f'is_cross_arms({test1.cross_threshold}):', test1.is_cross_arms())
print('angle:', test1.get_cross_angle())
test1.set_cross_ratio_threshold(0.05)
print(f'is_cross_arms({test1.cross_threshold}):', test1.is_cross_arms())
print(test1.get_cross_point())
print(test1.get_cross_ratio())
print('angle:', test1.get_cross_angle())
print('is_falldown:', test1.is_falldown(True))
print(test1.get_hpe_type(HPETypeMask.CROSS_ARM))
print(test1.get_hpe_type(HPETypeMask.FALL_DOWN))
print(hex(test1.get_hpe_type()))
print('hpe level:', test1.get_hpe_level())
print('--------------------------')
print('body tilt angle:', test1.body_tilt_angle)
print('--------------------------')
pose_info2 = {
'person': (0, 0, 0, 0, 0),
'keypoints': [
(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0),
(5, 20, 0), (3, 17, 0), # 오른쪽, 왼쪽 어깨
(-2.83, 2.2, 0), (0.98, 1.77, 0), # 오른쪽, 왼쪽 팔꿈치
(6.5, 1.6, 0), (5.45, 3.34, 0), # 오른쪽, 왼쪽 손목
(0, 30, 0), (0, 30, 0), # 오른쪽, 왼쪽 엉덩이
(0, 0, 0), (0, 0, 0),
(37, 31, 0), (40, 42, 0) # 오른쪽, 왼쪽 발
]
}
test2 = HPEClassification(pose_info2, falldown_tilt_ratio=0.5)
print('is_cross_arms:', test2.is_cross_arms())
print(f'is_falldown({test2.falldown_ratio}):', test2.is_falldown())
test2.set_falldown_tilt_ratio(0.7)
print(f'is_falldown({test2.falldown_ratio}):', test2.is_falldown())
print('hpe level:', test2.get_hpe_level())

View File

@@ -0,0 +1,2 @@
numpy
opencv-python

108
load_models.py Normal file
View File

@@ -0,0 +1,108 @@
import numpy as np
from ultralytics import YOLO
import demo_const as AI_CONST
import project_config
# from ai_engine.custom_logger.custom_log import log
class AIObjectModel:
"""
Yolov8 OD model Load
"""
MODEL_NAME = "OD"
WEIGHTS = AI_CONST.WEIGHTS_YOLO
def __init__(self) -> None:
self.model = ""
self.set_model(weights=self.WEIGHTS)
def set_model(self, weights):
self.model = YOLO(weights)
# Check device status
import torch
device_status = "CUDA (GPU)" if torch.cuda.is_available() else "CPU"
print(f"[{self.MODEL_NAME}] Model loaded. Using: {device_status}")
# warm up models (입력 영상 해상도가 다를경우 수정해야함)
img = np.zeros([1080,1920,3],dtype=np.uint8) # black image FHD 1920x1080
# img = np.zeros([360,640,3],dtype=np.uint8) # black image HD 640×360
self.model.predict(source=img, verbose=False)
# log.info(f"YOLOV8 {self.MODEL_NAME} MODEL LOADED")
def get_model(self):
return self.model
def get_class_info(self):
return self.model.names
class AIHelmetObjectModel(AIObjectModel):
"""
Yolov8 OD model Load
"""
MODEL_NAME = "OD-Helmet"
WEIGHTS = AI_CONST.WEIGHTS_YOLO_HELMET
class AIHPEModel(AIObjectModel):
"""
Yolov8 HPE model Load
"""
MODEL_NAME = "HPE"
WEIGHTS = AI_CONST.WEIGHTS_POSE
class AIModelManager:
def __init__(self) -> None:
self.od_model_info = ""
self.hpe_model_info = ""
self.helmet_model_info = ""
def set_od(self): # set od
if not self.od_model_info:
self.od_model_info = AIObjectModel()
def set_hpe(self): # set hpe
if not self.hpe_model_info:
self.hpe_model_info = AIHPEModel()
def set_helmet(self): # set od
if not self.helmet_model_info:
self.helmet_model_info = AIHelmetObjectModel()
def get_od(self):
if not self.od_model_info:
raise Exception("YOLO model not loaded")
return self.od_model_info
def get_hpe(self):
if not self.hpe_model_info:
raise Exception("HPE model not loaded")
return self.hpe_model_info
def get_helmet(self):
if not self.helmet_model_info:
raise Exception("helmet model not loaded")
return self.helmet_model_info
model_manager = ""
if not model_manager:
model_manager = AIModelManager()
model_manager.set_od()
model_manager.set_hpe()
if project_config.USE_HELMET_MODEL:
model_manager.set_helmet()
if __name__ == "__main__":
pass

124
mqtt.py Normal file
View File

@@ -0,0 +1,124 @@
import threading
import os, sys
import paho.mqtt.client as mqtt
"""
todo
log 설정 별도 필요
상수값 설정 필요
"""
MQTT_USER_ID = "a2d2admin"
MQTT_USER_PW = "a2d24992!"
MQTT_HOST = "localhost"
MQTT_PORT = 52002
class MQTTManager:
"""
MQTT manager
subscribe 사용시 set_onmessage 함수 이용하여 on_message세팅
"""
def __init__(self,
name="MQTT",
id=MQTT_USER_ID,
pw=MQTT_USER_PW,
host=MQTT_HOST,
port=MQTT_PORT) -> None:
self.name = name
self.id = id
self.pw = pw
self.host = host
self.port = port
self.status = False
self.status_subscribe = False
self.client = mqtt.Client()
self.client.username_pw_set(self.id, self.pw)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
self.client.on_unsubscribe = self.on_unsubscribe
self.client.on_message = self.on_message
self.mqtt_lock = threading.Lock()
def __del__(self):
self.end()
def set_onmessage(self, function):
self.client.on_message = function
def get_status(self):
return self.status
def mqtt_connect(self):
try:
if self.status:
pass #log.warning('Already connected to mqtt')
else:
self.client.connect(self.host, self.port)
except Exception as e:
pass #log.error((f"{self.name} connect fail"))
raise ConnectionError(f"{self.name} connect fail")
def on_connect(self, client, userdata, flags, rc):
"""
mqtt 연결 확인 함수
연결이 정상적으로 되었다면 rc = 0
"""
if rc == 0:
self.mqtt_lock.acquire()
self.status = True
pass #log.info(f"{self.name} connected OK")
self.mqtt_lock.release()
else:
pass #log.warning("Bad connection Returned code=", rc)
def on_disconnect(self, client, userdata, flags, rc=0):
pass #log.info(f"{self.name} disconnected")
self.mqtt_lock.acquire()
self.status = False
self.mqtt_lock.release()
def on_subscribe(self, client, userdata, mid, granted_qos):
self.status_subscribe = True
pass #log.info("subscribed: " + str(mid) + " " + str(granted_qos))
def on_unsubscribe(self, client, userdata, mid):
self.status_subscribe = False
pass #log.info("unsubscribed: " + str(mid))
def on_message(self, client, userdata, msg):
"""
구독중인 상태에서 메세지가 들어왔을때 처리 함수
set_onmessage 에서 set 하여 사용
"""
pass
def start_pub(self):
self.client.loop_start()
def start_sub(self, topic):
self.client.subscribe(topic)
self.client.loop_start()
def end_sub(self, topic):
self.client.unsubscribe(topic)
# self.client.loop_stop()
def end(self):
self.client.loop_stop()
self.client.disconnect()
pass #log.info(f"{self.name} Ending Connection")
mqtt_publisher = None
if mqtt_publisher is None:
mqtt_publisher = MQTTManager()
mqtt_publisher.mqtt_connect()
mqtt_publisher.start_pub()

147
parsing_msg.py Normal file
View File

@@ -0,0 +1,147 @@
import cv2,json
from base64 import b64encode
from schema import *
class ParsingMsg():
IMAGE_FORMAT = 'jpg'
def __init__(self):
self.msg = None
self.img = None
self.parsed_msg = None
def set(self, msg:list, img):
self.msg = None
self.img = None
self.msg = msg
self.img = img
self.parsed_msg = None
def get(self):
result = self.parsed_msg
return result
def image_encoding(self, img_data):
"""
이미지데이터(np.ndarray) 를 바이너리 데이터로 변환
:param img_data: 이미지 데이터
:return: base64 format
"""
_, JPEG = cv2.imencode(".jpg", img_data, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
# Base64 encode
b64 = b64encode(JPEG)
return b64.decode("utf-8")
def image_cropping(self, img_data, bbox):
"""
이미지데이터(np.ndarray) 를 bbox 기준으로 크롭
:param img_data: 이미지 데이터
:param bbox: [x1, y1, x2, y2]
:return: cropped image data
"""
x1, y1, x2, y2 = bbox
cropped_img = img_data[y1:y2, x1:x2]
return cropped_img
def parse_header(self):
"""
*camera_id, ward_id, frame_id 미구현
"""
msg = Header(
camera_id= "CAMERA_001",
ward_id= "WARD_001",
timestamp= datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
frame_id= 0
)
return msg
def parse_summary(self, activate_cnt, total_cnt):
"""
*system_health 미구현
"""
msg = Summary(
total_objects_count= total_cnt,
active_alerts_count= activate_cnt,
system_health= SyestemHealth.ok
)
return msg
def parse_object(self):
"""
*bbox,skeleton 제외 전부 미구현
"""
msg = []
activate_cnt = 0
total_cnt = 0
if self.msg:
for i in self.msg:
has_img = True if i.get('person') and self.img is not None else False
b64_img = self.image_encoding(self.image_cropping(self.img, i.get('person'))) if has_img else ''
if i['result']['pose_type'] > 0:
activate_cnt += 1
total_cnt += 1
sub_msg = DetectedObject(
tracking_id= i['result']['object_id'],
status= Status.stable if i['result']['pose_type'] == 0 else Status.fall_detected,
status_detail= None if i['result']['pose_type'] == 0 else StatusDetail.sudden_fall,
severity= None if i['result']['pose_type'] == 0 else Severity.critical,
location_zone= 'ZONE_001',
duration= 0.0,
bbox= i.get('person'),
skeleton= [[round(x, 0) for x in kpt] + [round(conf,2)] for kpt, conf in zip(i.get('keypoints'), i.get('kpt_conf'))],
metrics=Metrics(
velocity=0.0,
angle=0.0
),
visual_data=ObjectVisualData(
format=self.IMAGE_FORMAT,
has_image= has_img,
base64_str= b64_img
)
)
msg.append(sub_msg)
return msg, activate_cnt, total_cnt
def parse_virtual(self):
has_img = True if self.img is not None else False
b64_img = self.image_encoding(self.img) if has_img else ''
msg= VisualData(
format=self.IMAGE_FORMAT,
has_image= has_img,
base64_str= b64_img
)
return msg
def parse(self):
"""
* 위험note 여부는 수정 필요
"""
object_msg, activate_cnt, total_cnt = self.parse_object()
if activate_cnt == 0:
return None
header_msg = self.parse_header()
summary_msg = self.parse_summary(activate_cnt, total_cnt)
visual_msg = self.parse_virtual()
parsing_msg = FallDetectionSchema(
header= header_msg,
summary= summary_msg,
objects= object_msg,
visual_data= visual_msg
)
return parsing_msg.model_dump_json()

147
predict.py Normal file
View File

@@ -0,0 +1,147 @@
import cv2
import demo_const as AI_CONST
from pydantic.main import BaseModel
# from DL.custom_utils import crop_image,image_encoding
from hpe_classification.hpe_classification import HPEClassification
from hpe_classification import config as hpe_config
class HistoryInfo(BaseModel):
"""
history 에 저장될 정보
"""
class_name: str
class_id: int
object_id: int = None
bbox: list
bbox_conf: float
keypoint: list = None
kpt_conf: list = None
class ObjectDetect:
"""
ObjectDetect
"""
MODEL_CONFIDENCE = AI_CONST.MODEL_CONFIDENCE
def __init__(self) -> None:
self.image = ''
self.model = ''
self.class_info = ''
def predict(self,crop_image = True, class_name=True):
od_predict = self.model.model.predict(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE)
message_list = []
for object_list in od_predict:
for box in object_list.boxes:
_parsing_data = self.parsing(bbox=box)
_name = _parsing_data.class_name if class_name else False
message_list.append(self._od_mqtt_message(_parsing_data, image= self.image, crop=crop_image, name=_name))
return message_list
def set_image(self,image):
self.image = image
def set_model(self,model):
self.model = model
self.class_info = self.model.get_class_info()
def _od_mqtt_message(self,data,image,crop,name):
message ={}
# if crop:
# crop_img = image_encoding(crop_image(xyxy=data.bbox,img=image))
# else:
crop_img = None
if name:
message = {
"class_id" : data.class_id,
"class_name" : name,
"confidence" : data.bbox_conf,
"bbox" : data.bbox,
"object_id": data.object_id,
"parent_object_id": None,
"image": crop_img
}
else:
message = {
"class_id" : data.class_id,
"confidence" : data.bbox_conf,
"bbox" : data.bbox,
"object_id": data.object_id,
"parent_object_id": None,
"image": crop_img
}
return message
def parsing(self, bbox, kpt=None):
"""
데이터 파싱
kpt(pose) 정보가 없을시 keypoint 관련정보 None으로 처리
object id가 없을시 None으로 처리
:param bbox: object detect 정보
:param kpt: pose detect 정보
:return: HistoryInfo
"""
_cls_id = int(bbox.cls[0].item())
_history_info = HistoryInfo(
class_id = _cls_id,
class_name = self.class_info[_cls_id],
bbox = list(map(round, bbox.xyxy[0].detach().cpu().tolist())),
bbox_conf = round(bbox.conf[0].item(), 2),
)
if bbox.id is not None:
_history_info.object_id = int(bbox.id[0].item())
if kpt:
_history_info.keypoint = kpt.xy[0].detach().cpu().tolist()
_history_info.kpt_conf = kpt.conf[0].detach().cpu().tolist()
return _history_info
class PoseDetect(ObjectDetect):
"""
PoseDetect
"""
MODEL_CONFIDENCE = 0.5
FALLDOWN_TILT_RATIO = hpe_config.FALLDOWN_TILT_RATIO
FALLDOWN_TILT_ANGLE = hpe_config.FALLDOWN_TILT_ANGLE
CROSS_RATIO_THRESHOLD = hpe_config.CROSS_ARM_RATIO_THRESHOLD
CROSS_ANGLE_THRESHOLD = hpe_config.CROSS_ARM_ANGLE_THRESHOLD
def predict(self,working,crop_image=True):
# pose_predict = self.model.model.predict(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE)
#NOTE(jwkim): pose predict
pose_predict = self.model.model.track(source=self.image, show=False, stream=True, save=False, conf=self.MODEL_CONFIDENCE, verbose=False, imgsz=AI_CONST.MODEL_IMAGE_SIZE, persist=True)
message_list = []
for object_list in pose_predict:
for box, pose in zip(object_list.boxes, object_list.keypoints):
_parsing_data = self.parsing(bbox=box, kpt=pose)
current_pose={}
current_pose["person"]= _parsing_data.bbox
current_pose["keypoints"]= _parsing_data.keypoint
current_pose["kpt_conf"] = _parsing_data.kpt_conf
# HPEClassification
hpe_classification = HPEClassification(pose_info=current_pose)
current_pose["result"]=self._od_mqtt_message(_parsing_data, self.image, crop=crop_image, name=_parsing_data.class_name)
current_pose["result"]['pose_type'] = hpe_classification.get_hpe_type(is_working_on=working)
current_pose["result"]['pose_level'] = hpe_classification.get_hpe_level(is_working_on=working)
message_list.append(current_pose)
return message_list
if __name__ == "__main__":
pass

17
project_config.py Normal file
View File

@@ -0,0 +1,17 @@
# === config.yaml에서 설정값 로드 ===
from config_loader import CFG
PT_TYPE = CFG.get('pt_type', 'dev')
# === use model ===
USE_HPE_PERSON = CFG.get('use_hpe_person', True)
USE_HELMET_MODEL = CFG.get('use_helmet_model', True)
# === label ===
VIEW_CONF_SCORE = CFG.get('view_conf_score', False)
SHOW_GLOVES = CFG.get('show_gloves', True)
LABEL_ALL_WHITE = CFG.get('label_all_white', True)
USE_HPE_FRAME_CHECK = CFG.get('use_hpe_frame_check', False)
# === debug ===
ADD_CROSS_ARM = CFG.get('add_cross_arm', False)

18
requirements.txt Normal file
View File

@@ -0,0 +1,18 @@
# Core AI Framework
ultralytics
# Image Processing & GUI
opencv-python
# Configuration & Data Validation
PyYAML
pydantic
# System Utilities
pyautogui
# Math & Array (usually installed with ultralytics, but good to have)
numpy
#MQTT Client
paho-mqtt

86
schema.py Normal file
View File

@@ -0,0 +1,86 @@
from pydantic.main import BaseModel
from enum import Enum
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
#=========================================
class Header(BaseModel):
camera_id: str
ward_id: str
timestamp: datetime
frame_id: int
#=========================================
#=========================================
class SyestemHealth(str,Enum):
ok = "OK"
overload = "OVERLOAD"
critical = "CRITICAL"
class Summary(BaseModel):
active_alerts_count: int
total_objects_count: int
system_health: SyestemHealth
#=========================================
#=========================================
class Status(str,Enum):
stable = "STABLE"
fall_detected = "FALL_DETECTED"
class StatusDetail(str,Enum):
sudden_fall = "SUDDEN_FALL"
slip_down = "SLIP_DOWN"
class Severity(str,Enum):
low = "LOW"
medium = "MEDIUM"
critical = "CRITICAL"
class Metrics(BaseModel):
velocity: float
angle: float
class ObjectVisualData(BaseModel):
format: str
has_image: bool
base64_str: str
class DetectedObject(BaseModel):
tracking_id: int|None
status: Status
status_detail: StatusDetail|None
severity: Severity|None
location_zone: str
duration: float
bbox: List[int] # [x1, y1, x2, y2]
skeleton: Optional[List[List[float]]]
metrics: Metrics
visual_data: ObjectVisualData
#=========================================
#=========================================
class VisualData(BaseModel):
format: str
has_image: bool
base64_str: str
#=========================================
#=========================================
class FallDetectionSchema(BaseModel):
header: Header
summary: Summary
objects: List[DetectedObject]
visual_data: VisualData
#=========================================

89
utils.py Normal file
View File

@@ -0,0 +1,89 @@
import numpy as np
import time
import cv2
import demo_const as AI_CONST
import copy
from load_models import model_manager
from ultralytics.data.loaders import LoadStreams,LOGGER
def class_info():
od_model = model_manager.get_od() # pt파일에 따라 값이 달라짐
class_dict = copy.deepcopy(od_model.model.names)
class_dict[1000] = 'signalman'
return class_dict
CLASS_INFORMATION = {}
if not CLASS_INFORMATION:
CLASS_INFORMATION = class_info()
CLASS_SWAP_INFO = {v:k for k,v in CLASS_INFORMATION.items()}
class LoadStreamsDaool(LoadStreams):
def update(self, i, cap, stream):
"""Read stream `i` frames in daemon thread."""
n, f = 0, self.frames[i] # frame number, frame array
while self.running and cap.isOpened() and n < (f - 1):
if len(self.imgs[i]) < AI_CONST.LOADSTREAMS_IMG_BUFFER: # keep a <=30-image buffer
n += 1
cap.grab() # .read() = .grab() followed by .retrieve()
if n % self.vid_stride == 0:
success, im = cap.retrieve()
if not success:
im = np.zeros(self.shape[i], dtype=np.uint8)
LOGGER.warning("WARNING ⚠️ Video stream unresponsive, please check your IP camera connection.")
cap.open(stream) # re-open stream if signal was lost
if self.buffer:
self.imgs[i].append(im)
else:
self.imgs[i] = [im]
else:
time.sleep(0.01) # wait until the buffer is empty
class CustomVideoCapture:
def __init__(self) -> None:
self.fps = 30
# self.fourcc = cv2.VideoWriter_fourcc(*'X264')
self.fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.size = None
self.video_writer = None
def set_frame_size(self, image):
if isinstance(image, np.ndarray):
height,width,ch = image.shape
self.size = (int(width), int (height))
def set_video_writer(self, path):
if self.size == None:
raise
self.video_writer = cv2.VideoWriter(path, self.fourcc, self.fps, self.size)
def write_video(self,frame):
if self.video_writer:
self.video_writer.write(frame)
def release_video(self):
if self.video_writer:
self.video_writer.release()
self.video_writer = None
def get_monitorsize():
width = 0
height = 0
try:
import pyautogui #pip install pyautogui
width = pyautogui.size()[0] # getting the width of the screen
height = pyautogui.size()[1] # getting the height of the screen
except Exception:
(width,height) = AI_CONST.FHD_RESOLUTION
finally:
return (width,height)
def img_resize(image,size):
return cv2.resize(image, dsize=size, interpolation=cv2.INTER_AREA)

BIN
weights/yolov11m_dev1_8.pt Normal file

Binary file not shown.

BIN
weights/yolov8_dev1_66.pt Normal file

Binary file not shown.

BIN
weights/yolov8_dev1_89.pt Normal file

Binary file not shown.

BIN
weights/yolov8_dev1_97.pt Normal file

Binary file not shown.

BIN
weights/yolov8l-pose.pt Normal file

Binary file not shown.