SA-SAJCOAI / database /db_connector.py
EGYADMIN's picture
Upload 75 files
fb20480 verified
"""
وحدة الاتصال بقاعدة البيانات
"""
import os
import sqlite3
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from contextlib import contextmanager
from datetime import datetime
import config
# إنشاء قاعدة البيانات الأساسية
Base = declarative_base()
class DatabaseConnector:
"""الفئة المسؤولة عن إدارة الاتصال بقاعدة البيانات"""
def __init__(self):
"""تهيئة موصل قاعدة البيانات"""
db_path = config.DB_PATH
db_dir = os.path.dirname(db_path)
# التأكد من وجود المجلد
if not os.path.exists(db_dir):
os.makedirs(db_dir)
# إنشاء محرك قاعدة البيانات بناءاً على نوع قاعدة البيانات
if config.DB_TYPE == "sqlite":
self.engine = create_engine(f'sqlite:///{db_path}', echo=config.DEBUG_MODE)
elif config.DB_TYPE == "mysql":
# في حالة استخدام MySQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py)
self.engine = create_engine(
f'mysql+pymysql://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}',
echo=config.DEBUG_MODE
)
elif config.DB_TYPE == "postgresql":
# في حالة استخدام PostgreSQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py)
self.engine = create_engine(
f'postgresql+psycopg2://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}',
echo=config.DEBUG_MODE
)
else:
raise ValueError(f"نوع قاعدة البيانات غير مدعوم: {config.DB_TYPE}")
# إنشاء جلسة للتعامل مع قاعدة البيانات
self.Session = sessionmaker(bind=self.engine)
def create_tables(self):
"""إنشاء جداول قاعدة البيانات"""
Base.metadata.create_all(self.engine)
@contextmanager
def session_scope(self):
"""
توفير نطاق جلسة للتعامل مع قاعدة البيانات
يتم استخدامه مع 'with' لضمان إغلاق الجلسة بشكل صحيح
"""
session = self.Session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def execute_query(self, query):
"""
تنفيذ استعلام SQL مباشر
المعلمات:
query: استعلام SQL
الإرجاع:
نتائج الاستعلام
"""
with self.engine.connect() as connection:
result = connection.execute(query)
return result.fetchall()
def backup_database(self, backup_path=None):
"""
إنشاء نسخة احتياطية من قاعدة البيانات
المعلمات:
backup_path: مسار النسخة الاحتياطية (اختياري)
الإرجاع:
مسار ملف النسخة الاحتياطية
"""
if config.DB_TYPE != "sqlite":
raise NotImplementedError("النسخ الاحتياطي مدعوم فقط لقواعد بيانات SQLite حاليًا")
# تحديد مسار النسخة الاحتياطية إذا لم يتم تحديده
if backup_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(
os.path.dirname(config.DB_PATH),
f"backup_{timestamp}.sqlite"
)
# إنشاء النسخة الاحتياطية
try:
# فتح اتصال مع قاعدة البيانات الأصلية
source_conn = sqlite3.connect(config.DB_PATH)
# فتح اتصال مع قاعدة البيانات الاحتياطية
dest_conn = sqlite3.connect(backup_path)
# نسخ البيانات
source_conn.backup(dest_conn)
# إغلاق الاتصالات
source_conn.close()
dest_conn.close()
return backup_path
except Exception as e:
raise RuntimeError(f"فشل في إنشاء النسخة الاحتياطية: {str(e)}")
def restore_database(self, backup_path):
"""
استعادة قاعدة البيانات من نسخة احتياطية
المعلمات:
backup_path: مسار ملف النسخة الاحتياطية
الإرجاع:
True في حالة نجاح الاستعادة
"""
if config.DB_TYPE != "sqlite":
raise NotImplementedError("استعادة النسخ الاحتياطي مدعومة فقط لقواعد بيانات SQLite حاليًا")
if not os.path.exists(backup_path):
raise FileNotFoundError(f"ملف النسخة الاحتياطية غير موجود: {backup_path}")
try:
# فتح اتصال مع قاعدة البيانات الاحتياطية
source_conn = sqlite3.connect(backup_path)
# فتح اتصال مع قاعدة البيانات الهدف
dest_conn = sqlite3.connect(config.DB_PATH)
# استعادة البيانات
source_conn.backup(dest_conn)
# إغلاق الاتصالات
source_conn.close()
dest_conn.close()
return True
except Exception as e:
raise RuntimeError(f"فشل في استعادة قاعدة البيانات: {str(e)}")
# إنشاء نموذج الاتصال بقاعدة البيانات
db_connector = DatabaseConnector()
# الحصول على جلسة للتعامل مع قاعدة البيانات
def get_db_session():
"""
الحصول على جلسة للتعامل مع قاعدة البيانات
الإرجاع:
كائن الجلسة
"""
return db_connector.Session()
# سياق الجلسة للتعامل مع قاعدة البيانات
def session_scope():
"""
توفير نطاق جلسة للتعامل مع قاعدة البيانات
استخدام:
with session_scope() as session:
# عمليات قاعدة البيانات
"""
return db_connector.session_scope()