CyberForge / src /api /services /user_service.py
Replit Deployment
Deployment from Replit
89ae94f
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update
from passlib.context import CryptContext
from typing import Optional, List, Dict, Any
import logging
from src.models.user import User
from src.api.schemas import UserCreate, UserUpdate, UserInDB
# Configure logger
logger = logging.getLogger(__name__)
# Password context for hashing and verification
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify password against hash.
Args:
plain_password: Plain password
hashed_password: Hashed password
Returns:
bool: True if password is correct
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
Hash password.
Args:
password: Plain password
Returns:
str: Hashed password
"""
return pwd_context.hash(password)
async def get_user_by_username(db: AsyncSession, username: str) -> Optional[UserInDB]:
"""
Get user by username.
Args:
db: Database session
username: Username
Returns:
Optional[UserInDB]: User if found, None otherwise
"""
try:
result = await db.execute(select(User).where(User.username == username))
user = result.scalars().first()
if not user:
return None
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error getting user by username: {e}")
return None
async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[UserInDB]:
"""
Authenticate user.
Args:
db: Database session
username: Username
password: Plain password
Returns:
Optional[UserInDB]: User if authenticated, None otherwise
"""
user = await get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def create_user(db: AsyncSession, user_data: UserCreate) -> Optional[UserInDB]:
"""
Create a new user.
Args:
db: Database session
user_data: User data
Returns:
Optional[UserInDB]: Created user
"""
try:
# Check if user already exists
existing_user = await get_user_by_username(db, user_data.username)
if existing_user:
return None
# Create new user
hashed_password = get_password_hash(user_data.password)
user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password,
is_active=user_data.is_active
)
db.add(user)
await db.commit()
await db.refresh(user)
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error creating user: {e}")
await db.rollback()
return None
async def update_user(db: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserInDB]:
"""
Update user.
Args:
db: Database session
user_id: User ID
user_data: User data
Returns:
Optional[UserInDB]: Updated user
"""
try:
# Create update dictionary
update_data = user_data.dict(exclude_unset=True)
# Hash password if provided
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
# Update user
stmt = update(User).where(User.id == user_id).values(**update_data)
await db.execute(stmt)
await db.commit()
# Get updated user
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if not user:
return None
# Convert SQLAlchemy model to Pydantic model
user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns}
return UserInDB(**user_dict)
except Exception as e:
logger.error(f"Error updating user: {e}")
await db.rollback()
return None