Files
HOSPITAL_CCTV/parsing_msg.py

151 lines
4.6 KiB
Python
Raw Normal View History

2026-02-25 15:05:58 +09:00
import cv2,json
from base64 import b64encode
from schema import *
class ParsingMsg():
IMAGE_FORMAT = 'jpg'
def __init__(self):
self.msg = None
self.img = None
self.ward_id = None
2026-02-25 15:05:58 +09:00
self.parsed_msg = None
def set(self, msg:list, img, ward_id):
2026-02-25 15:05:58 +09:00
self.msg = None
self.img = None
self.ward_id = None
2026-02-25 15:05:58 +09:00
self.msg = msg
self.img = img
self.ward_id = ward_id
2026-02-25 15:05:58 +09:00
self.parsed_msg = None
def get(self):
result = self.parsed_msg
return result
def image_encoding(self, img_data):
"""
이미지데이터(np.ndarray) 바이너리 데이터로 변환
:param img_data: 이미지 데이터
:return: base64 format
"""
_, JPEG = cv2.imencode(".jpg", img_data, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
# Base64 encode
b64 = b64encode(JPEG.tobytes())
2026-02-25 15:05:58 +09:00
return b64.decode("utf-8")
def image_cropping(self, img_data, bbox):
"""
이미지데이터(np.ndarray) bbox 기준으로 크롭
:param img_data: 이미지 데이터
:param bbox: [x1, y1, x2, y2]
:return: cropped image data
"""
x1, y1, x2, y2 = bbox
cropped_img = img_data[y1:y2, x1:x2]
return cropped_img
def parse_header(self):
"""
*camera_id, ward_id, frame_id 미구현
"""
msg = Header(
camera_id= "CAMERA_001",
ward_id= self.ward_id,
2026-02-25 15:05:58 +09:00
timestamp= datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
frame_id= 0
)
return msg
def parse_summary(self, activate_cnt, total_cnt):
"""
*system_health 미구현
"""
msg = Summary(
total_objects_count= total_cnt,
active_alerts_count= activate_cnt,
system_health= SyestemHealth.ok
)
return msg
def parse_object(self):
"""
*bbox,skeleton 제외 전부 미구현
"""
msg = []
activate_cnt = 0
total_cnt = 0
if self.msg:
for i in self.msg:
has_img = True if i.get('person') and self.img is not None else False
b64_img = self.image_encoding(self.image_cropping(self.img, i.get('person'))) if has_img else ''
if i['result']['pose_type'] > 0:
activate_cnt += 1
total_cnt += 1
sub_msg = DetectedObject(
tracking_id= i['result']['object_id'],
status= Status.stable if i['result']['pose_type'] == 0 else Status.fall_detected,
status_detail= None if i['result']['pose_type'] == 0 else StatusDetail.sudden_fall,
severity= None if i['result']['pose_type'] == 0 else Severity.critical,
location_zone= 'ZONE_001',
duration= 0.0,
bbox= i.get('person'),
skeleton= [[round(x, 0) for x in kpt] + [round(conf,2)] for kpt, conf in zip(i.get('keypoints'), i.get('kpt_conf'))],
metrics=Metrics(
velocity=0.0,
angle=0.0
),
visual_data=ObjectVisualData(
format=self.IMAGE_FORMAT,
has_image= has_img,
base64_str= b64_img
)
)
msg.append(sub_msg)
return msg, activate_cnt, total_cnt
def parse_virtual(self):
has_img = True if self.img is not None else False
b64_img = self.image_encoding(self.img) if has_img else ''
msg= VisualData(
format=self.IMAGE_FORMAT,
has_image= has_img,
base64_str= b64_img
)
return msg
def parse(self):
"""
* 위험note 여부는 수정 필요
"""
object_msg, activate_cnt, total_cnt = self.parse_object()
if activate_cnt == 0:
return None
header_msg = self.parse_header()
summary_msg = self.parse_summary(activate_cnt, total_cnt)
visual_msg = self.parse_virtual()
parsing_msg = FallDetectionSchema(
header= header_msg,
summary= summary_msg,
objects= object_msg,
visual_data= visual_msg
)
return parsing_msg.model_dump_json()