Spaces:
Sleeping
Sleeping
import os | |
import requests | |
import zipfile | |
import gradio as gr | |
import shutil | |
from PIL import Image | |
import random | |
import time | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configuration | |
OUTPUT_DIR = "downloaded_images" | |
IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") | |
ZIP_FILE = os.path.join(OUTPUT_DIR, "images.zip") | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
# Constants | |
ITEMS_PER_PAGE = 40 | |
DAILY_IMAGE_LIMIT = 2000 | |
MAX_PAGES = min(DAILY_IMAGE_LIMIT // ITEMS_PER_PAGE, 10) | |
IMAGES_PER_ROW = 4 | |
MAX_ROWS = 6 | |
TOTAL_IMAGES = IMAGES_PER_ROW * MAX_ROWS | |
# API Configurations (Your original keys) | |
API_CONFIGS = { | |
"pexels": { | |
"base_url": "https://api.pexels.com/v1/search", | |
"headers": {"Authorization": "klHADHclpse2e2xSP9h747AgfE1Rx0wioemGhXYtedjZzvJ1WBUKwz7g"}, | |
"image_key": ["src", "medium"], | |
"result_key": "photos", | |
"delay": 2 | |
}, | |
"unsplash": { | |
"base_url": "https://api.unsplash.com/search/photos", | |
"headers": {}, | |
"image_key": ["urls", "small"], | |
"result_key": "results", | |
"delay": 2, | |
"client_id": "UKkhpD_Rs5-s1gIlVX28iNs_8E4ysPhQniyIpDpKUnU" | |
}, | |
"pixabay": { | |
"base_url": "https://pixabay.com/api/", | |
"headers": {}, | |
"image_key": ["webformatURL"], | |
"result_key": "hits", | |
"delay": 1, | |
"api_key": "45122300-cd3621e1539e8e95430ee3efc" | |
} | |
} | |
def fetch_image_urls(api_name, category, num_images): | |
config = API_CONFIGS[api_name] | |
num_pages_needed = (num_images + ITEMS_PER_PAGE - 1) // ITEMS_PER_PAGE | |
# Select random pages to fetch from (no tracking) | |
all_pages = list(range(1, MAX_PAGES + 1)) | |
if len(all_pages) < num_pages_needed: | |
logger.warning(f"Insufficient pages available: {len(all_pages)} < {num_pages_needed}") | |
return [] | |
selected_pages = random.sample(all_pages, num_pages_needed) | |
logger.info(f"Selected pages for {api_name}: {selected_pages}") | |
image_urls = [] | |
for page in selected_pages: | |
if api_name == "pixabay": | |
url = f"{config['base_url']}?key={config['api_key']}&q={category.lower()}&per_page={ITEMS_PER_PAGE}&page={page}" | |
params = {} | |
elif api_name == "unsplash": | |
url = config["base_url"] | |
params = { | |
"query": category.lower(), | |
"per_page": ITEMS_PER_PAGE, | |
"page": page, | |
"client_id": config["client_id"] | |
} | |
else: # pexels | |
url = config["base_url"] | |
params = { | |
"query": category.lower(), | |
"per_page": ITEMS_PER_PAGE, | |
"page": page | |
} | |
try: | |
logger.info(f"Requesting {url} with params {params} for {api_name}") | |
time.sleep(config.get("delay", 0)) | |
response = requests.get(url, headers=config["headers"], params=params, timeout=10) | |
response.raise_for_status() | |
data_response = response.json() | |
results = data_response.get(config["result_key"], []) | |
if not results: | |
logger.warning(f"No {config['result_key']} in response for {api_name}, page {page}: {data_response}") | |
continue | |
page_urls = [] | |
for item in results: | |
if len(image_urls) >= num_images: | |
break | |
image_url = item | |
for key in config["image_key"]: | |
image_url = image_url.get(key) if isinstance(image_url, dict) else None | |
if image_url is None: | |
break | |
if image_url: | |
page_urls.append(image_url) | |
else: | |
logger.warning(f"No image URL found in item for {api_name}: {item}") | |
if page_urls: | |
image_urls.extend(page_urls) | |
logger.info(f"Successfully fetched {len(page_urls)} images from page {page} for {api_name}") | |
else: | |
logger.warning(f"No valid URLs extracted from page {page} for {api_name}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error fetching page {page} from {api_name}: {e}") | |
logger.error(f"Response: {response.text}") | |
if response.status_code == 401: | |
logger.error(f"401 Unauthorized for {api_name}. Replace API key.") | |
elif response.status_code == 429: | |
logger.error(f"429 Rate Limit Exceeded for {api_name}. Increase delay or wait.") | |
break | |
logger.info(f"Total URLs fetched for {api_name}: {len(image_urls)}") | |
return image_urls[:num_images] | |
def download_images(image_urls): | |
if not image_urls: | |
logger.warning("No image URLs provided to download") | |
return 0, [] | |
if os.path.exists(IMAGES_DIR): | |
shutil.rmtree(IMAGES_DIR) | |
os.makedirs(IMAGES_DIR, exist_ok=True) | |
downloaded_count = 0 | |
image_paths = [] | |
for idx, url in enumerate(image_urls, 1): | |
try: | |
response = requests.get(url, stream=True, timeout=10) | |
response.raise_for_status() | |
image_path = os.path.join(IMAGES_DIR, f"img{idx}.jpg") | |
with open(image_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
Image.open(image_path).verify() | |
downloaded_count += 1 | |
image_paths.append(image_path) | |
logger.info(f"Downloaded {idx}/{len(image_urls)}: {url}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error downloading {url}: {e}") | |
except Exception as e: | |
logger.error(f"Invalid image or error saving {url}: {e}") | |
logger.info(f"Total images downloaded: {downloaded_count}/{len(image_urls)}") | |
return downloaded_count, image_paths | |
def create_zip_file(selected_image_paths): | |
if os.path.exists(ZIP_FILE): | |
os.remove(ZIP_FILE) | |
with zipfile.ZipFile(ZIP_FILE, "w", zipfile.ZIP_DEFLATED) as zipf: | |
for image_path in selected_image_paths: | |
arcname = os.path.relpath(image_path, OUTPUT_DIR) | |
zipf.write(image_path, arcname) | |
return ZIP_FILE | |
def process_and_display(api_name, category, num_images): | |
num_images = int(num_images) | |
if num_images > TOTAL_IMAGES: | |
num_images = TOTAL_IMAGES | |
logger.info(f"Starting process for {api_name} with category '{category}' and {num_images} images") | |
image_urls = fetch_image_urls(api_name, category, num_images) | |
if not image_urls: | |
logger.warning(f"No images fetched from {api_name}") | |
return "No images available or API limit reached. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES | |
logger.info(f"Proceeding to download {len(image_urls)} images from {api_name}") | |
downloaded_count, image_paths = download_images(image_urls) | |
if downloaded_count == 0: | |
logger.warning(f"No images downloaded from {api_name}") | |
return "No images were successfully downloaded. Check logs for details.", None, [], [None] * TOTAL_IMAGES, [False] * TOTAL_IMAGES | |
status = f"Successfully downloaded {downloaded_count}/{num_images} images from {api_name}. Select images to include in ZIP below." | |
image_outputs = [image_paths[i] if i < len(image_paths) else None for i in range(TOTAL_IMAGES)] | |
checkbox_outputs = [True if i < len(image_paths) else False for i in range(TOTAL_IMAGES)] | |
logger.info(f"Process completed for {api_name}: {downloaded_count} images prepared for display") | |
return status, None, image_paths, image_outputs, checkbox_outputs | |
def process_zip_submission(image_paths, *checkbox_states): | |
if not image_paths: | |
return "No images available to process.", None | |
selected_image_paths = [image_paths[i] for i, state in enumerate(checkbox_states) if state] | |
if not selected_image_paths: | |
return "No images selected for ZIP.", None | |
zip_path = create_zip_file(selected_image_paths) | |
logger.info(f"ZIP file created with {len(selected_image_paths)} images at {zip_path}") | |
return f"ZIP file created with {len(selected_image_paths)} images at {zip_path}", zip_path | |
# Gradio Interface | |
css = """ | |
.fetch-button { background-color: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; } | |
.fetch-button:hover { background-color: #45a049; } | |
.zip-button { background-color: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; } | |
.zip-button:hover { background-color: #1e88e5; } | |
.status-box { border: 1px solid #ddd; background-color: #f9f9f9; padding: 10px; border-radius: 5px; } | |
.input-group { border: 1px solid #ddd; padding: 15px; border-radius: 5px; background-color: #f0f0f0; } | |
.image-container { position: relative; width: 100%; height: 150px; overflow: hidden; border-radius: 5px; } | |
.image-container img { width: 100%; height: 100%; object-fit: cover; } | |
.overlay { position: absolute; bottom: 5px; right: 5px; background-color: rgba(0, 0, 0, 0.6); padding: 5px; border-radius: 5px; display: flex; align-items: center; gap: 5px; color: white; font-size: 12px; } | |
.overlay label { margin: 0; color: white; } | |
.overlay input[type="checkbox"] { margin: 0; } | |
""" | |
with gr.Blocks(title="Stock Photo Downloader", css=css) as demo: | |
gr.Markdown("<h1 style='text-align: center; color: #333;'>📸 Stock Photo Downloader</h1>") | |
gr.Markdown("<p style='text-align: center; color: #666;'>Fetch high-quality stock photos from Pexels, Unsplash, and Pixabay.</p>") | |
with gr.Group(elem_classes=["input-group"]): | |
gr.Markdown("### 🔍 Choose Your Parameters") | |
with gr.Row(): | |
api_input = gr.Dropdown(label="API Source", choices=["pexels", "unsplash", "pixabay"], value="pexels", info="Select the stock photo provider.") | |
category_input = gr.Dropdown(label="Category", choices=["nature", "business", "people", "technology", "food", "travel", "animals", "fashion"], value="nature", allow_custom_value=True, info="Choose a category or enter a custom keyword.") | |
num_images_input = gr.Dropdown(label="Number of Images (Max 24)", choices=["4", "8", "12", "16", "20", "24"], value="4", info="How many images to fetch (up to 24).") | |
download_button = gr.Button("Fetch and Display Images", elem_classes=["fetch-button"]) | |
# Combine Status and Download sections in a single row with two columns | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### 📊 Status") | |
status_output = gr.Textbox( | |
label="Status", | |
interactive=False, | |
placeholder="Status updates will appear here...", | |
elem_classes=["status-box"], | |
show_label=False | |
) | |
with gr.Column(): | |
gr.Markdown("### 💾 Download Your Images") | |
zip_output = gr.File(label="Download ZIP", visible=False) | |
gr.Markdown("### 🖼️ Image Gallery") | |
gr.Markdown("<p style='color: #666;'>Select images to include in your ZIP file.</p>") | |
image_paths_state = gr.State() | |
# Fixed grid layout with dynamically visible rows | |
image_outputs = [] | |
checkbox_outputs = [] | |
gallery_rows = [] | |
for row in range(MAX_ROWS): | |
with gr.Row(visible=False) as row_component: | |
gallery_rows.append(row_component) | |
for col in range(IMAGES_PER_ROW): | |
idx = row * IMAGES_PER_ROW + col | |
with gr.Column(min_width=150): | |
with gr.Group(elem_classes=["image-container"]): | |
image_output = gr.Image( | |
label=f"Image {idx+1}", | |
show_label=False, | |
visible=False, | |
height=150, | |
width=150 | |
) | |
with gr.Row(elem_classes=["overlay"]): | |
checkbox_output = gr.Checkbox( | |
label=f"Image {idx+1}", | |
value=False, | |
visible=False, | |
scale=0 | |
) | |
image_outputs.append(image_output) | |
checkbox_outputs.append(checkbox_output) | |
gr.Markdown("### 📦 Create ZIP File") | |
submit_button = gr.Button("Create ZIP of Selected Images", elem_classes=["zip-button"]) | |
def on_download(api_name, category, num_images): | |
status, zip_path, image_paths, image_outs, checkbox_outs = process_and_display(api_name, category, num_images) | |
num_downloaded = len(image_paths) | |
# Update visibility for images and checkboxes | |
image_updates = [ | |
gr.Image(value=img, visible=img is not None, label=f"Image {i+1}", height=150, width=150) | |
for i, img in enumerate(image_outs) | |
] | |
checkbox_updates = [ | |
gr.Checkbox(value=chk, visible=i < num_downloaded, label=f"Image {i+1}", scale=0) | |
for i, chk in enumerate(checkbox_outs) | |
] | |
# Update row visibility: show a row only if it contains at least one visible image | |
row_updates = [] | |
for row_idx in range(MAX_ROWS): | |
start_idx = row_idx * IMAGES_PER_ROW | |
end_idx = start_idx + IMAGES_PER_ROW | |
row_has_images = any(image_outs[i] is not None for i in range(start_idx, min(end_idx, len(image_outs)))) | |
row_updates.append(gr.Row(visible=row_has_images)) | |
return ( | |
status, zip_path, image_paths, | |
*image_updates, | |
*checkbox_updates, | |
*row_updates | |
) | |
def on_submit(image_paths, *checkbox_states): | |
status, zip_path = process_zip_submission(image_paths, *checkbox_states) | |
return status, gr.File(value=zip_path, visible=True) if zip_path else gr.File(visible=False) | |
download_button.click( | |
fn=on_download, | |
inputs=[api_input, category_input, num_images_input], | |
outputs=[status_output, zip_output, image_paths_state] + image_outputs + checkbox_outputs + gallery_rows | |
) | |
submit_button.click( | |
fn=on_submit, | |
inputs=[image_paths_state] + checkbox_outputs, | |
outputs=[status_output, zip_output] | |
) | |
demo.launch() |