Files
HOSPITAL_CCTV/cctv_origin.py
2026-02-25 15:05:58 +09:00

280 lines
11 KiB
Python

# -*- coding: utf-8 -*-
"""
@file : cctv.py
@author: hsj100
@license: A2TEC
@brief: multi cctv
@section Modify History
- 2025-11-24 오후 1:38 hsj100 base
"""
import cv2
import numpy as np
import threading
import time
import os
from datetime import datetime
from demo_detect import DemoDetect
from utils import get_monitorsize
from config_loader import CFG
# ==========================================================================================
# [Configuration Section]
# ==========================================================================================
_cctv_cfg = CFG.get('cctv', {})
# 1. Camera Switch Interval (Seconds)
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
# 2. Camera Sources List
SOURCES = _cctv_cfg.get('sources', [])
if not SOURCES:
# 기본값 (설정이 없을 경우)
SOURCES = [
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
]
# ==========================================================================================
class CameraStream:
"""
개별 카메라 스트림을 관리하는 클래스.
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
"""
def __init__(self, src, name="Camera"):
self.src = src
self.name = name
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.ret = False
self.latest_frame = None
self.running = False
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
# 연결 확인
if not self.cap.isOpened():
print(f"[{self.name}] 접속 실패: {self.src}")
else:
print(f"[{self.name}] 접속 성공")
self.running = True
# 스레드 시작
self.thread = threading.Thread(target=self.update, args=())
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
self.thread.start()
def update(self):
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
while self.running:
# 1. 버퍼에서 압축된 데이터 가져오기 (가볍고 빠름)
if self.cap.grab():
# 2. 실제 이미지로 디코딩 (무거움)
ret, frame = self.cap.retrieve()
if ret:
with self.lock:
self.ret = ret
self.latest_frame = frame
self.last_read_time = time.time() # 수신 시간 갱신
else:
# grab은 성공했으나 디코딩 실패 (드문 경우)
continue
else:
# 스트림이 끊기거나 끝난 경우 재접속 로직
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
self.cap.release()
time.sleep(2.0)
# 재접속 시도 (TCP 강제 설정 유지)
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if self.cap.isOpened():
print(f"[{self.name}] 재접속 성공")
else:
print(f"[{self.name}] 재접속 실패")
def get_frame(self):
"""현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)"""
with self.lock:
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
if time.time() - self.last_read_time > 3.0:
return False, None
return self.ret, self.latest_frame
def stop(self):
self.running = False
if self.thread.is_alive():
self.thread.join()
self.cap.release()
def main():
# Get Monitor Size inside main or top level if safe
MONITOR_RESOLUTION = get_monitorsize()
# 2. 객체탐지 모델 초기화
detector = DemoDetect()
print("모델 로딩 완료!")
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
cameras = []
for s in SOURCES:
cam = CameraStream(s["src"], s["name"])
cameras.append(cam)
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
last_switch_time = time.time() # 마지막 전환 시간
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
# FPS 계산을 위한 변수
prev_frame_time = time.time()
fps = 0
# FPS 통계 변수
fps_min = float('inf')
fps_max = 0
fps_sum = 0
fps_count = 0
warmup_frames = 30 # 초반 불안정한 프레임 제외
print("\n=== CCTV 관제 시스템 시작 ===")
print("5초마다 자동으로 카메라가 전환됩니다.")
print("[Q] 또는 [ESC]를 눌러 종료합니다.\n")
# 전체화면 윈도우 설정 (프레임 없이)
window_name = "CCTV Control Center"
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
while True:
# 자동 전환 로직: 5초마다 다음 카메라로 전환
current_time = time.time()
if current_time - last_switch_time >= switch_interval:
current_cam_index = (current_cam_index + 1) % len(cameras)
last_switch_time = current_time
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
# 3. 현재 선택된 카메라의 최신 프레임 가져오기
target_cam = cameras[current_cam_index]
ret, frame = target_cam.get_frame()
if ret and frame is not None:
# FPS 계산
current_frame_time = time.time()
time_diff = current_frame_time - prev_frame_time
fps = 1 / time_diff if time_diff > 0 else 0
prev_frame_time = current_frame_time
# FPS 통계 갱신 (워밍업 이후)
if fps > 0:
fps_count += 1
if fps_count > warmup_frames:
fps_sum += fps
real_count = fps_count - warmup_frames
fps_avg = fps_sum / real_count
fps_min = min(fps_min, fps)
fps_max = max(fps_max, fps)
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
# HPE
hpe_message = detector.inference_hpe(frame, False)
# Helmet 탐지
helmet_message = detector.inference_helmet(frame, False)
# 객체 탐지
od_message_raw = detector.inference_od(frame, False, class_name_view=True)
# 필터링
od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
# 탐지 결과 라벨링 (원본 좌표 기준)
detector.hpe_labeling(frame, hpe_message)
detector.od_labeling(frame, od_message)
detector.border_labeling(frame, hpe_message, od_message)
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
frame = cv2.resize(frame, MONITOR_RESOLUTION)
# 화면에 카메라 이름과 시간 표시 (UI 효과)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 카메라 이름 (테두리 + 본문)
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
cv2.putText(frame, cam_text, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# 시간 (테두리 + 본문)
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, timestamp, (20, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS (테두리 + 본문)
fps_text = f"FPS: {fps:.1f}"
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
cv2.putText(frame, fps_text, (20, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
# FPS 통계 (AVG, MIN, MAX)
stat_val_min = fps_min if fps_min != float('inf') else 0
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
cv2.putText(frame, stats_text, (20, 160),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
cv2.imshow(window_name, frame)
else:
# 신호가 없을 때 보여줄 대기 화면
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
# "NO SIGNAL" 텍스트 설정
text = f"NO SIGNAL ({target_cam.name})"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1.5
thickness = 3
# 텍스트 중앙 정렬 계산
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
text_x = (blank_screen.shape[1] - text_size[0]) // 2
text_y = (blank_screen.shape[0] + text_size[1]) // 2
# 텍스트 그리기 (빨간색)
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
cv2.imshow(window_name, blank_screen)
# 4. 키 입력 처리 (종료만 처리)
key = cv2.waitKey(1) & 0xFF
# 종료
if key == ord('q') or key == 27:
break
# 종료 처리
print("시스템 종료 중...")
for cam in cameras:
cam.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()