Spaces:
Runtime error
Runtime error
"""Custom Authentication & Authorization bearer to authenticate and authorize | |
users based on the following factors: | |
1. username | |
2.password | |
3.email | |
This utility class validates generated JWTs and grants scoped access to users | |
according to their roles. | |
""" | |
from backend.core.ConfigEnv import config | |
from backend.models import TokenPayload | |
from datetime import datetime | |
from fastapi import Request, HTTPException | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from pydantic import ValidationError | |
from jose import jwt | |
class JWTBearer(HTTPBearer): | |
"""Custom bearer to validate access tokens. | |
Args: | |
auto_error: bool = True. Internal param to allow auto error detection. | |
Raises: | |
HHTTPException(403): If authentication scheme is not `Bearer`. | |
HTTPException(403): If the access token is invalid or expired. | |
HTTPException(403): If authorization code is invalid. | |
""" | |
def __init__(self, auto_error: bool = True): | |
super(JWTBearer, self).__init__(auto_error=auto_error) | |
async def __call__(self, request: Request) -> str: | |
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request) | |
if credentials: | |
if not credentials.scheme == "Bearer": | |
raise HTTPException(status_code=403, detail="Invalid authentication scheme.") | |
else: | |
is_valid = JWTBearer.token_validation(credentials.credentials) | |
if not is_valid: | |
raise HTTPException(status_code=403, detail="Invalid token or expired token.") | |
return credentials.credentials | |
else: | |
raise HTTPException(status_code=403, detail="Invalid authorization code.") | |
def token_validation(token: str) -> bool: | |
"""Decodes JWTs to check their validity by inspecting expiry and | |
authorization code. | |
Args: | |
token: str. Authenticated `access_token` of the user. | |
Returns: | |
bool value to indicate validity of the access tokens. | |
Raises: | |
jwt.JWTError: If decode fails. | |
ValidationError: If JWTs are not in RFC 7519 standard. | |
""" | |
try: | |
payload = jwt.decode( | |
token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM] | |
) | |
token_data = TokenPayload(**payload) | |
if datetime.fromtimestamp(token_data.exp) < datetime.now(): | |
return False | |
except(jwt.JWTError, ValidationError): | |
return False | |
return True | |