280 lines
11 KiB
Python
280 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@file : cctv.py
|
|
@author: hsj100
|
|
@license: A2TEC
|
|
@brief: multi cctv
|
|
|
|
@section Modify History
|
|
- 2025-11-24 오후 1:38 hsj100 base
|
|
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import threading
|
|
import time
|
|
import os
|
|
from datetime import datetime
|
|
from demo_detect import DemoDetect
|
|
from utils import get_monitorsize
|
|
from config_loader import CFG
|
|
|
|
# ==========================================================================================
|
|
# [Configuration Section]
|
|
# ==========================================================================================
|
|
|
|
_cctv_cfg = CFG.get('cctv', {})
|
|
|
|
# 1. Camera Switch Interval (Seconds)
|
|
SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0)
|
|
|
|
# 2. Camera Sources List
|
|
SOURCES = _cctv_cfg.get('sources', [])
|
|
if not SOURCES:
|
|
# 기본값 (설정이 없을 경우)
|
|
SOURCES = [
|
|
{"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"},
|
|
{"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"},
|
|
]
|
|
|
|
# ==========================================================================================
|
|
|
|
class CameraStream:
|
|
"""
|
|
개별 카메라 스트림을 관리하는 클래스.
|
|
별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다.
|
|
"""
|
|
def __init__(self, src, name="Camera"):
|
|
self.src = src
|
|
self.name = name
|
|
|
|
# RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지)
|
|
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
|
|
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
|
|
|
|
# OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음)
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
self.ret = False
|
|
self.latest_frame = None
|
|
self.running = False
|
|
self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지
|
|
self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화
|
|
|
|
# 연결 확인
|
|
if not self.cap.isOpened():
|
|
print(f"[{self.name}] 접속 실패: {self.src}")
|
|
else:
|
|
print(f"[{self.name}] 접속 성공")
|
|
self.running = True
|
|
# 스레드 시작
|
|
self.thread = threading.Thread(target=self.update, args=())
|
|
self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료
|
|
self.thread.start()
|
|
|
|
def update(self):
|
|
"""백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)"""
|
|
while self.running:
|
|
# 1. 버퍼에서 압축된 데이터 가져오기 (가볍고 빠름)
|
|
if self.cap.grab():
|
|
# 2. 실제 이미지로 디코딩 (무거움)
|
|
ret, frame = self.cap.retrieve()
|
|
|
|
if ret:
|
|
with self.lock:
|
|
self.ret = ret
|
|
self.latest_frame = frame
|
|
self.last_read_time = time.time() # 수신 시간 갱신
|
|
else:
|
|
# grab은 성공했으나 디코딩 실패 (드문 경우)
|
|
continue
|
|
else:
|
|
# 스트림이 끊기거나 끝난 경우 재접속 로직
|
|
print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...")
|
|
self.cap.release()
|
|
time.sleep(2.0)
|
|
|
|
# 재접속 시도 (TCP 강제 설정 유지)
|
|
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
|
|
self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG)
|
|
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
if self.cap.isOpened():
|
|
print(f"[{self.name}] 재접속 성공")
|
|
else:
|
|
print(f"[{self.name}] 재접속 실패")
|
|
|
|
def get_frame(self):
|
|
"""현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함)"""
|
|
with self.lock:
|
|
# 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리
|
|
if time.time() - self.last_read_time > 3.0:
|
|
return False, None
|
|
return self.ret, self.latest_frame
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if self.thread.is_alive():
|
|
self.thread.join()
|
|
self.cap.release()
|
|
|
|
def main():
|
|
# Get Monitor Size inside main or top level if safe
|
|
MONITOR_RESOLUTION = get_monitorsize()
|
|
|
|
# 2. 객체탐지 모델 초기화
|
|
detector = DemoDetect()
|
|
print("모델 로딩 완료!")
|
|
|
|
# 3. 카메라 객체 생성 및 백그라운드 수신 시작
|
|
cameras = []
|
|
for s in SOURCES:
|
|
cam = CameraStream(s["src"], s["name"])
|
|
cameras.append(cam)
|
|
|
|
current_cam_index = 0 # 현재 보고 있는 카메라 인덱스
|
|
last_switch_time = time.time() # 마지막 전환 시간
|
|
switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환
|
|
|
|
# FPS 계산을 위한 변수
|
|
prev_frame_time = time.time()
|
|
fps = 0
|
|
|
|
# FPS 통계 변수
|
|
fps_min = float('inf')
|
|
fps_max = 0
|
|
fps_sum = 0
|
|
fps_count = 0
|
|
warmup_frames = 30 # 초반 불안정한 프레임 제외
|
|
|
|
print("\n=== CCTV 관제 시스템 시작 ===")
|
|
print("5초마다 자동으로 카메라가 전환됩니다.")
|
|
print("[Q] 또는 [ESC]를 눌러 종료합니다.\n")
|
|
|
|
# 전체화면 윈도우 설정 (프레임 없이)
|
|
window_name = "CCTV Control Center"
|
|
cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)
|
|
cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
|
|
|
while True:
|
|
# 자동 전환 로직: 5초마다 다음 카메라로 전환
|
|
current_time = time.time()
|
|
if current_time - last_switch_time >= switch_interval:
|
|
current_cam_index = (current_cam_index + 1) % len(cameras)
|
|
last_switch_time = current_time
|
|
print(f"-> 자동 전환: {cameras[current_cam_index].name}")
|
|
|
|
# 3. 현재 선택된 카메라의 최신 프레임 가져오기
|
|
target_cam = cameras[current_cam_index]
|
|
ret, frame = target_cam.get_frame()
|
|
|
|
if ret and frame is not None:
|
|
# FPS 계산
|
|
current_frame_time = time.time()
|
|
time_diff = current_frame_time - prev_frame_time
|
|
fps = 1 / time_diff if time_diff > 0 else 0
|
|
prev_frame_time = current_frame_time
|
|
|
|
# FPS 통계 갱신 (워밍업 이후)
|
|
if fps > 0:
|
|
fps_count += 1
|
|
if fps_count > warmup_frames:
|
|
fps_sum += fps
|
|
real_count = fps_count - warmup_frames
|
|
|
|
fps_avg = fps_sum / real_count
|
|
fps_min = min(fps_min, fps)
|
|
fps_max = max(fps_max, fps)
|
|
|
|
# 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거)
|
|
# HPE
|
|
hpe_message = detector.inference_hpe(frame, False)
|
|
|
|
# Helmet 탐지
|
|
helmet_message = detector.inference_helmet(frame, False)
|
|
|
|
# 객체 탐지
|
|
od_message_raw = detector.inference_od(frame, False, class_name_view=True)
|
|
|
|
# 필터링
|
|
od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False)
|
|
|
|
# 탐지 결과 라벨링 (원본 좌표 기준)
|
|
detector.hpe_labeling(frame, hpe_message)
|
|
detector.od_labeling(frame, od_message)
|
|
detector.border_labeling(frame, hpe_message, od_message)
|
|
|
|
# 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용)
|
|
frame = cv2.resize(frame, MONITOR_RESOLUTION)
|
|
|
|
# 화면에 카메라 이름과 시간 표시 (UI 효과)
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# 카메라 이름 (테두리 + 본문)
|
|
cam_text = f"CH {current_cam_index+1}: {target_cam.name}"
|
|
cv2.putText(frame, cam_text, (20, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게)
|
|
cv2.putText(frame, cam_text, (20, 40),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# 시간 (테두리 + 본문)
|
|
cv2.putText(frame, timestamp, (20, 80),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
|
|
cv2.putText(frame, timestamp, (20, 80),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# FPS (테두리 + 본문)
|
|
fps_text = f"FPS: {fps:.1f}"
|
|
cv2.putText(frame, fps_text, (20, 120),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리
|
|
cv2.putText(frame, fps_text, (20, 120),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문
|
|
|
|
# FPS 통계 (AVG, MIN, MAX)
|
|
stat_val_min = fps_min if fps_min != float('inf') else 0
|
|
stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0
|
|
|
|
stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}"
|
|
cv2.putText(frame, stats_text, (20, 160),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA)
|
|
cv2.putText(frame, stats_text, (20, 160),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색
|
|
|
|
cv2.imshow(window_name, frame)
|
|
else:
|
|
# 신호가 없을 때 보여줄 대기 화면
|
|
# MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함
|
|
blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8)
|
|
|
|
# "NO SIGNAL" 텍스트 설정
|
|
text = f"NO SIGNAL ({target_cam.name})"
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
font_scale = 1.5
|
|
thickness = 3
|
|
|
|
# 텍스트 중앙 정렬 계산
|
|
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
|
|
text_x = (blank_screen.shape[1] - text_size[0]) // 2
|
|
text_y = (blank_screen.shape[0] + text_size[1]) // 2
|
|
|
|
# 텍스트 그리기 (빨간색)
|
|
cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
|
|
|
|
cv2.imshow(window_name, blank_screen)
|
|
|
|
# 4. 키 입력 처리 (종료만 처리)
|
|
key = cv2.waitKey(1) & 0xFF
|
|
|
|
# 종료
|
|
if key == ord('q') or key == 27:
|
|
break
|
|
|
|
# 종료 처리
|
|
print("시스템 종료 중...")
|
|
for cam in cameras:
|
|
cam.stop()
|
|
cv2.destroyAllWindows()
|
|
|
|
if __name__ == "__main__":
|
|
main() |