# -*- coding: utf-8 -*- """ @File: users.py @Date: 2020-09-14 @author: A2TEC @section MODIFYINFO 수정정보 - 수정자/수정일 : 수정내역 - 2022-01-14/hsj100@a2tec.co.kr : refactoring @brief: users api """ from fastapi import APIRouter from starlette.requests import Request import bcrypt from rest.app.common import consts from rest.app import models as M from rest.app.common.config import conf from rest.app.database.schema import Users from rest.app.database.crud import table_select, table_update, table_delete from rest.app.utils.extra import AESCryptoCBC router = APIRouter(prefix='/user') @router.get('/me', response_model=M.UserSearchRes, summary='접속자 정보') async def get_me(request: Request): """ ## 현재 접속된 자신정보 확인 ***현 버전 미지원(추후 상세 처리)*** **결과** - UserSearchRes """ target_table = Users search_info = None try: # request if conf().GLOBAL_TOKEN: raise Exception('not supported: use search api!') accessor_info = request.state.user if not accessor_info: raise Exception('invalid accessor') # search search_info = target_table.get(account=accessor_info.account) if not search_info: raise Exception('not found data') # result result_info = list() result_info.append(M.UserInfo.from_orm(search_info)) return M.UserSearchRes(data=result_info) except Exception as e: if search_info: search_info.close() return M.ResponseBase.set_error(str(e)) @router.post('/search', response_model=M.UserSearchPagingRes, summary='유저정보 검색') async def user_search(request: Request, request_body_info: M.UserSearchPagingReq): """ ## 유저정보 검색 (기본) 검색 조건은 유저 테이블 항목만 가능 (Request body: Schema 참조)\n 관련 정보 연동 검색은 별도 API 사용 **세부항목** - **paging**\n 항목 미사용시에는 페이징기능 없이 검색조건(search) 결과 모두 반환 - **search**\n 검색에 필요한 항목들을 search 에 포함시킨다.\n 검색에 사용된 각 항목들은 AND 조건으로 처리된다. - **전체검색**\n empty object 사용 ( {} )\n 예) "search": {} - **검색항목**\n - 부분검색 항목\n SQL 문법( %, _ )을 사용한다.\n __like: 시작포함(X%), 중간포함(%X%), 끝포함(%X) - 구간검색 항목 * __lt: 주어진 값보다 작은값 * __lte: 주어진 값보다 같거나 작은 값 * __gt: 주어진 값보다 큰값 * __gte: 주어진 값보다 같거나 큰값 **결과** - UserSearchPagingRes """ return await table_select(request.state.user, Users, request_body_info, M.UserSearchPagingRes, M.UserInfo) @router.put('/update', response_model=M.ResponseBase, summary='유저정보 변경') async def user_update(request: Request, request_body_info: M.UserUpdateMultiReq): """ ## 유저정보 변경 **search_info**: 변경대상\n **update_info**: 변경내용\n - **비밀번호** 제외 **결과** - ResponseBase """ return await table_update(request.state.user, Users, request_body_info, M.ResponseBase) @router.put('/update_pw', response_model=M.ResponseBase, summary='유저 비밀번호 변경') async def user_update_pw(request: Request, request_body_info: M.UserUpdatePWReq): """ ## 유저정보 비밀번호 변경 **account**의 **비밀번호**를 변경한다. **결과** - ResponseBase """ target_table = Users search_info = None try: # request accessor_info = request.state.user if not accessor_info: raise Exception('invalid accessor') if not request_body_info.account: raise Exception('invalid account') # decrypt pw try: decode_cur_pw = request_body_info.current_pw.encode('utf-8') desc_cur_pw = AESCryptoCBC().decrypt(decode_cur_pw) except Exception as e: raise Exception(f'failed decryption [current_pw]: {e}') try: decode_new_pw = request_body_info.new_pw.encode('utf-8') desc_new_pw = AESCryptoCBC().decrypt(decode_new_pw) except Exception as e: raise Exception(f'failed decryption [new_pw]: {e}') # search target_user = target_table.get(account=request_body_info.account) is_verified = bcrypt.checkpw(desc_cur_pw, target_user.pw.encode('utf-8')) if not is_verified: raise Exception('invalid password') search_info = target_table.filter(id=target_user.id) if not search_info.first(): raise Exception('not found data') # process hash_pw = bcrypt.hashpw(desc_new_pw, bcrypt.gensalt()) result_info = search_info.update(auto_commit=True, pw=hash_pw) if not result_info or not result_info.id: raise Exception('failed update') # result return M.ResponseBase() except Exception as e: if search_info: search_info.close() return M.ResponseBase.set_error(str(e)) @router.delete('/delete', response_model=M.ResponseBase, summary='유저정보 삭제') async def user_delete(request: Request, request_body_info: M.UserSearchReq): """ ## 유저정보 삭제 조건에 해당하는 정보를 모두 삭제한다.\n - **본 API는 DB에서 완적삭제를 하는 함수이며, 서버관리자가 사용하는 것을 권장한다.** - **update API를 사용하여 상태 항목을 변경해서 사용하는 것을 권장.** `유저삭제시 관계 테이블의 정보도 같이 삭제된다.` **결과** - ResponseBase """ return await table_delete(request.state.user, Users, request_body_info, M.ResponseBase) # NOTE(hsj100): apikey """ """ # @router.get('/apikeys', response_model=List[M.GetApiKeyList]) # async def get_api_keys(request: Request): # """ # API KEY 조회 # :param request: # :return: # """ # user = request.state.user # api_keys = ApiKeys.filter(user_id=user.id).all() # return api_keys # # # @router.post('/apikeys', response_model=M.GetApiKeys) # async def create_api_keys(request: Request, key_info: M.AddApiKey, session: Session = Depends(db.session)): # """ # API KEY 생성 # :param request: # :param key_info: # :param session: # :return: # """ # user = request.state.user # # api_keys = ApiKeys.filter(session, user_id=user.id, status='active').count() # if api_keys == MAX_API_KEY: # raise ex.MaxKeyCountEx() # # alphabet = string.ascii_letters + string.digits # s_key = ''.join(secrets.choice(alphabet) for _ in range(40)) # uid = None # while not uid: # uid_candidate = f'{str(uuid4())[:-12]}{str(uuid4())}' # uid_check = ApiKeys.get(access_key=uid_candidate) # if not uid_check: # uid = uid_candidate # # key_info = key_info.dict() # new_key = ApiKeys.create(session, auto_commit=True, secret_key=s_key, user_id=user.id, access_key=uid, **key_info) # return new_key # # # @router.put('/apikeys/{key_id}', response_model=M.GetApiKeyList) # async def update_api_keys(request: Request, key_id: int, key_info: M.AddApiKey): # """ # API KEY User Memo Update # :param request: # :param key_id: # :param key_info: # :return: # """ # user = request.state.user # key_data = ApiKeys.filter(id=key_id) # if key_data and key_data.first().user_id == user.id: # return key_data.update(auto_commit=True, **key_info.dict()) # raise ex.NoKeyMatchEx() # # # @router.delete('/apikeys/{key_id}') # async def delete_api_keys(request: Request, key_id: int, access_key: str): # user = request.state.user # await check_api_owner(user.id, key_id) # search_by_key = ApiKeys.filter(access_key=access_key) # if not search_by_key.first(): # raise ex.NoKeyMatchEx() # search_by_key.delete(auto_commit=True) # return MessageOk() # # # @router.get('/apikeys/{key_id}/whitelists', response_model=List[M.GetAPIWhiteLists]) # async def get_api_keys(request: Request, key_id: int): # user = request.state.user # await check_api_owner(user.id, key_id) # whitelists = ApiWhiteLists.filter(api_key_id=key_id).all() # return whitelists # # # @router.post('/apikeys/{key_id}/whitelists', response_model=M.GetAPIWhiteLists) # async def create_api_keys(request: Request, key_id: int, ip: M.CreateAPIWhiteLists, session: Session = Depends(db.session)): # user = request.state.user # await check_api_owner(user.id, key_id) # import ipaddress # try: # _ip = ipaddress.ip_address(ip.ip_addr) # except Exception as e: # raise ex.InvalidIpEx(ip.ip_addr, e) # if ApiWhiteLists.filter(api_key_id=key_id).count() == MAX_API_WHITELIST: # raise ex.MaxWLCountEx() # ip_dup = ApiWhiteLists.get(api_key_id=key_id, ip_addr=ip.ip_addr) # if ip_dup: # return ip_dup # ip_reg = ApiWhiteLists.create(session=session, auto_commit=True, api_key_id=key_id, ip_addr=ip.ip_addr) # return ip_reg # # # @router.delete('/apikeys/{key_id}/whitelists/{list_id}') # async def delete_api_keys(request: Request, key_id: int, list_id: int): # user = request.state.user # await check_api_owner(user.id, key_id) # ApiWhiteLists.filter(id=list_id, api_key_id=key_id).delete() # # return MessageOk() # # # async def check_api_owner(user_id, key_id): # api_keys = ApiKeys.get(id=key_id, user_id=user_id) # if not api_keys: # raise ex.NoKeyMatchEx()