File size: 4,963 Bytes
a92d566
68c9ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a92d566
68c9ac7
a92d566
68c9ac7
 
 
 
 
 
 
a92d566
68c9ac7
 
 
a92d566
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68c9ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f299c4
68c9ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Code based on cutechicken/whisper-webui-translate
import io
from contextlib import redirect_stderr
from tempfile import mkdtemp
from typing import List

import yt_dlp
from yt_dlp import YoutubeDL
from yt_dlp.postprocessor import PostProcessor


class FilenameCollectorPP(PostProcessor):
    def __init__(self):
        super(FilenameCollectorPP, self).__init__(None)
        self.filenames = []

    def run(self, information):
        self.filenames.append(information["filepath"])
        return [], information


def download_url(
    url: str,
    maxDuration: int = None,
    destinationDirectory: str = None,
    playlistItems: str = "1",
    cookies_from_browser: str = None,
) -> tuple[bool, str, List[str]]:
    try:
        video_paths = _perform_download(
            url,
            maxDuration=maxDuration,
            outputTemplate=None,
            destinationDirectory=destinationDirectory,
            playlistItems=playlistItems,
            cookies_from_browser=cookies_from_browser,
        )
        return True, None, video_paths
    except yt_dlp.utils.DownloadError as e:
        # In case of an OS error, try again with a different output template
        if e.msg and e.msg.find("[Errno 36] File name too long") >= 0:
            try:
                video_paths = _perform_download(
                    url,
                    maxDuration=maxDuration,
                    outputTemplate="%(title).10s %(id)s.%(ext)s",
                    cookies_from_browser=cookies_from_browser,
                )
                return True, None, video_paths
            except Exception as retry_error:
                return False, str(retry_error), []
        return False, str(e), []
    except ExceededMaximumDuration as e:
        return (
            False,
            f"Video exceeds maximum duration: {e.videoDuration}s > {e.maxDuration}s",
            [],
        )
    except Exception as e:
        return False, str(e), []


def _perform_download(
    url: str,
    maxDuration: int = None,
    outputTemplate: str = None,
    destinationDirectory: str = None,
    playlistItems: str = "1",
    onlyAudio: bool = False,
    cookies_from_browser: str = None,
):
    # Create a temporary directory to store the downloaded files
    if destinationDirectory is None:
        destinationDirectory = mkdtemp()

    ydl_opts = {
        "format": "bestaudio/best"
        if onlyAudio
        else "worstvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/worst",
        "paths": {"home": destinationDirectory},
        "ignoreerrors": True,
    }

    if playlistItems:
        ydl_opts["playlist_items"] = playlistItems

    if cookies_from_browser:
        ydl_opts["cookies_from_browser"] = cookies_from_browser

    # Add output template if specified
    if outputTemplate:
        ydl_opts["outtmpl"] = outputTemplate

    errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m"))

    filename_collector = FilenameCollectorPP()
    with redirect_stderr(errStrIO):
        for _ in (True,):
            with YoutubeDL(ydl_opts) as ydl:
                if maxDuration and maxDuration > 0:
                    info = ydl.extract_info(url, download=False)
                    if not info:
                        break

                    entries = "entries" in info and info["entries"] or [info]

                    total_duration = 0

                    # Compute total duration
                    for entry in entries:
                        if entry:
                            total_duration += float(entry["duration"])

                    if total_duration >= maxDuration:
                        raise ExceededMaximumDuration(
                            videoDuration=total_duration,
                            maxDuration=maxDuration,
                            message="Video is too long",
                        )

                ydl.add_post_processor(filename_collector)
                ydl.download([url])

    errMsg = errStrIO.getvalue()
    errMsg = (
        [text for text in errMsg.split("\n") if text.startswith("ERROR")]
        if errMsg
        else ""
    )

    if len(filename_collector.filenames) <= 0:
        raise Exception(
            f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else ""
        )

    result = []

    for filename in filename_collector.filenames:
        result.append(filename)
        print("Downloaded " + filename)

    return result


class ExceededMaximumDuration(Exception):
    def __init__(self, videoDuration, maxDuration, message):
        self.videoDuration = videoDuration
        self.maxDuration = maxDuration
        super().__init__(message)


class EventStringIO(io.StringIO):
    def __init__(self, on_write=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.on_write = on_write

    def write(self, text):
        super().write(text)
        if self.on_write:
            self.on_write(text)