Spaces:
Runtime error
Runtime error
File size: 7,109 Bytes
2ecc792 dce75a4 2ecc792 808bf67 5cd9bab 2ecc792 5cd9bab 9b1f57f 5cd9bab 2ecc792 5cd9bab 9b1f57f 5cd9bab 9ee00e3 ea406bd 9ee00e3 5cd9bab 782b6ea 5cd9bab fde59e3 5cd9bab 2ecc792 1f640f4 2ecc792 5cd9bab 2ecc792 fde59e3 5cd9bab 2ecc792 5cd9bab 2ecc792 9b1f57f 2ecc792 21bfd6f 9b1f57f 2ecc792 9b1f57f 2ecc792 5cd9bab b60b60f 5cd9bab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from .utils.auth_funcs import *
from .utils.functools import *
from .utils.JWTBearer import *
from backend.models import *
from backend.services.db.utils.DBQueries import DBQueries
from backend.core.Exceptions import *
from backend.core.ExceptionHandlers import *
from backend.core.ConfigEnv import config
from backend import app
from fastapi import HTTPException, BackgroundTasks
from pydantic import ValidationError
from jose import jwt
# BASE_URL = "http://127.0.0.1:8000"
BASE_URL = "https://caffeinecrew-techdocs.hf.space"
async def ops_signup(bgtasks: BackgroundTasks, response_result: GeneralResponse, data: UserAuth):
"""Wrapper method to handle signup process.
Args:
response_result: FrontendResponseModel. A TypedDict to return the
response captured from the API to the frontend.
data: UserAuth. New user's prospective credentials from the frontend
to create their account.
Raises:
ExistingUserException: If account with entered AADHAR Number already exists.
"""
# querying database to check if user already exist
user = DBQueries.fetch_data_from_database('auth', ['username', 'email'], f"username='{data.username}' OR email='{data.email}'")
if len(list(user)) != 0:
# user with the entered credentials already exists
raise ExistingUserException(response_result)
verifiction_token = Auth.create_access_token(f"{data.username} {data.email}", secret_name='VERIFICATION')
verification_link = f"{BASE_URL}/auth/verify/{verifiction_token}"
email_body_params = {
"username": data.username,
"verify_link": verification_link
}
details = {
"recipients": [data.email],
"subject": "Welcome to Techdocs:[Account Verification]",
"template_name": "email_verification.html",
"template_kwargs": email_body_params
}
status = post_request(url=config.MAIL_SERVER_URL, data=details, headers=None)
if status != 200:
raise EmailNotSentException()
DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), "", 0),
['username', 'password', 'email', 'is_verified'])
response_result.status = 'success'
response_result.message = [f'Activate your account by clicking on the link sent to {data.email}.\nMake sure to check your spam folder.']
def ops_login(data:LoginCreds):
"""Wrapper method to handle login process.
Args:
data: LoginCreds. User's credentials from the frontend to login to their account.
Returns:
TokenSchema. A Pydantic BaseModel to return the JWT tokens to the frontend.
Raises:
InvalidCredentialsException: If account with entered credentials does not exist.
"""
# querying database to check if user already exist
response_result = GeneralResponse.get_instance(data={},
status="not_allowed",
message=["Not authenticated"]
)
user = DBQueries.fetch_data_from_database('auth', ['username', 'password', 'is_verified'], f"username='{data.username}'")
user = list(user)
if len(user) == 0:
# user with the entered credentials does not exist
raise InvalidCredentialsException(response_result)
user = user[0]
if not Auth.verify_password(data.password, user[1]) and Auth.verify_username(data.username, user[0]):
# password is incorrect
raise InvalidCredentialsException(response_result)
if not user[2]:
raise EmailNotVerifiedException()
# password is correct
return TokenSchema(access_token=Auth.create_access_token(data.username),
refresh_token=Auth.create_access_token(data.username, secret_name='REFRESH'),
)
def ops_regenerate_api_key(username:str) -> APIKey:
user_API_entry = DBQueries.fetch_data_from_database('api_key', 'apikey', f"username='{username}'")
user_API_entry = list(user_API_entry)
apikey = None
if len(user_API_entry) != 0:
apikey = APIKey(api_key=Auth.generate_api_key(username))
DBQueries.update_data_in_database('api_key','apikey',f"username='{username}'", apikey.api_key)
else:
apikey = Auth.generate_api_key(username)
DBQueries.insert_to_database('api_key', (username, apikey), ['username', 'apikey'])
apikey = APIKey(api_key=apikey)
return apikey
def ops_inference(source_code:str,api_key:str,username:str):
response_result = GeneralResponse.get_instance(data={},
status="not_allowed",
message=["Not authenticated"]
)
user=DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'")
if len(list(user)) == 0:
# user with the entered credentials does not exist
raise InfoNotFoundException(response_result,"User not found")
elif list(user)[0][0]!=api_key:
raise InvalidCredentialsException(response_result)
def generate_docstring(source_code_message: str):
llm_response = app.state.llmchain.invoke({"instruction": source_code_message})
print(llm_response)
docstring = Inference(docstr=llm_response)
return docstring
return generate_docstring(source_code)
def ops_verify_email(request: Request, response_result: GeneralResponse, token:str):
try:
payload = jwt.decode(
token, config.JWT_VERIFICATION_SECRET_KEY, algorithms=[config.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp)< datetime.now():
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
username, email = token_data.sub.split(' ', maxsplit=1)
registered_email = DBQueries.fetch_data_from_database('auth', ['is_verified'], f"username='{username}'")
registered_email = list(registered_email)
if len(registered_email) == 0:
raise InfoNotFoundException(response_result,"User not found")
print(registered_email[0][0])
if registered_email[0][0]:
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
DBQueries.update_data_in_database('auth','is_verified',f"username='{username}'", (True,))
DBQueries.update_data_in_database('auth','email',f"username='{username}'", email)
response_result.status = 'success'
response_result.message = [f'Email verified successfully']
return app.state.templates.TemplateResponse("verification_success.html", context={"request": request})
except (jwt.JWTError, ValidationError):
return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request})
|