Compare commits

...

16 Commits

Author SHA1 Message Date
b508d32aff edit: 서비스파일 변경 2026-01-02 10:30:41 +09:00
80f5e50c31 edit : 안경이미지 검색시 temp 이미지 사용 안함 2026-01-02 09:45:29 +09:00
e2190eadba edit : 서비스파일 추가 2025-12-22 10:41:28 +09:00
113626cc5d edit : api key 파일로 관리 2025-12-19 15:32:05 +09:00
7eed2e9868 edit : 요청값보다 벡터파일에 등록된 갯수가 적을경우 각 항목 null로 처리 2025-12-04 16:48:17 +09:00
fce9eedfae edit : hugging face login 로컬에 로그인되어있으면 무시 2025-12-04 15:24:49 +09:00
ff54e1cc2d relarse 0.2.0 2025-12-02 14:45:45 +09:00
8b7ad2e973 edit : 벡터파일 생성 관련 분리 2025-12-02 14:15:49 +09:00
18b24dcafa edit: api 변경 - 벡터이미지검색 -> 결과물에 parts 추가, 파츠검색 api 추가 2025-11-27 15:27:22 +09:00
12b12a95f5 edit : 이미지생성 model 변경, api 키 변경 2025-11-19 16:29:08 +09:00
8c2e146136 edit: 기본모델타입 b32로 변경 2025-11-10 10:49:32 +09:00
55d92878d5 edit : 이미지 파일 이름 추가 2025-09-24 10:34:16 +09:00
39d5683eee edit : 이미지생성 api(gemini) 추가 2025-09-23 09:55:40 +09:00
605a7660c4 edit: 안광학 서버 입고 세팅 2025-09-22 14:43:31 +09:00
4ef360448a edit : 이미지생성 api 생성갯수 삭제, 이미지생성 api 데이터를 return해주는 api 추가, 입력이미지 입력받아 검색하는 api 추가 2025-08-07 16:54:46 +09:00
5c20b9ccb5 edit : 오타 수정 2025-08-07 13:38:39 +09:00
29 changed files with 1155 additions and 325 deletions

5
.gitignore vendored
View File

