import cv2,json from base64 import b64encode from schema import * class ParsingMsg(): IMAGE_FORMAT = 'jpg' def __init__(self): self.msg = None self.img = None self.ward_id = None self.parsed_msg = None def set(self, msg:list, img, ward_id): self.msg = None self.img = None self.ward_id = None self.msg = msg self.img = img self.ward_id = ward_id self.parsed_msg = None def get(self): result = self.parsed_msg return result def image_encoding(self, img_data): """ 이미지데이터(np.ndarray) 를 바이너리 데이터로 변환 :param img_data: 이미지 데이터 :return: base64 format """ _, JPEG = cv2.imencode(".jpg", img_data, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) # Base64 encode b64 = b64encode(JPEG.tobytes()) return b64.decode("utf-8") def image_cropping(self, img_data, bbox): """ 이미지데이터(np.ndarray) 를 bbox 기준으로 크롭 :param img_data: 이미지 데이터 :param bbox: [x1, y1, x2, y2] :return: cropped image data """ x1, y1, x2, y2 = bbox cropped_img = img_data[y1:y2, x1:x2] return cropped_img def parse_header(self): """ *camera_id, ward_id, frame_id 미구현 """ msg = Header( camera_id= "CAMERA_001", ward_id= self.ward_id, timestamp= datetime.now().strftime("%Y-%m-%d %H:%M:%S"), frame_id= 0 ) return msg def parse_summary(self, activate_cnt, total_cnt): """ *system_health 미구현 """ msg = Summary( total_objects_count= total_cnt, active_alerts_count= activate_cnt, system_health= SyestemHealth.ok ) return msg def parse_object(self): """ *bbox,skeleton 제외 전부 미구현 """ msg = [] activate_cnt = 0 total_cnt = 0 if self.msg: for i in self.msg: has_img = True if i.get('person') and self.img is not None else False b64_img = self.image_encoding(self.image_cropping(self.img, i.get('person'))) if has_img else '' if i['result']['pose_type'] > 0: activate_cnt += 1 total_cnt += 1 sub_msg = DetectedObject( tracking_id= i['result']['object_id'], status= Status.stable if i['result']['pose_type'] == 0 else Status.fall_detected, status_detail= None if i['result']['pose_type'] == 0 else StatusDetail.sudden_fall, severity= None if i['result']['pose_type'] == 0 else Severity.critical, location_zone= 'ZONE_001', duration= 0.0, bbox= i.get('person'), skeleton= [[round(x, 0) for x in kpt] + [round(conf,2)] for kpt, conf in zip(i.get('keypoints'), i.get('kpt_conf'))], metrics=Metrics( velocity=0.0, angle=0.0 ), visual_data=ObjectVisualData( format=self.IMAGE_FORMAT, has_image= has_img, base64_str= b64_img ) ) msg.append(sub_msg) return msg, activate_cnt, total_cnt def parse_virtual(self): has_img = True if self.img is not None else False b64_img = self.image_encoding(self.img) if has_img else '' msg= VisualData( format=self.IMAGE_FORMAT, has_image= has_img, base64_str= b64_img ) return msg def parse(self): """ * 위험note 여부는 수정 필요 """ object_msg, activate_cnt, total_cnt = self.parse_object() if activate_cnt == 0: return None header_msg = self.parse_header() summary_msg = self.parse_summary(activate_cnt, total_cnt) visual_msg = self.parse_virtual() parsing_msg = FallDetectionSchema( header= header_msg, summary= summary_msg, objects= object_msg, visual_data= visual_msg ) return parsing_msg.model_dump_json()