Files
HOSPITAL_CCTV/cctv.py

362 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""
@file : cctv.py
@author: hsj100
@license: A2TEC
@brief: multi cctv
@section Modify History
- 2025-11-24 오후 1:38 hsj100 base
"""
import cv2
import numpy as np
import threading
import time
import os
from datetime import datetime
from demo_detect import DemoDetect
from utils import get_monitorsize
from config_loader import CFG
from parsing_msg import ParsingMsg
from mqtt import mqtt_publisher
# ==========================================================================================
# [Configuration Section]
# ==========================================================================================
_cctv_cfg = CFG.get('cctv', {})
# 1. Camera Switch Interval (Seconds)
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
# 2. Camera Sources List
SOURCES = _cctv_cfg.get('sources', [])
if not SOURCES:
# 기본값 (설정이 없을 경우)
SOURCES = [
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
]
# ==========================================================================================
class CameraStream:
"""
개별 카메라 스트림을 관리하는 클래스.
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
"""
def __init__(self, src, name="Camera"):
self.src = src
self.name = name
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.ret = False
self.latest_frame = None
self.running = False
self.is_active = False # [최적화] 현재 화면 표시 여부
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
self.frame_skip_count = 0
# 연결 확인
if not self.cap.isOpened():
print(f"[{self.name}] 접속 실패: {self.src}")
else:
print(f"[{self.name}] 접속 성공")
self.running = True
# 스레드 시작
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
self.thread.start()
def update(self):
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
while self.running:
# 1. 버퍼에서 압축된 데이터 가져오기 (grab은 항상 수행하여 버퍼가 쌓이지 않게 함)
if self.cap.grab():
# [최적화 로직] 활성 상태일 때만 디코딩(retrieve) 수행
# 비활성 상태일 때는 30프레임에 한 번만 디코딩하여 리소스 절약
self.frame_skip_count += 1
if self.is_active or (self.frame_skip_count % 30 == 0):
# 2. 실제 이미지로 디코딩 (무거움)
ret, frame = self.cap.retrieve()
if ret:
with self.lock:
self.ret = ret
self.latest_frame = frame
self.last_read_time = time.time() # 수신 시간 갱신
else:
# 디코딩을 건너뛰는 동안 CPU 부하를 아주 약간 줄이기 위해 미세한 대기
time.sleep(0.001)
else:
# 스트림이 끊기거나 끝난 경우 재접속 로직
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
self.cap.release()
time.sleep(2.0)
# 재접속 시도 (TCP 강제 설정 유지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if self.cap.isOpened():
print(f"[{self.name}] 재접속 성공")
else:
print(f"[{self.name}] 재접속 실패")
def get_frame(self):
"""
현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)
"""
with self.lock:
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
if time.time() - self.last_read_time > 3.0:
return False, None, self.name
return self.ret, self.latest_frame, self.name
def stop(self):
self.running = False
if self.thread.is_alive():
self.thread.join()
self.cap.release()
def main():
MQTT_TOPIC = "/hospital/ai1"
parser = ParsingMsg()
# Get Monitor Size inside main or top level if safe
MONITOR_RESOLUTION = get_monitorsize()
# 2. 객체탐지 모델 초기화
detector = DemoDetect()
print("모델 로딩 완료!")
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
cameras = []
for i, s in enumerate(SOURCES):
cam = CameraStream(s["src"], s["name"])
# 첫 번째 카메라만 활성 상태로 시작
if i == 0:
cam.is_active = True
cameras.append(cam)
DISPLAY_TURN_ON = CFG.get('display').get('turn_on') # display 출력 여부
SNAPSHOT_SEND = CFG.get('img_snap_shot')
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
last_switch_time = time.time() # 마지막 전환 시간
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
# FPS 계산을 위한 변수
prev_frame_time = time.time()
fps = 0
# FPS 통계 변수
fps_min = float('inf')
fps_max = 0
fps_sum = 0
fps_count = 0
warmup_frames = 30 # 초반 불안정한 프레임 제외
# pubilsh time check
last_publish_time = None
delay_after_publish = 4.0 # publish 후 n초 동안은 추가 publish 금지, 0일경우 무제한
print("\n=== CCTV 시스템 시작 ===")
print(f"{switch_interval}초마다 자동으로 카메라가 전환됩니다.")
if DISPLAY_TURN_ON:
print("[Q] 또는 [ESC] : 종료\n")
else:
print("화면 출력이 꺼져 있습니다. 시스템은 백그라운드에서 계속 실행됩니다.")
print("종료하려면 터미널에서 Ctrl+C를 누르세요.\n")
if DISPLAY_TURN_ON:
# 전체화면 윈도우 설정 (프레임 없이)
window_name = "CCTV Control Center"
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while True:
# 1. 카메라 전환 처리 (자동)
current_time = time.time()
need_switch = False
if DISPLAY_TURN_ON:
# 키 입력 처리 (종료만 체크)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 종료
break
# 자동 전환 타이머 체크
if current_time - last_switch_time >= switch_interval:
need_switch = True
if need_switch:
# 기존 카메라 비활성화 (리소스 절약 모드 진입)
cameras[current_cam_index].is_active = False
# 다음 카메라 인덱스 계산
current_cam_index = (current_cam_index + 1) % len(cameras)
# 새 카메라 활성화 (풀 디코딩 시작)
cameras[current_cam_index].is_active = True
last_switch_time = current_time
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
# 2. 현재 선택된 카메라의 최신 프레임 가져오기
target_cam = cameras[current_cam_index]
ret, frame, name = target_cam.get_frame()
if ret and frame is not None:
# FPS 계산
current_frame_time = time.time()
time_diff = current_frame_time - prev_frame_time
fps = 1 / time_diff if time_diff > 0 else 0
prev_frame_time = current_frame_time
# FPS 통계 갱신 (워밍업 이후)
if fps > 0:
fps_count += 1
if fps_count > warmup_frames:
fps_sum += fps
real_count = fps_count - warmup_frames
fps_avg = fps_sum / real_count
fps_min = min(fps_min, fps)
fps_max = max(fps_max, fps)
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
# HPE
hpe_message = detector.inference_hpe(frame, False)
#NOTE(jwkim): od모델 사용 안함
# Helmet 탐지
# helmet_message = detector.inference_helmet(frame, False)
# 객체 탐지
# od_message_raw = detector.inference_od(frame, False, class_name_view=True)
# 필터링
# od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
od_message = []
parser.set(
msg=hpe_message,
img=frame if SNAPSHOT_SEND else None,
ward_id=name)
parsing_msg = parser.parse()
#mqtt publish
if parsing_msg is not None:
#time check
if last_publish_time is None:
last_publish_time = datetime.now()
mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1)
else:
current_time = datetime.now()
time_diff = current_time - last_publish_time
time_diff_seconds = time_diff.total_seconds()
# n초에 한 번만 publish
if time_diff_seconds < delay_after_publish:
pass
else:
last_publish_time = datetime.now()
mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1)
# 탐지 결과 라벨링 (원본 좌표 기준)
if DISPLAY_TURN_ON:
detector.hpe_labeling(frame, hpe_message)
detector.od_labeling(frame, od_message)
detector.border_labeling(frame, hpe_message, od_message)
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
frame = cv2.resize(frame, MONITOR_RESOLUTION)
# 화면에 카메라 이름과 시간 표시 (UI 효과)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 카메라 이름 (테두리 + 본문)
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# 시간 (테두리 + 본문)
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS (테두리 + 본문)
fps_text = f"FPS: {fps:.1f}"
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS 통계 (AVG, MIN, MAX)
stat_val_min = fps_min if fps_min != float('inf') else 0
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
cv2.imshow(window_name, frame)
elif DISPLAY_TURN_ON:
# 신호가 없을 때 보여줄 대기 화면
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
# "NO SIGNAL" 텍스트 설정
text = f"NO SIGNAL ({target_cam.name})"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1.5
thickness = 3
# 텍스트 중앙 정렬 계산
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
text_x = (blank_screen.shape[1] - text_size[0]) // 2
text_y = (blank_screen.shape[0] + text_size[1]) // 2
# 텍스트 그리기 (빨간색)
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
cv2.imshow(window_name, blank_screen)
# 4. 키 입력 처리 (종료만 처리)
if DISPLAY_TURN_ON:
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # 종료
break
# 종료 처리
print("시스템 종료 중...")
for cam in cameras:
cam.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()