Spaces:
Running
Running
File size: 4,769 Bytes
89ae94f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update
from passlib.context import CryptContext
from typing import Optional, List, Dict, Any
import logging
from src.models.user import User
from src.api.schemas import UserCreate, UserUpdate, UserInDB
# Configure logger
logger = logging.getLogger(__name__)
# Password context for hashing and verification
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify password against hash.
Args:
plain_password: Plain password
hashed_password: Hashed password
Returns:
bool: True if password is correct
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Hash password.
Args:
password: Plain password
Returns:
str: Hashed password
"""
return pwd_context.hash(password)
async def get_user_by_username(db: AsyncSession, username: str) -> Optional[UserInDB]:
"""
Get user by username.
Args:
db: Database session
username: Username
Returns:
Optional[UserInDB]: User if found, None otherwise
"""
try:
result = await db.execute(select(User).where(User.username == username))
user = result.scalars().first()
if not user:
return None
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error getting user by username: {e}")
return None
async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[UserInDB]:
"""
Authenticate user.
Args:
db: Database session
username: Username
password: Plain password
Returns:
Optional[UserInDB]: User if authenticated, None otherwise
"""
user = await get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def create_user(db: AsyncSession, user_data: UserCreate) -> Optional[UserInDB]:
"""
Create a new user.
Args:
db: Database session
user_data: User data
Returns:
Optional[UserInDB]: Created user
"""
try:
# Check if user already exists
existing_user = await get_user_by_username(db, user_data.username)
if existing_user:
return None
# Create new user
hashed_password = get_password_hash(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password,
is_active=user_data.is_active
)
db.add(user)
await db.commit()
await db.refresh(user)
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error creating user: {e}")
await db.rollback()
return None
async def update_user(db: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserInDB]:
"""
Update user.
Args:
db: Database session
user_id: User ID
user_data: User data
Returns:
Optional[UserInDB]: Updated user
"""
try:
# Create update dictionary
update_data = user_data.dict(exclude_unset=True)
# Hash password if provided
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Update user
stmt = update(User).where(User.id == user_id).values(**update_data)
await db.execute(stmt)
await db.commit()
# Get updated user
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if not user:
return None
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error updating user: {e}")
await db.rollback()
return None |