2026-02-25 15:05:58 +09:00
|
|
|
import cv2,json
|
|
|
|
|
from base64 import b64encode
|
|
|
|
|
|
|
|
|
|
from schema import *
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ParsingMsg():
|
|
|
|
|
|
|
|
|
|
IMAGE_FORMAT = 'jpg'
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.msg = None
|
|
|
|
|
self.img = None
|
2026-03-04 16:54:28 +09:00
|
|
|
self.ward_id = None
|
|
|
|
|
|
2026-02-25 15:05:58 +09:00
|
|
|
self.parsed_msg = None
|
|
|
|
|
|
2026-03-04 16:54:28 +09:00
|
|
|
def set(self, msg:list, img, ward_id):
|
2026-02-25 15:05:58 +09:00
|
|
|
self.msg = None
|
|
|
|
|
self.img = None
|
2026-03-04 16:54:28 +09:00
|
|
|
self.ward_id = None
|
2026-02-25 15:05:58 +09:00
|
|
|
|
|
|
|
|
self.msg = msg
|
|
|
|
|
self.img = img
|
2026-03-04 16:54:28 +09:00
|
|
|
self.ward_id = ward_id
|
2026-02-25 15:05:58 +09:00
|
|
|
self.parsed_msg = None
|
|
|
|
|
|
|
|
|
|
def get(self):
|
|
|
|
|
result = self.parsed_msg
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def image_encoding(self, img_data):
|
|
|
|
|
"""
|
|
|
|
|
이미지데이터(np.ndarray) 를 바이너리 데이터로 변환
|
|
|
|
|
:param img_data: 이미지 데이터
|
|
|
|
|
:return: base64 format
|
|
|
|
|
"""
|
|
|
|
|
_, JPEG = cv2.imencode(".jpg", img_data, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
|
|
|
|
|
# Base64 encode
|
2026-03-04 16:54:28 +09:00
|
|
|
b64 = b64encode(JPEG.tobytes())
|
2026-02-25 15:05:58 +09:00
|
|
|
|
|
|
|
|
return b64.decode("utf-8")
|
|
|
|
|
|
|
|
|
|
def image_cropping(self, img_data, bbox):
|
|
|
|
|
"""
|
|
|
|
|
이미지데이터(np.ndarray) 를 bbox 기준으로 크롭
|
|
|
|
|
:param img_data: 이미지 데이터
|
|
|
|
|
:param bbox: [x1, y1, x2, y2]
|
|
|
|
|
:return: cropped image data
|
|
|
|
|
"""
|
|
|
|
|
x1, y1, x2, y2 = bbox
|
|
|
|
|
cropped_img = img_data[y1:y2, x1:x2]
|
|
|
|
|
return cropped_img
|
|
|
|
|
|
|
|
|
|
def parse_header(self):
|
|
|
|
|
"""
|
|
|
|
|
*camera_id, ward_id, frame_id 미구현
|
|
|
|
|
"""
|
|
|
|
|
msg = Header(
|
|
|
|
|
camera_id= "CAMERA_001",
|
2026-03-04 16:54:28 +09:00
|
|
|
ward_id= self.ward_id,
|
2026-02-25 15:05:58 +09:00
|
|
|
timestamp= datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
|
frame_id= 0
|
|
|
|
|
)
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
def parse_summary(self, activate_cnt, total_cnt):
|
|
|
|
|
"""
|
|
|
|
|
*system_health 미구현
|
|
|
|
|
"""
|
|
|
|
|
msg = Summary(
|
|
|
|
|
total_objects_count= total_cnt,
|
|
|
|
|
active_alerts_count= activate_cnt,
|
|
|
|
|
system_health= SyestemHealth.ok
|
|
|
|
|
)
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
def parse_object(self):
|
|
|
|
|
"""
|
|
|
|
|
*bbox,skeleton 제외 전부 미구현
|
|
|
|
|
"""
|
|
|
|
|
msg = []
|
|
|
|
|
activate_cnt = 0
|
|
|
|
|
total_cnt = 0
|
|
|
|
|
|
|
|
|
|
if self.msg:
|
|
|
|
|
|
|
|
|
|
for i in self.msg:
|
|
|
|
|
|
|
|
|
|
has_img = True if i.get('person') and self.img is not None else False
|
|
|
|
|
b64_img = self.image_encoding(self.image_cropping(self.img, i.get('person'))) if has_img else ''
|
|
|
|
|
if i['result']['pose_type'] > 0:
|
|
|
|
|
activate_cnt += 1
|
|
|
|
|
total_cnt += 1
|
|
|
|
|
|
|
|
|
|
sub_msg = DetectedObject(
|
|
|
|
|
tracking_id= i['result']['object_id'],
|
|
|
|
|
status= Status.stable if i['result']['pose_type'] == 0 else Status.fall_detected,
|
|
|
|
|
status_detail= None if i['result']['pose_type'] == 0 else StatusDetail.sudden_fall,
|
|
|
|
|
severity= None if i['result']['pose_type'] == 0 else Severity.critical,
|
|
|
|
|
location_zone= 'ZONE_001',
|
|
|
|
|
duration= 0.0,
|
|
|
|
|
bbox= i.get('person'),
|
|
|
|
|
skeleton= [[round(x, 0) for x in kpt] + [round(conf,2)] for kpt, conf in zip(i.get('keypoints'), i.get('kpt_conf'))],
|
|
|
|
|
metrics=Metrics(
|
|
|
|
|
velocity=0.0,
|
|
|
|
|
angle=0.0
|
|
|
|
|
),
|
|
|
|
|
visual_data=ObjectVisualData(
|
|
|
|
|
format=self.IMAGE_FORMAT,
|
|
|
|
|
has_image= has_img,
|
|
|
|
|
base64_str= b64_img
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
msg.append(sub_msg)
|
|
|
|
|
return msg, activate_cnt, total_cnt
|
|
|
|
|
|
|
|
|
|
def parse_virtual(self):
|
|
|
|
|
has_img = True if self.img is not None else False
|
|
|
|
|
b64_img = self.image_encoding(self.img) if has_img else ''
|
|
|
|
|
|
|
|
|
|
msg= VisualData(
|
|
|
|
|
format=self.IMAGE_FORMAT,
|
|
|
|
|
has_image= has_img,
|
|
|
|
|
base64_str= b64_img
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
def parse(self):
|
|
|
|
|
"""
|
|
|
|
|
* 위험note 여부는 수정 필요
|
|
|
|
|
"""
|
|
|
|
|
object_msg, activate_cnt, total_cnt = self.parse_object()
|
|
|
|
|
|
|
|
|
|
if activate_cnt == 0:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
header_msg = self.parse_header()
|
|
|
|
|
summary_msg = self.parse_summary(activate_cnt, total_cnt)
|
|
|
|
|
visual_msg = self.parse_virtual()
|
|
|
|
|
|
|
|
|
|
parsing_msg = FallDetectionSchema(
|
|
|
|
|
header= header_msg,
|
|
|
|
|
summary= summary_msg,
|
|
|
|
|
objects= object_msg,
|
|
|
|
|
visual_data= visual_msg
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return parsing_msg.model_dump_json()
|
|
|
|
|
|