# -*- coding: utf-8 -*- """ @File: extra.py @Date: 2020-09-14 @author: A2TEC @section MODIFYINFO 수정정보 - 수정자/수정일 : 수정내역 - 2022-01-14/hsj100@a2tec.co.kr : refactoring @brief: extra functions """ from hashlib import md5 from base64 import b64decode from base64 import b64encode from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from cryptography.fernet import Fernet # symmetric encryption # mail test import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from itertools import groupby from operator import attrgetter import uuid from rest.app.common.consts import NUM_RETRY_UUID_GEN, SMTP_HOST, SMTP_PORT from rest.app.utils.date_utils import D from rest.app import models as M from rest.app.common.consts import AES_CBC_PUBLIC_KEY, AES_CBC_IV, FERNET_SECRET_KEY async def send_mail(sender, sender_pw, title, recipient, contents_plain, contents_html, cc_list, smtp_host=SMTP_HOST, smtp_port=SMTP_PORT): """ 구글 계정사용시 : 보안 수준이 낮은 앱에서의 접근 활성화 :return: None: success Str. Message: error """ try: # check parameters if not sender: raise Exception('invalid sender') if not title: raise Exception('invalid title') if not recipient: raise Exception('invalid recipient') # sender info. # sender = consts.ADMIN_INIT_ACCOUNT_INFO.email # sender_pw = consts.ADMIN_INIT_ACCOUNT_INFO.email_pw # message msg = MIMEMultipart() msg['From'] = sender msg['To'] = recipient if cc_list: list_cc = cc_list str_cc = ','.join(list_cc) msg['Cc'] = str_cc msg['Subject'] = title if contents_plain: msg.attach(MIMEText(contents_plain, 'plain')) if contents_html: msg.attach(MIMEText(contents_html, 'html')) # smtp server smtp_server = smtplib.SMTP(host=smtp_host, port=smtp_port) smtp_server.ehlo() smtp_server.starttls() smtp_server.ehlo() smtp_server.login(sender, sender_pw) smtp_server.send_message(msg) smtp_server.quit() return None except Exception as e: return str(e) def query_to_groupby(query_result, key, first=False): """ 쿼리 결과물(list)을 항목값(key)으로 그룹화한다. :param query_result: 쿼리 결과 리스트 :param key: 그룹 항목값 :return: dict """ group_info = dict() for k, g in groupby(query_result, attrgetter(key)): if k not in group_info: if not first: group_info[k] = list(g) else: group_info[k] = list(g)[0] else: if not first: group_info[k].extend(list(g)) return group_info def query_to_groupby_date(query_result, key): """ 쿼리 결과물(list)을 항목값(key)으로 그룹화한다. :param query_result: 쿼리 결과 리스트 :param key: 그룹 항목값 :return: dict """ group_info = dict() for k, g in groupby(query_result, attrgetter(key)): day_str = k.strftime("%Y-%m-%d") if day_str not in group_info: group_info[day_str] = list(g) else: group_info[day_str].extend(list(g)) return group_info class FernetCrypto: def __init__(self, key=FERNET_SECRET_KEY): self.key = key self.f = Fernet(self.key) def encrypt(self, data, is_out_string=True): if isinstance(data, bytes): ou = self.f.encrypt(data) # 바이트형태이면 바로 암호화 else: ou = self.f.encrypt(data.encode('utf-8')) # 인코딩 후 암호화 if is_out_string is True: return ou.decode('utf-8') # 출력이 문자열이면 디코딩 후 반환 else: return ou def decrypt(self, data, is_out_string=True): if isinstance(data, bytes): ou = self.f.decrypt(data) # 바이트형태이면 바로 복호화 else: ou = self.f.decrypt(data.encode('utf-8')) # 인코딩 후 복호화 if is_out_string is True: return ou.decode('utf-8') # 출력이 문자열이면 디코딩 후 반환 else: return ou class AESCryptoCBC: def __init__(self, key=AES_CBC_PUBLIC_KEY, iv=AES_CBC_IV): # Initial vector를 0으로 초기화하여 16바이트 할당함 # iv = chr(0) * 16 #pycrypto 기준 # iv = bytes([0x00] * 16) #pycryptodomex 기준 # aes cbc 생성 self.key = key self.iv = iv self.crypto = AES.new(self.key, AES.MODE_CBC, self.iv) def encrypt(self, data): # 암호화 message는 16의 배수여야 한다. # enc = self.crypto.encrypt(data) # return enc enc = self.crypto.encrypt(pad(data, AES.block_size)) return b64encode(enc) def decrypt(self, enc): # 복호화 enc는 16의 배수여야 한다. # dec = self.crypto.decrypt(enc) # return dec enc = b64decode(enc) dec = self.crypto.decrypt(enc) return unpad(dec, AES.block_size) class AESCipher: def __init__(self, key): # self.key = md5(key.encode('utf8')).digest() self.key = bytes(key.encode('utf-8')) def encrypt(self, data): # iv = get_random_bytes(AES.block_size) iv = bytes('daooldns12345678'.encode('utf-8')) self.cipher = AES.new(self.key, AES.MODE_CBC, iv) t = b64encode(self.cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))) return b64encode(iv + self.cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))) def decrypt(self, data): raw = b64decode(data) self.cipher = AES.new(self.key, AES.MODE_CBC, raw[:AES.block_size]) return unpad(self.cipher.decrypt(raw[AES.block_size:]), AES.block_size) def cls_list_to_dict_list(list): """ list 내부 element가 dict로 변환 가능한 class일경우 내부 element를 dict 로 변경 """ _result = [] for i in list: if isinstance(i, dict): _result = list break _result.append(i.dict()) return _result