File size: 6,225 Bytes
2ecc792
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Utility class to leverage encryption, verification of entered credentials
and generation of JWT access tokens.
"""
from datetime import datetime, timedelta
from typing import Union, Any
import secrets

from jose import jwt
from passlib.context import CryptContext
from pydantic import ValidationError

from fastapi.exceptions import HTTPException

from backend.core.ConfigEnv import config
from backend.core.Exceptions import *
from backend.models import TokenPayload, TokenSchema



ACCESS_TOKEN_EXPIRE_MINUTES = 30  # 30 minutes
REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 3 # 3 days


class Auth:
    """Utility class to perform -  1.encryption via `bcrypt` scheme.
    2.password hashing 3.verification of credentials and generating
    access tokens.

    Attrs:
        pwd_context: CryptContext. Helper for hashing & verifying passwords
                                   using `bcrypt` algorithm.
    """
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

    

    @classmethod
    def get_password_hash(cls,password: str) -> str:
        """Encrypts the entered password.

        Args:
            password: str. Entered password.

        Returns:
            returns hashed(encrypted) password string.
        """
        return cls.pwd_context.hash(password)

    @classmethod    
    def verify_password(cls, plain_password: str, hashed_password: str) -> bool:
        """Validates if the entered password matches the actual password.

        Args:
            plain_password: str. Entered password by user.
            hashed_password: str. hashed password from the database.

        Returns:
            bool value indicating whether the passwords match or not.
        """
        return cls.pwd_context.verify(plain_password, hashed_password)

    @staticmethod
    def verify_username(entered_username: str, db_username: str) -> bool:
        """Validates if the entered username matches the actual username.

        Args:
            entered_username: str. Entered `username` by user.
            db_username: str. username from the database.

        Returns:
            bool value indicating whether the village names match or not.
        """
        return entered_username == db_username

    @staticmethod
    def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
        """Creates JWT access token.

        Args:
            subject: Union[Any, str]. Hash_key to generate access token from.
            expires_delta: int = None. Expiry time for the JWT.

        Returns:
            encoded_jwt: str. Encoded JWT token from the subject of interest.
        """
        if expires_delta is not None:
            expires_delta = datetime.utcnow() + expires_delta
        else:
            expires_delta = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

        to_encode = {"exp": expires_delta, "sub": str(subject)}
        encoded_jwt = jwt.encode(to_encode, config.JWT_SECRET_KEY, config.ALGORITHM)
        return encoded_jwt

    @staticmethod    
    def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
        """Creates JWT refresh access token.

        Args:
            subject: Union[Any, str]. Hash_key to generate access token from.
            expires_delta: int = None. Expiry time for the JWT.

        Returns:
            encoded_jwt: str. Encoded JWT token from the subject of interest.
        """
        if expires_delta is not None:
            expires_delta = datetime.utcnow() + expires_delta
        else:
            expires_delta = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
        
        to_encode = {"exp": expires_delta, "sub": str(subject)}
        encoded_jwt = jwt.encode(to_encode, config.JWT_REFRESH_SECRET_KEY, config.ALGORITHM)
        return encoded_jwt

    @staticmethod
    def generate_access_tokens_from_refresh_tokens(token: str) -> TokenSchema:
        """Generates a new pair of tokens by implementing rotating
        refresh_access_tokens.

        Args:
            token: str. Current valid refresh access token.

        Returns:
            tokens: TokenSchema. New tokens with new validity.

        Raises:
            LoginFailedException: If the current refresh access token is
                                  invalid.
        """
        tokens = TokenSchema.get_instance(
                                        access_token= "",
                                        refresh_token= "",
                                        )
        try:
            payload = jwt.decode(
                token, config.JWT_REFRESH_SECRET_KEY, algorithms=[config.ALGORITHM]
            )
            token_data = TokenPayload(**payload)
            if datetime.fromtimestamp(token_data.exp)< datetime.now():
                raise HTTPException(status_code=403, detail="Invalid token or expired token.")
        except (jwt.JWTError, ValidationError):
            raise InvalidCredentialsException(tokens)
        tokens['access_token'] = Auth.create_access_token(token_data.sub)
        tokens['refresh_token'] = Auth.create_refresh_token(token_data.sub)
        tokens['status'] = 'login successful'
        tokens['role'] = token_data.sub.split("_")[1]
        return tokens

    @classmethod
    def generate_api_key(cls, username: str):
        return cls.get_password_hash(username + secrets.token_urlsafe(25 - len(username)))
    
    @classmethod
    def get_user_credentials(cls,access_token:str):
        response_result = GeneralResponse.get_instance(data={},
                                      status="not_allowed",
                                      message=["Not authenticated"]
                                      )
        try:
            payload = jwt.decode(
                access_token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM]
            )
            token_data = TokenPayload(**payload)
            return token_data.sub
        except (jwt.JWTError, ValidationError):
            raise InvalidCredentialsException(response_result)
        
    @classmethod
    def verify_apikey(cls,user_api_key:str,true_api_key:str):
        return user_api_key == true_api_key