import datetime import io import json import os from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload class GDriveUtils: LOG_EVENTS = True @staticmethod def get_gdrive_service(creds_stringified: str | None = None): SCOPES = ["https://www.googleapis.com/auth/drive"] if not creds_stringified: print( "Attempting to use google drive creds from environment variable" ) if GDriveUtils.LOG_EVENTS else None creds_stringified = os.getenv("GOOGLE_SERVICE_ACC_CREDS") creds_dict = json.loads(creds_stringified) creds = service_account.Credentials.from_service_account_info( creds_dict, scopes=SCOPES ) return build("drive", "v3", credentials=creds) @staticmethod def upload_file_to_gdrive( local_file_path, drive_parent_folder_id: str, drive_filename: str | None = None, creds_stringified: str | None = None, ) -> str: service = GDriveUtils.get_gdrive_service(creds_stringified) if not drive_filename: drive_filename = os.path.basename(local_file_path) file_metadata = { "name": drive_filename, "parents": [drive_parent_folder_id], } file = ( service.files() .create(body=file_metadata, media_body=local_file_path) .execute() ) print( "File uploaded, drive file id: ", file.get("id") ) if GDriveUtils.LOG_EVENTS else None return file.get("id") @staticmethod def upload_file_to_gdrive_sanity_check( drive_parent_folder_id: str, creds_stringified: str | None = None, ): try: curr_time_utc = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") file_name = f"gdrive_upload_test_{curr_time_utc}_UTC.txt" print( "Creating local file to upload: ", file_name ) if GDriveUtils.LOG_EVENTS else None with open(file_name, "w") as f: f.write(f"gdrive_upload_test_{curr_time_utc}_UTC") return GDriveUtils.upload_file_to_gdrive( file_name, drive_parent_folder_id, creds_stringified=creds_stringified ) except Exception as e: raise e finally: if os.path.exists(file_name): print( "Deleting local file: ", file_name ) if GDriveUtils.LOG_EVENTS else None os.remove(file_name) @staticmethod def download_file_from_gdrive( drive_file_id: str, local_file_path: str | None = None, creds_stringified: str | None = None, ): service = GDriveUtils.get_gdrive_service(creds_stringified) drive_filename = service.files().get(fileId=drive_file_id, fields="name").execute().get('name') if not local_file_path: local_file_path = f"{drive_file_id}_{drive_filename}" request = service.files().get_media(fileId=drive_file_id) file = io.BytesIO() downloader = MediaIoBaseDownload(file, request, chunksize= 25 * 1024 * 1024) done = False while done is False: status, done = downloader.next_chunk() print(f"Downloading gdrive file {drive_filename} to local file {local_file_path}: {int(status.progress() * 100)}%.") if GDriveUtils.LOG_EVENTS else None if os.path.dirname(local_file_path): os.makedirs(os.path.dirname(local_file_path), exist_ok=True) with open(local_file_path, "wb") as f: f.write(file.getvalue()) print( "Downloaded file locally to: ", local_file_path ) if GDriveUtils.LOG_EVENTS else None @staticmethod def download_file_from_gdrive_sanity_check( drive_parent_folder_id: str, creds_stringified: str | None = None, ): file_id = GDriveUtils.upload_file_to_gdrive_sanity_check( drive_parent_folder_id, creds_stringified ) GDriveUtils.download_file_from_gdrive( file_id, creds_stringified=creds_stringified ) def stringify_json_creds(json_file: str, txt_file: str) -> str: with open(json_file, "r") as f: creds_dict = json.load(f) with open(txt_file, "w") as f: f.write(json.dumps(creds_dict))