Spaces:
Running
on
Zero
Running
on
Zero
# Code based on cutechicken/whisper-webui-translate | |
import io | |
from contextlib import redirect_stderr | |
from tempfile import mkdtemp | |
from typing import List | |
import yt_dlp | |
from yt_dlp import YoutubeDL | |
from yt_dlp.postprocessor import PostProcessor | |
class FilenameCollectorPP(PostProcessor): | |
def __init__(self): | |
super(FilenameCollectorPP, self).__init__(None) | |
self.filenames = [] | |
def run(self, information): | |
self.filenames.append(information["filepath"]) | |
return [], information | |
def download_url( | |
url: str, | |
maxDuration: int = None, | |
destinationDirectory: str = None, | |
playlistItems: str = "1", | |
cookies_from_browser: str = None, | |
) -> tuple[bool, str, List[str]]: | |
try: | |
video_paths = _perform_download( | |
url, | |
maxDuration=maxDuration, | |
outputTemplate=None, | |
destinationDirectory=destinationDirectory, | |
playlistItems=playlistItems, | |
cookies_from_browser=cookies_from_browser, | |
) | |
return True, None, video_paths | |
except yt_dlp.utils.DownloadError as e: | |
# In case of an OS error, try again with a different output template | |
if e.msg and e.msg.find("[Errno 36] File name too long") >= 0: | |
try: | |
video_paths = _perform_download( | |
url, | |
maxDuration=maxDuration, | |
outputTemplate="%(title).10s %(id)s.%(ext)s", | |
cookies_from_browser=cookies_from_browser, | |
) | |
return True, None, video_paths | |
except Exception as retry_error: | |
return False, str(retry_error), [] | |
return False, str(e), [] | |
except ExceededMaximumDuration as e: | |
return ( | |
False, | |
f"Video exceeds maximum duration: {e.videoDuration}s > {e.maxDuration}s", | |
[], | |
) | |
except Exception as e: | |
return False, str(e), [] | |
def _perform_download( | |
url: str, | |
maxDuration: int = None, | |
outputTemplate: str = None, | |
destinationDirectory: str = None, | |
playlistItems: str = "1", | |
onlyAudio: bool = False, | |
cookies_from_browser: str = None, | |
): | |
# Create a temporary directory to store the downloaded files | |
if destinationDirectory is None: | |
destinationDirectory = mkdtemp() | |
ydl_opts = { | |
"format": "bestaudio/best" | |
if onlyAudio | |
else "worstvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/worst", | |
"paths": {"home": destinationDirectory}, | |
"ignoreerrors": True, | |
} | |
if playlistItems: | |
ydl_opts["playlist_items"] = playlistItems | |
if cookies_from_browser: | |
ydl_opts["cookies_from_browser"] = cookies_from_browser | |
# Add output template if specified | |
if outputTemplate: | |
ydl_opts["outtmpl"] = outputTemplate | |
errStrIO = EventStringIO(on_write=lambda text: print(f"\033[91m{text}\033[0m")) | |
filename_collector = FilenameCollectorPP() | |
with redirect_stderr(errStrIO): | |
for _ in (True,): | |
with YoutubeDL(ydl_opts) as ydl: | |
if maxDuration and maxDuration > 0: | |
info = ydl.extract_info(url, download=False) | |
if not info: | |
break | |
entries = "entries" in info and info["entries"] or [info] | |
total_duration = 0 | |
# Compute total duration | |
for entry in entries: | |
if entry: | |
total_duration += float(entry["duration"]) | |
if total_duration >= maxDuration: | |
raise ExceededMaximumDuration( | |
videoDuration=total_duration, | |
maxDuration=maxDuration, | |
message="Video is too long", | |
) | |
ydl.add_post_processor(filename_collector) | |
ydl.download([url]) | |
errMsg = errStrIO.getvalue() | |
errMsg = ( | |
[text for text in errMsg.split("\n") if text.startswith("ERROR")] | |
if errMsg | |
else "" | |
) | |
if len(filename_collector.filenames) <= 0: | |
raise Exception( | |
f"Cannot download {url}, " + "\n".join(errMsg) if errMsg else "" | |
) | |
result = [] | |
for filename in filename_collector.filenames: | |
result.append(filename) | |
print("Downloaded " + filename) | |
return result | |
class ExceededMaximumDuration(Exception): | |
def __init__(self, videoDuration, maxDuration, message): | |
self.videoDuration = videoDuration | |
self.maxDuration = maxDuration | |
super().__init__(message) | |
class EventStringIO(io.StringIO): | |
def __init__(self, on_write=None, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.on_write = on_write | |
def write(self, text): | |
super().write(text) | |
if self.on_write: | |
self.on_write(text) | |