File size: 4,496 Bytes
8aed5f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import datetime
import io
import json
import os

from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload


class GDriveUtils:
    LOG_EVENTS = True

    @staticmethod
    def get_gdrive_service(creds_stringified: str | None = None):
        SCOPES = ["https://www.googleapis.com/auth/drive"]
        if not creds_stringified:
            print(
                "Attempting to use google drive creds from environment variable"
            ) if GDriveUtils.LOG_EVENTS else None
            creds_stringified = os.getenv("GOOGLE_SERVICE_ACC_CREDS")
        creds_dict = json.loads(creds_stringified)
        creds = service_account.Credentials.from_service_account_info(
            creds_dict, scopes=SCOPES
        )
        return build("drive", "v3", credentials=creds)

    @staticmethod
    def upload_file_to_gdrive(
        local_file_path,
        drive_parent_folder_id: str,
        drive_filename: str | None = None,
        creds_stringified: str | None = None,
    ) -> str:
        service = GDriveUtils.get_gdrive_service(creds_stringified)

        if not drive_filename:
            drive_filename = os.path.basename(local_file_path)

        file_metadata = {
            "name": drive_filename,
            "parents": [drive_parent_folder_id],
        }
        file = (
            service.files()
            .create(body=file_metadata, media_body=local_file_path)
            .execute()
        )
        print(
            "File uploaded, drive file id: ", file.get("id")
        ) if GDriveUtils.LOG_EVENTS else None
        return file.get("id")

    @staticmethod
    def upload_file_to_gdrive_sanity_check(
        drive_parent_folder_id: str,
        creds_stringified: str | None = None,
    ):
        try:
            curr_time_utc = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            file_name = f"gdrive_upload_test_{curr_time_utc}_UTC.txt"
            print(
                "Creating local file to upload: ", file_name
            ) if GDriveUtils.LOG_EVENTS else None
            with open(file_name, "w") as f:
                f.write(f"gdrive_upload_test_{curr_time_utc}_UTC")
            return GDriveUtils.upload_file_to_gdrive(
                file_name, drive_parent_folder_id, creds_stringified=creds_stringified
            )
        except Exception as e:
            raise e
        finally:
            if os.path.exists(file_name):
                print(
                    "Deleting local file: ", file_name
                ) if GDriveUtils.LOG_EVENTS else None
                os.remove(file_name)

    @staticmethod
    def download_file_from_gdrive(
        drive_file_id: str,
        local_file_path: str | None = None,
        creds_stringified: str | None = None,
    ):
        service = GDriveUtils.get_gdrive_service(creds_stringified)

        drive_filename = service.files().get(fileId=drive_file_id, fields="name").execute().get('name')

        if not local_file_path:
            local_file_path = f"{drive_file_id}_{drive_filename}"

        request = service.files().get_media(fileId=drive_file_id)
        file = io.BytesIO()
        downloader = MediaIoBaseDownload(file, request, chunksize= 25 * 1024 * 1024)
        done = False
        while done is False:
            status, done = downloader.next_chunk()
            print(f"Downloading gdrive file {drive_filename} to local file {local_file_path}: {int(status.progress() * 100)}%.") if GDriveUtils.LOG_EVENTS else None

        if os.path.dirname(local_file_path):
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        with open(local_file_path, "wb") as f:
            f.write(file.getvalue())
        print(
            "Downloaded file locally to: ", local_file_path
        ) if GDriveUtils.LOG_EVENTS else None

    @staticmethod
    def download_file_from_gdrive_sanity_check(
        drive_parent_folder_id: str,
        creds_stringified: str | None = None,
    ):
        file_id = GDriveUtils.upload_file_to_gdrive_sanity_check(
            drive_parent_folder_id, creds_stringified
        )
        GDriveUtils.download_file_from_gdrive(
            file_id, creds_stringified=creds_stringified
        )

    def stringify_json_creds(json_file: str, txt_file: str) -> str:
        with open(json_file, "r") as f:
            creds_dict = json.load(f)
        with open(txt_file, "w") as f:
            f.write(json.dumps(creds_dict))