import os import cv2 import random import numpy as np import imghdr import project_config import demo_const as AI_CONST from predict import ObjectDetect, PoseDetect from load_models import model_manager from ultralytics.data.loaders import LOGGER from ultralytics.data.loaders import LoadImagesAndVideos from utils import LoadStreamsDaool,CustomVideoCapture,CLASS_INFORMATION,CLASS_SWAP_INFO,get_monitorsize,img_resize LOGGER.setLevel("ERROR") MONITOR_RESOLUTION = get_monitorsize() class DemoDetect: POSETYPE_NORMAL = int(0x0000) POSETYPE_FALL = int(0x0080) POSETYPE_CROSS = int(0x0100) """ 시연용 """ def __init__(self): self.image_data = None self.crop_img = False self.model = model_manager.get_od() self.object_detect = ObjectDetect() self.object_detect.set_model(self.model) # helmet if project_config.USE_HELMET_MODEL: self.helmet_model = model_manager.get_helmet() self.helmet_detect = ObjectDetect() self.helmet_detect.set_model(self.helmet_model) # HPE self.pose_model = model_manager.get_hpe() self.pose_predict = PoseDetect() self.pose_predict.set_model(self.pose_model) self.hpe_frame_count = 0 self.color_table = AI_CONST.color_table self.source =AI_CONST.SOURCE self.save = False self.ext = None self.video_capture = None def set(self, args): self.source = args.source self.save = args.save self.clahe = args.clahe def run(self): if os.path.exists(self.source): dataset = LoadImagesAndVideos(path=self.source) else: if self.save: raise Exception("스트림영상은 저장 불가") dataset = LoadStreamsDaool(sources=self.source) if self.save: if imghdr.what(self.source): #img self.ext = ".jpg" else: #video self.ext = ".mp4" os.makedirs(AI_CONST.SAVE_PATH, exist_ok=True) for _, image, *_ in dataset: image = image[0] #image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) ##영상이 좌로 90도 회전되어 들어오는 경우가 있어 임시로 추가함 image = self.image_calibration(image) if self.ext == ".mp4" and not self.video_capture: self.video_capture = CustomVideoCapture() self.video_capture.set_frame_size(image=image) # _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}") _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{'.avi'}") if self.clahe: _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect_clahe{'.avi'}") self.video_capture.set_video_writer(path=_video_path) print(_video_path) # hpe person detect hpe_message = self.inference_hpe(image, self.crop_img) if project_config.USE_HPE_PERSON else [] if project_config.USE_HPE_FRAME_CHECK: hpe_message = self.hpe_frame_check(hpe_message) # helmet detect _helmet_message = self.inference_helmet(image, self.crop_img) if project_config.USE_HELMET_MODEL else [] # object detect od_message_raw = self.inference_od(image, self.crop_img,class_name_view=True) # od_message_raw = [] # od filtering od_message = self.od_filtering(image,od_message_raw,_helmet_message,hpe_message,add_siginal_man=False) self.hpe_labeling(image,hpe_message) # od_message = [] # NOTE(jwkim) od 라벨링 막음 다시 그리고싶다면 주석 self.od_labeling(image,od_message) self.border_labeling(image,hpe_message,od_message) if self.save: if imghdr.what(self.source): #img _video_path = os.path.join(AI_CONST.SAVE_PATH,f"{os.path.splitext(os.path.split(self.source)[-1])[0]}_detect{self.ext}") cv2.imwrite(_video_path,image) else: #video self.video_capture.write_video(image) else: cv2.namedWindow("image", cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty("image", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) if MONITOR_RESOLUTION != AI_CONST.FHD_RESOLUTION: image = img_resize(image,MONITOR_RESOLUTION) cv2.imshow("image", image) # Hit 'q' on the keyboard to quit! if cv2.waitKey(1) & 0xFF == ord('q'): break def inference_hpe(self, image, crop_image): """ inference 완료 후 off class 필터링 :param frame_count:현재 프레임 수 :param image: 이미지 데이터 :param crop_image: crop_image 유무 :return: 탐지 결과 """ # 초기화 message = [] self.pose_predict.set_image(image) message = self.pose_predict.predict(working=True,crop_image=crop_image) return message def inference_helmet(self, image, crop_image): """ inference 완료후 탐지된 리스트 반환 :param image: 이미지 데이터 :param crop_image: crop_image 유무 :return: 탐지 결과 """ # 초기화 message = [] self.helmet_detect.set_image(image) raw_message = self.helmet_detect.predict(crop_image=crop_image, class_name=True) for i in raw_message: #ON if i["class_name"] == 'head' or i["class_name"] == 'safety_helmet_off': i["class_id"] = CLASS_SWAP_INFO['safety_helmet_off'] i["class_name"] = CLASS_INFORMATION[i["class_id"]] message.append(i) #OFF elif i["class_name"] == 'helmet' or i["class_name"] == 'safety_helmet_on': i["class_id"] = CLASS_SWAP_INFO['safety_helmet_on'] i["class_name"] = CLASS_INFORMATION[i["class_id"]] message.append(i) else: continue return message def inference_od(self, image, crop_image, class_name_view=False): """ inference 완료후 탐지된 리스트 반환 :param image: 이미지 데이터 :param crop_image: crop_image 유무 :return: 탐지 결과 """ # 초기화 message = [] self.object_detect.set_image(image) message = self.object_detect.predict(crop_image=crop_image, class_name=class_name_view) return message def od_filtering(self,image,od_raw_message,helmet_message,hpe_message,add_siginal_man=True): """ 여러 모델 inference 결과를 조합하여 od 결과 추출 :param image: 원본 이미지 :param od_raw_message: od inference 결과 :param helmet_message: helmet inference 결과 :param hpe_message: hpe inference 결과 :param add_siginal_man: 신호수 추가 유무 :return: 필터링된 od inference 결과 """ ppe_filtered_message = [] #od helmet 제거 , helmet 추가 helmet_changed_message = self.update_helmet(helmet_message, od_raw_message) #od person 제거, hpe_person 추가 person_changed_message = self.update_person(hpe_message, helmet_changed_message) # 필터링 작업 (현재 box 겹침만 사용) ppe_filtered_message = self.od_ppe_class_filter(person_changed_message,hpe_message) # signalman 추가 if add_siginal_man: signal_man = self.signal_man_message(image, ppe_filtered_message) if signal_man: ppe_filtered_message.append(signal_man) return ppe_filtered_message def update_helmet(self, helmet_message, od_message): """ helmet message 파싱 후 , od message 에서 helmet 제거 그후 합친 결과 return :param helmet_message: helmet detect result :param od_message: od detect result :return: result """ _result = [] if not helmet_message: return od_message elif project_config.USE_HELMET_MODEL is False: return od_message else: #parsing helmet msg _result = _result + helmet_message #remove od helmet for _od in od_message: if _od['class_id'] not in AI_CONST.HELMET_CID: _result.append(_od) return _result def update_person(self, hpe_message, od_message): """ hpe message 파싱 후 , od message 에서 person 제거 그후 합친 결과 return :param hpe_message: _description_ :param od_message: _description_ :return: _description_ """ _result = [] if not hpe_message: return od_message elif project_config.USE_HPE_PERSON is False: return od_message else: #parsing hpe msg for _hpe in hpe_message: _person_info = {k: v for k, v in _hpe['result'].items() if k not in ('pose_type', 'pose_level')} _result.append(_person_info) #remove od person for _od in od_message: if _od['class_id'] != 0 : _result.append(_od) return _result def od_ppe_class_filter(self, od_message, kpt_message): """ PPE_CLASS_LIST 로 정의된 클래스가 사용할수 있는지 없는지 판단하여 결과물 return - PPE_CLASS_LIST 중 helmet, rubber_insulated_sleeve, suit_top, suit_bottom class 경우 bbox가 keypoint에 포함되면 _result에 추가, 포함되지 않는다면 person class와 겹치는지 확인후 겹치면 _result 에 추가 - PPE_CLASS_LIST 나머지 class 는 person class와 겹치면 _result 에 추가 - PPE_CLASS_LIST 가 아닐경우 _result에 추가 :param od_message: od list :param kpt_message: keypoint list :return: list """ _result = [] _person = [] _ppe_cls = [] # split classes for i in od_message: if i['class_id'] == 0: _person.append(i) _result.append(i) elif i['class_id'] in AI_CONST.PPE_CLASS_LIST: _ppe_cls.append(i) else: _result.append(i) # filtering ppe classes for ppe in _ppe_cls: for kpt in kpt_message: if AI_CONST.HELMET_CID and ppe['class_id'] in AI_CONST.HELMET_CID: #HELMET kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.HELMET_KPT,type=1) if kpt_include: _result.append(ppe) break else: intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: _result.append(ppe) break elif AI_CONST.RUBBER_INSULATED_SLEEVE_CID and ppe['class_id'] in AI_CONST.RUBBER_INSULATED_SLEEVE_CID: #RUBBER_INSULATED_SLEEVE kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.RUBBER_INSULATED_SLEEVE_KPT,type=0) if kpt_include: _result.append(ppe) break else: intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: _result.append(ppe) break elif AI_CONST.SUIT_TOP_CID and ppe['class_id'] in AI_CONST.SUIT_TOP_CID: #SUIT_TOP kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_TOP_KPT,type=1) if kpt_include: _result.append(ppe) break else: intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: _result.append(ppe) break elif AI_CONST.SUIT_BOTTOM_CID and ppe['class_id'] in AI_CONST.SUIT_BOTTOM_CID: #SUIT_BOTTOM kpt_include = self.check_keypoint_include(ppe_bbox=ppe['bbox'],kpt_list=kpt['keypoints'],kpt_index_list=AI_CONST.SUIT_BOTTOM_KPT,type=1) if kpt_include: _result.append(ppe) break else: intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: _result.append(ppe) break else: intersect_area = self.check_union_area(person=kpt['person'],object=ppe['bbox']) if intersect_area >= AI_CONST.PPE_UNION_MIN_PERCENT: _result.append(ppe) break return _result def check_union_area(self, person, object): """ 두개의 bbox 가 겹치는지 확인후 겹친다면 두개의 bbox 겹치는 영역 return 아닐경우 false return :param person: bbox 좌표 x1,y1,x2,y2 :param object: bbox 좌표 x1,y1,x2,y2 """ person_left, person_top, person_right, person_bot = ( int(person[0]), int(person[1]), int(person[2]), int(person[3]), ) obj_left, obj_top, obj_right, obj_bot = ( int(object[0]), int(object[1]), int(object[2]), int(object[3]), ) ## case1 오른쪽으로 벗어나 있는 경우 if person_right < obj_left: return 0 ## case2 왼쪽으로 벗어나 있는 경우 if person_left > obj_right: return 0 ## case3 위쪽으로 벗어나 있는 경우 if person_bot < obj_top: return 0 ## case4 아래쪽으로 벗어나 있는 경우 if person_top > obj_bot: return 0 # 교집합 영역 찾기 modified_left, modified_top, modified_right, modified_bot = obj_left, obj_top, obj_right, obj_bot # x좌표 기준으로 이동 if modified_left < person_left : # object 왼쪽 겹침 modified_left = person_left elif modified_right > person_right : #object 오른쪽 겹침 modified_right = person_right # y좌표 기준으로 이동 if modified_top < person_top : # object 위쪽 겹침 modified_top = person_top elif modified_bot > person_bot : #object 아래쪽 겹침 modified_bot = person_bot width = modified_right - modified_left height = modified_bot - modified_top if width * height > 0: return (width * height)/((obj_right-obj_left)*(obj_bot-obj_top)) else: return 0 def plot_one_box(self, x, img, color=None, text_color=AI_CONST.TEXT_COLOR_WHITE, label=None, line_thickness=3): # Plots one bounding box on image img tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness color = color or [random.randint(0, 255) for _ in range(3)] c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) if label: tf = max(tl - 1, 1) # font thickness t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filled cv2.putText( img, label, (c1[0], c1[1] - 2), 0, tl / 3, text_color, thickness=tf, lineType=cv2.LINE_AA, ) def plot_skeleton_kpts(self, im, kpts, steps, orig_shape=None): # Plot the skeleton and keypointsfor coco datatset palette = np.array( [ [255, 128, 0], [255, 153, 51], [255, 178, 102], [230, 230, 0], [255, 153, 255], [153, 204, 255], [255, 102, 255], [255, 51, 255], [102, 178, 255], [51, 153, 255], [255, 153, 153], [255, 102, 102], [255, 51, 51], [153, 255, 153], [102, 255, 102], [51, 255, 51], [0, 255, 0], [0, 0, 255], [255, 0, 0], [255, 255, 255], ] ) skeleton = [ [16, 14], [14, 12], [17, 15], [15, 13], [12, 13], [6, 12], [7, 13], [6, 7], [6, 8], [7, 9], [8, 10], [9, 11], [2, 3], [1, 2], [1, 3], [2, 4], [3, 5], [4, 6], [5, 7], ] pose_limb_color = palette[ [9, 9, 9, 9, 7, 7, 7, 0, 0, 0, 0, 0, 16, 16, 16, 16, 16, 16, 16] ] pose_kpt_color = palette[[16, 16, 16, 16, 16, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 9, 9]] radius = 4 num_kpts = len(kpts) // steps for kid in range(num_kpts): r, g, b = pose_kpt_color[kid] x_coord, y_coord = kpts[steps * kid], kpts[steps * kid + 1] if not (x_coord % 640 == 0 or y_coord % 640 == 0): if steps == 3: conf = kpts[steps * kid + 2] if conf < AI_CONST.KPT_MIN_CONFIDENCE: continue cv2.circle( im, (int(x_coord), int(y_coord)), radius, (int(r), int(g), int(b)), -1 * AI_CONST.HPE_THICKNESS_RAITO, ) for sk_id, sk in enumerate(skeleton): r, g, b = pose_limb_color[sk_id] pos1 = (int(kpts[(sk[0] - 1) * steps]), int(kpts[(sk[0] - 1) * steps + 1])) pos2 = (int(kpts[(sk[1] - 1) * steps]), int(kpts[(sk[1] - 1) * steps + 1])) if steps == 3: conf1 = kpts[(sk[0] - 1) * steps + 2] conf2 = kpts[(sk[1] - 1) * steps + 2] if conf1 < AI_CONST.KPT_MIN_CONFIDENCE or conf2 < AI_CONST.KPT_MIN_CONFIDENCE: continue if pos1[0] % 640 == 0 or pos1[1] % 640 == 0 or pos1[0] < 0 or pos1[1] < 0: continue if pos2[0] % 640 == 0 or pos2[1] % 640 == 0 or pos2[0] < 0 or pos2[1] < 0: continue cv2.line(im, pos1, pos2, (int(r), int(g), int(b)), thickness=2 * AI_CONST.HPE_THICKNESS_RAITO) def hpe_labeling(self,image,hpe_data): for hpe in hpe_data: _kpt=[] for kpt, conf in zip(hpe["keypoints"], hpe["kpt_conf"]): if kpt == None: _kpt.append(0) _kpt.append(0) else: _kpt.append(kpt[0]) _kpt.append(kpt[1]) _kpt.append(conf) label_kpt = np.array(_kpt) self.plot_skeleton_kpts(im=image, kpts=np.array(label_kpt), steps=3) def od_labeling(self,image,od_data): for od in od_data: if not project_config.SHOW_GLOVES: if od['class_id'] in AI_CONST.UNVISIBLE_CLS: continue _label_color = [] _text_color = [] if od["class_name"] not in list(self.color_table.keys()): _label_color = AI_CONST.TEXT_COLOR_WHITE _text_color = AI_CONST.TEXT_COLOR_BLACK elif od["class_name"] in AI_CONST.text_color_white_list: _label_color = AI_CONST.TEXT_COLOR_WHITE _text_color = AI_CONST.TEXT_COLOR_BLACK else: _label_color = self.color_table[od["class_name"]].label_color _text_color = self.color_table[od["class_name"]].font_color self.plot_one_box( od["bbox"], image, label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}", color=_label_color, text_color=_text_color, line_thickness=AI_CONST.NORMAL_THICKNESS, ) # # NOTE(gyong min) person만 bbox 표시 # if od["class_name"] == 'person': # self.plot_one_box( # od["bbox"], # image, # label=f"{od['class_name']} {od['confidence']}" if project_config.VIEW_CONF_SCORE else f"{od['class_name']}", # color=_label_color, # text_color=_text_color, # line_thickness=AI_CONST.NORMAL_THICKNESS, # ) def border_labeling(self,image,hpe_data,od_data): _current_pose_type = 0 _border_color = [] _off_helmet = False _off_glove = False #hpe warning for hpe in hpe_data: _current_pose_type = max(_current_pose_type, int(hpe['result']['pose_type'])) if _current_pose_type >= self.POSETYPE_CROSS: #cross _border_color = AI_CONST.POSE_CROSS_COLOR elif _current_pose_type < self.POSETYPE_CROSS and _current_pose_type >= self.POSETYPE_FALL: #fall _border_color = AI_CONST.POSE_FALL_COLOR #NOTE(SGM)빨간 경고테두리 그리기 if _current_pose_type>0: cv2.rectangle( image, pt1=( AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF, AI_CONST.BORDER_THICKNESS + AI_CONST.BORDER_THICKNESS_HALF, ), pt2=( image.shape[1] - AI_CONST.BORDER_THICKNESS - AI_CONST.BORDER_THICKNESS_HALF, image.shape[0] - AI_CONST.BORDER_THICKNESS - AI_CONST.BORDER_THICKNESS_HALF, ), color=_border_color, thickness=AI_CONST.BORDER_THICKNESS, ) # cv2.putText( # image, # AI_CONST.BORDER_HPE_TEXT, # AI_CONST.TEXT_HPE_STARTING_POINT, # AI_CONST.TEXT_THICKNESS, # AI_CONST.TEXT_SIZE, # [0, 0, 0], # thickness=1, # lineType=cv2.LINE_AA, # ) # od warning for od in od_data: if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[0]: _off_helmet = True if od['class_id'] == AI_CONST.OFF_TRIGGER_CLASS_LIST[1]: _off_glove = True if _off_glove and _off_helmet : cv2.rectangle( image, pt1=(AI_CONST.BORDER_THICKNESS_HALF, AI_CONST.BORDER_THICKNESS_HALF), pt2=( image.shape[1] - AI_CONST.BORDER_THICKNESS_HALF, image.shape[0] - AI_CONST.BORDER_THICKNESS_HALF, ), color=AI_CONST.WD_BORDER_COLOR, thickness=AI_CONST.BORDER_THICKNESS) cv2.putText( image, AI_CONST.BORDER_OD_TEXT, AI_CONST.TEXT_OD_STARTING_POINT, AI_CONST.TEXT_THICKNESS, AI_CONST.TEXT_SIZE, [0, 0, 0], thickness=1, lineType=cv2.LINE_AA, ) def check_keypoint_include(self, ppe_bbox, kpt_list, kpt_index_list, type): """ bbox에 keypoint가 포함되어 있는지 확인 :param ppe_bbox: ppe xyxy 좌표 :param kpt_list: keypoint 정보 :param kpt_index_list: keypoint 인덱스 정보 :param type: 0: null값을 제외한 keypoint가 최소 하나라도 있으면 인정, 1: null값을 제외한 keypoint가 전부 있어야 인정 :return: boolean """ include = True left, right = ppe_bbox[0], ppe_bbox[2] top, bottom = ppe_bbox[1], ppe_bbox[3] if type == 0: # 최소 하나 include = False for kpt_idx in kpt_index_list: kpt = kpt_list[kpt_idx] if kpt != None: if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom): include = True break else: # 전부 _null_count = 0 for kpt_idx in kpt_index_list: kpt = kpt_list[kpt_idx] if kpt != None: if (kpt[0] >= left and kpt[0] <= right) and (kpt[1] >= top and kpt[1] <= bottom): pass else: include = False break else: _null_count += 1 if _null_count == len(kpt_index_list): include = False return include def hpe_frame_check(self, hpe_message, threshold=int(0x0080), maxcount=AI_CONST.HPE_FRAME_CHECK_MAX_COUNT): _alert = False if hpe_message: for i in hpe_message: if i['result']['pose_type']>= threshold: self.hpe_frame_count += 1 _alert = True break if _alert: if self.hpe_frame_count == maxcount: self.hpe_frame_count = 0 return hpe_message else: self.hpe_frame_count = 0 for j in hpe_message: j['result']['pose_type'] = 0 j['result']['pose_level'] = 0 else: self.hpe_frame_count = 0 return hpe_message def image_calibration(self, image): """ 이미지 역광 보정 """ if self.clahe: # 이미지를 LAB 컬러 공간으로 변환 (L 채널에 CLAHE 적용) lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) # CLAHE 객체 생성 및 적용 # clipLimit: 대비 제한 임계값, tileGridSize: 타일의 크기 clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) cl = clahe.apply(l) # 보정된 L 채널과 기존 a, b 채널을 합쳐 다시 LAB 이미지 생성 limg = cv2.merge((cl, a, b)) # LAB 이미지를 다시 BGR 컬러 공간으로 변환 corrected_img = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR) return corrected_img else: return image def label_color(model,table): label_colored=[] text_colored=[] model_key_list=list(model.model.names.values()) fixed_color=list(table.values()) fixed_class=list(table.keys()) for cls in model_key_list: if cls in fixed_class: label_colored.append(table[cls]) else: while True: _color = [random.randint(0, 255) for _ in range(3)] if _color not in fixed_color and _color not in label_colored: break label_colored.append(_color) if cls in AI_CONST.text_color_white_list: text_colored.append(AI_CONST.TEXT_COLOR_WHITE) else: text_colored.append(AI_CONST.TEXT_COLOR_BLACK) return label_colored,text_colored if __name__ == "__main__": pass