SoundingStreet / SoundMapper.py
FQiao's picture
Upload 70 files
3324de2 verified
from DepthEstimator import DepthEstimator
import numpy as np
from PIL import Image
import os
from GenerateCaptions import generate_caption
import re
from config import LOGS_DIR
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
import torch
from PIL import Image, ImageDraw, ImageFont
import spacy
import gc
class SoundMapper:
def __init__(self):
self.depth_estimator = DepthEstimator()
# List of depth maps in dict["predicted_depth" ,"depth"] in (tensor, PIL.Image) format
self.device = "cuda"
# self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir)
self.map_list = None
self.image_dir = self.depth_estimator.image_dir
# self.nlp = spacy.load("en_core_web_sm")
self.nlp = None
self.dino = None
self.dino_processor = None
# self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny").to(self.device)
# self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny")
def _load_nlp(self):
if self.nlp is None:
self.nlp = spacy.load("en_core_web_sm")
return self.nlp
def _load_depth_maps(self):
if self.map_list is None:
self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir)
return self.map_list
def process_depth_maps(self) -> list:
depth_maps = self._load_depth_maps()
processed_maps = []
for item in depth_maps:
depth_map = item["depth"]
depth_array = np.array(depth_map)
normalization = depth_array / 255.0
processed_maps.append({
"original": depth_map,
"normalization": normalization
})
return processed_maps
# def create_depth_zone(self, processed_maps : list, num_zones = 3):
# zones_data = []
# for depth_data in processed_maps:
# normalized = depth_data["normalization"]
# thresholds = np.linspace(0, 1, num_zones+1)
# zones = []
# for i in range(num_zones):
# zone_mask = (normalized >= thresholds[i]) & (normalized < thresholds[i+1])
# zone_percentage = zone_mask.sum() / zone_mask.size
# zones.append({
# "range": (thresholds[i], thresholds[i+1]),
# "percentage": zone_percentage,
# "mask": zone_mask
# })
# zones_data.append(zones)
# return zones_data
def detect_sound_sources(self, caption_text: str) -> dict:
"""
Extract nouns and their sound descriptions from caption text.
Returns a dictionary mapping nouns to their descriptions.
"""
sound_sources = {}
nlp = self._load_nlp()
print(f"\n[DEBUG] Beginning sound source detection")
print(f"Raw caption text length: {len(caption_text)}")
print(f"First 100 chars: {caption_text[:100]}...")
# Split the caption by newlines to separate entries
lines = caption_text.strip().split('\n')
print(f"Found {len(lines)} lines after splitting")
for i, line in enumerate(lines):
# Skip empty lines
if not line.strip():
continue
print(f"Processing line {i}: {line[:50]}{'...' if len(line) > 50 else ''}")
# Check if line matches the expected format (Noun: description)
if ':' in line:
parts = line.split(':', 1) # Split only on the first colon
# Clean up the noun part - remove numbers and leading/trailing whitespace
noun_part = parts[0].strip().lower()
# Remove list numbering (e.g., "1. ", "2. ", etc.)
noun_part = re.sub(r'^\d+\.\s*', '', noun_part)
description = parts[1].strip()
# Clean any markdown formatting
noun = re.sub(r'[*()]', '', noun_part).strip()
description = re.sub(r'[*()]', '', description).strip()
# Separate the description at em dash if present
if ' — ' in description:
description = description.split(' — ', 1)[0].strip()
elif ' - ' in description:
description = description.split(' - ', 1)[0].strip()
print(f" - Found potential noun: '{noun}' with description: '{description[:30]}...'")
# Skip if noun contains invalid characters or is too short
if '##' not in noun and len(noun) > 1 and noun[0].isalpha():
sound_sources[noun] = description
print(f" √ Added to sound sources")
else:
print(f" × Skipped (invalid format)")
# If no structured format found, try to extract nouns from the text
if not sound_sources:
print("No structured format found, falling back to noun extraction")
all_nouns = []
doc = nlp(caption_text)
for token in doc:
if token.pos_ == "NOUN" and len(token.text) > 1:
if token.text[0].isalpha():
all_nouns.append(token.text.lower())
print(f" - Extracted noun: '{token.text.lower()}'")
for noun in all_nouns:
sound_sources[noun] = "" # Empty description
print(f"[DEBUG] Final detected sound sources: {list(sound_sources.keys())}")
return sound_sources
def map_bbox_to_depth_zone(self, bbox, depth_map, num_zones=3):
x1, y1, x2, y2 = [int(coord) for coord in bbox]
height, width = depth_map.shape
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(width, x2), min(height, y2)
depth_roi = depth_map[y1:y2, x1:x2]
if depth_roi.size == 0:
return num_zones - 1
mean_depth = np.mean(depth_roi)
thresholds = self.create_histogram_depth_zones(depth_map, num_zones)
for i in range(num_zones):
if thresholds[i] <= mean_depth < thresholds[i+1]:
return i
return num_zones - 1
def detect_objects(self, nouns : list, image: Image):
filtered_nouns = []
for noun in nouns:
if '##' not in noun and len(noun) > 1 and noun[0].isalpha():
filtered_nouns.append(noun)
print(f"Detecting objects for nouns: {filtered_nouns}")
if self.dino is None:
self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(self.device)
self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base")
else:
self.dino = self.dino.to(self.device)
text_prompt = " . ".join(filtered_nouns)
inputs = self.dino_processor(images=image, text=text_prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.dino(**inputs)
results = self.dino_processor.post_process_grounded_object_detection(
outputs,
inputs.input_ids,
box_threshold=0.25,
text_threshold=0.25,
target_sizes=[image.size[::-1]]
)
result = results[0]
labels = result["labels"]
bboxes = result["boxes"]
clean_labels = []
for label in labels:
clean_label = re.sub(r'##\w+', '', label)
clean_label = self._split_combined_words(clean_label, filtered_nouns)
clean_labels.append(clean_label)
self.dino = self.dino.to("cpu")
torch.cuda.empty_cache()
del inputs, outputs, results
print(f"Detected objects: {clean_labels}")
return (clean_labels, bboxes)
def _split_combined_words(self, text, nouns=None):
nlp = self._load_nlp()
if nouns is None:
known_words = set()
doc = nlp(text)
for token in doc:
if token.pos_ == "NOUN" and len(token.text) > 1:
known_words.add(token.text.lower())
else:
known_words = set(nouns)
result = []
for word in text.split():
if word in known_words:
result.append(word)
continue
found = False
for known in known_words:
if known in word and len(known) > 2:
result.append(known)
found = True
if not found:
result.append(word)
return " ".join(result)
def process_dino_labels(self, labels):
processed_labels = []
nlp = self._load_nlp()
for label in labels:
if label.startswith('##'):
continue
label = re.sub(r'[*()]', '', label).strip()
parts = label.split()
for part in parts:
if part.startswith('##'):
continue
doc = nlp(part)
for token in doc:
if token.pos_ == "NOUN" and len(token.text) > 1:
processed_labels.append(token.text.lower())
unique_labels = []
for label in processed_labels:
if label not in unique_labels:
unique_labels.append(label)
return unique_labels
def create_histogram_depth_zones(self, depth_map, num_zones = 3):
# using 50 bins because it is faster
hist, bin_edge = np.histogram(depth_map.flatten(), bins=50, range=(0, 1))
cumulative = np.cumsum(hist) / np.sum(hist)
thresholds = [0.0]
for i in range(1, num_zones):
target = i / num_zones
idx = np.argmin(np.abs(cumulative - target))
thresholds.append(bin_edge[idx + 1])
thresholds.append(1.0)
return thresholds
def analyze_object_depths(self, image_path, depth_map, lat, lon, caption_data=None, all_objects=False):
image = Image.open(image_path)
if caption_data is None:
caption = generate_caption(lat, lon)
if not caption:
print(f"Failed to generate caption for {image_path}")
return []
caption_text = caption.get("sound_description", "")
else:
caption_text = caption_data.get("sound_description", "")
# Debug: Print the raw caption text
print(f"\n[DEBUG] Raw caption text for {os.path.basename(image_path)}:")
print(caption_text)
print("-" * 50)
if not caption_text:
print(f"No caption text available for {image_path}")
return []
# Extract nouns and their sound descriptions
sound_sources = self.detect_sound_sources(caption_text)
# Debug: Print the extracted sound sources
print(f"[DEBUG] Extracted sound sources:")
for noun, desc in sound_sources.items():
print(f" - {noun}: {desc}")
print("-" * 50)
if not sound_sources:
print(f"No sound sources detected in caption for {image_path}")
return []
# Get list of nouns only for object detection
nouns = list(sound_sources.keys())
# Debug: Print the list of nouns being used for detection
print(f"[DEBUG] Nouns for object detection: {nouns}")
print("-" * 50)
labels, bboxes = self.detect_objects(nouns, image)
if len(labels) == 0 or len(bboxes) == 0:
print(f"No objects detected in {image_path}")
return []
object_data = []
known_objects = set(nouns) if nouns else set()
for i, (label, bbox) in enumerate(zip(labels, bboxes)):
if '##' in label:
continue
x1, y1, x2, y2 = [int(coord) for coord in bbox]
height, width = depth_map.shape
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(width, x2), min(height, y2)
depth_roi = depth_map[y1:y2, x1:x2]
if depth_roi.size == 0:
continue
mean_depth = np.mean(depth_roi)
matched_noun = None
matched_desc = None
for word in label.split():
word = word.lower()
if word in sound_sources:
matched_noun = word
matched_desc = sound_sources[word]
break
if matched_noun is None:
for noun in sound_sources:
if noun in label.lower():
matched_noun = noun
matched_desc = sound_sources[noun]
break
if matched_noun is None:
for word in label.split():
if len(word) > 1 and word[0].isalpha() and '##' not in word:
matched_noun = word.lower()
matched_desc = "" # No description available
break
if matched_noun:
thresholds = self.create_histogram_depth_zones(depth_map, num_zones=3)
zone = 0 # The default is 0 which is the closest zone
for i in range(3):
if thresholds[i] <= mean_depth < thresholds[i+1]:
zone = i
break
object_data.append({
"original_label": matched_noun,
"bbox": bbox.tolist(),
"depth_zone": zone,
"zone_description": ["near", "medium", "far"][zone],
"mean_depth": mean_depth,
"weight": 1.0 - mean_depth,
"sound_description": matched_desc
})
if all_objects:
object_data.sort(key=lambda x: x["mean_depth"])
return object_data
else:
if not object_data:
return []
closest_object = min(object_data, key=lambda x: x["mean_depth"])
return [closest_object]
def cleanup(self):
if hasattr(self, 'depth_estimator') and self.depth_estimator is not None:
del self.depth_estimator
self.depth_estimator = None
if self.map_list is not None:
del self.map_list
self.map_list = None
if self.dino is not None:
self.dino = self.dino.to("cpu")
del self.dino
self.dino = None
del self.dino_processor
self.dino_processor = None
if self.nlp is not None:
del self.nlp
self.nlp = None
torch.cuda.empty_cache()
gc.collect()
def test_object_depth_analysis(self):
"""
Test the object depth analysis on all images in the directory.
"""
# Process depth maps first
processed_maps = self.process_depth_maps()
# Get list of original image paths
image_dir = self.depth_estimator.image_dir
image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".jpg")]
results = []
# For each image and its corresponding depth map
for i, (image_path, processed_map) in enumerate(zip(image_paths, processed_maps)):
# Extract the normalized depth map
depth_map = processed_map["normalization"]
# Analyze objects and their depths
object_depths = self.analyze_object_depths(image_path, depth_map)
# Store results
results.append({
"image_path": image_path,
"object_depths": object_depths
})
# Print some information for debugging
print(f"Analyzed {image_path}:")
for obj in object_depths:
print(f" - {obj['original_label']} (Zone: {obj['zone_description']})")
return results