Spaces:
Running
Running
import logging | |
import os | |
import re | |
import shelve | |
import shutil | |
import subprocess | |
import tempfile | |
from pathlib import Path | |
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
# Import functions from your scripts (assuming they are structured appropriately) | |
# It's often better to refactor scripts into functions for easier import | |
try: | |
from huggingface_hub import InferenceClient | |
from src.create_presentation import ( | |
DEFAULT_LLM_MODEL, | |
DEFAULT_PRESENTATION_PROMPT_TEMPLATE, | |
generate_presentation_with_llm, | |
) | |
from src.create_video import ( | |
cleanup_temp_files, | |
concatenate_clips, | |
convert_pdf_to_images, | |
create_video_clips, | |
find_audio_files, | |
) | |
from src.transcription_to_audio import VOICE_ID, text_to_speech | |
except ImportError as e: | |
print(f"Error importing script functions: {e}") | |
print("Please ensure scripts are in the 'src' directory and structured correctly.") | |
exit(1) | |
load_dotenv() | |
# --- Configuration & Setup --- | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
HF_API_KEY = os.getenv("HF_API_KEY") | |
LLM_MODEL = os.getenv("LLM_MODEL", DEFAULT_LLM_MODEL) | |
PRESENTATION_PROMPT = os.getenv( | |
"PRESENTATION_PROMPT", DEFAULT_PRESENTATION_PROMPT_TEMPLATE | |
) | |
CACHE_DIR = ".cache" # For TTS caching | |
URL_CACHE_DIR = ".url_cache" | |
URL_CACHE_FILE = os.path.join(URL_CACHE_DIR, "presentations_cache") | |
TEMPLATE_DIR = os.getenv("TEMPLATE_DIR", "app/template") | |
# Initialize clients (do this once if possible, or manage carefully in functions) | |
try: | |
if HF_API_KEY: | |
hf_client = InferenceClient(token=HF_API_KEY, provider="cohere") | |
else: | |
logger.warning("HF_API_KEY not found. LLM generation will fail.") | |
hf_client = None | |
except Exception as e: | |
logger.error(f"Failed to initialize Hugging Face client: {e}") | |
hf_client = None | |
# --- Helper Functions --- | |
def fetch_webpage_content(url): | |
"""Fetches and extracts basic text content from a webpage.""" | |
logger.info(f"Fetching content from: {url}") | |
try: | |
response = requests.get(url, timeout=15) | |
response.raise_for_status() # Raise an exception for bad status codes | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Basic text extraction (can be improved significantly) | |
paragraphs = soup.find_all("p") | |
headings = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]) | |
list_items = soup.find_all("li") | |
content = ( | |
"\n".join([h.get_text() for h in headings]) | |
+ "\n\n" | |
+ "\n".join([p.get_text() for p in paragraphs]) | |
+ "\n\n" | |
+ "\n".join(["- " + li.get_text() for li in list_items]) | |
) | |
# Simple cleanup | |
content = re.sub(r"\s\s+", " ", content).strip() | |
logger.info( | |
f"Successfully fetched and parsed content (length: {len(content)})." | |
) | |
return content | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error fetching URL {url}: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Error parsing URL {url}: {e}") | |
return None | |
def parse_presentation_markdown(markdown_content): | |
"""Splits presentation markdown into slides with content and notes.""" | |
slides = [] | |
slide_parts = re.split(r"\n\n---\n\n", markdown_content) | |
for i, part in enumerate(slide_parts): | |
if "???" in part: | |
content, notes = part.split("???", 1) | |
slides.append({"id": i, "content": content.strip(), "notes": notes.strip()}) | |
else: | |
# Handle slides without notes (like title slide maybe) | |
slides.append( | |
{ | |
"id": i, | |
"content": part.strip(), | |
"notes": "", # Add empty notes field | |
} | |
) | |
logger.info(f"Parsed {len(slides)} slides from markdown.") | |
return slides | |
def reconstruct_presentation_markdown(slides_data): | |
"""Reconstructs the markdown string from slide data.""" | |
full_md = [] | |
for slide in slides_data: | |
slide_md = slide["content"] | |
if slide[ | |
"notes" | |
]: # Only add notes separator if notes exist and are not just whitespace | |
slide_md += f"\n\n???\n{slide['notes'].strip()}" | |
full_md.append(slide_md.strip()) # Ensure each slide part is stripped | |
return "\n\n---\n\n".join(full_md) | |
def generate_pdf_from_markdown(markdown_file_path, output_pdf_path): | |
"""Generates a PDF from a Markdown file using bs export + decktape.""" | |
logger.info(f"Attempting PDF gen: {markdown_file_path} -> {output_pdf_path}") | |
working_dir = os.path.dirname(markdown_file_path) | |
markdown_filename = os.path.basename(markdown_file_path) | |
html_output_dir_name = "bs_html_output" | |
html_output_dir_abs = os.path.join(working_dir, html_output_dir_name) | |
expected_html_filename = os.path.splitext(markdown_filename)[0] + ".html" | |
generated_html_path_abs = os.path.join(html_output_dir_abs, expected_html_filename) | |
pdf_gen_success = False | |
# ---- Step 1: Generate HTML using bs export ---- | |
try: | |
Path(html_output_dir_abs).mkdir(parents=True, exist_ok=True) | |
export_command = ["bs", "export", markdown_filename, "-o", html_output_dir_name] | |
logger.info(f"Running: {' '.join(export_command)} in CWD: {working_dir}") | |
export_result = subprocess.run( | |
export_command, | |
cwd=working_dir, | |
capture_output=True, | |
text=True, | |
check=True, | |
timeout=60, | |
) | |
logger.info("Backslide (bs export) OK.") | |
logger.debug(f"bs export stdout:\n{export_result.stdout}") | |
logger.debug(f"bs export stderr:\n{export_result.stderr}") | |
if not os.path.exists(generated_html_path_abs): | |
logger.error(f"Expected HTML not found: {generated_html_path_abs}") | |
try: | |
files_in_dir = os.listdir(html_output_dir_abs) | |
logger.error(f"Files in {html_output_dir_abs}: {files_in_dir}") | |
except FileNotFoundError: | |
logger.error( | |
f"HTML output directory {html_output_dir_abs} not found after bs run." | |
) | |
raise FileNotFoundError( | |
f"Generated HTML not found: {generated_html_path_abs}" | |
) | |
except FileNotFoundError: | |
logger.error( | |
"`bs` command not found. Install backslide (`npm install -g backslide`)." | |
) | |
raise gr.Error("HTML generation tool (backslide/bs) not found.") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Backslide (bs export) failed (code {e.returncode}).") | |
logger.error(f"bs stderr:\n{e.stderr}") | |
raise gr.Error(f"Backslide HTML failed: {e.stderr[:500]}...") | |
except subprocess.TimeoutExpired: | |
logger.error("Backslide (bs export) timed out.") | |
raise gr.Error("HTML generation timed out (backslide).") | |
except Exception as e: | |
logger.error(f"Unexpected error during bs export: {e}", exc_info=True) | |
raise gr.Error(f"Unexpected error during HTML generation: {e}") | |
# ---- Step 2: Generate PDF from HTML using decktape ---- | |
try: | |
Path(output_pdf_path).parent.mkdir(parents=True, exist_ok=True) | |
html_file_url = Path(generated_html_path_abs).as_uri() | |
decktape_command = [ | |
"decktape", | |
"-p", | |
"1000", | |
"--chrome-arg=--allow-running-insecure-content", | |
"--chrome-arg=--disable-web-security", | |
"--chrome-arg=--no-sandbox", | |
html_file_url, | |
str(output_pdf_path), | |
] | |
logger.info(f"Running PDF conversion: {' '.join(decktape_command)}") | |
decktape_result = subprocess.run( | |
decktape_command, | |
capture_output=True, | |
text=True, | |
check=True, | |
timeout=120, | |
) | |
logger.info("Decktape command executed successfully.") | |
logger.debug(f"decktape stdout:\n{decktape_result.stdout}") | |
logger.debug(f"decktape stderr:\n{decktape_result.stderr}") | |
if os.path.exists(output_pdf_path): | |
logger.info(f"PDF generated successfully: {output_pdf_path}") | |
pdf_gen_success = True | |
return output_pdf_path | |
else: | |
logger.error("Decktape command finished but output PDF not found.") | |
raise gr.Error("Decktape finished, but the PDF file was not created.") | |
except FileNotFoundError: | |
logger.error( | |
"`decktape` command not found. Install decktape (`npm install -g decktape`)." | |
) | |
raise gr.Error("PDF generation tool (decktape) not found.") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"Decktape command failed (code {e.returncode}).") | |
logger.error(f"decktape stderr:\n{e.stderr}") | |
raise gr.Error(f"Decktape PDF failed: {e.stderr[:500]}...") | |
except subprocess.TimeoutExpired: | |
logger.error("Decktape command timed out.") | |
raise gr.Error("PDF generation timed out (decktape).") | |
except Exception as e: | |
logger.error( | |
f"Unexpected error during decktape PDF generation: {e}", exc_info=True | |
) | |
raise gr.Error(f"Unexpected error during PDF generation: {e}") | |
finally: | |
# --- Cleanup HTML output directory --- | |
if os.path.exists(html_output_dir_abs): | |
try: | |
shutil.rmtree(html_output_dir_abs) | |
logger.info(f"Cleaned up HTML temp dir: {html_output_dir_abs}") | |
except Exception as cleanup_e: | |
logger.warning( | |
f"Could not cleanup HTML dir {html_output_dir_abs}: {cleanup_e}" | |
) | |
# Log final status | |
if pdf_gen_success: | |
logger.info(f"PDF generation process completed for {output_pdf_path}.") | |
else: | |
logger.error(f"PDF generation process failed for {output_pdf_path}.") | |
# --- Helper Function to Read CSS --- | |
def load_css(css_filename="style.scss"): | |
"""Loads CSS content from the template directory.""" | |
css_path = os.path.join(TEMPLATE_DIR, css_filename) | |
try: | |
with open(css_path, "r", encoding="utf-8") as f: | |
return f.read() | |
except FileNotFoundError: | |
logger.warning(f"CSS file not found at {css_path}. No custom styles applied.") | |
return "" # Return empty string instead of None | |
except Exception as e: | |
logger.error(f"Error reading CSS file {css_path}: {e}") | |
return "" # Return empty string on error | |
# --- Gradio Workflow Functions --- | |
def step1_fetch_and_generate_presentation(url, progress=gr.Progress(track_tqdm=True)): | |
"""Fetches content, generates presentation markdown, prepares editor, and copies template. Uses caching based on URL.""" | |
if not url: | |
raise gr.Error("Please enter a URL.") | |
logger.info(f"Step 1: Fetching & Generating for {url}") | |
status_update = f"Starting Step 1: Fetching content from {url}..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# --- Cache Check --- | |
try: | |
os.makedirs(URL_CACHE_DIR, exist_ok=True) # Ensure cache dir exists | |
with shelve.open(URL_CACHE_FILE) as cache: | |
if url in cache: | |
logger.info(f"Cache hit for URL: {url}") | |
progress(0.5, desc="Loading cached presentation...") | |
cached_data = cache[url] | |
presentation_md = cached_data.get("presentation_md") | |
slides_data = cached_data.get("slides_data") | |
if not presentation_md: | |
logger.warning( | |
f"Cache for {url} missing 'presentation_md'. Regenerating." | |
) | |
if not slides_data: | |
logger.warning( | |
f"Cache for {url} missing 'slides_data'. Regenerating." | |
) | |
if presentation_md and slides_data: | |
logger.info( | |
f"Found complete cache entry for {url} with {len(slides_data)} slides." | |
) | |
temp_dir = tempfile.mkdtemp() | |
md_path = os.path.join(temp_dir, "presentation.md") | |
try: | |
with open(md_path, "w", encoding="utf-8") as f: | |
f.write(presentation_md) | |
logger.info( | |
f"Wrote cached presentation to temp file: {md_path}" | |
) | |
# --- Copy Template Directory for Cached Item --- | |
template_src_dir = TEMPLATE_DIR | |
template_dest_dir = os.path.join( | |
temp_dir, os.path.basename(TEMPLATE_DIR) | |
) | |
if os.path.isdir(template_src_dir): | |
try: | |
shutil.copytree(template_src_dir, template_dest_dir) | |
logger.info( | |
f"Copied template dir to {template_dest_dir} (cached)" | |
) | |
except Exception as copy_e: | |
logger.error( | |
f"Failed to copy template dir for cache: {copy_e}" | |
) | |
shutil.rmtree(temp_dir) | |
raise gr.Error(f"Failed to prepare template: {copy_e}") | |
else: | |
logger.error( | |
f"Template source dir '{template_src_dir}' not found." | |
) | |
shutil.rmtree(temp_dir) | |
raise gr.Error( | |
f"Required template '{template_src_dir}' not found." | |
) | |
progress(0.9, desc="Preparing editor from cache...") | |
status_update = ( | |
"Loaded presentation from cache. Preparing editor..." | |
) | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
logger.info(f"Using cached data for {len(slides_data)} slides.") | |
# Final yield must match outputs list | |
yield ( | |
gr.update(value=status_update), | |
temp_dir, | |
md_path, | |
slides_data, | |
gr.update(visible=True), # editor_column | |
gr.update( | |
visible=True | |
), # btn_build_slides (Enable PDF button next) | |
gr.update( | |
interactive=False | |
), # btn_fetch_generate (disable) | |
) | |
return # End generator here for cache hit | |
except Exception as e: | |
logger.error(f"Error writing cached markdown: {e}") | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
# If writing cache fails, raise to trigger full regeneration flow | |
raise gr.Error(f"Failed to write cached markdown: {e}") | |
else: | |
# This case is now covered by the more specific logging above | |
pass # Continue to regeneration | |
# --- Cache Miss or Failed Cache Load --- | |
logger.info(f"Cache miss for URL: {url}. Proceeding with generation.") | |
progress(0.1, desc="Fetching webpage content...") | |
if not hf_client: | |
raise gr.Error("LLM Client not initialized. Check API Key.") | |
status_update = "Fetching webpage content..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
web_content = fetch_webpage_content(url) | |
if not web_content: | |
raise gr.Error("Failed to fetch or parse content from the URL.") | |
progress(0.3, desc="Generating presentation with LLM...") | |
status_update = "Generating presentation with LLM..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
try: | |
presentation_md = generate_presentation_with_llm( | |
hf_client, LLM_MODEL, PRESENTATION_PROMPT, web_content, url | |
) | |
except Exception as e: | |
logger.error(f"Error during LLM call: {e}", exc_info=True) | |
raise gr.Error(f"Failed to generate presentation from LLM: {e}") | |
if not presentation_md: | |
logger.error("LLM generation returned None.") | |
raise gr.Error("LLM generation failed (received None).") | |
# Check for basic structure early, but parsing handles final validation | |
if "---" not in presentation_md: | |
logger.warning( | |
"LLM output missing slide separators ('---'). Parsing might fail." | |
) | |
if "???" not in presentation_md: | |
logger.warning( | |
"LLM output missing notes separators ('???'). Notes might be empty." | |
) | |
progress(0.7, desc="Parsing presentation slides...") | |
status_update = "Parsing presentation slides..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
slides_data = parse_presentation_markdown(presentation_md) | |
if not slides_data: | |
logger.error("Parsing markdown resulted in zero slides.") | |
raise gr.Error("Failed to parse generated presentation markdown.") | |
# Create a temporary directory for this session | |
temp_dir = tempfile.mkdtemp() | |
md_path = os.path.join(temp_dir, "presentation.md") | |
with open(md_path, "w", encoding="utf-8") as f: | |
f.write(presentation_md) | |
logger.info(f"Presentation markdown saved to temp file: {md_path}") | |
# --- Copy Template Directory for New Item --- | |
template_src_dir = TEMPLATE_DIR | |
template_dest_dir = os.path.join(temp_dir, os.path.basename(TEMPLATE_DIR)) | |
if os.path.isdir(template_src_dir): | |
try: | |
shutil.copytree(template_src_dir, template_dest_dir) | |
logger.info(f"Copied template directory to {template_dest_dir}") | |
except Exception as copy_e: | |
logger.error(f"Failed to copy template directory: {copy_e}") | |
shutil.rmtree(temp_dir) | |
raise gr.Error(f"Failed to prepare template: {copy_e}") | |
else: | |
logger.error(f"Template source dir '{template_src_dir}' not found.") | |
shutil.rmtree(temp_dir) | |
raise gr.Error(f"Required template '{template_src_dir}' not found.") | |
# --- Store in Cache --- | |
try: | |
with shelve.open(URL_CACHE_FILE) as cache_write: | |
cache_write[url] = { | |
"presentation_md": presentation_md, | |
"slides_data": slides_data, | |
} | |
logger.info( | |
f"Stored generated presentation in cache for URL: {url}" | |
) | |
except Exception as e: | |
logger.error(f"Failed to write to cache for URL {url}: {e}") | |
progress(0.9, desc="Preparing editor...") | |
status_update = "Generated presentation. Preparing editor..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
logger.info(f"Prepared data for {len(slides_data)} slides.") | |
# Final yield must match outputs list | |
yield ( | |
gr.update(value=status_update), | |
temp_dir, | |
md_path, | |
slides_data, | |
gr.update(visible=True), # editor_column | |
gr.update(visible=True), # btn_build_slides (Enable PDF button next) | |
gr.update(interactive=False), # btn_fetch_generate (disable) | |
) | |
except Exception as e: | |
logger.error(f"Error in step 1 (fetch/generate): {e}", exc_info=True) | |
status_update = f"Error during presentation setup: {e}" | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# Yield a final tuple matching outputs, indicating error state if possible | |
yield ( | |
gr.update(value=status_update), | |
None, | |
None, | |
[], | |
gr.update(visible=False), # Keep editor hidden | |
gr.update(visible=False), # Keep build button hidden | |
gr.update(interactive=True), # Re-enable fetch button | |
) | |
# Do not re-raise if we want the Gradio app to stay alive | |
# raise gr.Error(f"Error during presentation setup: {e}") | |
def step2_build_slides( | |
state_temp_dir, | |
state_md_path, | |
state_slides_data, | |
*editors, | |
progress=gr.Progress(track_tqdm=True), | |
): | |
"""Renamed from step2_generate_pdf""" | |
if not all([state_temp_dir, state_md_path, state_slides_data]): | |
raise gr.Error("Session state missing.") | |
logger.info("Step 2: Building Slides (PDF + Images)") | |
status_update = "Starting Step 2: Building slides..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
num_slides = len(state_slides_data) | |
MAX_SLIDES = 20 | |
all_editors = list(editors) | |
if len(all_editors) != MAX_SLIDES * 2: | |
raise gr.Error(f"Incorrect editor inputs: {len(all_editors)}") | |
edited_contents = all_editors[:MAX_SLIDES][:num_slides] | |
edited_notes_list = all_editors[MAX_SLIDES:][:num_slides] | |
if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: | |
raise gr.Error("Editor input mismatch.") | |
progress(0.1, desc="Saving edited markdown...") | |
status_update = "Saving edited markdown..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
updated_slides = [] | |
for i in range(num_slides): | |
updated_slides.append( | |
{"id": i, "content": edited_contents[i], "notes": edited_notes_list[i]} | |
) | |
updated_md = reconstruct_presentation_markdown(updated_slides) | |
try: | |
with open(state_md_path, "w", encoding="utf-8") as f: | |
f.write(updated_md) | |
logger.info(f"Saved edited markdown: {state_md_path}") | |
except IOError as e: | |
status_update = f"Failed to save markdown: {e}" | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# Final yield must match outputs | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(visible=True), | |
gr.update(), | |
) | |
raise gr.Error(f"Failed to save markdown: {e}") | |
progress(0.3, desc="Generating PDF...") | |
status_update = "Generating PDF..." | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
pdf_output_path = os.path.join(state_temp_dir, "presentation.pdf") | |
try: | |
generated_pdf_path = generate_pdf_from_markdown(state_md_path, pdf_output_path) | |
if not generated_pdf_path: | |
raise gr.Error("PDF generation failed (check logs).") | |
except gr.Error as e: | |
status_update = f"PDF Generation Error: {e}" | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# Final yield must match outputs | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(visible=True), | |
gr.update(), | |
) | |
raise e | |
except Exception as e: | |
status_update = f"Unexpected PDF Error: {e}" | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# Final yield must match outputs | |
yield ( | |
gr.update(value=status_update), | |
None, | |
[], | |
gr.update(), | |
gr.update(visible=True), | |
gr.update(), | |
) | |
raise gr.Error(f"Unexpected error generating PDF: {e}") | |
progress(0.7, desc="Converting PDF to images...") | |
status_update = "Converting PDF to images..." | |
yield ( | |
gr.update(value=status_update), | |
generated_pdf_path, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
pdf_images = [] | |
try: | |
pdf_images = convert_pdf_to_images( | |
generated_pdf_path, dpi=150 | |
) # Use generated path | |
if not pdf_images: | |
raise gr.Error("PDF to image conversion failed.") | |
logger.info(f"Converted PDF to {len(pdf_images)} images.") | |
if len(pdf_images) != num_slides: | |
warning_msg = f"Warning: PDF page count ({len(pdf_images)}) != slide count ({num_slides}). Images might mismatch." | |
gr.Warning(warning_msg) | |
status_update += f" ({warning_msg})" | |
# Pad or truncate? For now, just return what we have, UI update logic handles MAX_SLIDES | |
except Exception as e: | |
logger.error(f"Error converting PDF to images: {e}", exc_info=True) | |
status_update = f"Failed to convert PDF to images: {e}" | |
yield ( | |
gr.update(value=status_update), | |
generated_pdf_path, | |
[], | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
) | |
# Final yield must match outputs | |
yield ( | |
gr.update(value=status_update), | |
generated_pdf_path, | |
[], | |
gr.update(), | |
gr.update(visible=True), | |
gr.update(value=generated_pdf_path, visible=True), | |
) | |
# Proceed without images? Or raise error? Let's raise. | |
raise gr.Error(f"Failed to convert PDF to images: {e}") | |
info_msg = f"Built {len(pdf_images)} slide images. Ready for Step 3." | |
logger.info(info_msg) | |
progress(1.0, desc="Slide build complete.") | |
status_update = f"Step 2 Complete: {info_msg}" | |
yield ( | |
gr.update(value=status_update), | |
generated_pdf_path, | |
pdf_images, # Return the list of image paths | |
gr.update(visible=True), # btn_generate_audio | |
gr.update(visible=False), # btn_build_slides | |
gr.update(value=generated_pdf_path, visible=True), # pdf_download_link | |
) | |
def step3_generate_audio(*args, progress=gr.Progress(track_tqdm=True)): | |
"""Generates audio files for the speaker notes using edited content.""" | |
# Args structure adjustment: | |
# args[0]: state_temp_dir | |
# args[1]: state_md_path | |
# args[2]: original_slides_data | |
# args[3...]: editors | |
state_temp_dir = args[0] | |
state_md_path = args[1] | |
original_slides_data = args[2] | |
# editors = args[3:] # Old slicing | |
num_slides = len(original_slides_data) | |
if num_slides == 0: | |
logger.error("Step 3 (Audio) called with zero slides data.") | |
raise gr.Error("No slide data available. Please start over.") | |
MAX_SLIDES = 20 # Ensure this matches UI definition | |
# --- Adjust indices based on removing status_textbox input --- | |
# Start editors from index 3 now | |
code_editors_start_index = 3 | |
notes_textboxes_start_index = 3 + MAX_SLIDES | |
# Slice the *actual* edited values based on num_slides | |
edited_contents = args[ | |
code_editors_start_index : code_editors_start_index + num_slides | |
] | |
edited_notes_list = args[ | |
notes_textboxes_start_index : notes_textboxes_start_index + num_slides | |
] | |
if not state_temp_dir or not state_md_path: | |
raise gr.Error("Session state lost (Audio step). Please start over.") | |
# Check slicing | |
if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: | |
logger.error( | |
f"Input slicing error (Audio step): Expected {num_slides}, got {len(edited_contents)} contents, {len(edited_notes_list)} notes." | |
) | |
raise gr.Error( | |
f"Input processing error: Mismatch after slicing ({num_slides} slides)." | |
) | |
logger.info(f"Processing {num_slides} slides for audio generation.") | |
status_update = "Starting Step 3: Generating audio..." | |
yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
gr.update(), | |
) * MAX_SLIDES * 2 | |
audio_dir = os.path.join(state_temp_dir, "audio") | |
os.makedirs(audio_dir, exist_ok=True) | |
# --- Update the presentation.md file AGAIN in case notes changed after PDF --- | |
# This might be redundant if users don't edit notes between PDF and Audio steps, | |
# but ensures the audio matches the *latest* notes displayed. | |
progress(0.1, desc="Saving latest notes...") | |
status_update = "Saving latest notes..." | |
yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
gr.update(), | |
) * MAX_SLIDES * 2 | |
updated_slides_data = [] | |
for i in range(num_slides): | |
updated_slides_data.append( | |
{ | |
"id": original_slides_data[i]["id"], # Keep original ID | |
"content": edited_contents[i], # Use sliced edited content | |
"notes": edited_notes_list[i], # Use sliced edited notes | |
} | |
) | |
updated_markdown = reconstruct_presentation_markdown(updated_slides_data) | |
try: | |
with open(state_md_path, "w", encoding="utf-8") as f: | |
f.write(updated_markdown) | |
logger.info(f"Updated presentation markdown before audio gen: {state_md_path}") | |
except IOError as e: | |
warning_msg = f"Warning: Could not save latest notes to markdown file: {e}" | |
gr.Warning(warning_msg) | |
status_update += f" ({warning_msg})" | |
yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
gr.update(), | |
) * MAX_SLIDES * 2 | |
# Note: We continue processing audio even if saving markdown fails | |
# If we need to stop and return final state, do it here: | |
# yield (gr.update(value=status_update), state_audio_dir, gr.update(), gr.update(visible=True), *[gr.update()]*N, *[gr.update()]*N) | |
# raise gr.Error(f"Failed to save updated markdown before audio gen: {e}") # Old raise | |
generated_audio_paths = ["" for _ in range(num_slides)] | |
audio_generation_failed = False | |
successful_audio_count = 0 | |
for i in range(num_slides): | |
note_text = edited_notes_list[i] | |
slide_num = i + 1 | |
progress_val = (i + 1) / num_slides * 0.8 + 0.1 | |
progress( | |
progress_val, | |
desc=f"Audio slide {slide_num}/{num_slides}", | |
) | |
status_update = f"Generating audio for slide {slide_num}/{num_slides}..." | |
yield (gr.update(value=status_update), audio_dir, gr.update(), gr.update()) + ( | |
gr.update(), | |
) * MAX_SLIDES * 2 | |
output_file_path = Path(audio_dir) / f"{slide_num}.wav" | |
if not note_text or not note_text.strip(): | |
try: # Generate silence | |
subprocess.run( | |
[ | |
"ffmpeg", | |
"-y", | |
"-f", | |
"lavfi", | |
"-i", | |
"anullsrc=r=44100:cl=mono", | |
"-t", | |
"0.1", | |
"-q:a", | |
"9", | |
str(output_file_path), | |
], | |
check=True, | |
capture_output=True, | |
text=True, | |
) | |
generated_audio_paths[i] = str(output_file_path) | |
except Exception as e: | |
audio_generation_failed = True | |
logger.error(f"Silence gen failed slide {i + 1}: {e}") | |
continue | |
try: # Generate TTS | |
success = text_to_speech( | |
note_text, output_file_path, voice=VOICE_ID, cache_dir=CACHE_DIR | |
) | |
if success: | |
generated_audio_paths[i] = str(output_file_path) | |
successful_audio_count += 1 | |
else: | |
audio_generation_failed = True | |
logger.error(f"TTS failed slide {i + 1}") | |
except Exception as e: | |
audio_generation_failed = True | |
logger.error(f"TTS exception slide {i + 1}: {e}", exc_info=True) | |
# --- Prepare outputs for Gradio --- | |
audio_player_updates = [ | |
gr.update(value=p if p else None, visible=bool(p and os.path.exists(p))) | |
for p in generated_audio_paths | |
] | |
regen_button_updates = [gr.update(visible=True)] * num_slides | |
audio_player_updates.extend( | |
[gr.update(value=None, visible=False)] * (MAX_SLIDES - num_slides) | |
) | |
regen_button_updates.extend([gr.update(visible=False)] * (MAX_SLIDES - num_slides)) | |
info_msg = f"Generated {successful_audio_count}/{num_slides} audio clips. " | |
if audio_generation_failed: | |
info_msg += "Some audio failed. Review/Regenerate before video." | |
gr.Warning(info_msg) | |
else: | |
info_msg += "Ready for Step 4." | |
logger.info(info_msg) | |
progress(1.0, desc="Audio generation complete.") | |
status_update = f"Step 3 Complete: {info_msg}" | |
# Return tuple including status update + original outputs | |
# Final yield must match outputs list | |
yield ( | |
gr.update(value=status_update), | |
audio_dir, | |
gr.update(visible=True), # btn_generate_video | |
gr.update(visible=False), # btn_generate_audio | |
*audio_player_updates, | |
*regen_button_updates, | |
) | |
def step4_generate_video( | |
state_temp_dir, | |
state_audio_dir, | |
state_pdf_path, # Use PDF path from state | |
progress=gr.Progress(track_tqdm=True), | |
): | |
"""Generates the final video using PDF images and audio files.""" | |
if not state_temp_dir or not state_audio_dir or not state_pdf_path: | |
raise gr.Error("Session state lost (Video step). Please start over.") | |
if not os.path.exists(state_pdf_path): | |
raise gr.Error(f"PDF file not found: {state_pdf_path}. Cannot generate video.") | |
if not os.path.isdir(state_audio_dir): | |
raise gr.Error( | |
f"Audio directory not found: {state_audio_dir}. Cannot generate video." | |
) | |
video_output_path = os.path.join(state_temp_dir, "final_presentation.mp4") | |
status_update = "Starting Step 4: Generating video..." | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
progress(0.1, desc="Preparing video components...") | |
pdf_images = [] # Initialize to ensure cleanup happens | |
try: | |
# Find audio files (natsorted) | |
audio_files = find_audio_files(state_audio_dir, "*.wav") | |
if not audio_files: | |
warning_msg = f"Warning: No WAV files found in {state_audio_dir}. Video might lack audio." | |
logger.warning(warning_msg) | |
status_update += f" ({warning_msg})" | |
# Decide whether to proceed with silent video or error out | |
# raise gr.Error(f"No audio files found in {state_audio_dir}") | |
# Convert PDF to images | |
progress(0.2, desc="Converting PDF to images...") | |
status_update = "Converting PDF back to images for video..." | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
pdf_images = convert_pdf_to_images(state_pdf_path, dpi=150) | |
if not pdf_images: | |
raise gr.Error(f"Failed to convert PDF ({state_pdf_path}) to images.") | |
# Allow video generation even if audio is missing or count mismatch | |
# The create_video_clips function should handle missing audio gracefully (e.g., use image duration) | |
if len(pdf_images) != len(audio_files): | |
warning_msg = f"Warning: Mismatch: {len(pdf_images)} PDF pages vs {len(audio_files)} audio files. Video clips might have incorrect durations or missing audio." | |
logger.warning(warning_msg) | |
status_update += f" ({warning_msg})" | |
# yield status_update # Old yield | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
progress(0.5, desc="Creating individual video clips...") | |
status_update = "Creating individual video clips..." | |
# yield status_update # Old yield | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
buffer_seconds = 1.0 | |
output_fps = 10 | |
video_clips = create_video_clips( | |
pdf_images, audio_files, buffer_seconds, output_fps | |
) | |
if not video_clips: | |
raise gr.Error("Failed to create any video clips.") | |
progress(0.8, desc="Concatenating clips...") | |
status_update = "Concatenating clips into final video..." | |
# yield status_update # Old yield | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
concatenate_clips(video_clips, video_output_path, output_fps) | |
logger.info(f"Video concatenation complete: {video_output_path}") | |
progress(0.95, desc="Cleaning up temp images...") | |
status_update = "Cleaning up temporary image files..." | |
# yield status_update # Old yield | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
cleanup_temp_files(pdf_images) # Pass the list of image paths | |
except Exception as e: | |
if pdf_images: | |
cleanup_temp_files(pdf_images) | |
logger.error(f"Video generation failed: {e}", exc_info=True) | |
status_update = f"Video generation failed: {e}" | |
yield (gr.update(value=status_update), gr.update(), gr.update()) | |
# Final yield must match outputs | |
yield (gr.update(value=status_update), gr.update(), gr.update(visible=True)) | |
raise gr.Error(f"Video generation failed: {e}") | |
info_msg = f"Video generated: {os.path.basename(video_output_path)}" | |
logger.info(info_msg) | |
progress(1.0, desc="Video Complete.") | |
status_update = f"Step 4 Complete: {info_msg}" | |
# Return tuple including status update | |
yield ( | |
gr.update(value=status_update), | |
gr.update(value=video_output_path, visible=True), # video_output | |
gr.update(visible=False), # btn_generate_video | |
) | |
def cleanup_session(temp_dir): | |
"""Removes the temporary directory.""" | |
if temp_dir and isinstance(temp_dir, str) and os.path.exists(temp_dir): | |
try: | |
shutil.rmtree(temp_dir) | |
logger.info(f"Cleaned up temporary directory: {temp_dir}") | |
return "Cleaned up session files." | |
except Exception as e: | |
logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") | |
return f"Error during cleanup: {e}" | |
logger.warning(f"Cleanup called but temp_dir invalid or not found: {temp_dir}") | |
return "No valid temporary directory found to clean." | |
# --- Dependency Check and Installation --- (Added) | |
def check_and_install_npm_dependency(command_name, package_name, install_instructions): | |
"""Checks if a command exists, tries to install via npm if not.""" | |
if shutil.which(command_name): | |
logger.info(f"Command '{command_name}' found.") | |
return True | |
else: | |
logger.warning( | |
f"Command '{command_name}' not found. Attempting to install '{package_name}' globally via npm..." | |
) | |
npm_command = ["npm", "install", "-g", package_name] | |
try: | |
result = subprocess.run( | |
npm_command, check=True, capture_output=True, text=True, timeout=300 | |
) | |
logger.info(f"Successfully installed '{package_name}'.") | |
logger.debug(f"npm stdout:\n{result.stdout}") | |
logger.debug(f"npm stderr:\n{result.stderr}") | |
# Verify again after install attempt | |
if shutil.which(command_name): | |
logger.info(f"Command '{command_name}' is now available.") | |
return True | |
else: | |
logger.error( | |
f"Installation of '{package_name}' reported success, but command '{command_name}' still not found. Check npm logs and PATH." | |
) | |
return False | |
except FileNotFoundError: | |
logger.error( | |
f"`npm` command not found. Cannot install '{package_name}'. {install_instructions}" | |
) | |
return False | |
except subprocess.CalledProcessError as e: | |
logger.error( | |
f"Failed to install '{package_name}' (return code {e.returncode})." | |
) | |
logger.error(f"npm stdout:\n{e.stdout}") | |
logger.error(f"npm stderr:\n{e.stderr}") | |
logger.error( | |
"Installation might require administrator privileges (e.g., run with 'sudo')." | |
) | |
return False | |
except subprocess.TimeoutExpired: | |
logger.error(f"Installation of '{package_name}' timed out.") | |
return False | |
except Exception as e: | |
logger.error( | |
f"An unexpected error occurred during '{package_name}' installation: {e}", | |
exc_info=True, | |
) | |
return False | |
# --- Gradio Interface --- | |
# Load custom CSS | |
custom_css = load_css() | |
with gr.Blocks( | |
theme=gr.themes.Soft(), css=custom_css, title="Webpage to Video" | |
) as demo: | |
gr.Markdown("# Webpage to Video Presentation Generator") | |
# State variables | |
state_temp_dir = gr.State(None) | |
state_md_path = gr.State(None) | |
state_audio_dir = gr.State(None) | |
state_pdf_path = gr.State(None) | |
state_slides_data = gr.State([]) | |
state_pdf_image_paths = gr.State([]) | |
MAX_SLIDES = 20 | |
# --- Tabbed Interface --- | |
with gr.Tabs(elem_id="tabs") as tabs_widget: | |
# Tab 1: Generate Presentation | |
with gr.TabItem("1. Generate Presentation", id=0): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("**Step 1:** Enter URL") | |
input_url = gr.Textbox( | |
label="Webpage URL", | |
value="https://huggingface.co./blog/llm-course", | |
) | |
btn_fetch_generate = gr.Button( | |
value="1. Fetch & Generate", variant="primary" | |
) | |
with gr.Column(scale=4): | |
gr.Markdown( | |
"### Instructions\n1. Enter URL & click 'Fetch & Generate'.\n2. Editor appears below.\n3. Go to next tab." | |
) | |
# Tab 2: Build Slides | |
with gr.TabItem("2. Build Slides", id=1): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("**Step 2:** Review/Edit, then build slides.") | |
btn_build_slides = gr.Button( | |
value="2. Build Slides", variant="secondary", visible=False | |
) | |
pdf_download_link = gr.File( | |
label="Download PDF", visible=False, interactive=False | |
) | |
with gr.Column(scale=4): | |
gr.Markdown( | |
"### Instructions\n1. Edit content/notes below.\n2. Click 'Build Slides'. Images appear.\n3. Download PDF from sidebar.\n4. Go to next tab." | |
) | |
# Tab 3: Generate Audio | |
with gr.TabItem("3. Generate Audio", id=2): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("**Step 3:** Review/Edit notes, then generate audio.") | |
btn_generate_audio = gr.Button( | |
value="3. Generate Audio", variant="primary", visible=False | |
) | |
with gr.Column(scale=4): | |
gr.Markdown( | |
"### Instructions\n1. Finalize notes below.\n2. Click 'Generate Audio'.\n3. Regenerate if needed.\n4. Go to next tab." | |
) | |
# Tab 4: Generate Video | |
with gr.TabItem("4. Create Video", id=3): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("**Step 4:** Create the final video.") | |
btn_generate_video = gr.Button( | |
value="4. Create Video", variant="primary", visible=False | |
) | |
with gr.Column(scale=4): | |
gr.Markdown( | |
"### Instructions\n1. Click 'Create Video'.\n2. Video appears below.".replace( | |
"\n", "\n" | |
) # Ensure newlines are rendered | |
) | |
video_output = gr.Video(label="Final Video", visible=False) | |
# --- Status Textbox (Moved BEFORE editor_column) --- | |
with gr.Row(): | |
status_textbox = gr.Textbox( | |
label="Status", | |
value="Enter a URL and click 'Fetch & Generate' to start.", | |
interactive=False, | |
lines=1, | |
max_lines=1, | |
) | |
# Define the shared editor structure once, AFTER tabs and status box | |
slide_editors_group = [] | |
with gr.Column(visible=False) as editor_column: # Initially hidden | |
gr.Markdown("--- \n## Edit Slides & Notes") | |
gr.Markdown("_(PDF uses content & notes, Audio uses notes only)_") | |
for i in range(MAX_SLIDES): | |
with gr.Accordion(f"Slide {i + 1}", open=(i == 0), visible=False) as acc: | |
with gr.Row(): # Row for Content/Preview/Image | |
with gr.Column(scale=1): | |
code_editor = gr.Code( | |
label="Content (Markdown)", | |
language="markdown", | |
lines=15, | |
interactive=True, | |
visible=False, | |
) | |
notes_textbox = ( | |
gr.Textbox( # Changed from gr.Code to gr.Textbox | |
label="Script/Notes (for Audio)", | |
lines=8, | |
# language="markdown", # Removed language parameter | |
interactive=True, | |
visible=False, | |
) | |
) | |
with gr.Column(scale=1): | |
slide_image = gr.Image( | |
label="Slide Image", | |
visible=False, | |
interactive=False, | |
# height=300, # Removed fixed height | |
) | |
md_preview = gr.Markdown(visible=False) | |
with gr.Row(): # Row for audio controls | |
audio_player = gr.Audio( | |
label="Generated Audio", | |
visible=False, | |
interactive=False, | |
scale=3, | |
) | |
regen_button = gr.Button( | |
value="Regen Audio", visible=False, scale=1, size="sm" | |
) | |
slide_editors_group.append( | |
( | |
acc, | |
code_editor, | |
md_preview, | |
notes_textbox, | |
audio_player, | |
regen_button, | |
slide_image, | |
) | |
) | |
code_editor.change( | |
fn=lambda x: x, | |
inputs=code_editor, | |
outputs=md_preview, | |
show_progress="hidden", | |
) | |
# --- Component Lists for Updates --- | |
all_editor_components = [comp for group in slide_editors_group for comp in group] | |
all_code_editors = [group[1] for group in slide_editors_group] | |
all_notes_textboxes = [group[3] for group in slide_editors_group] | |
all_audio_players = [group[4] for group in slide_editors_group] | |
all_regen_buttons = [group[5] for group in slide_editors_group] | |
all_slide_images = [group[6] for group in slide_editors_group] | |
# --- Main Button Click Handlers --- (Outputs use locally defined component vars) | |
# Step 1 Click Handler | |
step1_outputs = [ | |
status_textbox, # Added status output | |
state_temp_dir, | |
state_md_path, | |
state_slides_data, | |
editor_column, # Show the editor column | |
btn_build_slides, # Enable the button in Tab 2 | |
btn_fetch_generate, # Disable self | |
] | |
btn_fetch_generate.click( | |
fn=step1_fetch_and_generate_presentation, | |
inputs=[input_url], | |
outputs=step1_outputs, | |
show_progress="full", | |
).then( | |
fn=lambda s_data: ( | |
gr.update( | |
value="Editor populated. Proceed to Step 2." | |
), # Status update first | |
# Then unpack the list/tuple of editor updates | |
*[ | |
upd | |
for i, slide in enumerate(s_data) | |
if i < MAX_SLIDES | |
for upd in [ | |
gr.update( | |
# Get cleaned first line for title, then use in f-string | |
label=f"Slide {i + 1}: {(slide['content'].splitlines()[0] if slide['content'] else '').strip()[:30]}...", | |
visible=True, | |
open=(i == 0), | |
), # Accordion | |
gr.update(value=slide["content"], visible=True), # Code Editor | |
gr.update(value=slide["content"], visible=True), # MD Preview | |
gr.update(value=slide["notes"], visible=True), # Notes Textbox | |
gr.update(value=None, visible=False), # Audio Player | |
gr.update(visible=False), # Regen Button | |
gr.update(value=None, visible=False), # Slide Image | |
] | |
] | |
+ [ | |
upd | |
for i in range(len(s_data), MAX_SLIDES) | |
for upd in [gr.update(visible=False)] * 7 | |
], | |
), # Correctly construct the output tuple | |
inputs=[state_slides_data], | |
outputs=[status_textbox] + all_editor_components, # Add status_textbox output | |
show_progress="hidden", | |
).then(lambda: gr.update(selected=1), outputs=tabs_widget) # Switch to Tab 2 | |
# Step 2 Click Handler | |
step2_inputs = ( | |
[state_temp_dir, state_md_path, state_slides_data] | |
+ all_code_editors | |
+ all_notes_textboxes | |
) | |
step2_outputs = [ | |
status_textbox, # Added status output | |
state_pdf_path, | |
state_pdf_image_paths, | |
btn_generate_audio, # Enable button in Tab 3 | |
btn_build_slides, # Disable self | |
pdf_download_link, # Update download link in Tab 2 | |
] | |
btn_build_slides.click( | |
fn=step2_build_slides, | |
inputs=step2_inputs, | |
outputs=step2_outputs, | |
show_progress="full", | |
).then( | |
fn=lambda image_paths: ( | |
gr.update( | |
value="Slide images updated. Proceed to Step 3." | |
), # Status update first | |
# Then unpack the list/tuple of image updates | |
*[ | |
gr.update( | |
value=image_paths[i] if i < len(image_paths) else None, | |
visible=(i < len(image_paths)), | |
) | |
for i in range(MAX_SLIDES) | |
], | |
), # Correctly construct the output tuple | |
inputs=[state_pdf_image_paths], | |
outputs=[status_textbox] + all_slide_images, # Add status_textbox output | |
show_progress="hidden", | |
).then(lambda: gr.update(selected=2), outputs=tabs_widget) # Switch to Tab 3 | |
# Step 3 Click Handler | |
step3_inputs = ( | |
[state_temp_dir, state_md_path, state_slides_data] | |
+ all_code_editors | |
+ all_notes_textboxes | |
) | |
step3_outputs = ( | |
[ | |
status_textbox, # Added status output | |
state_audio_dir, | |
btn_generate_video, # Enable button in Tab 4 | |
btn_generate_audio, # Disable self | |
] | |
+ all_audio_players | |
+ all_regen_buttons | |
) | |
btn_generate_audio.click( | |
fn=step3_generate_audio, | |
inputs=step3_inputs, | |
outputs=step3_outputs, | |
show_progress="full", | |
).then( | |
fn=lambda: ( | |
gr.update(value="Audio generated. Proceed to Step 4."), | |
gr.update(selected=3), | |
), # Status update is handled by the main function yield now | |
# No inputs needed for this simple status update + tab switch | |
outputs=[status_textbox, tabs_widget], | |
show_progress="hidden", | |
) | |
# Step 4 Click Handler | |
step4_inputs = [state_temp_dir, state_audio_dir, state_pdf_path] | |
step4_outputs = [ | |
status_textbox, # Added status output | |
video_output, # Update video output in Tab 4 | |
btn_generate_video, # Disable self | |
] | |
btn_generate_video.click( | |
fn=step4_generate_video, | |
inputs=step4_inputs, | |
outputs=step4_outputs, | |
show_progress="full", | |
) | |
if __name__ == "__main__": | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
os.makedirs(URL_CACHE_DIR, exist_ok=True) | |
# --- Check/Install Dependencies --- (Added) | |
logger.info("Checking for external dependencies...") | |
backslide_ok = check_and_install_npm_dependency( | |
"bs", | |
"backslide", | |
"Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g backslide'", | |
) | |
decktape_ok = check_and_install_npm_dependency( | |
"decktape", | |
"decktape", | |
"Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g decktape'", | |
) | |
if not backslide_ok: | |
gr.Warning( | |
"Backslide (bs) command check/install failed. PDF generation might fail. Check logs." | |
) | |
if not decktape_ok: | |
gr.Warning( | |
"Decktape command check/install failed. PDF generation might fail. Check logs." | |
) | |
# --- End Dependency Check --- | |
demo.queue().launch(debug=True) | |