import gradio as gr from google import genai from google.genai import types from PIL import Image from io import BytesIO import base64 import os import json import random import urllib.parse import time import logging # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # Initialize the Google Generative AI client with the API key from environment variables try: api_key = os.environ['GEMINI_API_KEY'] except KeyError: raise ValueError("Please set the GEMINI_API_KEY environment variable.") client = genai.Client(api_key=api_key) # Define safety settings to disable all filters for content generation SAFETY_SETTINGS = [ types.SafetySetting( category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, threshold=types.HarmBlockThreshold.BLOCK_NONE, ), types.SafetySetting( category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, threshold=types.HarmBlockThreshold.BLOCK_NONE, ), types.SafetySetting( category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, threshold=types.HarmBlockThreshold.BLOCK_NONE, ), types.SafetySetting( category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, threshold=types.HarmBlockThreshold.BLOCK_NONE, ), types.SafetySetting( category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, threshold=types.HarmBlockThreshold.BLOCK_NONE, ), ] # Define progress stages and corresponding witty messages PROGRESS_STAGES = { "initializing": [ "Warming up the creativity engine... 🚀", "Getting ready to spark some magic... ✨", "Loading the idea generator... ⚡" ], "generating_ideas": [ "Brainstorming brilliant ideas... 💡", "Cooking up some viral concepts... 🍳", "Mixing creativity with a pinch of flair... 🎨" ], "creating_image": [ "Painting a stunning visual... 🖌️", "Snapping the perfect shot... 📸", "Crafting a masterpiece pixel by pixel... 🖼️" ], "retrying_image": [ "Tweaking the image for perfection... 🔧", "Giving it another go for that wow factor... 🔄", "Adjusting the lens for a better shot... 🎥" ], "generating_video": [ "Rolling the cameras for a viral video... 🎬", "Animating the scene with cinematic vibes... 🌟", "Producing a TikTok-worthy clip... 🎞️" ], "finalizing": [ "Putting the final touches... 🎀", "Polishing your content to shine... 💎", "Almost ready to blow up the feed... 🔥" ] } def clean_response_text(response_text): """ Clean the API response by removing Markdown code block markers. """ logging.debug("Cleaning response text") cleaned_text = response_text.strip() if cleaned_text.startswith("```json"): cleaned_text = cleaned_text[len("```json"):].strip() if cleaned_text.endswith("```"): cleaned_text = cleaned_text[:-len("```")].strip() return cleaned_text def generate_ideas(user_input): """ Generate a diverse set of ideas based on the user's input concept using the LLM. """ logging.debug(f"Generating ideas for input: {user_input}") prompt = f""" The user has provided the concept: "{user_input}". You must generate 5 diverse and creative ideas for a TikTok video that are directly and explicitly related to "{user_input}". Each idea must clearly incorporate and focus on the core theme of "{user_input}" without deviating into unrelated topics. Each idea should be a short sentence describing a specific scene or concept. Return the response as a JSON object with a single key 'ideas' containing a list of 5 ideas. Ensure the response is strictly in JSON format. """ try: response = client.models.generate_content( model='gemini-2.0-flash-lite', contents=[prompt], config=types.GenerateContentConfig( temperature=1.2, safety_settings=SAFETY_SETTINGS ) ) logging.debug(f"Generate ideas response: {response.text}") if not response.text or response.text.isspace(): raise ValueError("Empty response from API") cleaned_text = clean_response_text(response.text) response_json = json.loads(cleaned_text) if 'ideas' not in response_json or not isinstance(response_json['ideas'], list) or len(response_json['ideas']) != 5: raise ValueError("Invalid JSON format: 'ideas' key missing, not a list, or incorrect length") logging.debug(f"Generated ideas: {response_json['ideas']}") return response_json['ideas'] except Exception as e: logging.error(f"Error generating ideas: {e}") return [ f"A dramatic {user_input} scene with cinematic lighting", f"A close-up of {user_input} in a futuristic setting", f"A high-energy {user_input} moment with vibrant colors", f"A serene {user_input} scene with soft focus", f"An action-packed {user_input} challenge with dynamic angles" ] def generate_item(user_input, ideas, generate_video=False, max_retries=3): """ Generate a single feed item with progress updates. Yields progress stage and message for UI updates. """ logging.debug("Starting generate_item") video_base64 = None max_total_attempts = 3 timeout_seconds = 60 # Timeout for API calls total_attempts = 0 while total_attempts < max_total_attempts: total_attempts += 1 logging.debug(f"Total attempt {total_attempts}") yield {"stage": "initializing", "message": random.choice(PROGRESS_STAGES["initializing"]), "progress": 10} generated_image = None text = None img_str = None image_prompt = None for image_attempt in range(max_retries): logging.debug(f"Image attempt {image_attempt + 1}") yield {"stage": "creating_image", "message": random.choice(PROGRESS_STAGES["creating_image"]), "progress": 30 + (image_attempt * 10)} selected_idea = random.choice(ideas) prompt = f""" The user has provided the concept: "{user_input}". Based on this concept and the specific idea "{selected_idea}", create content for a TikTok video. Return a JSON object with two keys: - 'caption': A short, viral TikTok-style caption with hashtags that reflects "{user_input}". - 'image_prompt': A detailed image prompt for generating a high-quality visual scene, ensuring the theme of "{user_input}" is central. The image prompt should describe the scene vividly, specify a perspective and style, and ensure no text or letters are included. Ensure the response is strictly in JSON format. """ try: start_time = time.time() response = client.models.generate_content( model='gemini-2.0-flash-lite', contents=[prompt], config=types.GenerateContentConfig( temperature=1.2, safety_settings=SAFETY_SETTINGS ) ) if time.time() - start_time > timeout_seconds: raise TimeoutError("Image caption generation timed out") logging.debug(f"Generate content response: {response.text}") cleaned_text = clean_response_text(response.text) response_json = json.loads(cleaned_text) text = response_json['caption'] image_prompt = response_json['image_prompt'] except Exception as e: logging.error(f"Error generating item: {e}") text = f"Amazing {user_input}! 🔥 #{user_input.replace(' ', '')}" image_prompt = f"A vivid scene of {selected_idea} related to {user_input}, in a vibrant pop art style, no text or letters" try: start_time = time.time() imagen = client.models.generate_images( model='imagen-3.0-generate-002', prompt=image_prompt, config=types.GenerateImagesConfig( aspect_ratio="9:16", number_of_images=1 ) ) if time.time() - start_time > timeout_seconds: raise TimeoutError("Image generation timed out") if imagen.generated_images and len(imagen.generated_images) > 0: generated_image = imagen.generated_images[0] image = Image.open(BytesIO(generated_image.image.image_bytes)) target_width = 360 target_height = int(target_width / 9 * 16) image = image.resize((target_width, target_height), Image.LANCZOS) buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() logging.debug("Image generated successfully") break else: logging.warning("No images generated") if image_attempt == max_retries - 1 and total_attempts == max_total_attempts: image = Image.new('RGB', (360, 640), color='gray') buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Returning with placeholder image") return { 'text': text, 'image_base64': img_str, 'video_base64': None, 'ideas': ideas } yield {"stage": "retrying_image", "message": random.choice(PROGRESS_STAGES["retrying_image"]), "progress": 40 + (image_attempt * 10)} except Exception as e: logging.error(f"Error generating image: {e}") if image_attempt == max_retries - 1 and total_attempts == max_total_attempts: image = Image.new('RGB', (360, 640), color='gray') buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Returning with placeholder image") return { 'text': text, 'image_base64': img_str, 'video_base64': None, 'ideas': ideas } yield {"stage": "retrying_image", "message": random.choice(PROGRESS_STAGES["retrying_image"]), "progress": 40 + (image_attempt * 10)} if generate_video and generated_image is not None: logging.debug("Attempting video generation") yield {"stage": "generating_video", "message": random.choice(PROGRESS_STAGES["generating_video"]), "progress": 70} try: video_prompt = f""" The user concept is "{user_input}". Based on this and the scene: {image_prompt}, create a video. Use a close-up shot with a slow dolly shot circling around the subject, using shallow focus on the main subject to emphasize details, in a realistic style with cinematic lighting. """ start_time = time.time() operation = client.models.generate_videos( model="veo-2.0-generate-001", prompt=video_prompt, image=generated_image.image, config=types.GenerateVideosConfig( aspect_ratio="9:16", number_of_videos=1, duration_seconds=5, negative_prompt="blurry, low quality, text, letters" ) ) while not operation.done: if time.time() - start_time > timeout_seconds: raise TimeoutError("Video generation timed out") time.sleep(5) operation = client.operations.get(operation) if operation.error: raise ValueError(f"Video generation failed: {operation.error.message}") if operation.response and hasattr(operation.response, 'generated_videos') and operation.response.generated_videos: if len(operation.response.generated_videos) > 0: video = operation.response.generated_videos[0] if video and hasattr(video, 'video'): video_data = client.files.download(file=video.video) video_bytes = video_data if isinstance(video_data, bytes) else BytesIO(video_data).getvalue() video_base64 = base64.b64encode(video_bytes).decode() yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Video generated successfully") return { 'text': text, 'image_base64': img_str, 'video_base64': video_base64, 'ideas': ideas } raise ValueError("No valid video generated") else: raise ValueError("Video generation operation failed: No generated_videos in response") except Exception as e: logging.error(f"Error generating video: {e}") logging.debug("Falling back to text-to-video") yield {"stage": "generating_video", "message": random.choice(PROGRESS_STAGES["generating_video"]), "progress": 80} try: start_time = time.time() operation = client.models.generate_videos( model="veo-2.0-generate-001", prompt=video_prompt, config=types.GenerateVideosConfig( aspect_ratio="9:16", number_of_videos=1, duration_seconds=5, negative_prompt="blurry, low quality, text, letters" ) ) while not operation.done: if time.time() - start_time > timeout_seconds: raise TimeoutError("Text-to-video generation timed out") time.sleep(5) operation = client.operations.get(operation) if operation.error: raise ValueError(f"Video generation failed: {operation.error.message}") if operation.response and hasattr(operation.response, 'generated_videos') and operation.response.generated_videos: if len(operation.response.generated_videos) > 0: video = operation.response.generated_videos[0] if video and hasattr(video, 'video'): video_data = client.files.download(file=video.video) video_bytes = video_data if isinstance(video_data, bytes) else BytesIO(video_data).getvalue() video_base64 = base64.b64encode(video_bytes).decode() yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Text-to-video generated successfully") return { 'text': text, 'image_base64': img_str, 'video_base64': video_base64, 'ideas': ideas } raise ValueError("No valid video generated") else: raise ValueError("Video generation operation failed: No generated_videos in response") except Exception as e: logging.error(f"Error generating text-to-video: {e}") if total_attempts == max_total_attempts: yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Returning without video") return { 'text': text, 'image_base64': img_str, 'video_base64': None, 'ideas': ideas } if img_str is not None: yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Returning with image only") return { 'text': text, 'image_base64': img_str, 'video_base64': video_base64, 'ideas': ideas } image = Image.new('RGB', (360, 640), color='gray') buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() yield {"stage": "finalizing", "message": random.choice(PROGRESS_STAGES["finalizing"]), "progress": 90} logging.debug("Returning with placeholder image") return { 'text': f"Amazing {user_input}! 🔥 #{user_input.replace(' ', '')}", 'image_base64': img_str, 'video_base64': None, 'ideas': ideas } def start_feed(user_input, generate_video, current_index, feed_items): """ Start or update the feed based on the user input with progress updates. """ logging.debug("Starting start_feed") if not user_input.strip(): user_input = "trending" current_user_input = user_input is_loading = True share_links = "" timeout_seconds = 120 # Timeout for entire generation process html_content = generate_html([], False, 0, user_input, is_loading, "initializing", random.choice(PROGRESS_STAGES["initializing"]), 10) yield current_user_input, current_index, feed_items, html_content, share_links, is_loading try: start_time = time.time() html_content = generate_html([], False, 0, user_input, is_loading, "generating_ideas", random.choice(PROGRESS_STAGES["generating_ideas"]), 20) yield current_user_input, current_index, feed_items, html_content, share_links, is_loading ideas = generate_ideas(user_input) item_generator = generate_item(user_input, ideas, generate_video=generate_video) try: while True: if time.time() - start_time > timeout_seconds: logging.error("Generation timed out") raise TimeoutError("Feed generation timed out") progress = next(item_generator) logging.debug(f"Progress update: {progress}") if isinstance(progress, dict) and "stage" in progress: html_content = generate_html([], False, 0, user_input, is_loading, progress["stage"], progress["message"], progress["progress"]) yield current_user_input, current_index, feed_items, html_content, share_links, is_loading else: logging.debug(f"Received final item: {progress}") item = progress feed_items = [item] current_index = 0 share_links = generate_share_links(item['image_base64'], item['video_base64'], item['text']) is_loading = False html_content = generate_html(feed_items, False, current_index, user_input, is_loading) logging.debug("Feed generation complete") yield current_user_input, current_index, feed_items, html_content, share_links, is_loading return except StopIteration as e: # Handle the generator's return value if e.value is not None: logging.debug(f"Generator returned final item: {e.value}") item = e.value feed_items = [item] current_index = 0 share_links = generate_share_links(item['image_base64'], item['video_base64'], item['text']) is_loading = False html_content = generate_html(feed_items, False, current_index, user_input, is_loading) logging.debug("Feed generation complete via StopIteration") yield current_user_input, current_index, feed_items, html_content, share_links, is_loading return else: logging.warning("Generator returned without a final item") raise ValueError("Generator did not return a valid item") except Exception as e: logging.error(f"Error in start_feed: {e}") html_content = """
Error generating content: {str(e)}. Please try again!
Error generating content: {str(e)}. Please try again!
Download the media to share:
Click a share button below to start a post with the caption, then manually upload the downloaded image or video.
""" share_links = """ """ youtube_share = "" if video_base64: youtube_share = f""" """ return f"""Enter a concept or idea to start your feed!