362 lines
14 KiB
Python
362 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@file : cctv.py
|
|
@author: hsj100
|
|
@license: A2TEC
|
|
@brief: multi cctv
|
|
|
|
@section Modify History
|
|
- 2025-11-24 오후 1:38 hsj100 base
|
|
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import threading
|
|
import time
|
|
import os
|
|
|
|
from datetime import datetime
|
|
|
|
from demo_detect import DemoDetect
|
|
from utils import get_monitorsize
|
|
from config_loader import CFG
|
|
from parsing_msg import ParsingMsg
|
|
from mqtt import mqtt_publisher
|
|
|
|
# ==========================================================================================
|
|
# [Configuration Section]
|
|
# ==========================================================================================
|
|
|
|
_cctv_cfg = CFG.get('cctv', {})
|
|
|
|
# 1. Camera Switch Interval (Seconds)
|
|
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
|
|
|
|
# 2. Camera Sources List
|
|
SOURCES = _cctv_cfg.get('sources', [])
|
|
if not SOURCES:
|
|
# 기본값 (설정이 없을 경우)
|
|
SOURCES = [
|
|
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
|
|
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
|
|
]
|
|
|
|
# ==========================================================================================
|
|
|
|
class CameraStream:
|
|
"""
|
|
개별 카메라 스트림을 관리하는 클래스.
|
|
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
|
|
"""
|
|
def __init__(self, src, name="Camera"):
|
|
self.src = src
|
|
self.name = name
|
|
|
|
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
|
|
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
|
|
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
|
|
|
|
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
self.ret = False
|
|
self.latest_frame = None
|
|
self.running = False
|
|
self.is_active = False # [최적화] 현재 화면 표시 여부
|
|
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
|
|
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
|
|
self.frame_skip_count = 0
|
|
|
|
# 연결 확인
|
|
if not self.cap.isOpened():
|
|
print(f"[{self.name}] 접속 실패: {self.src}")
|
|
else:
|
|
print(f"[{self.name}] 접속 성공")
|
|
self.running = True
|
|
# 스레드 시작
|
|
self.thread = threading.Thread(target=self.update, args=())
|
|
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
|
|
self.thread.start()
|
|
|
|
def update(self):
|
|
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
|
|
while self.running:
|
|
# 1. 버퍼에서 압축된 데이터 가져오기 (grab은 항상 수행하여 버퍼가 쌓이지 않게 함)
|
|
if self.cap.grab():
|
|
# [최적화 로직] 활성 상태일 때만 디코딩(retrieve) 수행
|
|
# 비활성 상태일 때는 30프레임에 한 번만 디코딩하여 리소스 절약
|
|
self.frame_skip_count += 1
|
|
if self.is_active or (self.frame_skip_count % 30 == 0):
|
|
# 2. 실제 이미지로 디코딩 (무거움)
|
|
ret, frame = self.cap.retrieve()
|
|
|
|
if ret:
|
|
with self.lock:
|
|
self.ret = ret
|
|
self.latest_frame = frame
|
|
self.last_read_time = time.time() # 수신 시간 갱신
|
|
else:
|
|
# 디코딩을 건너뛰는 동안 CPU 부하를 아주 약간 줄이기 위해 미세한 대기
|
|
time.sleep(0.001)
|
|
else:
|
|
# 스트림이 끊기거나 끝난 경우 재접속 로직
|
|
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
|
|
self.cap.release()
|
|
time.sleep(2.0)
|
|
|
|
# 재접속 시도 (TCP 강제 설정 유지)
|
|
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
|
|
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
if self.cap.isOpened():
|
|
print(f"[{self.name}] 재접속 성공")
|
|
else:
|
|
print(f"[{self.name}] 재접속 실패")
|
|
|
|
def get_frame(self):
|
|
"""
|
|
현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)
|
|
"""
|
|
with self.lock:
|
|
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
|
|
if time.time() - self.last_read_time > 3.0:
|
|
return False, None, self.name
|
|
return self.ret, self.latest_frame, self.name
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if self.thread.is_alive():
|
|
self.thread.join()
|
|
self.cap.release()
|
|
|
|
def main():
|
|
|
|
MQTT_TOPIC = "/hospital/ai1"
|
|
|
|
parser = ParsingMsg()
|
|
|
|
# Get Monitor Size inside main or top level if safe
|
|
MONITOR_RESOLUTION = get_monitorsize()
|
|
|
|
# 2. 객체탐지 모델 초기화
|
|
detector = DemoDetect()
|
|
print("모델 로딩 완료!")
|
|
|
|
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
|
|
cameras = []
|
|
for i, s in enumerate(SOURCES):
|
|
cam = CameraStream(s["src"], s["name"])
|
|
# 첫 번째 카메라만 활성 상태로 시작
|
|
if i == 0:
|
|
cam.is_active = True
|
|
cameras.append(cam)
|
|
|
|
DISPLAY_TURN_ON = CFG.get('display').get('turn_on') # display 출력 여부
|
|
|
|
SNAPSHOT_SEND = CFG.get('img_snap_shot')
|
|
|
|
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
|
|
last_switch_time = time.time() # 마지막 전환 시간
|
|
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
|
|
|
|
# FPS 계산을 위한 변수
|
|
prev_frame_time = time.time()
|
|
fps = 0
|
|
|
|
# FPS 통계 변수
|
|
fps_min = float('inf')
|
|
fps_max = 0
|
|
fps_sum = 0
|
|
fps_count = 0
|
|
warmup_frames = 30 # 초반 불안정한 프레임 제외
|
|
|
|
# pubilsh time check
|
|
last_publish_time = None
|
|
delay_after_publish = 4.0 # publish 후 n초 동안은 추가 publish 금지, 0일경우 무제한
|
|
|
|
print("\n=== CCTV 시스템 시작 ===")
|
|
print(f"{switch_interval}초마다 자동으로 카메라가 전환됩니다.")
|
|
if DISPLAY_TURN_ON:
|
|
print("[Q] 또는 [ESC] : 종료\n")
|
|
else:
|
|
print("화면 출력이 꺼져 있습니다. 시스템은 백그라운드에서 계속 실행됩니다.")
|
|
print("종료하려면 터미널에서 Ctrl+C를 누르세요.\n")
|
|
|
|
if DISPLAY_TURN_ON:
|
|
# 전체화면 윈도우 설정 (프레임 없이)
|
|
window_name = "CCTV Control Center"
|
|
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
|
|
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
|
|
|
while True:
|
|
# 1. 카메라 전환 처리 (자동)
|
|
current_time = time.time()
|
|
need_switch = False
|
|
|
|
if DISPLAY_TURN_ON:
|
|
# 키 입력 처리 (종료만 체크)
|
|
key = cv2.waitKey(1) & 0xFF
|
|
if key == ord('q') or key == 27: # 종료
|
|
break
|
|
|
|
# 자동 전환 타이머 체크
|
|
if current_time - last_switch_time >= switch_interval:
|
|
need_switch = True
|
|
|
|
if need_switch:
|
|
# 기존 카메라 비활성화 (리소스 절약 모드 진입)
|
|
cameras[current_cam_index].is_active = False
|
|
|
|
# 다음 카메라 인덱스 계산
|
|
current_cam_index = (current_cam_index + 1) % len(cameras)
|
|
|
|
# 새 카메라 활성화 (풀 디코딩 시작)
|
|
cameras[current_cam_index].is_active = True
|
|
|
|
last_switch_time = current_time
|
|
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
|
|
|
|
# 2. 현재 선택된 카메라의 최신 프레임 가져오기
|
|
target_cam = cameras[current_cam_index]
|
|
ret, frame, name = target_cam.get_frame()
|
|
|
|
if ret and frame is not None:
|
|
# FPS 계산
|
|
current_frame_time = time.time()
|
|
time_diff = current_frame_time - prev_frame_time
|
|
fps = 1 / time_diff if time_diff > 0 else 0
|
|
prev_frame_time = current_frame_time
|
|
|
|
# FPS 통계 갱신 (워밍업 이후)
|
|
if fps > 0:
|
|
fps_count += 1
|
|
if fps_count > warmup_frames:
|
|
fps_sum += fps
|
|
real_count = fps_count - warmup_frames
|
|
|
|
fps_avg = fps_sum / real_count
|
|
fps_min = min(fps_min, fps)
|
|
fps_max = max(fps_max, fps)
|
|
|
|
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
|
|
# HPE
|
|
hpe_message = detector.inference_hpe(frame, False)
|
|
|
|
#NOTE(jwkim): od모델 사용 안함
|
|
# Helmet 탐지
|
|
# helmet_message = detector.inference_helmet(frame, False)
|
|
|
|
# 객체 탐지
|
|
# od_message_raw = detector.inference_od(frame, False, class_name_view=True)
|
|
|
|
# 필터링
|
|
# od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
|
|
|
|
od_message = []
|
|
|
|
parser.set(
|
|
msg=hpe_message,
|
|
img=frame if SNAPSHOT_SEND else None,
|
|
ward_id=name)
|
|
parsing_msg = parser.parse()
|
|
|
|
#mqtt publish
|
|
if parsing_msg is not None:
|
|
|
|
#time check
|
|
if last_publish_time is None:
|
|
last_publish_time = datetime.now()
|
|
mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1)
|
|
else:
|
|
current_time = datetime.now()
|
|
time_diff = current_time - last_publish_time
|
|
time_diff_seconds = time_diff.total_seconds()
|
|
|
|
# n초에 한 번만 publish
|
|
if time_diff_seconds < delay_after_publish:
|
|
pass
|
|
else:
|
|
last_publish_time = datetime.now()
|
|
mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1)
|
|
|
|
# 탐지 결과 라벨링 (원본 좌표 기준)
|
|
if DISPLAY_TURN_ON:
|
|
detector.hpe_labeling(frame, hpe_message)
|
|
detector.od_labeling(frame, od_message)
|
|
detector.border_labeling(frame, hpe_message, od_message)
|
|
|
|
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
|
|
frame = cv2.resize(frame, MONITOR_RESOLUTION)
|
|
|
|
# 화면에 카메라 이름과 시간 표시 (UI 효과)
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# 카메라 이름 (테두리 + 본문)
|
|
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
|
|
cv2.putText(frame, cam_text, (20, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
|
|
cv2.putText(frame, cam_text, (20, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# 시간 (테두리 + 본문)
|
|
cv2.putText(frame, timestamp, (20, 80),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
|
|
cv2.putText(frame, timestamp, (20, 80),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# FPS (테두리 + 본문)
|
|
fps_text = f"FPS: {fps:.1f}"
|
|
cv2.putText(frame, fps_text, (20, 120),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
|
|
cv2.putText(frame, fps_text, (20, 120),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# FPS 통계 (AVG, MIN, MAX)
|
|
stat_val_min = fps_min if fps_min != float('inf') else 0
|
|
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
|
|
|
|
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
|
|
cv2.putText(frame, stats_text, (20, 160),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
|
|
cv2.putText(frame, stats_text, (20, 160),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
|
|
|
|
cv2.imshow(window_name, frame)
|
|
|
|
elif DISPLAY_TURN_ON:
|
|
# 신호가 없을 때 보여줄 대기 화면
|
|
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
|
|
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
|
|
|
|
# "NO SIGNAL" 텍스트 설정
|
|
text = f"NO SIGNAL ({target_cam.name})"
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
font_scale = 1.5
|
|
thickness = 3
|
|
|
|
# 텍스트 중앙 정렬 계산
|
|
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
|
|
text_x = (blank_screen.shape[1] - text_size[0]) // 2
|
|
text_y = (blank_screen.shape[0] + text_size[1]) // 2
|
|
|
|
# 텍스트 그리기 (빨간색)
|
|
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
|
|
|
|
cv2.imshow(window_name, blank_screen)
|
|
|
|
# 4. 키 입력 처리 (종료만 처리)
|
|
if DISPLAY_TURN_ON:
|
|
key = cv2.waitKey(1) & 0xFF
|
|
if key == ord('q') or key == 27: # 종료
|
|
break
|
|
|
|
# 종료 처리
|
|
print("시스템 종료 중...")
|
|
for cam in cameras:
|
|
cam.stop()
|
|
cv2.destroyAllWindows()
|
|
|
|
if __name__ == "__main__":
|
|
main() |