Deadmon's picture
Update app.py
726835e verified
raw
history blame contribute delete
14.6 kB
import os
import requests
import zipfile
import gradio as gr
import shutil
from PIL import Image
import random
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
OUTPUT_DIR = "downloaded_images"
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images")
ZIP_FILE = os.path.join(OUTPUT_DIR, "images.zip")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Constants
ITEMS_PER_PAGE = 40
DAILY_IMAGE_LIMIT = 2000
MAX_PAGES = min(DAILY_IMAGE_LIMIT // ITEMS_PER_PAGE, 10)
IMAGES_PER_ROW = 4
MAX_ROWS = 6
TOTAL_IMAGES = IMAGES_PER_ROW * MAX_ROWS
# API Configurations (Your original keys)
API_CONFIGS = {
"pexels": {
"base_url": "https://api.pexels.com/v1/search",
"headers": {"Authorization": "klHADHclpse2e2xSP9h747AgfE1Rx0wioemGhXYtedjZzvJ1WBUKwz7g"},
"image_key": ["src", "medium"],
"result_key": "photos",
"delay": 2
},
"unsplash": {
"base_url": "https://api.unsplash.com/search/photos",
"headers": {},
"image_key": ["urls", "small"],
"result_key": "results",
"delay": 2,
"client_id": "UKkhpD_Rs5-s1gIlVX28iNs_8E4ysPhQniyIpDpKUnU"
},
"pixabay": {
"base_url": "https://pixabay.com/api/",
"headers": {},
"image_key": ["webformatURL"],
"result_key": "hits",
"delay": 1,
"api_key": "45122300-cd3621e1539e8e95430ee3efc"
}
}
def fetch_image_urls(api_name, category, num_images):
config = API_CONFIGS[api_name]
num_pages_needed = (num_images + ITEMS_PER_PAGE - 1) // ITEMS_PER_PAGE
# Select random pages to fetch from (no tracking)
all_pages = list(range(1, MAX_PAGES + 1))
if len(all_pages) < num_pages_needed:
logger.warning(f"Insufficient pages available: {len(all_pages)} < {num_pages_needed}")
return []
selected_pages = random.sample(all_pages, num_pages_needed)
logger.info(f"Selected pages for {api_name}: {selected_pages}")
image_urls = []
for page in selected_pages:
if api_name == "pixabay":
url = f"{config['base_url']}?key={config['api_key']}&q={category.lower()}&per_page={ITEMS_PER_PAGE}&page={page}"
params = {}
elif api_name == "unsplash":
url = config["base_url"]
params = {
"query": category.lower(),
"per_page": ITEMS_PER_PAGE,
"page": page,
"client_id": config["client_id"]
}
else: # pexels
url = config["base_url"]
params = {
"query": category.lower(),
"per_page": ITEMS_PER_PAGE,
"page": page
}
try:
logger.info(f"Requesting {url} with params {params} for {api_name}")
time.sleep(config.get("delay", 0))
response = requests.get(url, headers=config["headers"], params=params, timeout=10)
response.raise_for_status()
data_response = response.json()
results = data_response.get(config["result_key"], [])
if not results:
logger.warning(f"No {config['result_key']} in response for {api_name}, page {page}: {data_response}")
continue
page_urls = []
for item in results:
if len(image_urls) >= num_images:
break
image_url = item
for key in config["image_key"]:
image_url = image_url.get(key) if isinstance(image_url, dict) else None
if image_url is None:
break
if image_url:
page_urls.append(image_url)
else:
logger.warning(f"No image URL found in item for {api_name}: {item}")
if page_urls:
image_urls.extend(page_urls)
logger.info(f"Successfully fetched {len(page_urls)} images from page {page} for {api_name}")
else:
logger.warning(f"No valid URLs extracted from page {page} for {api_name}")
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching page {page} from {api_name}: {e}")
logger.error(f"Response: {response.text}")
if response.status_code == 401:
logger.error(f"401 Unauthorized for {api_name}. Replace API key.")
elif response.status_code == 429:
logger.error(f"429 Rate Limit Exceeded for {api_name}. Increase delay or wait.")
break
logger.info(f"Total URLs fetched for {api_name}: {len(image_urls)}")
return image_urls[:num_images]
def download_images(image_urls):
if not image_urls:
logger.warning("No image URLs provided to download")
return 0, []
if os.path.exists(IMAGES_DIR):
shutil.rmtree(IMAGES_DIR)
os.makedirs(IMAGES_DIR, exist_ok=True)
downloaded_count = 0
image_paths = []
for idx, url in enumerate(image_urls, 1):
try:
response = requests.get(url, stream=True, timeout=10)
response.raise_for_status()
image_path = os.path.join(IMAGES_DIR, f"img{idx}.jpg")
with open(image_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
Image.open(image_path).verify()
downloaded_count += 1
image_paths.append(image_path)
logger.info(f"Downloaded {idx}/{len(image_urls)}: {url}")
except requests.exceptions.RequestException as e:
logger.error(f"Error downloading {url}: {e}")
except Exception as e:
logger.error(f"Invalid image or error saving {url}: {e}")
logger.info(f"Total images downloaded: {downloaded_count}/{len(image_urls)}")
return downloaded_count, image_paths
def create_zip_file(selected_image_paths):
if os.path.exists(ZIP_FILE):
os.remove(ZIP_FILE)
with zipfile.ZipFile(ZIP_FILE, "w", zipfile.ZIP_DEFLATED) as zipf:
for image_path in selected_image_paths:
arcname = os.path.relpath(image_path, OUTPUT_DIR)
zipf.write(image_path, arcname)
return ZIP_FILE
def process_and_display(api_name, category, num_images):
num_images = int(num_images)
if num_images > TOTAL_IMAGES:
num_images = TOTAL_IMAGES
logger.info(f"Starting process for {api_name} with category '{category}' and {num_images} images")
image_urls = fetch_image_urls(api_name, category, num_images)
if not image_urls:
logger.warning(f"No images fetched from {api_name}")
return "No images available or API limit reached. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES
logger.info(f"Proceeding to download {len(image_urls)} images from {api_name}")
downloaded_count, image_paths = download_images(image_urls)
if downloaded_count == 0:
logger.warning(f"No images downloaded from {api_name}")
return "No images were successfully downloaded. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES
status = f"Successfully downloaded {downloaded_count}/{num_images} images from {api_name}. Select images to include in ZIP below."
image_outputs = [image_paths[i] if i < len(image_paths) else None for i in range(TOTAL_IMAGES)]
checkbox_outputs = [True if i < len(image_paths) else False for i in range(TOTAL_IMAGES)]
logger.info(f"Process completed for {api_name}: {downloaded_count} images prepared for display")
return status, None, image_paths, image_outputs, checkbox_outputs
def process_zip_submission(image_paths, *checkbox_states):
if not image_paths:
return "No images available to process.", None
selected_image_paths = [image_paths[i] for i, state in enumerate(checkbox_states) if state]
if not selected_image_paths:
return "No images selected for ZIP.", None
zip_path = create_zip_file(selected_image_paths)
logger.info(f"ZIP file created with {len(selected_image_paths)} images at {zip_path}")
return f"ZIP file created with {len(selected_image_paths)} images at {zip_path}", zip_path
# Gradio Interface
css = """
.fetch-button { background-color: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; }
.fetch-button:hover { background-color: #45a049; }
.zip-button { background-color: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; }
.zip-button:hover { background-color: #1e88e5; }
.status-box { border: 1px solid #ddd; background-color: #f9f9f9; padding: 10px; border-radius: 5px; }
.input-group { border: 1px solid #ddd; padding: 15px; border-radius: 5px; background-color: #f0f0f0; }
.image-container { position: relative; width: 100%; height: 150px; overflow: hidden; border-radius: 5px; }
.image-container img { width: 100%; height: 100%; object-fit: cover; }
.overlay { position: absolute; bottom: 5px; right: 5px; background-color: rgba(0, 0, 0, 0.6); padding: 5px; border-radius: 5px; display: flex; align-items: center; gap: 5px; color: white; font-size: 12px; }
.overlay label { margin: 0; color: white; }
.overlay input[type="checkbox"] { margin: 0; }
"""
with gr.Blocks(title="Stock Photo Downloader", css=css) as demo:
gr.Markdown("<h1 style='text-align: center; color: #333;'>📸 Stock Photo Downloader</h1>")
gr.Markdown("<p style='text-align: center; color: #666;'>Fetch high-quality stock photos from Pexels, Unsplash, and Pixabay.</p>")
with gr.Group(elem_classes=["input-group"]):
gr.Markdown("### 🔍 Choose Your Parameters")
with gr.Row():
api_input = gr.Dropdown(label="API Source", choices=["pexels", "unsplash", "pixabay"], value="pexels", info="Select the stock photo provider.")
category_input = gr.Dropdown(label="Category", choices=["nature", "business", "people", "technology", "food", "travel", "animals", "fashion"], value="nature", allow_custom_value=True, info="Choose a category or enter a custom keyword.")
num_images_input = gr.Dropdown(label="Number of Images (Max 24)", choices=["4", "8", "12", "16", "20", "24"], value="4", info="How many images to fetch (up to 24).")
download_button = gr.Button("Fetch and Display Images", elem_classes=["fetch-button"])
# Combine Status and Download sections in a single row with two columns
with gr.Row():
with gr.Column():
gr.Markdown("### 📊 Status")
status_output = gr.Textbox(
label="Status",
interactive=False,
placeholder="Status updates will appear here...",
elem_classes=["status-box"],
show_label=False
)
with gr.Column():
gr.Markdown("### 💾 Download Your Images")
zip_output = gr.File(label="Download ZIP", visible=False)
gr.Markdown("### 🖼️ Image Gallery")
gr.Markdown("<p style='color: #666;'>Select images to include in your ZIP file.</p>")
image_paths_state = gr.State()
# Fixed grid layout with dynamically visible rows
image_outputs = []
checkbox_outputs = []
gallery_rows = []
for row in range(MAX_ROWS):
with gr.Row(visible=False) as row_component:
gallery_rows.append(row_component)
for col in range(IMAGES_PER_ROW):
idx = row * IMAGES_PER_ROW + col
with gr.Column(min_width=150):
with gr.Group(elem_classes=["image-container"]):
image_output = gr.Image(
label=f"Image {idx+1}",
show_label=False,
visible=False,
height=150,
width=150
)
with gr.Row(elem_classes=["overlay"]):
checkbox_output = gr.Checkbox(
label=f"Image {idx+1}",
value=False,
visible=False,
scale=0
)
image_outputs.append(image_output)
checkbox_outputs.append(checkbox_output)
gr.Markdown("### 📦 Create ZIP File")
submit_button = gr.Button("Create ZIP of Selected Images", elem_classes=["zip-button"])
def on_download(api_name, category, num_images):
status, zip_path, image_paths, image_outs, checkbox_outs = process_and_display(api_name, category, num_images)
num_downloaded = len(image_paths)
# Update visibility for images and checkboxes
image_updates = [
gr.Image(value=img, visible=img is not None, label=f"Image {i+1}", height=150, width=150)
for i, img in enumerate(image_outs)
]
checkbox_updates = [
gr.Checkbox(value=chk, visible=i < num_downloaded, label=f"Image {i+1}", scale=0)
for i, chk in enumerate(checkbox_outs)
]
# Update row visibility: show a row only if it contains at least one visible image
row_updates = []
for row_idx in range(MAX_ROWS):
start_idx = row_idx * IMAGES_PER_ROW
end_idx = start_idx + IMAGES_PER_ROW
row_has_images = any(image_outs[i] is not None for i in range(start_idx, min(end_idx, len(image_outs))))
row_updates.append(gr.Row(visible=row_has_images))
return (
status, zip_path, image_paths,
*image_updates,
*checkbox_updates,
*row_updates
)
def on_submit(image_paths, *checkbox_states):
status, zip_path = process_zip_submission(image_paths, *checkbox_states)
return status, gr.File(value=zip_path, visible=True) if zip_path else gr.File(visible=False)
download_button.click(
fn=on_download,
inputs=[api_input, category_input, num_images_input],
outputs=[status_output, zip_output, image_paths_state] + image_outputs + checkbox_outputs + gallery_rows
)
submit_button.click(
fn=on_submit,
inputs=[image_paths_state] + checkbox_outputs,
outputs=[status_output, zip_output]
)
demo.launch()