Spaces:
Runtime error
Runtime error
File size: 7,230 Bytes
2ecc792 808bf67 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 5cd9bab 2ecc792 808bf67 2ecc792 5cd9bab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
from .utils.auth_funcs import *
from .utils.JWTBearer import *
from backend.models import *
from backend.services.db.utils.DBQueries import DBQueries
from backend.core.Exceptions import *
from backend.core.ExceptionHandlers import *
from backend.core.ConfigEnv import config
from backend import app
from fastapi import HTTPException, BackgroundTasks
from pydantic import ValidationError
from jose import jwt
from fastapi_mail import MessageSchema, MessageType
# import openai
# from transformers import RobertaTokenizer, T5ForConditionalGeneration
async def ops_signup(bgtasks: BackgroundTasks, response_result: GeneralResponse, data: UserAuth):
"""Wrapper method to handle signup process.
Args:
response_result: FrontendResponseModel. A TypedDict to return the
response captured from the API to the frontend.
data: UserAuth. New user's prospective credentials from the frontend
to create their account.
Raises:
ExistingUserException: If account with entered AADHAR Number already exists.
"""
# querying database to check if user already exist
user = DBQueries.fetch_data_from_database('auth', ['username', 'email'], f"username='{data.username}' OR email='{data.email}'")
if len(list(user)) != 0:
# user with the entered credentials already exists
raise ExistingUserException(response_result)
verifiction_token = Auth.create_access_token(f"{data.username} {data.email}", secret_name='VERIFICATION')
verification_link = f"http://localhost:8000/auth/verify/{verifiction_token}"
email_body_params = {
"username": data.username,
"verify_link": verification_link
}
message = MessageSchema(
subject="Welcome to Techdocs:[Account Verification]",
recipients=[data.email], # List of recipients, as many as you can pass
template_body=email_body_params,
subtype=MessageType.html
)
bgtasks.add_task(app.state.mail_client.send_message, message=message, template_name="email_verification.html")
# await app.state.mail_client.send_message(message=message, template_name="email_verification.html")
DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), "", 0),
['username', 'password', 'email', 'is_verified'])
response_result.status = 'success'
response_result.message = [f'Activate your account by clicking on the link sent to {data.email}.\nMake sure to check your spam folder.']
def ops_login(data:LoginCreds):
"""Wrapper method to handle login process.
Args:
data: LoginCreds. User's credentials from the frontend to login to their account.
Returns:
TokenSchema. A Pydantic BaseModel to return the JWT tokens to the frontend.
Raises:
InvalidCredentialsException: If account with entered credentials does not exist.
"""
# querying database to check if user already exist
response_result = GeneralResponse.get_instance(data={},
status="not_allowed",
message=["Not authenticated"]
)
user = DBQueries.fetch_data_from_database('auth', ['username', 'password', 'is_verified'], f"username='{data.username}'")
user = list(user)
if len(user) == 0:
# user with the entered credentials does not exist
raise InvalidCredentialsException(response_result)
user = user[0]
if not Auth.verify_password(data.password, user[1]) and Auth.verify_username(data.username, user[0]):
# password is incorrect
raise InvalidCredentialsException(response_result)
if not user[2]:
raise EmailNotVerifiedException()
# password is correct
return TokenSchema(access_token=Auth.create_access_token(data.username),
refresh_token=Auth.create_access_token(data.username, secret_name='REFRESH'),
)
def ops_regenerate_api_key(username:str) -> APIKey:
user_API_entry = DBQueries.fetch_data_from_database('api_key', 'apikey', f"username='{username}'")
user_API_entry = list(user_API_entry)
apikey = None
if len(user_API_entry) != 0:
apikey = APIKey(api_key=Auth.generate_api_key(username))
DBQueries.update_data_in_database('api_key','apikey',f"username='{username}'", apikey.api_key)
else:
apikey = Auth.generate_api_key(username)
DBQueries.insert_to_database('api_key', (username, apikey), ['username', 'apikey'])
apikey = APIKey(api_key=apikey)
return apikey
def ops_inference(source_code:str,api_key:str,username:str):
response_result = GeneralResponse.get_instance(data={},
status="not_allowed",
message=["Not authenticated"]
)
user=DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'")
if len(list(user)) == 0:
# user with the entered credentials does not exist
raise InfoNotFoundException(response_result,"User not found")
elif list(user)[0][0]!=api_key:
raise InvalidCredentialsException(response_result)
def generate_docstring(source_code_message: str):
llm_response = app.state.llmchain.run({"instruction": source_code_message})
docstring = Inference(docstr=llm_response)
return docstring
return generate_docstring(source_code)
def ops_verify_email(request: Request, response_result: GeneralResponse, token:str):
try:
payload = jwt.decode(
token, config.JWT_VERIFICATION_SECRET_KEY, algorithms=[config.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp)< datetime.now():
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
username, email = token_data.sub.split(' ', maxsplit=1)
registered_email = DBQueries.fetch_data_from_database('auth', ['is_verified'], f"username='{username}'")
registered_email = list(registered_email)
if len(registered_email) == 0:
raise InfoNotFoundException(response_result,"User not found")
print(registered_email[0][0])
if registered_email[0][0]:
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
DBQueries.update_data_in_database('auth','is_verified',f"username='{username}'", (True,))
DBQueries.update_data_in_database('auth','email',f"username='{username}'", email)
response_result.status = 'success'
response_result.message = [f'Email verified successfully']
return app.state.templates.TemplateResponse("verification_success.html", context={"request": request})
except (jwt.JWTError, ValidationError):
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
|