# -*- coding: utf-8 -*- """ @file : cctv.py @author: hsj100 @license: A2TEC @brief: multi cctv @section Modify History - 2025-11-24 오후 1:38 hsj100 base """ import cv2 import numpy as np import threading import time import os from datetime import datetime from demo_detect import DemoDetect from utils import get_monitorsize from config_loader import CFG from parsing_msg import ParsingMsg from mqtt import mqtt_publisher # ========================================================================================== # [Configuration Section] # ========================================================================================== _cctv_cfg = CFG.get('cctv', {}) # 1. Camera Switch Interval (Seconds) SWITCH_INTERVAL = _cctv_cfg.get('switch_interval', 5.0) # 2. Camera Sources List SOURCES = _cctv_cfg.get('sources', []) if not SOURCES: # 기본값 (설정이 없을 경우) SOURCES = [ {"src": "rtsp://192.168.200.231:50199/wd", "name": "dev1_stream"}, {"src": "rtsp://192.168.200.232:50299/wd", "name": "dev2_stream"}, ] # ========================================================================================== class CameraStream: """ 개별 카메라 스트림을 관리하는 클래스. 별도의 스레드에서 프레임을 계속 읽어와서 '최신 프레임(latest_frame)' 변수에 저장합니다. """ def __init__(self, src, name="Camera"): self.src = src self.name = name # RTSP를 TCP로 강제 설정 (461 Unsupported Transport, Packet Loss 방지) os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) # OpenCV 버퍼 사이즈를 최소화하여 레이턴시 감소 (백엔드에 따라 동작 안 할 수도 있음) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.ret = False self.latest_frame = None self.running = False self.is_active = False # [최적화] 현재 화면 표시 여부 self.lock = threading.Lock() # 프레임 읽기/쓰기 충돌 방지 self.last_read_time = time.time() # 마지막 프레임 수신 시간 초기화 self.frame_skip_count = 0 # 연결 확인 if not self.cap.isOpened(): print(f"[{self.name}] 접속 실패: {self.src}") else: print(f"[{self.name}] 접속 성공") self.running = True # 스레드 시작 self.thread = threading.Thread(target=self.update, args=()) self.thread.daemon = True # 메인 프로그램 종료 시 스레드도 강제 종료 self.thread.start() def update(self): """백그라운드에서 계속 프레임을 읽어오는 함수 (grab + retrieve 분리)""" while self.running: # 1. 버퍼에서 압축된 데이터 가져오기 (grab은 항상 수행하여 버퍼가 쌓이지 않게 함) if self.cap.grab(): # [최적화 로직] 활성 상태일 때만 디코딩(retrieve) 수행 # 비활성 상태일 때는 30프레임에 한 번만 디코딩하여 리소스 절약 self.frame_skip_count += 1 if self.is_active or (self.frame_skip_count % 30 == 0): # 2. 실제 이미지로 디코딩 (무거움) ret, frame = self.cap.retrieve() if ret: with self.lock: self.ret = ret self.latest_frame = frame self.last_read_time = time.time() # 수신 시간 갱신 else: # 디코딩을 건너뛰는 동안 CPU 부하를 아주 약간 줄이기 위해 미세한 대기 time.sleep(0.001) else: # 스트림이 끊기거나 끝난 경우 재접속 로직 print(f"[{self.name}] 신호 없음 (grab failed). 2초 후 재접속 시도...") self.cap.release() time.sleep(2.0) # 재접속 시도 (TCP 강제 설정 유지) os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" self.cap = cv2.VideoCapture(self.src, cv2.CAP_FFMPEG) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if self.cap.isOpened(): print(f"[{self.name}] 재접속 성공") else: print(f"[{self.name}] 재접속 실패") def get_frame(self): """ 현재 저장된 가장 최신 프레임을 반환 (타임아웃 체크 포함) """ with self.lock: # 마지막 수신 후 3초 이상 지났으면 신호 없음(False) 처리 if time.time() - self.last_read_time > 3.0: return False, None, self.name return self.ret, self.latest_frame, self.name def stop(self): self.running = False if self.thread.is_alive(): self.thread.join() self.cap.release() def main(): MQTT_TOPIC = "/hospital/ai1" parser = ParsingMsg() # Get Monitor Size inside main or top level if safe MONITOR_RESOLUTION = get_monitorsize() # 2. 객체탐지 모델 초기화 detector = DemoDetect() print("모델 로딩 완료!") # 3. 카메라 객체 생성 및 백그라운드 수신 시작 cameras = [] for i, s in enumerate(SOURCES): cam = CameraStream(s["src"], s["name"]) # 첫 번째 카메라만 활성 상태로 시작 if i == 0: cam.is_active = True cameras.append(cam) DISPLAY_TURN_ON = CFG.get('display').get('turn_on') # display 출력 여부 SNAPSHOT_SEND = CFG.get('img_snap_shot') current_cam_index = 0 # 현재 보고 있는 카메라 인덱스 last_switch_time = time.time() # 마지막 전환 시간 switch_interval = SWITCH_INTERVAL # 5초마다 자동 전환 # FPS 계산을 위한 변수 prev_frame_time = time.time() fps = 0 # FPS 통계 변수 fps_min = float('inf') fps_max = 0 fps_sum = 0 fps_count = 0 warmup_frames = 30 # 초반 불안정한 프레임 제외 # pubilsh time check last_publish_time = None delay_after_publish = 4.0 # publish 후 n초 동안은 추가 publish 금지, 0일경우 무제한 print("\n=== CCTV 시스템 시작 ===") print(f"{switch_interval}초마다 자동으로 카메라가 전환됩니다.") if DISPLAY_TURN_ON: print("[Q] 또는 [ESC] : 종료\n") else: print("화면 출력이 꺼져 있습니다. 시스템은 백그라운드에서 계속 실행됩니다.") print("종료하려면 터미널에서 Ctrl+C를 누르세요.\n") if DISPLAY_TURN_ON: # 전체화면 윈도우 설정 (프레임 없이) window_name = "CCTV Control Center" cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) while True: # 1. 카메라 전환 처리 (자동) current_time = time.time() need_switch = False if DISPLAY_TURN_ON: # 키 입력 처리 (종료만 체크) key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: # 종료 break # 자동 전환 타이머 체크 if current_time - last_switch_time >= switch_interval: need_switch = True if need_switch: # 기존 카메라 비활성화 (리소스 절약 모드 진입) cameras[current_cam_index].is_active = False # 다음 카메라 인덱스 계산 current_cam_index = (current_cam_index + 1) % len(cameras) # 새 카메라 활성화 (풀 디코딩 시작) cameras[current_cam_index].is_active = True last_switch_time = current_time print(f"-> 자동 전환: {cameras[current_cam_index].name}") # 2. 현재 선택된 카메라의 최신 프레임 가져오기 target_cam = cameras[current_cam_index] ret, frame, name = target_cam.get_frame() if ret and frame is not None: # FPS 계산 current_frame_time = time.time() time_diff = current_frame_time - prev_frame_time fps = 1 / time_diff if time_diff > 0 else 0 prev_frame_time = current_frame_time # FPS 통계 갱신 (워밍업 이후) if fps > 0: fps_count += 1 if fps_count > warmup_frames: fps_sum += fps real_count = fps_count - warmup_frames fps_avg = fps_sum / real_count fps_min = min(fps_min, fps) fps_max = max(fps_max, fps) # 객체탐지 인퍼런스 수행 (원본 해상도로 추론하여 불필요한 리사이즈 제거) # HPE hpe_message = detector.inference_hpe(frame, False) #NOTE(jwkim): od모델 사용 안함 # Helmet 탐지 # helmet_message = detector.inference_helmet(frame, False) # 객체 탐지 # od_message_raw = detector.inference_od(frame, False, class_name_view=True) # 필터링 # od_message = detector.od_filtering(frame, od_message_raw, helmet_message, hpe_message, add_siginal_man=False) od_message = [] parser.set( msg=hpe_message, img=frame if SNAPSHOT_SEND else None, ward_id=name) parsing_msg = parser.parse() #mqtt publish if parsing_msg is not None: #time check if last_publish_time is None: last_publish_time = datetime.now() mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1) else: current_time = datetime.now() time_diff = current_time - last_publish_time time_diff_seconds = time_diff.total_seconds() # n초에 한 번만 publish if time_diff_seconds < delay_after_publish: pass else: last_publish_time = datetime.now() mqtt_publisher.client.publish(MQTT_TOPIC, parsing_msg, qos=1) # 탐지 결과 라벨링 (원본 좌표 기준) if DISPLAY_TURN_ON: detector.hpe_labeling(frame, hpe_message) detector.od_labeling(frame, od_message) detector.border_labeling(frame, hpe_message, od_message) # 모니터 해상도에 맞춰 리사이즈 (라벨링 완료 후 화면 표시용) frame = cv2.resize(frame, MONITOR_RESOLUTION) # 화면에 카메라 이름과 시간 표시 (UI 효과) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 카메라 이름 (테두리 + 본문) cam_text = f"CH {current_cam_index+1}: {target_cam.name}" cv2.putText(frame, cam_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 (진하게) cv2.putText(frame, cam_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 # 시간 (테두리 + 본문) cv2.putText(frame, timestamp, (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 cv2.putText(frame, timestamp, (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 # FPS (테두리 + 본문) fps_text = f"FPS: {fps:.1f}" cv2.putText(frame, fps_text, (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 5, cv2.LINE_AA) # 검은색 테두리 cv2.putText(frame, fps_text, (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) # 초록색 본문 # FPS 통계 (AVG, MIN, MAX) stat_val_min = fps_min if fps_min != float('inf') else 0 stat_val_avg = fps_sum / (fps_count - warmup_frames) if fps_count > warmup_frames else 0 stats_text = f"AVG: {stat_val_avg:.1f} MIN: {stat_val_min:.1f} MAX: {fps_max:.1f}" cv2.putText(frame, stats_text, (20, 160), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 5, cv2.LINE_AA) cv2.putText(frame, stats_text, (20, 160), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA) # 노란색 cv2.imshow(window_name, frame) elif DISPLAY_TURN_ON: # 신호가 없을 때 보여줄 대기 화면 # MONITOR_RESOLUTION은 (width, height)이므로 numpy shape는 (height, width, 3)이어야 함 blank_screen = np.zeros((MONITOR_RESOLUTION[1], MONITOR_RESOLUTION[0], 3), dtype=np.uint8) # "NO SIGNAL" 텍스트 설정 text = f"NO SIGNAL ({target_cam.name})" font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.5 thickness = 3 # 텍스트 중앙 정렬 계산 text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] text_x = (blank_screen.shape[1] - text_size[0]) // 2 text_y = (blank_screen.shape[0] + text_size[1]) // 2 # 텍스트 그리기 (빨간색) cv2.putText(blank_screen, text, (text_x, text_y), font, font_scale, (0, 0, 255), thickness, cv2.LINE_AA) cv2.imshow(window_name, blank_screen) # 4. 키 입력 처리 (종료만 처리) if DISPLAY_TURN_ON: key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: # 종료 break # 종료 처리 print("시스템 종료 중...") for cam in cameras: cam.stop() cv2.destroyAllWindows() if __name__ == "__main__": main()