Spaces:
Runtime error
Runtime error
"""Utility class to leverage encryption, verification of entered credentials | |
and generation of JWT access tokens. | |
""" | |
from datetime import datetime, timedelta | |
from typing import Union, Any | |
import secrets | |
from jose import jwt | |
from passlib.context import CryptContext | |
from pydantic import ValidationError | |
from fastapi.exceptions import HTTPException | |
from backend.core.ConfigEnv import config | |
from backend.core.Exceptions import * | |
from backend.models import TokenPayload, TokenSchema | |
token_expiry_info = { | |
'ACCESS_TOKEN_EXPIRE_MINUTES': 30, # 30 minutes | |
'REFRESH_TOKEN_EXPIRE_MINUTES': 60 * 24 * 3, # 3 days | |
'VERIFICATION_TOKEN_EXPIRE_MINUTES': 20, # 20 minutes | |
} | |
class Auth: | |
"""Utility class to perform - 1.encryption via `bcrypt` scheme. | |
2.password hashing 3.verification of credentials and generating | |
access tokens. | |
Attrs: | |
pwd_context: CryptContext. Helper for hashing & verifying passwords | |
using `bcrypt` algorithm. | |
""" | |
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
def get_password_hash(cls,password: str) -> str: | |
"""Encrypts the entered password. | |
Args: | |
password: str. Entered password. | |
Returns: | |
returns hashed(encrypted) password string. | |
""" | |
return cls.pwd_context.hash(password) | |
def verify_password(cls, plain_password: str, hashed_password: str) -> bool: | |
"""Validates if the entered password matches the actual password. | |
Args: | |
plain_password: str. Entered password by user. | |
hashed_password: str. hashed password from the database. | |
Returns: | |
bool value indicating whether the passwords match or not. | |
""" | |
return cls.pwd_context.verify(plain_password, hashed_password) | |
def verify_username(entered_username: str, db_username: str) -> bool: | |
"""Validates if the entered username matches the actual username. | |
Args: | |
entered_username: str. Entered `username` by user. | |
db_username: str. username from the database. | |
Returns: | |
bool value indicating whether the village names match or not. | |
""" | |
return entered_username == db_username | |
def create_access_token(subject: Union[str, Any], expires_delta: int = None, secret_name: str = None) -> str: | |
"""Creates JWT access token. | |
Args: | |
subject: Union[Any, str]. Hash_key to generate access token from. | |
expires_delta: int = None. Expiry time for the JWT. | |
Returns: | |
encoded_jwt: str. Encoded JWT token from the subject of interest. | |
""" | |
secret_key = config.JWT_SECRET_KEY | |
token_expiration = token_expiry_info['ACCESS_TOKEN_EXPIRE_MINUTES'] | |
if secret_name is not None: | |
secret_key = config.dict()["JWT_" + secret_name.upper() + "_SECRET_KEY"] | |
token_expiration = token_expiry_info[secret_name.upper() + "_TOKEN_EXPIRE_MINUTES"] | |
if expires_delta is not None: | |
expires_delta = datetime.utcnow() + expires_delta | |
else: | |
expires_delta = datetime.utcnow() + timedelta(minutes=token_expiration) | |
to_encode = {"exp": expires_delta, "sub": str(subject)} | |
encoded_jwt = jwt.encode(to_encode, secret_key, config.ALGORITHM) | |
return encoded_jwt | |
def generate_access_tokens_from_refresh_tokens(token: str) -> TokenSchema: | |
"""Generates a new pair of tokens by implementing rotating | |
refresh_access_tokens. | |
Args: | |
token: str. Current valid refresh access token. | |
Returns: | |
tokens: TokenSchema. New tokens with new validity. | |
Raises: | |
LoginFailedException: If the current refresh access token is | |
invalid. | |
""" | |
tokens = TokenSchema.get_instance( | |
access_token= "", | |
refresh_token= "", | |
) | |
try: | |
payload = jwt.decode( | |
token, config.JWT_REFRESH_SECRET_KEY, algorithms=[config.ALGORITHM] | |
) | |
token_data = TokenPayload(**payload) | |
if datetime.fromtimestamp(token_data.exp)< datetime.now(): | |
raise HTTPException(status_code=403, detail="Invalid token or expired token.") | |
except (jwt.JWTError, ValidationError): | |
raise InvalidCredentialsException(tokens) | |
tokens['access_token'] = Auth.create_access_token(token_data.sub) | |
tokens['refresh_token'] = Auth.create_access_token(token_data.sub, secret_name='REFRESH') | |
return tokens | |
def generate_api_key(cls, username: str): | |
return cls.get_password_hash(username + secrets.token_urlsafe(25 - len(username))) | |
def get_user_credentials(cls,access_token:str): | |
response_result = GeneralResponse.get_instance(data={}, | |
status="not_allowed", | |
message=["Not authenticated"] | |
) | |
try: | |
payload = jwt.decode( | |
access_token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM] | |
) | |
token_data = TokenPayload(**payload) | |
return token_data.sub | |
except (jwt.JWTError, ValidationError): | |
raise InvalidCredentialsException(response_result) | |
def verify_apikey(cls,user_api_key:str,true_api_key:str): | |
return user_api_key == true_api_key | |