Files
GLASSES_AI_SERVER/rest/app/utils/extra.py

205 lines
6.2 KiB
Python

# -*- coding: utf-8 -*-
"""
@File: extra.py
@Date: 2020-09-14
@author: A2TEC
@section MODIFYINFO 수정정보
- 수정자/수정일 : 수정내역
- 2022-01-14/hsj100@a2tec.co.kr : refactoring
@brief: extra functions
"""
from hashlib import md5
from base64 import b64decode
from base64 import b64encode
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from cryptography.fernet import Fernet # symmetric encryption
# mail test
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from itertools import groupby
from operator import attrgetter
import uuid
from rest.app.common.consts import NUM_RETRY_UUID_GEN, SMTP_HOST, SMTP_PORT
from rest.app.utils.date_utils import D
from rest.app import models as M
from rest.app.common.consts import AES_CBC_PUBLIC_KEY, AES_CBC_IV, FERNET_SECRET_KEY
async def send_mail(sender, sender_pw, title, recipient, contents_plain, contents_html, cc_list, smtp_host=SMTP_HOST, smtp_port=SMTP_PORT):
"""
구글 계정사용시 : 보안 수준이 낮은 앱에서의 접근 활성화
:return:
None: success
Str. Message: error
"""
try:
# check parameters
if not sender:
raise Exception('invalid sender')
if not title:
raise Exception('invalid title')
if not recipient:
raise Exception('invalid recipient')
# sender info.
# sender = consts.ADMIN_INIT_ACCOUNT_INFO.email
# sender_pw = consts.ADMIN_INIT_ACCOUNT_INFO.email_pw
# message
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
if cc_list:
list_cc = cc_list
str_cc = ','.join(list_cc)
msg['Cc'] = str_cc
msg['Subject'] = title
if contents_plain:
msg.attach(MIMEText(contents_plain, 'plain'))
if contents_html:
msg.attach(MIMEText(contents_html, 'html'))
# smtp server
smtp_server = smtplib.SMTP(host=smtp_host, port=smtp_port)
smtp_server.ehlo()
smtp_server.starttls()
smtp_server.ehlo()
smtp_server.login(sender, sender_pw)
smtp_server.send_message(msg)
smtp_server.quit()
return None
except Exception as e:
return str(e)
def query_to_groupby(query_result, key, first=False):
"""
쿼리 결과물(list)을 항목값(key)으로 그룹화한다.
:param query_result: 쿼리 결과 리스트
:param key: 그룹 항목값
:return: dict
"""
group_info = dict()
for k, g in groupby(query_result, attrgetter(key)):
if k not in group_info:
if not first:
group_info[k] = list(g)
else:
group_info[k] = list(g)[0]
else:
if not first:
group_info[k].extend(list(g))
return group_info
def query_to_groupby_date(query_result, key):
"""
쿼리 결과물(list)을 항목값(key)으로 그룹화한다.
:param query_result: 쿼리 결과 리스트
:param key: 그룹 항목값
:return: dict
"""
group_info = dict()
for k, g in groupby(query_result, attrgetter(key)):
day_str = k.strftime("%Y-%m-%d")
if day_str not in group_info:
group_info[day_str] = list(g)
else:
group_info[day_str].extend(list(g))
return group_info
class FernetCrypto:
def __init__(self, key=FERNET_SECRET_KEY):
self.key = key
self.f = Fernet(self.key)
def encrypt(self, data, is_out_string=True):
if isinstance(data, bytes):
ou = self.f.encrypt(data) # 바이트형태이면 바로 암호화
else:
ou = self.f.encrypt(data.encode('utf-8')) # 인코딩 후 암호화
if is_out_string is True:
return ou.decode('utf-8') # 출력이 문자열이면 디코딩 후 반환
else:
return ou
def decrypt(self, data, is_out_string=True):
if isinstance(data, bytes):
ou = self.f.decrypt(data) # 바이트형태이면 바로 복호화
else:
ou = self.f.decrypt(data.encode('utf-8')) # 인코딩 후 복호화
if is_out_string is True:
return ou.decode('utf-8') # 출력이 문자열이면 디코딩 후 반환
else:
return ou
class AESCryptoCBC:
def __init__(self, key=AES_CBC_PUBLIC_KEY, iv=AES_CBC_IV):
# Initial vector를 0으로 초기화하여 16바이트 할당함
# iv = chr(0) * 16 #pycrypto 기준
# iv = bytes([0x00] * 16) #pycryptodomex 기준
# aes cbc 생성
self.key = key
self.iv = iv
self.crypto = AES.new(self.key, AES.MODE_CBC, self.iv)
def encrypt(self, data):
# 암호화 message는 16의 배수여야 한다.
# enc = self.crypto.encrypt(data)
# return enc
enc = self.crypto.encrypt(pad(data, AES.block_size))
return b64encode(enc)
def decrypt(self, enc):
# 복호화 enc는 16의 배수여야 한다.
# dec = self.crypto.decrypt(enc)
# return dec
enc = b64decode(enc)
dec = self.crypto.decrypt(enc)
return unpad(dec, AES.block_size)
class AESCipher:
def __init__(self, key):
# self.key = md5(key.encode('utf8')).digest()
self.key = bytes(key.encode('utf-8'))
def encrypt(self, data):
# iv = get_random_bytes(AES.block_size)
iv = bytes('daooldns12345678'.encode('utf-8'))
self.cipher = AES.new(self.key, AES.MODE_CBC, iv)
t = b64encode(self.cipher.encrypt(pad(data.encode('utf-8'), AES.block_size)))
return b64encode(iv + self.cipher.encrypt(pad(data.encode('utf-8'), AES.block_size)))
def decrypt(self, data):
raw = b64decode(data)
self.cipher = AES.new(self.key, AES.MODE_CBC, raw[:AES.block_size])
return unpad(self.cipher.decrypt(raw[AES.block_size:]), AES.block_size)
def cls_list_to_dict_list(list):
"""
list 내부 element가 dict로 변환 가능한 class일경우
내부 element를 dict 로 변경
"""
_result = []
for i in list:
if isinstance(i, dict):
_result = list
break
_result.append(i.dict())
return _result