non use ai
BIN
AI_ENGINE/DATA/4NI_0400.MOV_20230104_101349.426.png
Normal file
|
After Width: | Height: | Size: 2.1 MiB |
BIN
AI_ENGINE/DATA/CON.mp4
Normal file
BIN
AI_ENGINE/DATA/FR.mov
Normal file
BIN
AI_ENGINE/DATA/PPE.mp4
Normal file
BIN
AI_ENGINE/DATA/WD_2.mp4
Normal file
BIN
AI_ENGINE/DATA/agics.jpg
Normal file
|
After Width: | Height: | Size: 7.2 KiB |
BIN
AI_ENGINE/DATA/facerec_worker1.png
Normal file
|
After Width: | Height: | Size: 12 KiB |
BIN
AI_ENGINE/DATA/facerec_worker2.png
Normal file
|
After Width: | Height: | Size: 25 KiB |
BIN
AI_ENGINE/DATA/facerec_worker3.png
Normal file
|
After Width: | Height: | Size: 22 KiB |
BIN
AI_ENGINE/DATA/ftp_data/bi.jpg
Normal file
|
After Width: | Height: | Size: 65 KiB |
BIN
AI_ENGINE/DATA/ftp_data/con_setup.jpg
Normal file
|
After Width: | Height: | Size: 76 KiB |
BIN
AI_ENGINE/DATA/ftp_data/fr.jpg
Normal file
|
After Width: | Height: | Size: 74 KiB |
BIN
AI_ENGINE/DATA/ftp_data/local.jpg
Normal file
|
After Width: | Height: | Size: 72 KiB |
BIN
AI_ENGINE/DATA/ftp_data/ppe.jpg
Normal file
|
After Width: | Height: | Size: 77 KiB |
BIN
AI_ENGINE/DATA/ftp_data/wd.jpg
Normal file
|
After Width: | Height: | Size: 77 KiB |
BIN
AI_ENGINE/DATA/jangys.jpg
Normal file
|
After Width: | Height: | Size: 81 KiB |
BIN
AI_ENGINE/DATA/jangys_re.jpg
Normal file
|
After Width: | Height: | Size: 39 KiB |
BIN
AI_ENGINE/DATA/kepco1.jpg
Normal file
|
After Width: | Height: | Size: 2.0 KiB |
BIN
AI_ENGINE/DATA/kepco1_1.jpg
Normal file
|
After Width: | Height: | Size: 5.7 KiB |
BIN
AI_ENGINE/DATA/kepco1_2.jpg
Normal file
|
After Width: | Height: | Size: 4.9 KiB |
BIN
AI_ENGINE/DATA/kepco2.jpg
Normal file
|
After Width: | Height: | Size: 2.1 KiB |
BIN
AI_ENGINE/DATA/kepco2_1.jpg
Normal file
|
After Width: | Height: | Size: 3.8 KiB |
BIN
AI_ENGINE/DATA/kepco2_10.jpg
Executable file
|
After Width: | Height: | Size: 14 KiB |
BIN
AI_ENGINE/DATA/kepco2_11.jpg
Executable file
|
After Width: | Height: | Size: 20 KiB |
BIN
AI_ENGINE/DATA/kepco2_12.jpg
Executable file
|
After Width: | Height: | Size: 74 KiB |
BIN
AI_ENGINE/DATA/kepco2_13.jpg
Executable file
|
After Width: | Height: | Size: 67 KiB |
BIN
AI_ENGINE/DATA/kepco2_2.jpg
Normal file
|
After Width: | Height: | Size: 3.9 KiB |
BIN
AI_ENGINE/DATA/kepco2_3.jpg
Normal file
|
After Width: | Height: | Size: 4.5 KiB |
BIN
AI_ENGINE/DATA/kepco2_4.jpg
Executable file
|
After Width: | Height: | Size: 15 KiB |
BIN
AI_ENGINE/DATA/kepco2_5.jpg
Executable file
|
After Width: | Height: | Size: 14 KiB |
BIN
AI_ENGINE/DATA/kepco2_6.jpg
Executable file
|
After Width: | Height: | Size: 14 KiB |
BIN
AI_ENGINE/DATA/kepco2_7.jpg
Executable file
|
After Width: | Height: | Size: 15 KiB |
BIN
AI_ENGINE/DATA/kepco2_8.jpg
Executable file
|
After Width: | Height: | Size: 10 KiB |
BIN
AI_ENGINE/DATA/kepco2_9.jpg
Executable file
|
After Width: | Height: | Size: 9.0 KiB |
BIN
AI_ENGINE/DATA/kimjw.jpg
Normal file
|
After Width: | Height: | Size: 282 KiB |
BIN
AI_ENGINE/DATA/kimjw_re.jpg
Normal file
|
After Width: | Height: | Size: 45 KiB |
BIN
AI_ENGINE/DATA/ksy_re.jpg
Normal file
|
After Width: | Height: | Size: 159 KiB |
BIN
AI_ENGINE/DATA/whangsj.jpg
Normal file
|
After Width: | Height: | Size: 16 KiB |
BIN
AI_ENGINE/DATA/yunikim.jpg
Executable file
|
After Width: | Height: | Size: 88 KiB |
52
AI_ENGINE/demo_utils.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import threading
|
||||
import ai_engine_const as AI_CONST
|
||||
import cv2
|
||||
import os
|
||||
import paramiko
|
||||
|
||||
#demo
|
||||
global DEMO_WD_BI_CONST
|
||||
DEMO_WD_BI_CONST = 0
|
||||
|
||||
demo_lock = threading.Lock()
|
||||
def demo_wd_bi(i):
|
||||
global DEMO_WD_BI_CONST
|
||||
demo_lock.acquire()
|
||||
|
||||
DEMO_WD_BI_CONST = i
|
||||
|
||||
demo_lock.release()
|
||||
print(DEMO_WD_BI_CONST)
|
||||
|
||||
def bi_snap_shot():
|
||||
rtsp = AI_CONST.RTSP
|
||||
local_path = AI_CONST.FTP_BI_RESULT
|
||||
|
||||
if os.path.exists(local_path):
|
||||
os.remove(local_path)
|
||||
|
||||
input_movie = cv2.VideoCapture(rtsp)
|
||||
|
||||
ret, frame = input_movie.read()
|
||||
print(local_path)
|
||||
cv2.imwrite(local_path,frame)
|
||||
_bi_sftp_upload()
|
||||
cv2.destroyAllWindows()
|
||||
print(f"bi uploaded")
|
||||
|
||||
def _bi_sftp_upload():
|
||||
try:
|
||||
transprot = paramiko.Transport((AI_CONST.FTP_IP,AI_CONST.FTP_PORT))
|
||||
transprot.connect(username = AI_CONST.FTP_ID, password = AI_CONST.FTP_PW)
|
||||
sftp = paramiko.SFTPClient.from_transport(transprot)
|
||||
|
||||
remotepath = AI_CONST.FTP_LOCATION + os.sep + AI_CONST.FTP_BI_FILE_NAME + '.jpg'
|
||||
|
||||
#sftp.put(AI_CONST.FTP_BI_RESULT, remotepath)
|
||||
|
||||
sftp.close()
|
||||
transprot.close()
|
||||
|
||||
return remotepath
|
||||
except Exception as e:
|
||||
return ""
|
||||
693
AI_ENGINE/instance_queue.py
Normal file
@@ -0,0 +1,693 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@file : instance_queue.py
|
||||
@author: jwkim
|
||||
@license: A2TEC & DAOOLDNS
|
||||
|
||||
@section Modify History
|
||||
- 2023-01-22 오전 11:31 jwkim base
|
||||
|
||||
"""
|
||||
|
||||
import os, sys
|
||||
import time
|
||||
import threading
|
||||
from queue import Queue
|
||||
|
||||
DL_BASE_PATH = os.path.abspath(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
|
||||
sys.path.append(DL_BASE_PATH)
|
||||
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) # const
|
||||
|
||||
import json
|
||||
|
||||
import ai_engine_const as AI_CONST
|
||||
from mqtt_publish import client
|
||||
from DL import d2_od_detect
|
||||
from DL.FR.d2_face_detect import FaceDetect
|
||||
|
||||
from REST_AI_ENGINE_CONTROL.app.utils.date_utils import D
|
||||
from REST_AI_ENGINE_CONTROL.app import models as M
|
||||
from DL.d2_od_detect import D2_ROOT
|
||||
|
||||
|
||||
class ThreadValues:
|
||||
"""
|
||||
thread간 공유 데이터
|
||||
"""
|
||||
STATUS_START = 'start'
|
||||
STATUS_COMPLETE = 'complete'
|
||||
STATUS_NEW = 'new'
|
||||
STATUS_TIMEOUT = 'timeout'
|
||||
STATUS_STOP = 'stop'
|
||||
STATUS_ERROR = '[error] ai model error : '
|
||||
STATUS_NONE = 'None'
|
||||
STATUS_DETECT = 'detect'
|
||||
|
||||
date_utill = D()
|
||||
|
||||
def __init__(self):
|
||||
self.current_status = self.STATUS_NONE
|
||||
self.result = []
|
||||
self.report_unit_result = []
|
||||
self.frame_result = {}
|
||||
self.timeout_status = False
|
||||
|
||||
self.wd_ri = 0
|
||||
|
||||
|
||||
class WorkQueue:
|
||||
# max_workers = 2
|
||||
# current_workers = 0
|
||||
|
||||
message_queue = Queue()
|
||||
|
||||
# event_thread = threading.Event()
|
||||
|
||||
def __init__(self):
|
||||
# self.current_workers = 0
|
||||
self.message_queue = Queue()
|
||||
# self.lock = threading.Lock()
|
||||
# self.event_thread = threading.Event()
|
||||
receiver_start_thread = threading.Thread(target=self.classifier, daemon=True)
|
||||
receiver_start_thread.start()
|
||||
# event_check_thread = threading.Thread(target=self.event, daemon=True).start()
|
||||
|
||||
def sender(self, data):
|
||||
self.message_queue.put(data)
|
||||
|
||||
# TODO(jwkim): 이벤트 처리
|
||||
# def event(self):
|
||||
# if self.current_workers <= self.max_workers:
|
||||
# print("-----------------event--------------------",self.current_workers)
|
||||
# self.event_thread.set()
|
||||
# else :
|
||||
# pass
|
||||
|
||||
def classifier(self):
|
||||
"""
|
||||
sender를 통해 받은 메세지 처리
|
||||
ai_model을 통해 해당 model queue에 전송
|
||||
"""
|
||||
while True:
|
||||
queue_data = self.message_queue.get()
|
||||
|
||||
# TODO(jwkim): 이벤트 worker 체킹
|
||||
# self.event_thread.wait()
|
||||
|
||||
# manager.lock.acquire()
|
||||
# manager.current_workers += 1
|
||||
# manager.lock.release()
|
||||
|
||||
if queue_data['ai_model'] == AI_CONST.MODEL_CON:
|
||||
CON.con_sender(queue_data)
|
||||
elif queue_data['ai_model'] == AI_CONST.MODEL_PPE:
|
||||
PPE.ppe_sender(queue_data)
|
||||
elif queue_data['ai_model'] == AI_CONST.MODEL_WORK_DETECT:
|
||||
WD.wd_sender(queue_data)
|
||||
elif queue_data['ai_model'] == AI_CONST.MODEL_FACE_RECOGNIZE:
|
||||
FR.fr_sender(queue_data)
|
||||
elif queue_data['ai_model'] == AI_CONST.MODEL_BIO_INFO:
|
||||
pass
|
||||
|
||||
|
||||
class PPEManager:
|
||||
"""
|
||||
PPE Manager
|
||||
"""
|
||||
TYPE_DETECT_SEQ = 0 # 순서적 처리
|
||||
TYPE_DETECT_PAL = 1
|
||||
|
||||
def __init__(self, detect_type: int = TYPE_DETECT_SEQ):
|
||||
self.ppe_queue = Queue()
|
||||
|
||||
self.detect_type = detect_type
|
||||
|
||||
self.ppe_receiver_thread = threading.Thread(target=self.ppe_receiver, daemon=True, name='ppe_receiver')
|
||||
self.ppe_receiver_thread.start()
|
||||
|
||||
self.worker_event = threading.Event() # worker check
|
||||
self.stop_event = threading.Event() # inference stop trigger
|
||||
self.timeout_event = threading.Event() # timeout check
|
||||
|
||||
self.current_worker = ''
|
||||
|
||||
def ppe_sender(self, data):
|
||||
self.ppe_queue.put(data)
|
||||
|
||||
def ppe_receiver(self):
|
||||
"""
|
||||
ppe_queue를 통해 데이터를 받아서 inference 시작, 정지 실행
|
||||
|
||||
ppe_thread가 실행중인 상황에서 또다른 데이터가 들어올시
|
||||
ppe_thread 종료후 새로운 ppe_thread 실행
|
||||
|
||||
병렬처리 미구현
|
||||
"""
|
||||
while True:
|
||||
queue_data = self.ppe_queue.get()
|
||||
if queue_data:
|
||||
if self.detect_type == PPEManager.TYPE_DETECT_SEQ:
|
||||
|
||||
# start inference
|
||||
if queue_data["signal"] == AI_CONST.SIGNAL_INFERENCE:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
|
||||
self.worker_event.clear()
|
||||
ppe_thread = threading.Thread(target=self.ppe_start, daemon=True, args=(queue_data,),
|
||||
name='ppe_thread').start()
|
||||
|
||||
# stop inference
|
||||
elif queue_data["signal"] == AI_CONST.SIGNAL_STOP:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
self.worker_event.clear()
|
||||
|
||||
def ppe_start(self, data):
|
||||
"""
|
||||
inference 와 timeout 관리
|
||||
timeout 발생시 inference 종료
|
||||
inference 종료시 timeout_thread와 같이 종료
|
||||
:param data: queue_data
|
||||
"""
|
||||
|
||||
try:
|
||||
share_value = ThreadValues()
|
||||
|
||||
data['argument']['source'] = self._input_source(data['engine_info'].input_video)
|
||||
|
||||
ppe_detect = d2_od_detect.PPEDetect(
|
||||
request=data['request'],
|
||||
engine_info=data['engine_info'],
|
||||
thread_value=share_value,
|
||||
yolo_argument=data['argument'],
|
||||
worker_event=self.worker_event,
|
||||
stop_event=self.stop_event,
|
||||
timeout_event=self.timeout_event,
|
||||
queue_info=self.ppe_queue,
|
||||
)
|
||||
self.current_worker = threading.Thread(target=ppe_detect.run, daemon=True, kwargs=data['argument'],
|
||||
name='ppe_inference')
|
||||
self.current_worker.start()
|
||||
|
||||
timeout_value = data['request'].limit_time_min * 60
|
||||
timeout_thread = threading.Thread(target=self._timeout, daemon=True, args=(timeout_value, share_value,),
|
||||
name='timeout')
|
||||
timeout_thread.start()
|
||||
|
||||
# TODO(jwkim) : worker check
|
||||
# if manager.current_workers > 0:
|
||||
# manager.lock.acquire()
|
||||
# manager.current_workers -= 1
|
||||
# manager.lock.release()
|
||||
|
||||
except Exception as e:
|
||||
result_message = {
|
||||
"datetime": share_value.date_utill.date_str_micro_sec(),
|
||||
"status": share_value.STATUS_ERROR + str(e),
|
||||
"result": share_value.result
|
||||
}
|
||||
client.publish(AI_CONST.MQTT_PPE_TOPIC, json.dumps(result_message), 0)
|
||||
|
||||
if not self.timeout_event.is_set():
|
||||
self.timeout_event.set()
|
||||
self.timeout_event.clear()
|
||||
|
||||
self.stop_event.clear()
|
||||
self.worker_event.set()
|
||||
|
||||
def _timeout(self, timeout_value, share_value):
|
||||
"""
|
||||
time out 관련 처리
|
||||
-> timeout 발생시 timeout_event는 set 상태가 아님
|
||||
|
||||
:param timeout_value: 타임아웃 시간 (초)
|
||||
:param share_value: 스레드간 공유 변수
|
||||
"""
|
||||
self.timeout_event.wait(timeout=timeout_value)
|
||||
|
||||
# 외부에서 set
|
||||
if self.timeout_event.is_set():
|
||||
share_value.timeout_status = False
|
||||
|
||||
# stop
|
||||
elif share_value.current_status == share_value.STATUS_STOP:
|
||||
share_value.timeout_status = False
|
||||
|
||||
# time out
|
||||
else:
|
||||
share_value.timeout_status = True
|
||||
|
||||
def _input_source(self, args):
|
||||
"""
|
||||
입력source를 받아서 parsing
|
||||
:param args: input_video 정보
|
||||
:return: parsing된 정보
|
||||
"""
|
||||
for i in args:
|
||||
if i.model == M.AEAIModelType.PPE:
|
||||
return i.connect_url
|
||||
|
||||
|
||||
class CONManager:
|
||||
"""
|
||||
CON_SETUP Manager
|
||||
"""
|
||||
TYPE_DETECT_SEQ = 0 # 순서적 처리
|
||||
TYPE_DETECT_PAL = 1
|
||||
|
||||
def __init__(self, detect_type: int = TYPE_DETECT_SEQ):
|
||||
self.con_queue = Queue()
|
||||
|
||||
self.detect_type = detect_type
|
||||
|
||||
self.con_receiver_thread = threading.Thread(target=self.con_receiver, daemon=True, name='con_receiver')
|
||||
self.con_receiver_thread.start()
|
||||
|
||||
self.worker_event = threading.Event() # worker check
|
||||
self.stop_event = threading.Event() # inference stop trigger
|
||||
self.timeout_event = threading.Event() # timeout check
|
||||
|
||||
self.current_worker = ''
|
||||
|
||||
def con_sender(self, data):
|
||||
self.con_queue.put(data)
|
||||
|
||||
def con_receiver(self):
|
||||
"""
|
||||
con_queue를 통해 데이터를 받아서 inference 시작, 정지 실행
|
||||
|
||||
con_thread가 실행중인 상황에서 또다른 데이터가 들어올시
|
||||
con_thread 종료후 새로운 con_thread 실행
|
||||
|
||||
병렬처리 미구현
|
||||
"""
|
||||
while True:
|
||||
queue_data = self.con_queue.get()
|
||||
if queue_data:
|
||||
if self.detect_type == CONManager.TYPE_DETECT_SEQ:
|
||||
|
||||
# start inference
|
||||
if queue_data["signal"] == AI_CONST.SIGNAL_INFERENCE:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
|
||||
self.worker_event.clear()
|
||||
con_thread = threading.Thread(target=self.con_start, daemon=True, args=(queue_data,),
|
||||
name='con_thread').start()
|
||||
|
||||
# stop inference
|
||||
elif queue_data["signal"] == AI_CONST.SIGNAL_STOP:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
self.worker_event.clear()
|
||||
|
||||
def con_start(self, data):
|
||||
"""
|
||||
inference 와 timeout 관리
|
||||
timeout 발생시 inference 종료
|
||||
inference 종료시 timeout_thread와 같이 종료
|
||||
:param data: queue_data
|
||||
"""
|
||||
|
||||
try:
|
||||
share_value = ThreadValues()
|
||||
|
||||
data['argument']['source'] = self._input_source(data['engine_info'].input_video)
|
||||
|
||||
con_detect = d2_od_detect.CONDetect(
|
||||
request=data['request'],
|
||||
engine_info=data['engine_info'],
|
||||
thread_value=share_value,
|
||||
yolo_argument=data['argument'],
|
||||
worker_event=self.worker_event,
|
||||
stop_event=self.stop_event,
|
||||
timeout_event=self.timeout_event,
|
||||
queue_info=self.con_queue,
|
||||
)
|
||||
self.current_worker = threading.Thread(target=con_detect.run, daemon=True, kwargs=data['argument'],
|
||||
name='con_inference')
|
||||
self.current_worker.start()
|
||||
|
||||
timeout_value = data['request'].limit_time_min * 60
|
||||
timeout_thread = threading.Thread(target=self._timeout, daemon=True, args=(timeout_value, share_value,),
|
||||
name='timeout')
|
||||
timeout_thread.start()
|
||||
|
||||
# TODO(jwkim) : worker check
|
||||
# if manager.current_workers > 0:
|
||||
# manager.lock.acquire()
|
||||
# manager.current_workers -= 1
|
||||
# manager.lock.release()
|
||||
|
||||
except Exception as e:
|
||||
result_message = {
|
||||
"datetime": share_value.date_utill.date_str_micro_sec(),
|
||||
"status": share_value.STATUS_ERROR + str(e),
|
||||
"result": share_value.result
|
||||
}
|
||||
client.publish(AI_CONST.MQTT_CON_TOPIC, json.dumps(result_message), 0)
|
||||
|
||||
if not self.timeout_event.is_set():
|
||||
self.timeout_event.set()
|
||||
self.timeout_event.clear()
|
||||
|
||||
self.stop_event.clear()
|
||||
self.worker_event.set()
|
||||
|
||||
def _timeout(self, timeout_value, share_value):
|
||||
"""
|
||||
time out 관련 처리
|
||||
-> timeout 발생시 timeout_event는 set 상태가 아님
|
||||
|
||||
:param timeout_value: 타임아웃 시간 (초)
|
||||
:param share_value: 스레드간 공유 변수
|
||||
"""
|
||||
self.timeout_event.wait(timeout=timeout_value)
|
||||
|
||||
# 외부에서 set
|
||||
if self.timeout_event.is_set():
|
||||
share_value.timeout_status = False
|
||||
|
||||
# stop
|
||||
elif share_value.current_status == share_value.STATUS_STOP:
|
||||
share_value.timeout_status = False
|
||||
|
||||
# time out
|
||||
else:
|
||||
share_value.timeout_status = True
|
||||
|
||||
def _input_source(self, args):
|
||||
"""
|
||||
입력source를 받아서 parsing
|
||||
:param args: input_video 정보
|
||||
:return: parsing된 정보
|
||||
"""
|
||||
for i in args:
|
||||
if i.model == M.AEAIModelType.CON:
|
||||
return i.connect_url
|
||||
|
||||
class WDManager:
|
||||
"""
|
||||
WorkDetect Manager
|
||||
"""
|
||||
TYPE_DETECT_SEQ = 0 # 순서적 처리
|
||||
TYPE_DETECT_PAL = 1
|
||||
|
||||
def __init__(self, detect_type: int = TYPE_DETECT_SEQ):
|
||||
self.wd_queue = Queue()
|
||||
|
||||
self.detect_type = detect_type
|
||||
|
||||
self.wd_receiver_thread = threading.Thread(target=self.wd_receiver, daemon=True, name='wd_receiver')
|
||||
self.wd_receiver_thread.start()
|
||||
|
||||
self.worker_event = threading.Event() # worker check
|
||||
self.stop_event = threading.Event() # inference stop trigger
|
||||
# self.timeout_event = threading.Event() # timeout check
|
||||
|
||||
self.lock = threading.Lock()
|
||||
|
||||
self.current_worker = ''
|
||||
|
||||
def wd_sender(self, data):
|
||||
self.wd_queue.put(data)
|
||||
|
||||
def wd_receiver(self):
|
||||
"""
|
||||
wd_queue를 통해 데이터를 받아서 inference 시작, 정지 실행
|
||||
|
||||
wd_thread가 실행중인 상황에서 또다른 데이터가 들어올시
|
||||
wd_thread 종료후 새로운 wd_thread 실행
|
||||
|
||||
병렬처리 미구현
|
||||
"""
|
||||
while True:
|
||||
queue_data = self.wd_queue.get()
|
||||
if queue_data:
|
||||
if self.detect_type == WDManager.TYPE_DETECT_SEQ:
|
||||
|
||||
# start inference
|
||||
if queue_data["signal"] == AI_CONST.SIGNAL_INFERENCE:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
|
||||
self.worker_event.clear()
|
||||
wd_thread = threading.Thread(target=self.wd_start, daemon=True, args=(queue_data,),
|
||||
name='wd_thread')
|
||||
wd_thread.start()
|
||||
|
||||
# stop inference
|
||||
elif queue_data["signal"] == AI_CONST.SIGNAL_STOP:
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
self.worker_event.clear()
|
||||
|
||||
def wd_start(self, data):
|
||||
"""
|
||||
inference 관리
|
||||
|
||||
외부에서 stop 발생시 inference 종료
|
||||
|
||||
:param data: queue_data
|
||||
"""
|
||||
|
||||
try:
|
||||
share_value = ThreadValues()
|
||||
|
||||
data['argument']['source'] = self._input_source(data['engine_info'].input_video)
|
||||
|
||||
wd_detect = d2_od_detect.WDDetect(
|
||||
request=data['request'],
|
||||
engine_info=data['engine_info'],
|
||||
thread_value=share_value,
|
||||
yolo_argument=data['argument'],
|
||||
worker_event=self.worker_event,
|
||||
stop_event=self.stop_event,
|
||||
queue_info=self.wd_queue
|
||||
)
|
||||
self.current_worker = threading.Thread(target=wd_detect.run, daemon=True, kwargs=data['argument'],
|
||||
name='wd_inference')
|
||||
self.current_worker.start()
|
||||
|
||||
# TODO(jwkim) : worker check
|
||||
# if manager.current_workers > 0:
|
||||
# manager.lock.acquire()
|
||||
# manager.current_workers -= 1
|
||||
# manager.lock.release()
|
||||
|
||||
except Exception as e:
|
||||
result_message = {
|
||||
"datetime": share_value.date_utill.date_str_micro_sec(),
|
||||
"status": share_value.STATUS_ERROR + str(e),
|
||||
"result": {
|
||||
'construction_type': data['request'].ri.construction_code,
|
||||
'procedure_no': data['request'].ri.work_no,
|
||||
'procedure_ri': data['request'].ri.work_define_ri,
|
||||
'ri': share_value.wd_ri,
|
||||
'detect_list': share_value.result
|
||||
}
|
||||
}
|
||||
client.publish(AI_CONST.MQTT_WD_TOPIC, json.dumps(result_message), 0)
|
||||
|
||||
# if not self.timeout_event.is_set():
|
||||
# self.timeout_event.set()
|
||||
# self.timeout_event.clear()
|
||||
|
||||
self.stop_event.clear()
|
||||
self.worker_event.set()
|
||||
|
||||
def _input_source(self, args):
|
||||
"""
|
||||
모델에서 입력소스를 받아서 parsing
|
||||
만약 stream data와 files data가 같이있으면 stream data return
|
||||
:param args: input_video 정보
|
||||
:return: parsing된 정보
|
||||
"""
|
||||
|
||||
result = ''
|
||||
|
||||
streams = [] # webcam, RTSP
|
||||
files = []
|
||||
|
||||
for i in args:
|
||||
if i.model == M.AEAIModelType.WORK:
|
||||
if os.path.isfile(i.connect_url):
|
||||
files.append(i.connect_url)
|
||||
elif i.connect_url.isnumeric() or i.connect_url.lower().startswith(
|
||||
('rtsp://', 'rtmp://', 'http://', 'https://')):
|
||||
streams.append(i.connect_url)
|
||||
|
||||
if streams:
|
||||
f = open(AI_CONST.STREAMS_PATH, 'w')
|
||||
for k in streams:
|
||||
# if k.isnumeric():
|
||||
# k = int(k)
|
||||
f.write(k + "\n")
|
||||
f.close()
|
||||
result = AI_CONST.STREAMS_PATH
|
||||
|
||||
elif files:
|
||||
result = files[0]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class FRManager:
|
||||
"""
|
||||
Face Recognition Manager
|
||||
"""
|
||||
# fr_queue = Queue()
|
||||
TYPE_DETECT_SEQ = 0
|
||||
TYPE_DETECT_PAL = 1
|
||||
|
||||
# DETECT_WORKER_MAX = 10
|
||||
|
||||
def __init__(self, detect_type: int = TYPE_DETECT_SEQ):
|
||||
self.fr_queue = Queue()
|
||||
|
||||
# self.status_detect = False
|
||||
self.detect_type = detect_type # 0: 순서적처리 1: 병렬처리
|
||||
# self.detect_worker_list = []
|
||||
# self.detect_worker_cur_idx = 0
|
||||
|
||||
self.stop_event = threading.Event() # inference stop trigger
|
||||
self.worker_event = threading.Event() # worker check
|
||||
self.timeout_event = threading.Event() # timeout check
|
||||
|
||||
self.fr_receiver_thread = threading.Thread(target=self._fr_receiver, daemon=True, name='fr_receiver')
|
||||
self.fr_receiver_thread.start()
|
||||
|
||||
# self.frlock = threading.Lock()
|
||||
self.fr_thread = ''
|
||||
self.current_worker = ''
|
||||
|
||||
self.fr_id_info = {}
|
||||
|
||||
def fr_sender(self, data):
|
||||
self.fr_queue.put(data)
|
||||
|
||||
def _fr_receiver(self):
|
||||
"""
|
||||
FR_sender 를 통해 받은 메세지 처리
|
||||
signal data를 입력받아 inference 시작, 강제종료 시킴
|
||||
TODO(jwkim) 병렬처리
|
||||
"""
|
||||
while True:
|
||||
# message get
|
||||
queue_data = self.fr_queue.get()
|
||||
|
||||
if queue_data:
|
||||
if self.detect_type == FRManager.TYPE_DETECT_SEQ:
|
||||
|
||||
if queue_data["signal"] == AI_CONST.SIGNAL_INFERENCE:
|
||||
# start inference
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
self.worker_event.clear()
|
||||
self.fr_thread = threading.Thread(target=self._fr_start, daemon=True, args=(queue_data,),
|
||||
name='fr_thread')
|
||||
self.fr_thread.start()
|
||||
|
||||
elif queue_data["signal"] == AI_CONST.SIGNAL_STOP:
|
||||
# stop inference
|
||||
if self.current_worker:
|
||||
if self.current_worker.is_alive():
|
||||
self.stop_event.set()
|
||||
self.worker_event.wait()
|
||||
self.worker_event.clear()
|
||||
|
||||
# elif self.detect_type == FRManager.TYPE_DETECT_PAL:
|
||||
# #TODO(jwkim): 병렬처리
|
||||
|
||||
# self.detect_worker_list.append(threading.Thread(target=self.FR_start, self.detect_worker_cur_idx, daemon=True).start())
|
||||
# self.detect_worker_cur_idx+=1
|
||||
# pass
|
||||
|
||||
def _fr_start(self, data):
|
||||
"""
|
||||
timeout 스레드와 inference 스레드 관리
|
||||
|
||||
:param data: _description_
|
||||
"""
|
||||
try:
|
||||
self.timeout_event.clear()
|
||||
share_value = ThreadValues()
|
||||
|
||||
input_video = self._input_source(data['engine_info'].input_video)
|
||||
|
||||
face_detect = FaceDetect(
|
||||
request=data['request'],
|
||||
thread_value=share_value,
|
||||
stop_event=self.stop_event,
|
||||
worker_event=self.worker_event,
|
||||
timeout_event=self.timeout_event,
|
||||
queue_info=self.fr_queue,
|
||||
input_video=input_video,
|
||||
engine_info=data['engine_info'],
|
||||
fr_manager=self
|
||||
)
|
||||
|
||||
self.current_worker = threading.Thread(target=face_detect.inference, daemon=True, name='fr_inference')
|
||||
self.current_worker.start()
|
||||
|
||||
limit_time_min = data['request'].limit_time_min * 60
|
||||
self.timeout_thread = threading.Thread(target=self._timeout, daemon=True,
|
||||
args=(limit_time_min, share_value,), name='timeout')
|
||||
self.timeout_thread.start()
|
||||
|
||||
except Exception as e:
|
||||
result_message = {
|
||||
"datetime": share_value.date_utill.date_str_micro_sec(),
|
||||
"status": share_value.STATUS_ERROR + str(e),
|
||||
"result": share_value.result
|
||||
}
|
||||
client.publish(AI_CONST.MQTT_FR_TOPIC, json.dumps(result_message), 0)
|
||||
|
||||
def _timeout(self, limit_min: int, share_value):
|
||||
"""
|
||||
입력받은 시간을 토대로 current_worker에 timeout 발생
|
||||
:param limit_min: 제한시간(분)
|
||||
"""
|
||||
self.timeout_event.wait(timeout=limit_min)
|
||||
|
||||
# 외부에서 set
|
||||
if self.timeout_event.is_set():
|
||||
share_value.timeout_status = False
|
||||
|
||||
# stop
|
||||
elif share_value.current_status == share_value.STATUS_STOP:
|
||||
share_value.timeout_status = False
|
||||
|
||||
# time out
|
||||
else:
|
||||
share_value.timeout_status = True
|
||||
|
||||
def _input_source(self, args):
|
||||
for i in args:
|
||||
if i.model == M.AEAIModelType.FR:
|
||||
return i.connect_url
|
||||
|
||||
|
||||
manager = WorkQueue()
|
||||
CON = CONManager()
|
||||
PPE = PPEManager()
|
||||
WD = WDManager()
|
||||
FR = FRManager()
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
44
AI_ENGINE/mqtt_publish.py
Normal file
@@ -0,0 +1,44 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@file : mqtt_publish.py
|
||||
@author: jwkim
|
||||
@license: A2TEC & DAOOLDNS
|
||||
|
||||
@section Modify History
|
||||
- 2023-01-11 오전 11:31 jwkim base
|
||||
|
||||
"""
|
||||
import os, sys
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
|
||||
|
||||
import ai_engine_const as AI_CONST
|
||||
|
||||
import json
|
||||
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print("MQTT connected OK")
|
||||
else:
|
||||
print("Bad connection Returned code=", rc)
|
||||
|
||||
|
||||
def on_disconnect(client, userdata, flags, rc=0):
|
||||
print(str(rc))
|
||||
|
||||
|
||||
def on_publish(client, userdata, mid):
|
||||
print("publish = ", mid)
|
||||
|
||||
|
||||
# 새로운 클라이언트 생성
|
||||
client = mqtt.Client()
|
||||
# 콜백 함수 설정 on_connect(브로커에 접속), on_disconnect(브로커에 접속중료), on_publish(메세지 발행)
|
||||
client.on_connect = on_connect
|
||||
client.on_disconnect = on_disconnect
|
||||
# client.on_publish = on_publish
|
||||
client.username_pw_set(AI_CONST.MQTT_USER_ID,AI_CONST.MQTT_USER_PW)
|
||||
# address : localhost, port: 1883 에 연결
|
||||
client.connect(AI_CONST.MQTT_HOST, AI_CONST.MQTT_PORT)
|
||||
64
AI_ENGINE/mqtt_subscribe.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import os, sys
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import cv2
|
||||
from base64 import b64decode
|
||||
import numpy as np
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
|
||||
|
||||
import ai_engine_const as AI_CONST
|
||||
|
||||
|
||||
def on_connect(mqtt_object, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print("connected OK")
|
||||
else:
|
||||
print("Bad connection Returned code=", rc)
|
||||
|
||||
|
||||
def on_disconnect(mqtt_object, userdata, flags, rc=0):
|
||||
print(str(rc))
|
||||
|
||||
|
||||
def on_subscribe(mqtt_object, userdata, mid, granted_qos):
|
||||
print("subscribed: " + str(mid) + " " + str(granted_qos))
|
||||
|
||||
|
||||
def on_message(mqtt_object, userdata, msg):
|
||||
"""
|
||||
:param mqtt_object: 정의된mqtt client
|
||||
:param userdata: ?
|
||||
:param msg: 전송받은 메세지 정보
|
||||
"""
|
||||
|
||||
if msg.topic == AI_CONST.MQTT_PPE_TOPIC:
|
||||
object_message = json.loads(msg.payload.decode("utf-8"))
|
||||
# if object_message["img_data"]:
|
||||
# img_data = b64decode(object_message["img_data"])
|
||||
#
|
||||
# save_image = cv2.imdecode(np.frombuffer(img_data,dtype=np.uint8), cv2.IMREAD_COLOR)
|
||||
# cv2.imwrite("./test.jpg", save_image)
|
||||
print(object_message)
|
||||
|
||||
elif msg.topic == AI_CONST.MQTT_FR_TOPIC:
|
||||
print(msg.payload.decode("utf-8"))
|
||||
|
||||
|
||||
client = mqtt.Client()
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_disconnect = on_disconnect
|
||||
client.on_subscribe = on_subscribe
|
||||
client.on_message = on_message
|
||||
|
||||
client.username_pw_set(AI_CONST.MQTT_USER_ID, AI_CONST.MQTT_USER_PW)
|
||||
client.connect(AI_CONST.MQTT_HOST, AI_CONST.MQTT_PORT)
|
||||
client.subscribe(AI_CONST.MQTT_PPE_TOPIC) # ppe
|
||||
client.subscribe(AI_CONST.MQTT_FR_TOPIC) # FR
|
||||
|
||||
|
||||
|
||||
client.loop_forever()
|
||||
|
||||
|
||||
63
AI_ENGINE/old_queue.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
@File: queue.py
|
||||
@Date: 2022-06-30
|
||||
@author: jwkim
|
||||
@section MODIFYINFO 수정정보
|
||||
- 수정자/수정일 : 수정내역
|
||||
@brief: queue
|
||||
"""
|
||||
|
||||
import os, sys
|
||||
import time
|
||||
import threading
|
||||
from queue import Queue
|
||||
|
||||
DL_BASE_PATH = os.path.abspath(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
|
||||
sys.path.append(DL_BASE_PATH)
|
||||
|
||||
from DL import d2_od_detect
|
||||
|
||||
|
||||
class WorkQueue:
|
||||
max_workers = 4
|
||||
message_queue = Queue()
|
||||
|
||||
def __init__(self):
|
||||
self.current_workers = 0
|
||||
self.message_queue = Queue()
|
||||
self.lock = threading.Lock()
|
||||
receiver_start_thread = threading.Thread(target=self.classifier, daemon=True).start()
|
||||
|
||||
def sender(self, data):
|
||||
self.message_queue.put(data)
|
||||
|
||||
def classifier(self):
|
||||
while True:
|
||||
queue_data = self.message_queue.get()
|
||||
work_thread = threading.Thread(target=self.check_worker, args=(queue_data,), daemon=True).start()
|
||||
|
||||
def check_worker(self, args):
|
||||
if self.max_workers > self.current_workers:
|
||||
self.start_worker(args)
|
||||
|
||||
elif self.max_workers <= self.current_workers:
|
||||
while self.max_workers <= self.current_workers:
|
||||
time.sleep(1)
|
||||
self.start_worker(args)
|
||||
|
||||
def start_worker(self, inference_args):
|
||||
self.lock.acquire()
|
||||
self.current_workers += 1
|
||||
self.lock.release()
|
||||
|
||||
d2_od_detect.run(**inference_args)
|
||||
|
||||
if self.current_workers > 0:
|
||||
self.lock.acquire()
|
||||
self.current_workers -= 1
|
||||
self.lock.release()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||