@@ -145,8 +145,7 @@ log
*output
*temp
datas/eyewear_all/*
datas/clip*
datas/openai*
datas/*
*result
*.env

View File

@@ -1,4 +1,4 @@
# FM_TEST_REST_SERVER
# GLASSES_AI_SERVER
RESTful API Server

View File

@@ -1,3 +1,5 @@
import os
class Config:
def __init__(self):
self.set_ict()
@@ -14,6 +16,8 @@ class Config:
self.sftp_id = "fermat"
self.sftp_pw = "1234"
self.api_path = "./gemini.env"
def set_dev(self):
"""
개발용
@@ -24,8 +28,10 @@ class Config:
self.sftp_port = 22
self.sftp_id = "fermat"
self.sftp_pw = "fermat3514"
self.db_pw = "1234"
self.api_path = "./gemini.env"
import os
if not os.path.exists(self.remote_folder):
os.makedirs(self.remote_folder)
@@ -33,5 +39,12 @@ class Config:
self.config = 'release'
self.local_folder = "./result" # 결과물 저장 폴더
self.db_pw = "Fermat3514!"
self.api_path = "../gemini.env"
if not os.path.exists(self.local_folder):
os.makedirs(self.local_folder)
rest_config = Config()
# rest_config.set_dev()

View File

@@ -1,3 +1,7 @@
from utils.api_key_manager import ApiKeyManager
TEMP_FOLDER = "./temp"
ILLEGAL_FILE_NAME = ['<', '>', ':', '"', '/', '\ ', '|', '?', '*']
API_KEY_MANAGER = ApiKeyManager()

View File

20
custom_apps/FEATURE_VECTOR_SIMILARITY_FAISS/const.py Executable file → Normal file
View File

@@ -19,10 +19,26 @@ ViTL14_336 = "clip-vit-large-patch14-336"
#download/save path
PRETRAINED_MODEL_PATH = "./datas"
FAISS_VECTOR_PATH = "./datas"
VECTOR_IMG_LIST_PATH = "./datas"
IMG_LIST_PATH = "../Eyewear_data"
INDEX_IMAGES_PATH = "./datas/eyewear_all"
# INDEX_IMAGES_PATH = "./datas/eyewear_all"
class VectorSearchItem:
glass = "glass"
tips = "Tips"
temple = "Temple"
rim = "Rim"
rim2 = "Rim2"
noesPad = "Nose pad"
noseArm = "Nose arm"
lens = "Lens"
endPiece = "End piece"
hinges = "Hinges"
screw = "Screw"
bridge = "Bridge"
class ImageDepths:
parts = "Parts"
thumnails = "Thumbnails"
# report image consts
class ReportInfoConst:

View File

@@ -100,24 +100,44 @@ def save_report_image(report_info, report_image_path):
table_setting(report_info, report_image_path)
def get_clip_info(model, query_image_path, top_k=4):
def get_clip_info(model, query_image_path, item_info, top_k=4):
"""
이미지 유사도 검색
"""
index_file_path = os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{DEFAULT_INDEX_NAME_SUFFIX}')
txt_file_path = os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}.txt')
index_file_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}_{DEFAULT_INDEX_NAME_SUFFIX}')
txt_file_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}.txt')
if not os.path.exists(index_file_path) or not os.path.exists(txt_file_path):
raise FileNotFoundError(f"Index file {index_file_path} or txt file {txt_file_path} does not exist.")
vector_model = VectorSimilarity(model_info=model,
index_file_path=index_file_path,
txt_file_path=txt_file_path)
# image_lists = []
# with open(txt_file_path, 'r') as f:
# image_lists = [line.strip() for line in f.readlines()]
# vector_model.create_feature_extraction_model(FEMUsageInfo.openaiclipvit)
vector_model.save_index_from_files(images_path=INDEX_IMAGES_PATH,
save_index_dir=FAISS_VECTOR_PATH,
save_txt_dir=VECTOR_IMG_LIST_PATH,
index_type=model.value[1].index_type)
# vector_model.save_index_from_files(images_path_lists=image_lists,
# save_index_dir=FAISS_VECTOR_PATH,
# save_txt_dir=VECTOR_IMG_LIST_PATH,
# item_info=item_info,
# index_type=model.value[1].index_type)
inference_times, result_img_paths, result_percents = vector_model.query_faiss(query_image_path, top_k=top_k)
if os.path.exists(query_image_path):
inference_times, result_img_paths, result_percents = vector_model.query_faiss(query_image_path, top_k=top_k)
elif query_image_path is None or query_image_path == "":
raise ValueError("query_image is None or empty.")
else:
inference_times, result_img_paths, result_percents = vector_model.query_faiss_image_data(query_image_path, top_k=top_k)
for i in range(len(result_percents)):
if float(result_percents[i]) < 0.0:
result_percents[i] = None
result_img_paths[i] = None
report_info = ReportInfo(
feature_extraction_model=ReportInfoConst.feature_extraction_model,
@@ -132,4 +152,55 @@ def get_clip_info(model, query_image_path, top_k=4):
return report_info
def find_glass_folder_images(root_folder):
"""
glass(thumnails) 폴더 내의 모든 이미지 파일 경로를 재귀적으로 탐색하여 리스트로 반환합니다.
Args:
root_folder (str): 탐색을 시작할 최상위 폴더
Returns:
list: 발견된 이미지 파일 전체 경로의 리스트.
"""
image_paths = []
for dirpath, dirnames, filenames in os.walk(root_folder):
if os.path.basename(dirpath) == ImageDepths.thumnails:
for filename in filenames:
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
# 파일의 전체 경로를 생성하여 리스트에 추가
full_path = os.path.join(dirpath, filename)
image_paths.append(full_path)
return image_paths
def find_parts_folder_images(root_folder, parts):
"""
parts 폴더 내의 해당하는 파츠 이미지 파일 경로를 재귀적으로 탐색하여 리스트로 반환합니다.
Args:
root_folder (str): 탐색을 시작할 최상위 폴더
Returns:
list: 발견된 이미지 파일 경로의 리스트.
"""
image_paths = []
for dirpath, dirnames, filenames in os.walk(root_folder):
if os.path.basename(dirpath) == ImageDepths.parts:
for filename in filenames:
if filename.lower().endswith(('.png', '.jpg', '.jpeg')) and parts in filename:
# 파일의 전체 경로를 생성하여 리스트에 추가
full_path = os.path.join(dirpath, filename)
image_paths.append(full_path)
return image_paths

View File

@@ -53,7 +53,7 @@ from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS import feature_extraction_model
# 사용할 이미지 임베딩 모델 클래스 추가
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.fem_openaiclipvit import FEOpenAIClipViT
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import *
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.utils import get_base64_bytes
"""
Definition
@@ -219,6 +219,24 @@ class VectorSimilarity:
feature_vectors = self.fem_model.image_embedding(image_data_np)
return feature_vectors
def image_embedding_from_b64data(self, b64_data=None):
"""
이미지 데이터(base64)에서 특징 벡터 추출
:param image_data_np: 이미지 데이터(numpy)
:return: 특징 벡터 or None
"""
import io
feature_vectors = None
if b64_data is None:
log.error(f'invalid data[{b64_data}]')
return feature_vectors
image = Image.open(io.BytesIO(b64_data)).convert("RGB")
feature_vectors = self.fem_model.image_embedding(image)
return feature_vectors
def image_embedding_from_data(self, image_data_np=None):
"""
이미지 데이터(numpy)에서 특징 벡터 추출
@@ -234,73 +252,67 @@ class VectorSimilarity:
feature_vectors = self.fem_model.image_embedding(image_data_np)
return feature_vectors
def save_index_from_files(self, images_path=None, save_index_dir=None, save_txt_dir=None, index_type=INDEX_TYPE_L2):
def save_index_from_files(self, images_path_lists=None, save_index_dir=None, save_txt_dir=None, item_info=None, index_type=INDEX_TYPE_L2):
"""
인덱스 정보 저장
:param images_path: 이미지 파일 경로
:param save_index_dir: 인덱스 저장 디렉토리
:param save_txt_dir: 이미지 경로 저장 디렉토리
:param item_info: 아이템 정보
:param index_type: 인덱스 타입 (L2, Cosine)
:return: 이미지 임베딩 차원 정보
"""
assert self.fem_model is not None, 'invalid fem_model'
assert images_path is not None, 'images_path is required'
assert images_path_lists is not None, 'images_path is required'
if not os.path.exists(images_path):
log.error(f'image_path={images_path} does not exist')
image_folder = images_path
result = None
# 모든 이미지 임베딩 모으기
if os.path.exists(image_folder):
image_paths = [os.path.join(image_folder, image_file) for image_file in os.listdir(image_folder)]
image_vectors = np.vstack([self.image_embedding_from_file(image_file) for image_file in image_paths])
log.debug(f'image_vectors.shape={image_vectors.shape}')
result = image_vectors.shape
image_paths = images_path_lists
image_vectors = np.vstack([self.image_embedding_from_file(image_file) for image_file in image_paths])
log.debug(f'image_vectors.shape={image_vectors.shape}')
result = image_vectors.shape
# 인덱스 생성 (L2)
dimension = image_vectors.shape[1]
if index_type == INDEX_TYPE_L2:
index = faiss.IndexFlatL2(dimension) # L2
elif index_type == INDEX_TYPE_COSINE:
index = faiss.IndexFlatIP(dimension) # cosine
faiss.normalize_L2(image_vectors)
else:
raise Exception(f'Invalid index_type={index_type}')
index.add(image_vectors)
# 인덱스 저장
if save_index_dir is None:
save_index_path = os.path.join(FILE_BASE_PATH,f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}_{DEFAULT_SAVE_INDEX_SUFFIX}')
self.index_file_path = save_index_path
else:
save_index_path = os.path.join(save_index_dir,f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}_{DEFAULT_SAVE_INDEX_SUFFIX}')
os.makedirs(save_index_dir, exist_ok=True)
# 이미지 경로 파일 저장
if save_txt_dir is None:
save_txt_path = os.path.join(FILE_BASE_PATH, f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}.txt')
self.txt_file_path = save_txt_path
else:
save_txt_path = os.path.join(save_txt_dir, f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}.txt')
os.makedirs(save_txt_dir, exist_ok=True)
if not os.path.exists(save_txt_path):
faiss.write_index(index, save_index_path)
log.debug(f'save_index_path={save_index_path}')
if not os.path.exists(save_txt_path):
# 이미지 경로 저장
with open(save_txt_path, 'w') as f:
for path in image_paths:
f.write(f"{path}\n")
log.debug(f'save_txt_path={save_txt_path}')
# 인덱스 생성 (L2)
dimension = image_vectors.shape[1]
if index_type == INDEX_TYPE_L2:
index = faiss.IndexFlatL2(dimension) # L2
elif index_type == INDEX_TYPE_COSINE:
index = faiss.IndexFlatIP(dimension) # cosine
faiss.normalize_L2(image_vectors)
else:
log.error(f'Image folder {image_folder} does not exist')
raise Exception(f'Invalid index_type={index_type}')
index.add(image_vectors)
# 인덱스 저장
if save_index_dir is None:
save_index_path = os.path.join(FILE_BASE_PATH,f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}_{item_info}_{DEFAULT_SAVE_INDEX_SUFFIX}')
self.index_file_path = save_index_path
else:
save_index_path = os.path.join(save_index_dir,f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}_{item_info}_{DEFAULT_SAVE_INDEX_SUFFIX}')
os.makedirs(save_index_dir, exist_ok=True)
# # 이미지 경로 파일 저장
# if save_txt_dir is None:
# save_txt_path = os.path.join(FILE_BASE_PATH, f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}.txt')
# self.txt_file_path = save_txt_path
# else:
# save_txt_path = os.path.join(save_txt_dir, f'{self.fem_model_name}_{os.path.basename(self.fem_model_args.trained_model)}_{index_type}.txt')
# os.makedirs(save_txt_dir, exist_ok=True)
if not os.path.exists(save_index_path):
faiss.write_index(index, save_index_path)
log.debug(f'save_index_path={save_index_path}')
# if not os.path.exists(save_txt_path):
# # 이미지 경로 저장
# with open(save_txt_path, 'w') as f:
# for path in image_paths:
# f.write(f"{path}\n")
# log.debug(f'save_txt_path={save_txt_path}')
return result
def set_index_path(self, index_path):
@@ -378,33 +390,80 @@ class VectorSimilarity:
return inference_times, result_img_paths, result_percents
def test():
"""
module test function
"""
log.info('\nModule: faiss_similarity_search.py')
def query_faiss_image_data(self, query_image_data=None, top_k=4):
# index_file_path = f'{FILE_BASE_PATH}openaiclipvit_clip-vit-base-patch32_index.faiss'
model = FEMUsageInfo.openaiclip_vit_l14_cos
index_file_path = os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{DEFAULT_INDEX_NAME_SUFFIX}')
if os.path.exists(self.txt_file_path):
with open(self.txt_file_path, 'r') as f:
image_paths = [line.strip() for line in f.readlines()]
else:
logging.error("Image path list TXT file not found.")
image_paths = []
cm = VectorSimilarity(model_info=model,
index_file_path=index_file_path,
txt_file_path=os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}.txt'))
b64_data = get_base64_bytes(query_image_data)
# cm.create_feature_extraction_model(FEMUsageInfo.openaiclipvit)
start_vector_time = datetime.now()
index = self.load_index(self.index_file_path)
query_vector = self.image_embedding_from_b64data(b64_data)
end_vector_time = datetime.now()
cm.save_index_from_files(images_path=INDEX_IMAGES_PATH,
save_index_dir=FAISS_VECTOR_PATH,
save_txt_dir=VECTOR_IMG_LIST_PATH,
index_type=model.value[1].index_type)
diff_vector_time = self.time_diff(start_vector_time,end_vector_time)
if self.index_type == INDEX_TYPE_COSINE:
faiss.normalize_L2(query_vector)
start_search_time = datetime.now()
distances, indices = index.search(query_vector, top_k)
end_search_time = datetime.now()
diff_search_time = self.time_diff(start_search_time,end_search_time)
diff_total_time = self.time_diff(start_vector_time,end_search_time)
inference_times = f"Total time - {diff_total_time}, vector_time - {diff_vector_time}, search_time - {diff_search_time}"
result_img_paths = []
result_percents = []
# 결과
# for i in range(top_k):
# print(f"{i + 1}: {image_paths[indices[0][i]]}, Distance: {distances[0][i]}")
for idx, dist in zip(indices[0], distances[0]):
log.debug(f"{idx} (거리: {dist:.4f})")
result_img_paths.append(image_paths[idx])
if self.index_type == INDEX_TYPE_COSINE:
result_percents.append(f"{dist*100:.2f}")
else:
result_percents.append(f"{((1 - dist)*100):.2f}")
return inference_times, result_img_paths, result_percents
# def test():
# """
# module test function
# """
# log.info('\nModule: faiss_similarity_search.py')
# # index_file_path = f'{FILE_BASE_PATH}openaiclipvit_clip-vit-base-patch32_index.faiss'
# model = FEMUsageInfo.openaiclip_vit_l14_cos
# index_file_path = os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{DEFAULT_INDEX_NAME_SUFFIX}')
# cm = VectorSimilarity(model_info=model,
# index_file_path=index_file_path,
# txt_file_path=os.path.join(VECTOR_IMG_LIST_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}.txt'))
# # cm.create_feature_extraction_model(FEMUsageInfo.openaiclipvit)
# cm.save_index_from_files(images_path_lists=INDEX_IMAGES_PATH,
# save_index_dir=FAISS_VECTOR_PATH,
# save_txt_dir=VECTOR_IMG_LIST_PATH,
# index_type=model.value[1].index_type)
cm.query_faiss(os.path.join(INDEX_IMAGES_PATH,"img1.png"))
# cm.query_faiss(os.path.join(INDEX_IMAGES_PATH,"img1.png"))
if __name__ == '__main__':
"""
test module
"""
test()
# test()

View File

@@ -31,6 +31,8 @@ import os, sys
from transformers import CLIPProcessor, CLIPModel
from huggingface_hub import login as huggingface_login
from huggingface_hub import whoami, logout
from huggingface_hub.utils import LocalTokenNotFoundError
"""
Package: custom
@@ -104,7 +106,18 @@ class FEOpenAIClipViT(FEM.FeatureExtractionModel):
# huggingface token
if self.huggingface_token:
huggingface_login(fem_arguments.token)
"""
토큰이 있다면 로그인이 되어있는지 확인
안되어있다면 로그인 시도
"""
try:
user_info = whoami()
except Exception as LocalTokenNotFoundError:
huggingface_login(fem_arguments.token)
except Exception as e:
log.error(f'Huggingface login error: {e}')
raise e
# model path check
if not os.path.exists(fem_arguments.trained_model):

View File

View File

@@ -0,0 +1,72 @@
import os
import base64
from pathlib import Path
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import *
def search_glass_parts(image_path_list):
result = []
for image_path in image_path_list:
if image_path is None:
result.append(None)
continue
elif not os.path.exists(image_path):
result.append(None)
continue
parts_path = Path(os.path.join(os.path.dirname(os.path.dirname(image_path)),ImageDepths.parts))
parts_files_generator = parts_path.rglob('*.png')
parts_list = []
# parts_list = [str(p.resolve()) for p in parts_files_generator] # 풀경로
for p in parts_files_generator:
# name,ext = os.path.splitext(p.name)
parts_list.append(p.name)
result.append(parts_list)
return result
def file_name_to_parts(file_path):
"""
파일경로로 파츠이름 파싱
:param file_path: 파일경로
:return: 파츠이름
"""
filename, ext = os.path.splitext(os.path.basename(file_path))
name_list = filename.split('_')
result = name_list[2:][0]
return result
def get_base64_bytes(data: str) -> bytes:
"""
문자열을 검사하여 유효한 Base64라면 디코딩된 bytes 데이터를 반환하고,
그렇지 않으면 ValueError 예외를 발생시킵니다.
"""
try:
# 1. 입력 문자열 전처리 (공백 제거 등)
stripped_data = data.strip()
# 2. bytes로 인코딩 (b64decode는 bytes-like object를 필요로 함)
encoded_input = stripped_data.encode('ascii')
# 3. Base64 디코딩 수행 (validate=True로 엄격한 검사)
# 성공 시 b'...' 형태의 바이트 데이터가 생성됨
decoded_bytes = base64.b64decode(encoded_input, validate=True)
return decoded_bytes
except Exception as e:
# Base64 형식이 아니거나 패딩 오류 등 발생 시
raise ValueError(f"유효한 Base64 데이터가 아닙니다. 변환 불가: {e}")
if __name__ == '__main__':
print(file_name_to_parts(os.path.join(FAISS_VECTOR_PATH,"Glass_001",ImageDepths.parts,"Glass_001_Temple_L.png")))

View File

@@ -7,18 +7,22 @@ from PIL import Image
from io import BytesIO
from main_rest.app.utils.date_utils import D
from const import TEMP_FOLDER
from const import TEMP_FOLDER, API_KEY_MANAGER
def gemini_image(prompt, folder=None):
from custom_logger.main_log import main_logger as LOG
image_path = ''
client = genai.Client(api_key="AIzaSyB7tu67y9gOkJkpQtvI5OAYSzUzwv9qwnE")
api_key = API_KEY_MANAGER.get_api_key()
if api_key is None:
raise Exception("API 키 세팅 필요! - 서버를 다시 구동하거나, API키 파일을 확인")
client = genai.Client(api_key=API_KEY_MANAGER.get_api_key()) # a2tec key
for i in range(3):
response = client.models.generate_content(
model="gemini-2.0-flash-preview-image-generation",
model="gemini-2.5-flash-image",
contents=prompt,
config=types.GenerateContentConfig(
response_modalities=['TEXT', 'IMAGE']

View File

@@ -2,7 +2,7 @@
"""
@file : custom_log.py
@author: hsj100
@license: DAOOLDNS
@license: A2TEC
@brief:
@section Modify History
- 2024-05-08 오후 2:33 hsj100 base

View File

@@ -8,15 +8,16 @@
- 2022-01-14/hsj100@a2tec.co.kr : refactoring
@brief: consts
"""
from config import rest_config
# SUPPORT PROJECT
SUPPORT_PROJECT_BASIC = 'PROJECT_BASIC'
PROJECT_NAME = 'FERMAT-TEST'
PROJECT_NAME = 'K_EYEWEAR'
SW_TITLE= f'{PROJECT_NAME} - REST API'
SW_VERSION = '0.1.0'
SW_VERSION = '0.2.0'
SW_DESCRIPTION = f'''
### FERMAT-TEST REST API
### K_EYEWEAR REST API
## API 이용법
- 개별 API 설명과 Request/Response schema 참조
@@ -87,7 +88,7 @@ NUM_RETRY_UUID_GEN = 3
DB_ADDRESS = "localhost"
DB_PORT = 53306
DB_USER_ID = 'root'
DB_USER_PW = '1234'
DB_USER_PW = rest_config.db_pw
DB_NAME = 'FM_TEST'
DB_CHARSET = 'utf8mb4'

View File

@@ -34,7 +34,6 @@ from custom_logger.main_log import main_logger as LOG
API_KEY_HEADER = APIKeyHeader(name='Authorization', auto_error=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
# When service starts.
@@ -42,6 +41,10 @@ async def lifespan(app: FastAPI):
import os
import const
from const import API_KEY_MANAGER
API_KEY_MANAGER.set_api_key()
if os.path.exists(const.TEMP_FOLDER):
for _file in os.scandir(const.TEMP_FOLDER):
os.remove(_file)

View File

@@ -30,7 +30,7 @@ from main_rest.app.common.consts import (
DEFAULT_USER_ACCOUNT_PW
)
from main_rest.app.utils.date_utils import D
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import VectorSearchItem
class SWInfo(BaseModel):
"""
@@ -587,9 +587,8 @@ class ImageGenerateReq(BaseModel):
"""
### [Request] image generate request
"""
prefix : Optional[str] = Field(default="하얀색 배경", description="접두사", example="하얀색 배경")
prompt : str = Field(description='프롬프트', example='검은색 안경')
downloadCount : int = Field(1, description='이미지 생성 갯수', example=1)
class BingCookieSetReq(BaseModel):
"""
@@ -598,7 +597,7 @@ class BingCookieSetReq(BaseModel):
cookie : str = Field('',description='쿠키 데이터', example='')
class VactorImageSearchReq(BaseModel):
class VectorImageSearchReq(BaseModel):
"""
### [Request] vector image search request
"""
@@ -612,17 +611,36 @@ class VectorImageSearchVitReq(BaseModel):
### [Request] vector image search vit
"""
prompt : str = Field(description='프롬프트', example='검은색 안경')
modelType : str = Field(VitModelType.l14, description='pretrained model 타입', example=VitModelType.l14)
modelType : str = Field(VitModelType.b32, description='pretrained model 타입', example=VitModelType.b32)
indexType : str = Field(VitIndexType.l2, description='인덱스 타입', example=VitIndexType.l2)
searchNum : int = Field(4, description='검색결과 이미지 갯수', example=4)
class VectorGlassesImageSearchVitInputImgReq(BaseModel):
"""
### [Request] vector image search vit - input image
"""
inputImage : str = Field(description='base64 이미지', example='')
modelType : str = Field(VitModelType.b32, description='pretrained model 타입', example=VitModelType.b32)
indexType : str = Field(VitIndexType.l2, description='인덱스 타입', example=VitIndexType.l2)
searchNum : int = Field(4, description='검색결과 이미지 갯수', example=4)
class VectorPartsImageSearchVitInputImgReq(VectorGlassesImageSearchVitInputImgReq):
"""
### [Request] vector image search vit data - input image
"""
inputImage : str = Field(description='파츠이미지 이름', example='')
class VectorImageSearchVitDataReq(VectorImageSearchVitReq):
"""
### [Request] vector image search vit data
"""
querySend: bool = Field(True, description='쿼리 이미지 전송 여부', example=True)
class VectorImageSearchVitReportReq(BaseModel):
"""
### [Request] vector image search vit request
@@ -631,10 +649,18 @@ class VectorImageSearchVitReportReq(BaseModel):
modelType : str = Field(VitModelType.l14, description='pretrained model 타입', example=VitModelType.l14)
indexType : str = Field(VitIndexType.l2, description='인덱스 타입', example=VitIndexType.l2)
class VectorImageResult(BaseModel):
class VectorGlassesImageResult(BaseModel):
image : str = Field("", description='이미지 데이터', example='')
percents: float = Field(0.0, description='percents 값', example='')
imageInfo : str = Field("", description='원본이미지 이름', example='')
parts: list = Field([], description='안경 파츠 정보', example=[])
class VectorPartsImageResult(BaseModel):
image : str | None = Field("", description='이미지 데이터', example='')
percents: float | None = Field(0.0, description='percents 값', example='')
imageInfo : str | None = Field("", description='원본이미지 이름', example='')
#===============================================================================
#===============================================================================
#===============================================================================
@@ -661,6 +687,26 @@ class ImageGenerateRes(ResponseBase):
ImageGenerateRes.imageLen = img_len
return ImageGenerateRes
class ImageGenerateDataRes(ResponseBase):
"""
### image generate Data response
"""
imageData : str = Field('', description='이미지 데이터', example='')
@staticmethod
def set_error(error,img_data=''):
ImageGenerateRes.result = False
ImageGenerateRes.error = str(error)
ImageGenerateRes.imageData = img_data
return ImageGenerateRes
@staticmethod
def set_message(img_data):
ImageGenerateRes.result = True
ImageGenerateRes.error = None
ImageGenerateRes.imageData = img_data
return ImageGenerateRes
class BingCookieSetRes(ResponseBase):
"""
@@ -684,12 +730,12 @@ class BingCookieSetRes(ResponseBase):
return BingCookieSetRes
class VectorImageSerachDataRes(ResponseBase):
class VectorGlassesImageSerachDataRes(ResponseBase):
"""
### vector image data response
"""
queryImage : str = Field("", description='쿼리 이미지', example="")
vectorResult : List[VectorImageResult] = Field([], description='벡터 검색 결과', example=[])
vectorResult : List[VectorGlassesImageResult] = Field([], description='벡터 검색 결과', example=[])
@staticmethod
def set_error(error,vector_result=[],query_img=""):
@@ -708,3 +754,7 @@ class VectorImageSerachDataRes(ResponseBase):
ImageGenerateRes.queryImage = query_img
return ImageGenerateRes
class VectorPartsImageSerachDataRes(VectorGlassesImageSerachDataRes):
vectorResult : List[VectorPartsImageResult] = Field([], description='벡터 검색 결과', example=[])

View File

@@ -135,3 +135,75 @@ async def test(request: Request):
return a
# @router.post("/vectorImageSearch/vit/imageGenerate/imagen/report", summary="벡터 이미지 검색(clip-vit) - imagen, report 생성", response_model=M.ResponseBase)
# async def vactor_vit_report(request: Request, request_body_info: M.VectorImageSearchVitReportReq):
# """
# ## 벡터 이미지 검색(clip-vit) - imagen, report 생성
# > imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 종합결과 이미지 생성
# ### Requriements
# > - googlecli 설치(https://cloud.google.com/sdk/docs/install?hl=ko#linux)
# ### options
# > - modelType -> b32,b16,l14,l14_336
# > - indexType -> l2,cos
# """
# response = M.ResponseBase()
# try:
# if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
# raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
# if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
# raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
# # query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
# query_image_path = gemini_image(request_body_info.prompt) #gemini
# report_image_path = f"{os.path.splitext(query_image_path)[0]}_report.png"
# vactor_request_data = {'query_image_path' : query_image_path,
# 'index_type' : request_body_info.indexType,
# 'model_type' : request_body_info.modelType,
# 'report_path' : report_image_path}
# vactor_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit/report', data=json.dumps(vactor_request_data))
# if vactor_response.status_code != 200:
# raise Exception(f"response error: {json.loads(vactor_response.text)['error']}")
# if json.loads(vactor_response.text)["error"] != None:
# raise Exception(f"vector error: {json.loads(vactor_response.text)['error']}")
# if rest_config.config != 'release':
# # remote 폴더에 이미지 저장
# sftp_client.remote_copy_data(local_path=report_image_path,
# remote_path=os.path.join(rest_config.remote_folder, f"imagen_report_vit_{request_body_info.prompt}_{D.date_file_name()}.png"))
# else:
# shutil.copy(report_image_path, os.path.join(rest_config.local_folder, f"imagen_report_vit_{request_body_info.prompt}_{D.date_file_name()}.png"))
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# if 'report_image_path' in locals():
# if os.path.exists(report_image_path):
# os.remove(report_image_path)
# del report_image_path
# return response.set_message()
# except Exception as e:
# LOG.error(traceback.format_exc())
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# if 'report_image_path' in locals():
# if os.path.exists(report_image_path):
# os.remove(report_image_path)
# del report_image_path
# return response.set_error(error=e)

View File

@@ -17,7 +17,7 @@ from typing import Annotated, List
from main_rest.app.common import consts
from main_rest.app import models as M
from main_rest.app.utils.date_utils import D
from main_rest.app.utils.parsing_utils import image_to_base64_string
from main_rest.app.utils.parsing_utils import image_to_base64_string,save_base64_as_image_file
from custom_logger.main_log import main_logger as LOG
# from custom_apps.bingimagecreator.utils import DallEArgument,dalle3_generate_image
@@ -30,6 +30,8 @@ from utils.custom_sftp import sftp_client
from config import rest_config
from const import TEMP_FOLDER
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import *
router = APIRouter(prefix="/services")
# @router.post("/bing/cookie/set", summary="bing 관련 쿠키 set", response_model=M.BingCookieSetRes)
@@ -123,60 +125,91 @@ router = APIRouter(prefix="/services")
# LOG.error(traceback.format_exc())
# return response.set_error(e)
@router.post("/imageGenerate/imagen", summary="이미지 생성(AI) - imagen", response_model=M.ImageGenerateRes)
async def imagen(request: Request, request_body_info: M.ImageGenerateReq):
"""
## 이미지 생성(AI) - imagen
> imagen AI를 이용하여 이미지 생성
# @router.post("/imageGenerate/imagen", summary="이미지 생성(AI) - imagen", response_model=M.ResponseBase)
# async def imagen(request: Request, request_body_info: M.ImageGenerateReq):
# """
# ## 이미지 생성(AI) - imagen
# > imagen AI를 이용하여 이미지 생성
# """
# # imagen 사용중단 gemini로 변경
# response = M.ResponseBase()
# try:
# # NOTE(JWKIM) : imagen 사용 중단
# # img_length = imagen_generate_image(prompt=request_body_info.prompt,
# # download_count=request_body_info.downloadCount
# # )
# temp_image_path = gemini_image(request_body_info.prompt)
"""
# imagen 사용중단 gemini로 변경
# if rest_config.config != 'release':
# _remote_folder = os.path.join(rest_config.remote_folder,"imagen")
# # remote save
# sftp_client.remote_copy_data(
# temp_image_path,
# os.path.join(_remote_folder,f"imagen_{request_body_info.prompt}_{1}_{D.date_file_name()}.png"))
# else:
# _local_forder = os.path.join(rest_config.local_folder,"image_generate","imagen")
# os.makedirs(_local_forder,exist_ok=True)
# shutil.copy(temp_image_path, os.path.join(_local_forder,f"imagen_{request_body_info.prompt}_{1}_{D.date_file_name()}.png"))
response = M.ImageGenerateRes()
try:
# # Clean up temporary files
# if 'temp_image_path' in locals():
# if os.path.exists(temp_image_path):
# os.remove(temp_image_path)
# del temp_image_path
if not download_range(request_body_info.downloadCount):
raise Exception(f"downloadCount is 1~4 (current value = {request_body_info.downloadCount})")
# return response.set_message()
# NOTE(JWKIM) : imagen 사용 중단
# img_length = imagen_generate_image(prompt=request_body_info.prompt,
# download_count=request_body_info.downloadCount
# )
temp_image_path = gemini_image(request_body_info.prompt)
# except Exception as e:
# LOG.error(traceback.format_exc())
if rest_config.config != 'release':
_remote_folder = os.path.join(rest_config.remote_folder,"imagen")
# remote save
sftp_client.remote_copy_data(
temp_image_path,
os.path.join(_remote_folder,f"imagen_{request_body_info.prompt}_{1}_{D.date_file_name()}.png"))
else:
_local_forder = os.path.join(rest_config.local_folder,"image_generate","imagen")
os.makedirs(_local_forder,exist_ok=True)
shutil.copy(temp_image_path, os.path.join(_local_forder,f"imagen_{request_body_info.prompt}_{1}_{D.date_file_name()}.png"))
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# Clean up temporary files
if 'temp_image_path' in locals():
if os.path.exists(temp_image_path):
os.remove(temp_image_path)
del temp_image_path
# return response.set_error(error=e)
return response.set_message(img_len=1)
# @router.post("/imageGenerate/imagen/data", summary="이미지 생성(AI) - imagen(data)", response_model=M.ImageGenerateDataRes)
# async def imagen_data(request: Request, request_body_info: M.ImageGenerateReq):
# """
# ## 이미지 생성(AI) - imagen
# > imagen AI를 이용하여 이미지 데이터생성
# """
# # imagen 사용중단 gemini로 변경
except Exception as e:
LOG.error(traceback.format_exc())
# response = M.ImageGenerateDataRes()
# try:
# # NOTE(JWKIM) : imagen 사용 중단
# # img_length = imagen_generate_image(prompt=request_body_info.prompt,
# # download_count=request_body_info.downloadCount
# # )
# temp_image_path = gemini_image(request_body_info.prompt)
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
os.remove(query_image_path)
del query_image_path
# b64data = image_to_base64_string(temp_image_path)
return response.set_error(error=e)
# # Clean up temporary files
# if 'temp_image_path' in locals():
# if os.path.exists(temp_image_path):
# os.remove(temp_image_path)
# del temp_image_path
# @router.post("/vactorImageSearch/imagenet/imageGenerate/imagen", summary="벡터 이미지 검색(imagenet) - imagen", response_model=M.ResponseBase)
# async def vactor_imagenet(request: Request, request_body_info: M.VactorImageSearchReq):
# return response.set_message(b64data)
# except Exception as e:
# LOG.error(traceback.format_exc())
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# return response.set_error(error=e)
# @router.post("/vectorImageSearch/imagenet/imageGenerate/imagen", summary="벡터 이미지 검색(imagenet) - imagen", response_model=M.ResponseBase)
# async def vactor_imagenet(request: Request, request_body_info: M.vectorImageSearchReq):
# """
# ## 벡터 이미지 검색 - imagen
# > imagen AI를 이용하여 이미지 생성 후 vector 검색
@@ -225,120 +258,261 @@ async def imagen(request: Request, request_body_info: M.ImageGenerateReq):
# LOG.error(traceback.format_exc())
# return response.set_error(error=e)
@router.post("/vactorImageSearch/vit/imageGenerate/imagen", summary="벡터 이미지 검색(clip-vit) - imagen", response_model=M.ResponseBase)
async def vactor_vit(request: Request, request_body_info: M.VectorImageSearchVitReq):
# @router.post("/vectorImageSearch/vit/imageGenerate/imagen", summary="벡터 이미지 검색(clip-vit) - imagen", response_model=M.ResponseBase)
# async def vactor_vit(request: Request, request_body_info: M.VectorImageSearchVitReq):
# """
# ## 벡터 이미지 검색(clip-vit) - imagen
# > imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 결과 이미지 생성
# ### options
# > - modelType -> b32,b16,l14,l14_336
# > - indexType -> l2,cos
# """
# response = M.ResponseBase()
# try:
# if not download_range(request_body_info.searchNum, max=10):
# raise Exception(f"downloadCound is invalid (current value = {request_body_info.searchNum})")
# if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
# raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
# if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
# raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
# # query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
# query_image_path = gemini_image(request_body_info.prompt) #gemini
# vector_request_data = {'query_image_path' : query_image_path,
# 'index_type' : request_body_info.indexType,
# 'model_type' : request_body_info.modelType,
# 'search_num' : request_body_info.searchNum}
# vector_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit', data=json.dumps(vector_request_data))
# vector_response_dict = json.loads(vector_response.text)
# if vector_response.status_code != 200:
# raise Exception(f"response error: {vector_response_dict['error']}")
# if vector_response_dict["error"] != None:
# raise Exception(f"vector error: {vector_response_dict['error']}")
# result_image_paths = vector_response_dict.get('img_list').get('result_image_paths')
# result_percents = vector_response_dict.get('img_list').get('result_percents')
# if rest_config.config != 'release':
# # 원격지 폴더 생성
# remote_directory = os.path.join(rest_config.remote_folder, f"imagen_query_{request_body_info.modelType}_{request_body_info.indexType}_{request_body_info.prompt}_{D.date_file_name()}")
# sftp_client.remote_mkdir(remote_directory)
# # 원격지에 이미지 저장
# sftp_client.remote_copy_data(local_path=query_image_path, remote_path=os.path.join(remote_directory,"query.png"))
# for img_path, img_percent in zip(result_image_paths,result_percents):
# sftp_client.remote_copy_data(local_path=img_path, remote_path=os.path.join(remote_directory,f"search_{img_percent}.png"))
# else:
# local_directory = os.path.join(rest_config.local_folder, f"imagen_query_{request_body_info.modelType}_{request_body_info.indexType}_{request_body_info.prompt}_{D.date_file_name()}")
# os.makedirs(local_directory, exist_ok=True)
# shutil.copy(query_image_path, os.path.join(local_directory,"query.png"))
# for img_path, img_percent in zip(result_image_paths,result_percents):
# shutil.copy(img_path, os.path.join(local_directory,f"search_{img_percent}.png"))
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# return response.set_message()
# except Exception as e:
# LOG.error(traceback.format_exc())
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# return response.set_error(error=e)
# @router.post("/vectorImageSearch/vit/imageGenerate/imagen/data", summary="벡터 이미지 검색(clip-vit) - imagen(data)", response_model=M.VectorImageSerachDataRes)
# async def vactor_vit_data(request: Request, request_body_info: M.VectorImageSearchVitDataReq):
# """
# ## 벡터 이미지 검색(clip-vit) - imagen
# > imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 결과 이미지데이터 return
# ### options
# > - modelType -> b32,b16,l14,l14_336
# > - indexType -> l2,cos
# """
# response = M.VectorImageSerachDataRes()
# query_image_data = ''
# try:
# if not download_range(request_body_info.searchNum, max=10):
# raise Exception(f"downloadCound is invalid (current value = {request_body_info.searchNum})")
# if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
# raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
# if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
# raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
# # query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
# query_image_path = gemini_image(request_body_info.prompt) #gemini
# vector_request_data = {'query_image_path' : query_image_path,
# 'index_type' : request_body_info.indexType,
# 'model_type' : request_body_info.modelType,
# 'search_num' : request_body_info.searchNum}
# vector_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit', data=json.dumps(vector_request_data))
# vector_response_dict = json.loads(vector_response.text)
# if vector_response.status_code != 200:
# raise Exception(f"response error: {vector_response_dict['error']}")
# if vector_response_dict["error"] != None:
# raise Exception(f"vector error: {vector_response_dict['error']}")
# result_image_paths = vector_response_dict.get('img_list').get('result_image_paths')
# result_percents = vector_response_dict.get('img_list').get('result_percents')
# # 이미지 데이터 생성
# vector_image_results = []
# for img, percents in zip(result_image_paths, result_percents):
# b64_data = image_to_base64_string(img)
# float_percent = float(percents)
# info = M.VectorImageResult(image=b64_data,percents=float_percent)
# vector_image_results.append(info)
# if request_body_info.querySend:
# query_image_data = image_to_base64_string(query_image_path)
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# return response.set_message(vector_result=vector_image_results,query_img=query_image_data)
# except Exception as e:
# LOG.error(traceback.format_exc())
# # Clean up temporary files
# if 'query_image_path' in locals():
# if os.path.exists(query_image_path):
# os.remove(query_image_path)
# del query_image_path
# return response.set_error(error=e)
@router.post("/imageGenerator", summary="이미지 데이터 생성", response_model=M.ImageGenerateDataRes)
async def image_generator(request: Request, request_body_info: M.ImageGenerateReq):
"""
## 벡터 이미지 검색(clip-vit) - imagen
> imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 결과 이미지 생성
## 이미지 생성
> 입력된 prompt로 이미지 생성 그후 결과 이미지데이터 return
### Requriements
> - googlecli 설치(https://cloud.google.com/sdk/docs/install?hl=ko#linux)
### Input
> 접두어(prefix)는 null값을 주어도 기본값 사용, 빈("")값 사용시 prefix는 사용안함
> 최종 입력시 prefix, prompt 조합하여 내부적으로 최종 문장 완성
### options
> - modelType -> b32,b16,l14,l14_336
> - indexType -> l2,cos
### Output
> - 결과(imageData)는 base64로 변환된 데이터(image)
### Options
> - prefix -> 접두어 (기본값: 하얀색 배경)
> - prompt -> 프롬프트
### Notice
> - 최종이미지에 배경이 없어야 이미지 검색시 유리하기때문에 꼭 prefix 옵션은 사용
"""
response = M.ResponseBase()
response = M.ImageGenerateDataRes()
try:
if not download_range(request_body_info.searchNum, max=10):
raise Exception(f"downloadCound is invalid (current value = {request_body_info.searchNum})")
if request_body_info.prefix == None:
request_body_info.prefix = M.ImageGenerateReq.model_fields.get("prefix").default
if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
full_prompt = f"{request_body_info.prefix}, {request_body_info.prompt}" if request_body_info.prefix else f"{request_body_info.prompt}"
if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
temp_image_path = gemini_image(full_prompt)
# query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
query_image_path = gemini_image(request_body_info.prompt) #gemini
if os.path.exists(temp_image_path):
image_data = image_to_base64_string(temp_image_path)
vector_request_data = {'query_image_path' : query_image_path,
'index_type' : request_body_info.indexType,
'model_type' : request_body_info.modelType,
'search_num' : request_body_info.searchNum}
# Clean up temporary files
if 'temp_image_path' in locals():
if os.path.exists(temp_image_path):
os.remove(temp_image_path)
del temp_image_path
vector_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit', data=json.dumps(vector_request_data))
vector_response_dict = json.loads(vector_response.text)
if vector_response.status_code != 200:
raise Exception(f"response error: {vector_response_dict['error']}")
if vector_response_dict["error"] != None:
raise Exception(f"vector error: {vector_response_dict['error']}")
result_image_paths = vector_response_dict.get('img_list').get('result_image_paths')
result_percents = vector_response_dict.get('img_list').get('result_percents')
if rest_config.config != 'release':
# 원격지 폴더 생성
remote_directory = os.path.join(rest_config.remote_folder, f"imagen_query_{request_body_info.modelType}_{request_body_info.indexType}_{request_body_info.prompt}_{D.date_file_name()}")
sftp_client.remote_mkdir(remote_directory)
# 원격지에 이미지 저장
sftp_client.remote_copy_data(local_path=query_image_path, remote_path=os.path.join(remote_directory,"query.png"))
for img_path, img_percent in zip(result_image_paths,result_percents):
sftp_client.remote_copy_data(local_path=img_path, remote_path=os.path.join(remote_directory,f"search_{img_percent}.png"))
return response.set_message(image_data)
else:
local_directory = os.path.join(rest_config.local_folder, f"imagen_query_{request_body_info.modelType}_{request_body_info.indexType}_{request_body_info.prompt}_{D.date_file_name()}")
os.makedirs(local_directory, exist_ok=True)
shutil.copy(query_image_path, os.path.join(local_directory,"query.png"))
for img_path, img_percent in zip(result_image_paths,result_percents):
shutil.copy(img_path, os.path.join(local_directory,f"search_{img_percent}.png"))
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
os.remove(query_image_path)
del query_image_path
return response.set_message()
raise Exception(f"image not generated(prefix: {request_body_info.prefix}, prompt:{request_body_info.prompt})")
except Exception as e:
LOG.error(traceback.format_exc())
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
os.remove(query_image_path)
del query_image_path
if 'temp_image_path' in locals():
if os.path.exists(temp_image_path):
os.remove(temp_image_path)
del temp_image_path
return response.set_error(error=e)
@router.post("/vactorImageSearch/vit/imageGenerate/imagen/data", summary="벡터 이미지 검색(clip-vit) - imagen(data)", response_model=M.VectorImageSerachDataRes)
async def vactor_vit_report_data(request: Request, request_body_info: M.VectorImageSearchVitDataReq):
@router.post("/vectorImageSearch/vit/inputImage/data/glasses", summary="벡터 이미지 검색(clip-vit) - input image(data): glasses", response_model=M.VectorGlassesImageSerachDataRes)
async def vactor_vit_input_glasses_img_data(request: Request, request_body_info: M.VectorGlassesImageSearchVitInputImgReq):
"""
## 벡터 이미지 검색(clip-vit) - imagen
> imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 결과 이미지데이터 return
## 벡터 이미지 검색(clip-vit) - inputimage : glasses
> 입력된 이미지로 vector 검색 그후 결과 이미지데이터 return
### Requriements
> - googlecli 설치(https://cloud.google.com/sdk/docs/install?hl=ko#linux)
### Input
> 입력이미지(inputImage)는 base64로 변환된 데이터 입력
### options
> - modelType -> b32,b16,l14,l14_336
> - indexType -> l2,cos
### Output
> - 결과(vectorResult)는 base64로 변환된 데이터(image), 유사도(percents), 파츠정보(parts)가 쌍으로 나오며, 요청한 searchNum 갯수에 맞춰서 결과가 나옴
> - queryImage는 Input시 입력한 inputImage 이미지 데이터
### Options
> - modelType -> b32, b16, l14, l14_336 (기본값: b32)
> - indexType -> l2, cos (기본값: l2)
> - searchNum -> 결과이미지 갯수 (기본값: 4)
### Notice
> - 일반검색시 indexType , modelType 은 기본값으로 사용
> - indexType , modelType - 새로운 조합 (ex l14, cos)으로 처음 요청할경우 모델을 빌드하는 과정이 추가됨
"""
response = M.VectorImageSerachDataRes()
response = M.VectorGlassesImageSerachDataRes()
query_image_data = ''
try:
if not download_range(request_body_info.searchNum, max=10):
raise Exception(f"downloadCound is invalid (current value = {request_body_info.searchNum})")
if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
# query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
query_image_path = gemini_image(request_body_info.prompt) #gemini
query_image_data = request_body_info.inputImage
# query_image_path = os.path.join(TEMP_FOLDER, f'input_{D.date_file_name()}_query.png')
# os.makedirs(TEMP_FOLDER, exist_ok=True)
# save_base64_as_image_file(request_body_info.inputImage ,query_image_path)
query_image_path = query_image_data
vector_request_data = {'query_image_path' : query_image_path,
'index_type' : request_body_info.indexType,
@@ -350,27 +524,36 @@ async def vactor_vit_report_data(request: Request, request_body_info: M.VectorIm
vector_response_dict = json.loads(vector_response.text)
if vector_response.status_code != 200:
raise Exception(f"response error: {vector_response_dict['error']}")
raise Exception(f"search server error: {vector_response_dict['error']}")
if vector_response_dict["error"] != None:
raise Exception(f"vector error: {vector_response_dict['error']}")
raise Exception(f"search result error: {vector_response_dict['error']}")
result_image_paths = vector_response_dict.get('img_list').get('result_image_paths')
result_percents = vector_response_dict.get('img_list').get('result_percents')
result_parts = vector_response_dict.get('img_list').get('result_parts_list')
# 이미지 데이터 생성
vector_image_results = []
for img, percents in zip(result_image_paths, result_percents):
b64_data = image_to_base64_string(img)
float_percent = float(percents)
for img, percents, parts in zip(result_image_paths, result_percents, result_parts):
info = M.VectorImageResult(image=b64_data,percents=float_percent)
b64_data = None
float_percent = None
img_info = None
if img is not None:
if os.path.exists(img):
b64_data = image_to_base64_string(img)
img_info = os.path.split(img)[-1]
if percents is not None:
if percents.isnumeric:
float_percent = float(percents)
info = M.VectorGlassesImageResult(image=b64_data, percents=float_percent, imageInfo=img_info, parts=parts)
vector_image_results.append(info)
if request_body_info.querySend:
query_image_data = image_to_base64_string(query_image_path)
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
@@ -390,21 +573,31 @@ async def vactor_vit_report_data(request: Request, request_body_info: M.VectorIm
return response.set_error(error=e)
@router.post("/vactorImageSearch/vit/imageGenerate/imagen/report", summary="벡터 이미지 검색(clip-vit) - imagen, report 생성", response_model=M.ResponseBase)
async def vactor_vit_report(request: Request, request_body_info: M.VectorImageSearchVitReportReq):
@router.post("/vectorImageSearch/vit/inputImage/data/parts", summary="벡터 이미지 검색(clip-vit) - input image(data): parts", response_model=M.VectorPartsImageSerachDataRes)
async def vactor_vit_input_parts_img_data(request: Request, request_body_info: M.VectorPartsImageSearchVitInputImgReq):
"""
## 벡터 이미지 검색(clip-vit) - imagen, report 생성
> imagen AI를 이용하여 이미지 생성 후 vector 검색 그후 종합결과 이미지 생성
## 벡터 이미지 검색(clip-vit) - inputImage : parts
> 입력된 이미지로 vector 검색 그후 결과 이미지데이터 return
### Requriements
> - googlecli 설치(https://cloud.google.com/sdk/docs/install?hl=ko#linux)
### Input
> 입력이미지(inputImage)는 파츠 이미지파일 이름 (확장자 포함)
> ⚠️ /vectorImageSearch/vit/inputImage/data/glasses API response인 parts 값중 하나를 선택
### options
> - modelType -> b32,b16,l14,l14_336
> - indexType -> l2,cos
### Output
> - 결과(vectorResult)는 base64로 변환된 데이터(image), 유사도(percents)가 쌍으로 나오며, 요청한 searchNum 갯수에 맞춰서 결과가 나옴
> - queryImage는 Input시 입력한 inputImage 이미지 데이터
### Options
> - modelType -> b32, b16, l14, l14_336 (기본값: b32)
> - indexType -> l2, cos (기본값: l2)
> - searchNum -> 결과이미지 갯수 (기본값: 4)
### Notice
> - 일반검색시 indexType , modelType 은 기본값으로 사용
> - indexType , modelType, inputImage - 새로운 조합 (ex l14, cos, Lens)으로 처음 요청할경우 모델을 빌드하는 과정이 추가됨
"""
response = M.ResponseBase()
response = M.VectorPartsImageSerachDataRes()
try:
if request_body_info.modelType not in [M.VitModelType.b32, M.VitModelType.b16, M.VitModelType.l14, M.VitModelType.l14_336]:
raise Exception(f"modelType is invalid (current value = {request_body_info.modelType})")
@@ -412,53 +605,52 @@ async def vactor_vit_report(request: Request, request_body_info: M.VectorImageSe
if request_body_info.indexType not in [M.VitIndexType.cos, M.VitIndexType.l2]:
raise Exception(f"indexType is invalid (current value = {request_body_info.indexType})")
# query_image_path = imagen_generate_temp_image_path(image_prompt=request_body_info.prompt) #imagen
query_image_path = gemini_image(request_body_info.prompt) #gemini
report_image_path = f"{os.path.splitext(query_image_path)[0]}_report.png"
glass_name = f"{request_body_info.inputImage.split('_')[0]}_{request_body_info.inputImage.split('_')[1]}"
query_image_path = os.path.join(IMG_LIST_PATH, glass_name, ImageDepths.parts, request_body_info.inputImage)
query_image_data = image_to_base64_string(query_image_path)
vactor_request_data = {'query_image_path' : query_image_path,
vector_request_data = {'query_image_path' : query_image_path,
'index_type' : request_body_info.indexType,
'model_type' : request_body_info.modelType,
'report_path' : report_image_path}
'search_num' : request_body_info.searchNum}
vactor_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit/report', data=json.dumps(vactor_request_data))
vector_response = requests.post('http://localhost:51002/api/services/faiss/vector/search/vit/parts', data=json.dumps(vector_request_data))
if vactor_response.status_code != 200:
raise Exception(f"response error: {json.loads(vactor_response.text)['error']}")
vector_response_dict = json.loads(vector_response.text)
if json.loads(vactor_response.text)["error"] != None:
raise Exception(f"vector error: {json.loads(vactor_response.text)['error']}")
if vector_response.status_code != 200:
raise Exception(f"search server error: {vector_response_dict['error']}")
if rest_config.config != 'release':
# remote 폴더에 이미지 저장
sftp_client.remote_copy_data(local_path=report_image_path,
remote_path=os.path.join(rest_config.remote_folder, f"imagen_report_vit_{request_body_info.prompt}_{D.date_file_name()}.png"))
else:
shutil.copy(report_image_path, os.path.join(rest_config.local_folder, f"imagen_report_vit_{request_body_info.prompt}_{D.date_file_name()}.png"))
if vector_response_dict["error"] != None:
raise Exception(f"search result error: {vector_response_dict['error']}")
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
os.remove(query_image_path)
del query_image_path
if 'report_image_path' in locals():
if os.path.exists(report_image_path):
os.remove(report_image_path)
del report_image_path
result_image_paths = vector_response_dict.get('img_list').get('result_image_paths')
result_percents = vector_response_dict.get('img_list').get('result_percents')
return response.set_message()
# 이미지 데이터 생성
vector_image_results = []
for img, percents in zip(result_image_paths, result_percents):
b64_data = None
float_percent = None
img_info = None
if img is not None:
if os.path.exists(img):
b64_data = image_to_base64_string(img)
img_info = os.path.split(img)[-1]
if percents is not None:
if percents.isnumeric:
float_percent = float(percents)
info = M.VectorPartsImageResult(image=b64_data, percents=float_percent, imageInfo=img_info)
vector_image_results.append(info)
return response.set_message(vector_result=vector_image_results,query_img=query_image_data)
except Exception as e:
LOG.error(traceback.format_exc())
# Clean up temporary files
if 'query_image_path' in locals():
if os.path.exists(query_image_path):
os.remove(query_image_path)
del query_image_path
if 'report_image_path' in locals():
if os.path.exists(report_image_path):
os.remove(report_image_path)
del report_image_path
return response.set_error(error=e)

View File

@@ -59,3 +59,16 @@ def image_to_base64_string(image_path: str) -> str:
except Exception as e:
# 그 외 예외 처리
raise Exception(f"An unexpected error occurred: {e}")
def save_base64_as_image_file(base64_data: str, output_path: str):
"""
Base64 문자열을 디코딩하여 이미지 파일로 저장합니다.
"""
try:
# Base64 문자열을 디코딩하여 이진 데이터로 변환합니다.
decoded_data = base64.b64decode(base64_data)
with open(output_path, "wb") as image_file:
image_file.write(decoded_data)
except Exception as e:
raise Exception(f"input image data error : {e}")

120
make_vector_files.py Normal file
View File

@@ -0,0 +1,120 @@
import os
import logging
import matplotlib.pyplot as plt
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.faiss_similarity_search import VectorSimilarity
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import *
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.faiss_functions import get_models, find_glass_folder_images, find_parts_folder_images
from vector_rest.app import models as VM
#log level
os.environ["HF_HUB_VERBOSITY"] = "info"
matplotlib_logger = logging.getLogger("matplotlib")
matplotlib_logger.setLevel(logging.INFO)
def make_image_files(item_info, index_type:VM.VitIndexType, model_type:VM.VitModelType):
model = get_models(index_type=index_type, model_type=model_type)
txt_file_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}.txt')
image_lists = []
if not os.path.exists(txt_file_path):
if item_info == VectorSearchItem.glass:
image_lists = find_glass_folder_images(IMG_LIST_PATH)
else:
image_lists = find_parts_folder_images(IMG_LIST_PATH, item_info)
if image_lists:
with open(txt_file_path, 'w') as f:
for img_path in image_lists:
f.write(f"{img_path}\n")
else:
temp_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}.bak')
try:
os.rename(txt_file_path, temp_path)
if item_info == VectorSearchItem.glass:
image_lists = find_glass_folder_images(IMG_LIST_PATH)
else:
image_lists = find_parts_folder_images(IMG_LIST_PATH, item_info)
if image_lists:
with open(txt_file_path, 'w') as f:
for img_path in image_lists:
f.write(f"{img_path}\n")
except Exception as e:
print(f"Error updating image file list: {e}")
finally:
if os.path.exists(txt_file_path):
os.remove(temp_path)
else:
os.rename(temp_path, txt_file_path)
return txt_file_path
def make_vector_files(item_info, index_type:VM.VitIndexType, model_type:VM.VitModelType):
model = get_models(index_type=index_type, model_type=model_type)
txt_file_path = make_image_files(item_info=item_info, index_type=index_type, model_type=model_type)
index_file_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}_{DEFAULT_INDEX_NAME_SUFFIX}')
if not os.path.exists(index_file_path):
vector_model = VectorSimilarity(model_info=model,
index_file_path=index_file_path,
txt_file_path=txt_file_path)
with open(txt_file_path, 'r') as f:
image_lists = [line.strip() for line in f.readlines()]
vector_model.save_index_from_files(images_path_lists=image_lists,
save_index_dir=FAISS_VECTOR_PATH,
item_info=item_info,
index_type=model.value[1].index_type)
else:
temp_path = os.path.join(FAISS_VECTOR_PATH,f'{model.name}_{os.path.basename(model.value[1].trained_model)}_{model.value[1].index_type}_{item_info}_{DEFAULT_INDEX_NAME_SUFFIX}.bak')
try:
os.rename(index_file_path, temp_path)
vector_model = VectorSimilarity(model_info=model,
index_file_path=index_file_path,
txt_file_path=txt_file_path)
with open(txt_file_path, 'r') as f:
image_lists = [line.strip() for line in f.readlines()]
vector_model.save_index_from_files(images_path_lists=image_lists,
save_index_dir=FAISS_VECTOR_PATH,
item_info=item_info,
index_type=model.value[1].index_type)
except Exception as e:
print(f"Error updating vector file: {e}")
finally:
if os.path.exists(index_file_path):
os.remove(temp_path)
else:
os.rename(temp_path, index_file_path)
print(f"Vector file created: {index_file_path} :::::::::: item_info: {item_info}, index_type: {index_type}, model_type: {model_type}")
return index_file_path
if __name__ == '__main__':
class_attributes = dict(VectorSearchItem.__dict__)
pure_data_dict = {
key: value
for key, value in class_attributes.items()
if not key.startswith('__')
}
for item_key, item_value in pure_data_dict.items():
make_vector_files(item_info=item_value, index_type=VM.VitIndexType.l2, model_type=VM.VitModelType.b32)

View File

@@ -13,6 +13,7 @@ pycryptodomex
pycryptodome
email-validator
requests
python-dotenv
#imagen
google-cloud-aiplatform

26
rest.main.service Normal file
View File

@@ -0,0 +1,26 @@
[Unit]
Description=Main REST Service for Glasses AI
After=rest.vector.service
Requires=rest.vector.service
[Service]
User=user
Group=user
# User=fermat
# Group=fermat
WorkingDirectory=/home/user/a2tec/glasses_ai
# WorkingDirectory=/home/fermat/project/glasses/rest
# 51002 포트가 열릴 때까지 대기 (bash 내장 TCP 체크 활용)
ExecStartPre=/bin/bash -c 'until timeout 1s bash -c "cat < /dev/null > /dev/tcp/localhost/51002"; do echo "Waiting for Vector Service on port 51002..."; sleep 2; done'
# rest 가상환경 파이썬 사용
ExecStart=/home/user/anaconda3/envs/rest/bin/python rest_main.py
# ExecStart=/mnt/clover_1TB/anaconda_data/fm_rest/bin/python rest_main.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

24
rest.vector.service Normal file
View File

@@ -0,0 +1,24 @@
[Unit]
Description=Vector Service for Glasses AI
After=network-online.target
Wants=network-online.target
PartOf=rest.main.service
[Service]
User=user
Group=user
# User=fermat
# Group=fermat
WorkingDirectory=/home/user/a2tec/glasses_ai
# WorkingDirectory=/home/fermat/project/glasses/rest
ExecStart=/home/user/anaconda3/envs/rest_vector/bin/python rest_vector.py
# ExecStart=/mnt/clover_1TB/anaconda_data/fm_rest_vector/bin/python rest_vector.py
Restart=always
RestartSec=3
[Install]
WantedBy=multi-user.target

33
utils/api_key_manager.py Normal file
View File

@@ -0,0 +1,33 @@
import os
from dotenv import load_dotenv
from config import rest_config
from custom_logger.main_log import main_logger as LOG
class ApiKeyManager:
def __init__(self):
self.api_key = None
def set_api_key(self, env_path=rest_config.api_path):
if os.path.exists(env_path):
load_dotenv(dotenv_path=env_path)
key = os.getenv("GEMINI_API_KEY")
if key is None:
LOG.error(f"api key 파일에 GEMINI_API_KEY라는 변수가 없습니다")
elif key == "":
LOG.error(f"api key 파일에 내부 변수 GEMINI_API_KEY 값이 빈값입니다 값을 설정해주세요")
else:
self.api_key = key
else:
LOG.error(f"api key 파일이 없습니다 : {os.path.abspath(env_path)}")
def get_api_key(self):
if self.api_key is not None:
return self.api_key

View File

@@ -8,6 +8,7 @@
- 2022-01-14/hsj100@a2tec.co.kr : refactoring
@brief: consts
"""
from config import rest_config
# SUPPORT PROJECT
SUPPORT_PROJECT_BASIC = 'PROJECT_BASIC'
@@ -87,7 +88,7 @@ NUM_RETRY_UUID_GEN = 3
DB_ADDRESS = "localhost"
DB_PORT = 53306
DB_USER_ID = 'root'
DB_USER_PW = '1234'
DB_USER_PW = rest_config.db_pw
DB_NAME = 'FM_TEST'
DB_CHARSET = 'utf8mb4'

View File

@@ -583,7 +583,7 @@ class VitModelType(str, Enum):
l14_336 = "l14_336"
class VactorSearchReq(BaseModel):
class VectorSearchReq(BaseModel):
"""
### [Request] vector 검색
"""
@@ -592,7 +592,7 @@ class VactorSearchReq(BaseModel):
search_num : int = Field(4, description='검색결과 이미지 갯수', example=4)
class VactorSearchVitReportReq(BaseModel):
class VectorSearchVitReportReq(BaseModel):
"""
### [Request] vector 검색(vit) 후 리포트 이미지 생성
"""
@@ -602,7 +602,7 @@ class VactorSearchVitReportReq(BaseModel):
report_path : str = Field(description='리포트 이미지 저장 경로', example='path')
class VactorSearchVitReq(BaseModel):
class VectorSearchVitReq(BaseModel):
"""
### [Request] vector 검색(vit) 후 이미지 생성
"""
@@ -611,21 +611,21 @@ class VactorSearchVitReq(BaseModel):
model_type : VitModelType = Field(VitModelType.l14, description='pretrained 모델 정보', example=VitModelType.l14)
search_num : int = Field(4, description='검색결과 이미지 갯수', example=4)
class VactorSearchVitRes(ResponseBase):
class VectorSearchVitRes(ResponseBase):
img_list : dict = Field({}, description='이미지 결과 리스트', example={})
@staticmethod
def set_error(error):
VactorSearchVitRes.img_list = {}
VactorSearchVitRes.result = False
VactorSearchVitRes.error = str(error)
VectorSearchVitRes.img_list = {}
VectorSearchVitRes.result = False
VectorSearchVitRes.error = str(error)
return VactorSearchVitRes
return VectorSearchVitRes
@staticmethod
def set_message(msg):
VactorSearchVitRes.img_list = msg
VactorSearchVitRes.result = True
VactorSearchVitRes.error = None
VectorSearchVitRes.img_list = msg
VectorSearchVitRes.result = True
VectorSearchVitRes.error = None
return VactorSearchVitRes
return VectorSearchVitRes

View File

@@ -39,7 +39,7 @@ def send_mail():
sender = 'jolimola@gmail.com'
sender_pw = '!ghkdtmdwns1'
# recipient = 'hsj100@a2tec.co.kr'
recipient = 'jwkim@daooldns.co.kr'
recipient = 'jwkim@a2tec.co.kr'
list_cc = ['cc1@gmail.com', 'cc2@naver.com']
str_cc = ','.join(list_cc)

View File

@@ -23,12 +23,14 @@ from custom_apps.faiss_imagenet.main import search_idxs
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.faiss_functions import get_clip_info, save_report_image, get_models
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.faiss_similarity_search import FEMUsageInfo
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.const import VectorSearchItem
from custom_apps.FEATURE_VECTOR_SIMILARITY_FAISS.utils import search_glass_parts, file_name_to_parts
router = APIRouter(prefix="/services")
@router.post("/faiss/vector/search/imagenet", summary="imagenet search", response_model=M.ResponseBase)
async def vactor_search(request: Request, request_body_info: M.VactorSearchReq):
async def vactor_search(request: Request, request_body_info: M.VectorSearchReq):
"""
## 벡터검색
@@ -54,7 +56,10 @@ async def vactor_search(request: Request, request_body_info: M.VactorSearchReq):
@router.post("/faiss/vector/search/vit/report", summary="vit search report", response_model=M.ResponseBase)
async def vactor_report_vit(request: Request, request_body_info: M.VactorSearchVitReportReq):
async def vactor_report_vit(request: Request, request_body_info: M.VectorSearchVitReportReq):
"""
이미지 경로를 입력받아 repport image 저장
"""
response = M.ResponseBase()
try:
if not os.path.exists(request_body_info.query_image_path):
@@ -72,20 +77,58 @@ async def vactor_report_vit(request: Request, request_body_info: M.VactorSearchV
return response.set_error(e)
@router.post("/faiss/vector/search/vit", summary="vit search", response_model=M.VactorSearchVitRes)
async def vactor_vit(request: Request, request_body_info: M.VactorSearchVitReq):
response = M.VactorSearchVitRes()
@router.post("/faiss/vector/search/vit", summary="vit search", response_model=M.VectorSearchVitRes)
async def vactor_vit(request: Request, request_body_info: M.VectorSearchVitReq):
"""
이미지 경로 를 입력받아 vit 방식으로 검색
"""
response = M.VectorSearchVitRes()
try:
# if not os.path.exists(request_body_info.query_image_path):
# raise FileNotFoundError(f"File {request_body_info.query_image_path} does not exist.")
model = get_models(index_type=request_body_info.index_type, model_type=request_body_info.model_type)
report_info = get_clip_info(model=model,
query_image_path=request_body_info.query_image_path,
item_info=VectorSearchItem.glass,
top_k=request_body_info.search_num)
parts = search_glass_parts(report_info.result_image_paths)
return response.set_message({
'result_image_paths': report_info.result_image_paths,
'result_percents': report_info.result_percents,
'result_parts_list': parts
})
except Exception as e:
LOG.error(traceback.format_exc())
return response.set_error(e)
@router.post("/faiss/vector/search/vit/parts", summary="vit search parts", response_model=M.VectorSearchVitRes)
async def vactor_vit_parts(request: Request, request_body_info: M.VectorSearchVitReq):
"""
이미지 경로 를 입력받아 vit 방식으로 parts 검색
"""
response = M.VectorSearchVitRes()
try:
if not os.path.exists(request_body_info.query_image_path):
raise FileNotFoundError(f"File {request_body_info.query_image_path} does not exist.")
model = get_models(index_type=request_body_info.index_type, model_type=request_body_info.model_type)
report_info = get_clip_info(model,request_body_info.query_image_path,top_k=request_body_info.search_num)
parts = file_name_to_parts(request_body_info.query_image_path)
report_info = get_clip_info(model=model,
query_image_path=request_body_info.query_image_path,
item_info=parts,
top_k=request_body_info.search_num)
return response.set_message({
'result_image_paths': report_info.result_image_paths,
'result_percents': report_info.result_percents
'result_percents': report_info.result_percents,
})
except Exception as e: