Spaces:
Sleeping
Sleeping
import gradio as gr | |
from gradio_bbox_annotator import BBoxAnnotator | |
import json | |
import os | |
from pathlib import Path | |
from PIL import Image | |
from io import BytesIO | |
import tempfile | |
import shutil | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Define categories and their limits | |
CATEGORY_LIMITS = { | |
"advertisement": 1, # Maximum 1 advertisement annotation per image | |
"text": 2 # Maximum 2 text annotations per image | |
} | |
CATEGORIES = list(CATEGORY_LIMITS.keys()) | |
MAX_SIZE = [1024, 1024] # Maximum width and height for resized images | |
class ImageProcessor: | |
def __init__(self): | |
# Create a persistent directory for resized images | |
self.base_dir = os.path.join(tempfile.gettempdir(), "annotation_tool") | |
self.resized_dir = os.path.join(self.base_dir, "resized_images") | |
self._setup_directories() | |
logger.info(f"Initialized ImageProcessor with directory: {self.base_dir}") | |
def _setup_directories(self): | |
"""Create necessary directories if they don't exist""" | |
os.makedirs(self.resized_dir, exist_ok=True) | |
logger.info(f"Set up directories: {self.resized_dir}") | |
def resize_image(self, image_path): | |
"""Resize image and save to persistent directory""" | |
try: | |
logger.info(f"Processing image: {image_path}") | |
# Read original image | |
with open(image_path, "rb") as f: | |
img = Image.open(BytesIO(f.read())) | |
img.thumbnail(MAX_SIZE, Image.Resampling.LANCZOS) | |
# Create a unique filename for the resized image | |
original_filename = os.path.basename(image_path) | |
resized_filename = f"resized_{original_filename}" | |
resized_path = os.path.join(self.resized_dir, resized_filename) | |
# Save resized image | |
img.save(resized_path) | |
logger.info(f"Saved resized image to: {resized_path}") | |
return resized_path | |
except Exception as e: | |
logger.error(f"Error processing image: {str(e)}") | |
raise | |
class AnnotationManager: | |
def __init__(self): | |
self.annotations = {} | |
self.image_processor = ImageProcessor() | |
def validate_annotations(self, bbox_data): | |
"""Validate the annotation data and return (is_valid, error_message)""" | |
if not bbox_data or not isinstance(bbox_data, tuple): | |
return False, "No image or annotations provided" | |
image_path, annotations = bbox_data | |
if not isinstance(image_path, str): | |
return False, "Invalid image format" | |
if not annotations: | |
return False, "No annotations drawn" | |
# Count annotations per category | |
category_counts = {cat: 0 for cat in CATEGORIES} | |
for ann in annotations: | |
if len(ann) != 5: | |
return False, "Invalid annotation format" | |
y1, y2, x1, x2, label = ann | |
# Validate coordinates | |
if any(not isinstance(coord, (int, float)) for coord in [y1, y2, x1, x2]): | |
return False, "Invalid coordinate values" | |
# Validate label | |
if not label or label not in CATEGORIES: | |
return False, f"Invalid or missing label. Must be one of: {', '.join(CATEGORIES)}" | |
# Count this annotation | |
category_counts[label] += 1 | |
# Check category limits | |
for category, count in category_counts.items(): | |
limit = CATEGORY_LIMITS[category] | |
if count > limit: | |
return False, f"Too many {category} annotations. Maximum allowed: {limit}" | |
return True, "" | |
def process_upload(self, image_path): | |
"""Process uploaded image""" | |
if not isinstance(image_path, (str, bytes, os.PathLike)): | |
logger.warning(f"Invalid image path type: {type(image_path)}") | |
return None | |
try: | |
logger.info(f"Processing upload: {image_path}") | |
resized_path = self.image_processor.resize_image(image_path) | |
logger.info(f"Successfully processed upload: {resized_path}") | |
return resized_path | |
except Exception as e: | |
logger.error(f"Error in process_upload: {str(e)}") | |
return None | |
def add_annotation(self, bbox_data): | |
"""Add or update annotations for an image""" | |
is_valid, error_msg = self.validate_annotations(bbox_data) | |
if not is_valid: | |
return self.get_json_annotations(), f"β Error: {error_msg}" | |
image_path, annotations = bbox_data | |
# Use original filename (remove 'resized_' prefix) | |
filename = os.path.basename(image_path) | |
if filename.startswith("resized_"): | |
filename = filename[8:] | |
formatted_annotations = [] | |
for ann in annotations: | |
y1, y2, x1, x2, label = ann | |
formatted_annotations.append({ | |
"annotation": [y1, y2, x1, x2], | |
"label": label | |
}) | |
self.annotations[filename] = formatted_annotations | |
# Count annotations by type | |
counts = {cat: sum(1 for ann in annotations if ann[4] == cat) for cat in CATEGORIES} | |
counts_str = ", ".join(f"{count} {cat}" for cat, count in counts.items()) | |
success_msg = f"β Successfully saved for {filename}: {counts_str}" | |
return self.get_json_annotations(), success_msg | |
def get_json_annotations(self): | |
"""Get all annotations as formatted JSON string""" | |
return json.dumps(self.annotations, indent=2) | |
def clear_annotations(self): | |
"""Clear all annotations""" | |
self.annotations = {} | |
return "", "ποΈ All annotations cleared" | |
def create_interface(): | |
annotation_mgr = AnnotationManager() | |
with gr.Blocks() as demo: | |
gr.Markdown(f""" | |
# Advertisement and Text Annotation Tool | |
**Instructions:** | |
1. Upload an image (will be automatically resized to max {MAX_SIZE[0]}x{MAX_SIZE[1]}) | |
2. Draw bounding boxes and select the appropriate label | |
3. Click 'Save Annotations' to add to the collection | |
4. Repeat for all images | |
5. Copy the combined JSON when finished | |
**Annotation Limits per Image:** | |
- advertisement: Maximum 1 annotation | |
- text: Maximum 2 annotations | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
bbox_input = BBoxAnnotator( | |
show_label=True, | |
label="Draw Bounding Boxes", | |
show_download_button=True, | |
interactive=True, | |
categories=CATEGORIES | |
) | |
with gr.Column(scale=1): | |
json_output = gr.TextArea( | |
label="Combined Annotations JSON", | |
interactive=True, | |
lines=15, | |
show_copy_button=True | |
) | |
with gr.Row(): | |
save_btn = gr.Button("Save Current Image Annotations", variant="primary") | |
clear_btn = gr.Button("Clear All Annotations", variant="secondary") | |
# Add status message | |
status_msg = gr.Markdown(label="Status") | |
# Event handlers | |
def handle_image_upload(bbox_data): | |
try: | |
if not bbox_data or not isinstance(bbox_data, tuple): | |
return None, "No image uploaded" | |
image_path, annotations = bbox_data | |
if not image_path: | |
return None, "No image path provided" | |
logger.info(f"Handling upload for: {image_path}") | |
resized_path = annotation_mgr.process_upload(image_path) | |
if resized_path and os.path.exists(resized_path): | |
logger.info(f"Processed image path: {resized_path}") | |
# Return the resized path and keep any existing annotations | |
return (resized_path, annotations) | |
else: | |
error_msg = "Failed to process image" | |
logger.error(error_msg) | |
return None, error_msg | |
except Exception as e: | |
error_msg = f"Error in upload handler: {str(e)}" | |
logger.error(error_msg) | |
return None, error_msg | |
# Handle image upload and resizing | |
bbox_input.upload( | |
fn=handle_image_upload, | |
inputs=[bbox_input], | |
outputs=[bbox_input] | |
) | |
save_btn.click( | |
fn=annotation_mgr.add_annotation, | |
inputs=[bbox_input], | |
outputs=[json_output, status_msg] | |
) | |
clear_btn.click( | |
fn=annotation_mgr.clear_annotations, | |
inputs=[], | |
outputs=[json_output, status_msg] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch() |