Spaces:
Sleeping
Sleeping
File size: 7,191 Bytes
fb20480 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
"""
وحدة الاتصال بقاعدة البيانات
"""
import os
import sqlite3
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from contextlib import contextmanager
from datetime import datetime
import config
# إنشاء قاعدة البيانات الأساسية
Base = declarative_base()
class DatabaseConnector:
"""الفئة المسؤولة عن إدارة الاتصال بقاعدة البيانات"""
def __init__(self):
"""تهيئة موصل قاعدة البيانات"""
db_path = config.DB_PATH
db_dir = os.path.dirname(db_path)
# التأكد من وجود المجلد
if not os.path.exists(db_dir):
os.makedirs(db_dir)
# إنشاء محرك قاعدة البيانات بناءاً على نوع قاعدة البيانات
if config.DB_TYPE == "sqlite":
self.engine = create_engine(f'sqlite:///{db_path}', echo=config.DEBUG_MODE)
elif config.DB_TYPE == "mysql":
# في حالة استخدام MySQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py)
self.engine = create_engine(
f'mysql+pymysql://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}',
echo=config.DEBUG_MODE
)
elif config.DB_TYPE == "postgresql":
# في حالة استخدام PostgreSQL (يتطلب إضافة تفاصيل الاتصال في ملف config.py)
self.engine = create_engine(
f'postgresql+psycopg2://{config.DB_USER}:{config.DB_PASSWORD}@{config.DB_HOST}:{config.DB_PORT}/{config.DB_NAME}',
echo=config.DEBUG_MODE
)
else:
raise ValueError(f"نوع قاعدة البيانات غير مدعوم: {config.DB_TYPE}")
# إنشاء جلسة للتعامل مع قاعدة البيانات
self.Session = sessionmaker(bind=self.engine)
def create_tables(self):
"""إنشاء جداول قاعدة البيانات"""
Base.metadata.create_all(self.engine)
@contextmanager
def session_scope(self):
"""
توفير نطاق جلسة للتعامل مع قاعدة البيانات
يتم استخدامه مع 'with' لضمان إغلاق الجلسة بشكل صحيح
"""
session = self.Session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def execute_query(self, query):
"""
تنفيذ استعلام SQL مباشر
المعلمات:
query: استعلام SQL
الإرجاع:
نتائج الاستعلام
"""
with self.engine.connect() as connection:
result = connection.execute(query)
return result.fetchall()
def backup_database(self, backup_path=None):
"""
إنشاء نسخة احتياطية من قاعدة البيانات
المعلمات:
backup_path: مسار النسخة الاحتياطية (اختياري)
الإرجاع:
مسار ملف النسخة الاحتياطية
"""
if config.DB_TYPE != "sqlite":
raise NotImplementedError("النسخ الاحتياطي مدعوم فقط لقواعد بيانات SQLite حاليًا")
# تحديد مسار النسخة الاحتياطية إذا لم يتم تحديده
if backup_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(
os.path.dirname(config.DB_PATH),
f"backup_{timestamp}.sqlite"
)
# إنشاء النسخة الاحتياطية
try:
# فتح اتصال مع قاعدة البيانات الأصلية
source_conn = sqlite3.connect(config.DB_PATH)
# فتح اتصال مع قاعدة البيانات الاحتياطية
dest_conn = sqlite3.connect(backup_path)
# نسخ البيانات
source_conn.backup(dest_conn)
# إغلاق الاتصالات
source_conn.close()
dest_conn.close()
return backup_path
except Exception as e:
raise RuntimeError(f"فشل في إنشاء النسخة الاحتياطية: {str(e)}")
def restore_database(self, backup_path):
"""
استعادة قاعدة البيانات من نسخة احتياطية
المعلمات:
backup_path: مسار ملف النسخة الاحتياطية
الإرجاع:
True في حالة نجاح الاستعادة
"""
if config.DB_TYPE != "sqlite":
raise NotImplementedError("استعادة النسخ الاحتياطي مدعومة فقط لقواعد بيانات SQLite حاليًا")
if not os.path.exists(backup_path):
raise FileNotFoundError(f"ملف النسخة الاحتياطية غير موجود: {backup_path}")
try:
# فتح اتصال مع قاعدة البيانات الاحتياطية
source_conn = sqlite3.connect(backup_path)
# فتح اتصال مع قاعدة البيانات الهدف
dest_conn = sqlite3.connect(config.DB_PATH)
# استعادة البيانات
source_conn.backup(dest_conn)
# إغلاق الاتصالات
source_conn.close()
dest_conn.close()
return True
except Exception as e:
raise RuntimeError(f"فشل في استعادة قاعدة البيانات: {str(e)}")
# إنشاء نموذج الاتصال بقاعدة البيانات
db_connector = DatabaseConnector()
# الحصول على جلسة للتعامل مع قاعدة البيانات
def get_db_session():
"""
الحصول على جلسة للتعامل مع قاعدة البيانات
الإرجاع:
كائن الجلسة
"""
return db_connector.Session()
# سياق الجلسة للتعامل مع قاعدة البيانات
def session_scope():
"""
توفير نطاق جلسة للتعامل مع قاعدة البيانات
استخدام:
with session_scope() as session:
# عمليات قاعدة البيانات
"""
return db_connector.session_scope() |