mayureshagashe2105 commited on
Commit
cb77b29
·
1 Parent(s): 6bf71bb

Add auth servies and functools

Browse files
backend/services/auth/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .ops import *
2
+ from .utils.JWTBearer import *
backend/services/auth/ops.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .utils.auth_funcs import *
2
+ from .utils.JWTBearer import *
3
+ from docguptea.services import parser
4
+ from docguptea.models import *
5
+ from docguptea.services.db.utils.DBQueries import DBQueries
6
+ from docguptea.core.Exceptions import *
7
+ from docguptea import app
8
+
9
+ # import openai
10
+ # from transformers import RobertaTokenizer, T5ForConditionalGeneration
11
+
12
+ def ops_signup(response_result: GeneralResponse, data: UserAuth):
13
+ """Wrapper method to handle signup process.
14
+
15
+ Args:
16
+ response_result: FrontendResponseModel. A TypedDict to return the
17
+ response captured from the API to the frontend.
18
+ data: UserAuth. New user's prospective credentials from the frontend
19
+ to create their account.
20
+
21
+ Raises:
22
+ ExistingUserException: If account with entered AADHAR Number already exists.
23
+ """
24
+ # querying database to check if user already exist
25
+ user = DBQueries.fetch_data_from_database('auth', ['username', 'email'], f"username='{data.username}' OR email='{data.email}'")
26
+ if len(list(user)) != 0:
27
+ # user with the entered credentials already exists
28
+ raise ExistingUserException(response_result)
29
+
30
+ DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), data.email),
31
+ ['username', 'password', 'email'])
32
+
33
+ response_result.status = 'success'
34
+ response_result.message = [f'User created successfully']
35
+
36
+ def ops_login(data:LoginCreds):
37
+ """Wrapper method to handle login process.
38
+
39
+ Args:
40
+ data: LoginCreds. User's credentials from the frontend to login to their account.
41
+
42
+ Returns:
43
+ TokenSchema. A Pydantic BaseModel to return the JWT tokens to the frontend.
44
+
45
+ Raises:
46
+ InvalidCredentialsException: If account with entered credentials does not exist.
47
+ """
48
+ # querying database to check if user already exist
49
+ response_result = GeneralResponse.get_instance(data={},
50
+ status="not_allowed",
51
+ message=["Not authenticated"]
52
+ )
53
+ user = DBQueries.fetch_data_from_database('auth', ['username', 'password'], f"username='{data.username}'")
54
+ user = list(user)
55
+ if len(user) == 0:
56
+ # user with the entered credentials does not exist
57
+ raise InvalidCredentialsException(response_result)
58
+ user = user[0]
59
+ if not Auth.verify_password(data.password, user[1]) and Auth.verify_username(data.username, user[0]):
60
+ # password is incorrect
61
+ raise InvalidCredentialsException(response_result)
62
+
63
+ # password is correct
64
+ return TokenSchema(access_token=Auth.create_access_token(data.username),
65
+ refresh_token=Auth.create_refresh_token(data.username),
66
+ )
67
+
68
+ def ops_regenerate_api_key(username:str) -> APIKey:
69
+
70
+ user_API_entry = DBQueries.fetch_data_from_database('api_key', 'apikey', f"username='{username}'")
71
+ user_API_entry = list(user_API_entry)
72
+ apikey = None
73
+
74
+ if len(user_API_entry) != 0:
75
+ apikey = APIKey(api_key=Auth.generate_api_key(username))
76
+ DBQueries.update_data_in_database('api_key','apikey',f"username='{username}'", apikey.api_key)
77
+
78
+ else:
79
+ apikey = Auth.generate_api_key(username)
80
+ DBQueries.insert_to_database('api_key', (username, apikey), ['username', 'apikey'])
81
+ apikey = APIKey(api_key=apikey)
82
+
83
+ return apikey
84
+
85
+
86
+
87
+ def ops_inference(source_code:str,api_key:str,username:str):
88
+ response_result = GeneralResponse.get_instance(data={},
89
+ status="not_allowed",
90
+ message=["Not authenticated"]
91
+ )
92
+
93
+ user=DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'")
94
+ if len(list(user)) == 0:
95
+ # user with the entered credentials does not exist
96
+ raise InfoNotFoundException(response_result,"User not found")
97
+ elif list(user)[0][0]!=api_key:
98
+ raise InvalidCredentialsException(response_result)
99
+
100
+ def generate_docstring(source_code_message: str):
101
+
102
+
103
+ llm_response = app.state.llmchain.run({"instruction": source_code_message})
104
+
105
+ docstring = Inference(docstr=llm_response)
106
+
107
+
108
+ return docstring
109
+
110
+ return generate_docstring(source_code)
backend/services/auth/utils/JWTBearer.py ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Custom Authentication & Authorization bearer to authenticate and authorize
2
+ users based on the following factors:
3
+ 1. username
4
+ 2.password
5
+ 3.email
6
+
7
+ This utility class validates generated JWTs and grants scoped access to users
8
+ according to their roles.
9
+ """
10
+ from docguptea.core.ConfigEnv import config
11
+ from docguptea.models import TokenPayload
12
+
13
+ from datetime import datetime
14
+
15
+ from fastapi import Request, HTTPException
16
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
17
+ from pydantic import ValidationError
18
+ from jose import jwt
19
+
20
+
21
+ class JWTBearer(HTTPBearer):
22
+ """Custom bearer to validate access tokens.
23
+
24
+ Args:
25
+ auto_error: bool = True. Internal param to allow auto error detection.
26
+
27
+ Raises:
28
+ HHTTPException(403): If authentication scheme is not `Bearer`.
29
+ HTTPException(403): If the access token is invalid or expired.
30
+ HTTPException(403): If authorization code is invalid.
31
+ """
32
+ def __init__(self, auto_error: bool = True):
33
+ super(JWTBearer, self).__init__(auto_error=auto_error)
34
+
35
+ async def __call__(self, request: Request) -> str:
36
+ credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
37
+ if credentials:
38
+ if not credentials.scheme == "Bearer":
39
+ raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
40
+ else:
41
+ is_valid = JWTBearer.token_validation(credentials.credentials)
42
+ if not is_valid:
43
+ raise HTTPException(status_code=403, detail="Invalid token or expired token.")
44
+ return credentials.credentials
45
+ else:
46
+ raise HTTPException(status_code=403, detail="Invalid authorization code.")
47
+
48
+ @staticmethod
49
+ def token_validation(token: str) -> bool:
50
+ """Decodes JWTs to check their validity by inspecting expiry and
51
+ authorization code.
52
+
53
+ Args:
54
+ token: str. Authenticated `access_token` of the user.
55
+
56
+ Returns:
57
+ bool value to indicate validity of the access tokens.
58
+
59
+ Raises:
60
+ jwt.JWTError: If decode fails.
61
+ ValidationError: If JWTs are not in RFC 7519 standard.
62
+ """
63
+ try:
64
+ payload = jwt.decode(
65
+ token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM]
66
+ )
67
+ token_data = TokenPayload(**payload)
68
+
69
+ if datetime.fromtimestamp(token_data.exp) < datetime.now():
70
+ return False
71
+
72
+ except(jwt.JWTError, ValidationError):
73
+ return False
74
+
75
+ return True
backend/services/auth/utils/auth_funcs.py ADDED
@@ -0,0 +1,171 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Utility class to leverage encryption, verification of entered credentials
2
+ and generation of JWT access tokens.
3
+ """
4
+ from datetime import datetime, timedelta
5
+ from typing import Union, Any
6
+ import secrets
7
+
8
+ from jose import jwt
9
+ from passlib.context import CryptContext
10
+ from pydantic import ValidationError
11
+
12
+ from fastapi.exceptions import HTTPException
13
+
14
+ from docguptea.core.ConfigEnv import config
15
+ from docguptea.core.Exceptions import *
16
+ from docguptea.models import TokenPayload, TokenSchema
17
+
18
+
19
+
20
+ ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 30 minutes
21
+ REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 3 # 3 days
22
+
23
+
24
+ class Auth:
25
+ """Utility class to perform - 1.encryption via `bcrypt` scheme.
26
+ 2.password hashing 3.verification of credentials and generating
27
+ access tokens.
28
+
29
+ Attrs:
30
+ pwd_context: CryptContext. Helper for hashing & verifying passwords
31
+ using `bcrypt` algorithm.
32
+ """
33
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
34
+
35
+
36
+
37
+ @classmethod
38
+ def get_password_hash(cls,password: str) -> str:
39
+ """Encrypts the entered password.
40
+
41
+ Args:
42
+ password: str. Entered password.
43
+
44
+ Returns:
45
+ returns hashed(encrypted) password string.
46
+ """
47
+ return cls.pwd_context.hash(password)
48
+
49
+ @classmethod
50
+ def verify_password(cls, plain_password: str, hashed_password: str) -> bool:
51
+ """Validates if the entered password matches the actual password.
52
+
53
+ Args:
54
+ plain_password: str. Entered password by user.
55
+ hashed_password: str. hashed password from the database.
56
+
57
+ Returns:
58
+ bool value indicating whether the passwords match or not.
59
+ """
60
+ return cls.pwd_context.verify(plain_password, hashed_password)
61
+
62
+ @staticmethod
63
+ def verify_username(entered_username: str, db_username: str) -> bool:
64
+ """Validates if the entered username matches the actual username.
65
+
66
+ Args:
67
+ entered_username: str. Entered `username` by user.
68
+ db_username: str. username from the database.
69
+
70
+ Returns:
71
+ bool value indicating whether the village names match or not.
72
+ """
73
+ return entered_username == db_username
74
+
75
+ @staticmethod
76
+ def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
77
+ """Creates JWT access token.
78
+
79
+ Args:
80
+ subject: Union[Any, str]. Hash_key to generate access token from.
81
+ expires_delta: int = None. Expiry time for the JWT.
82
+
83
+ Returns:
84
+ encoded_jwt: str. Encoded JWT token from the subject of interest.
85
+ """
86
+ if expires_delta is not None:
87
+ expires_delta = datetime.utcnow() + expires_delta
88
+ else:
89
+ expires_delta = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
90
+
91
+ to_encode = {"exp": expires_delta, "sub": str(subject)}
92
+ encoded_jwt = jwt.encode(to_encode, config.JWT_SECRET_KEY, config.ALGORITHM)
93
+ return encoded_jwt
94
+
95
+ @staticmethod
96
+ def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
97
+ """Creates JWT refresh access token.
98
+
99
+ Args:
100
+ subject: Union[Any, str]. Hash_key to generate access token from.
101
+ expires_delta: int = None. Expiry time for the JWT.
102
+
103
+ Returns:
104
+ encoded_jwt: str. Encoded JWT token from the subject of interest.
105
+ """
106
+ if expires_delta is not None:
107
+ expires_delta = datetime.utcnow() + expires_delta
108
+ else:
109
+ expires_delta = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
110
+
111
+ to_encode = {"exp": expires_delta, "sub": str(subject)}
112
+ encoded_jwt = jwt.encode(to_encode, config.JWT_REFRESH_SECRET_KEY, config.ALGORITHM)
113
+ return encoded_jwt
114
+
115
+ @staticmethod
116
+ def generate_access_tokens_from_refresh_tokens(token: str) -> TokenSchema:
117
+ """Generates a new pair of tokens by implementing rotating
118
+ refresh_access_tokens.
119
+
120
+ Args:
121
+ token: str. Current valid refresh access token.
122
+
123
+ Returns:
124
+ tokens: TokenSchema. New tokens with new validity.
125
+
126
+ Raises:
127
+ LoginFailedException: If the current refresh access token is
128
+ invalid.
129
+ """
130
+ tokens = TokenSchema.get_instance(
131
+ access_token= "",
132
+ refresh_token= "",
133
+ )
134
+ try:
135
+ payload = jwt.decode(
136
+ token, config.JWT_REFRESH_SECRET_KEY, algorithms=[config.ALGORITHM]
137
+ )
138
+ token_data = TokenPayload(**payload)
139
+ if datetime.fromtimestamp(token_data.exp)< datetime.now():
140
+ raise HTTPException(status_code=403, detail="Invalid token or expired token.")
141
+ except (jwt.JWTError, ValidationError):
142
+ raise InvalidCredentialsException(tokens)
143
+ tokens['access_token'] = Auth.create_access_token(token_data.sub)
144
+ tokens['refresh_token'] = Auth.create_refresh_token(token_data.sub)
145
+ tokens['status'] = 'login successful'
146
+ tokens['role'] = token_data.sub.split("_")[1]
147
+ return tokens
148
+
149
+ @classmethod
150
+ def generate_api_key(cls, username: str):
151
+ return cls.get_password_hash(username + secrets.token_urlsafe(25 - len(username)))
152
+
153
+ @classmethod
154
+ def get_user_credentials(cls,access_token:str):
155
+ response_result = GeneralResponse.get_instance(data={},
156
+ status="not_allowed",
157
+ message=["Not authenticated"]
158
+ )
159
+ try:
160
+ payload = jwt.decode(
161
+ access_token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM]
162
+ )
163
+ token_data = TokenPayload(**payload)
164
+ return token_data.sub
165
+ except (jwt.JWTError, ValidationError):
166
+ raise InvalidCredentialsException(response_result)
167
+
168
+ @classmethod
169
+ def verify_apikey(cls,user_api_key:str,true_api_key:str):
170
+ return user_api_key == true_api_key
171
+