File size: 4,769 Bytes
89ae94f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update
from passlib.context import CryptContext
from typing import Optional, List, Dict, Any
import logging

from src.models.user import User
from src.api.schemas import UserCreate, UserUpdate, UserInDB

# Configure logger
logger = logging.getLogger(__name__)

# Password context for hashing and verification
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Verify password against hash.
    
    Args:
        plain_password: Plain password
        hashed_password: Hashed password
        
    Returns:
        bool: True if password is correct
    """
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """
    Hash password.
    
    Args:
        password: Plain password
        
    Returns:
        str: Hashed password
    """
    return pwd_context.hash(password)

async def get_user_by_username(db: AsyncSession, username: str) -> Optional[UserInDB]:
    """
    Get user by username.
    
    Args:
        db: Database session
        username: Username
        
    Returns:
        Optional[UserInDB]: User if found, None otherwise
    """
    try:
        result = await db.execute(select(User).where(User.username == username))
        user = result.scalars().first()
        
        if not user:
            return None
            
        # Convert SQLAlchemy model to Pydantic model
        user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
        return UserInDB(**user_dict)
    except Exception as e:
        logger.error(f"Error getting user by username: {e}")
        return None

async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[UserInDB]:
    """
    Authenticate user.
    
    Args:
        db: Database session
        username: Username
        password: Plain password
        
    Returns:
        Optional[UserInDB]: User if authenticated, None otherwise
    """
    user = await get_user_by_username(db, username)
    
    if not user:
        return None
        
    if not verify_password(password, user.hashed_password):
        return None
        
    return user

async def create_user(db: AsyncSession, user_data: UserCreate) -> Optional[UserInDB]:
    """
    Create a new user.
    
    Args:
        db: Database session
        user_data: User data
        
    Returns:
        Optional[UserInDB]: Created user
    """
    try:
        # Check if user already exists
        existing_user = await get_user_by_username(db, user_data.username)
        if existing_user:
            return None
            
        # Create new user
        hashed_password = get_password_hash(user_data.password)
        user = User(
            username=user_data.username,
            email=user_data.email,
            full_name=user_data.full_name,
            hashed_password=hashed_password,
            is_active=user_data.is_active
        )
        
        db.add(user)
        await db.commit()
        await db.refresh(user)
        
        # Convert SQLAlchemy model to Pydantic model
        user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
        return UserInDB(**user_dict)
    except Exception as e:
        logger.error(f"Error creating user: {e}")
        await db.rollback()
        return None

async def update_user(db: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserInDB]:
    """
    Update user.
    
    Args:
        db: Database session
        user_id: User ID
        user_data: User data
        
    Returns:
        Optional[UserInDB]: Updated user
    """
    try:
        # Create update dictionary
        update_data = user_data.dict(exclude_unset=True)
        
        # Hash password if provided
        if "password" in update_data:
            update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
            
        # Update user
        stmt = update(User).where(User.id == user_id).values(**update_data)
        await db.execute(stmt)
        await db.commit()
        
        # Get updated user
        result = await db.execute(select(User).where(User.id == user_id))
        user = result.scalars().first()
        
        if not user:
            return None
            
        # Convert SQLAlchemy model to Pydantic model
        user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
        return UserInDB(**user_dict)
    except Exception as e:
        logger.error(f"Error updating user: {e}")
        await db.rollback()
        return None