Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/env python3 | |
""" | |
streetsoundtext.py - A pipeline that downloads Google Street View panoramas, | |
extracts perspective views, and analyzes them for sound information. | |
""" | |
import os | |
import requests | |
import argparse | |
import numpy as np | |
import torch | |
import time | |
from PIL import Image | |
from io import BytesIO | |
from config import LOGS_DIR | |
import torchvision.transforms as T | |
from torchvision.transforms.functional import InterpolationMode | |
from transformers import AutoModel, AutoTokenizer | |
from utils import sample_perspective_img | |
import cv2 | |
log_dir = LOGS_DIR | |
os.makedirs(log_dir, exist_ok=True) # Creates the directory if it doesn't exist | |
# soundscape_query = "<image>\nWhat can we expect to hear from the location captured in this image? Name the around five nouns. Avoid speculation and provide a concise response including sound sources visible in the image." | |
soundscape_query = """<image> | |
Identify 5 potential sound sources visible in this image. For each source, provide both the noun and a brief description of its typical sound. | |
Format your response exactly like these examples (do not include the word "Noun:" in your response): | |
Car: engine humming with occasional honking. | |
River: gentle flowing water with subtle splashing sounds. | |
Trees: rustling leaves moved by the wind. | |
""" | |
# Constants | |
IMAGENET_MEAN = (0.485, 0.456, 0.406) | |
IMAGENET_STD = (0.229, 0.224, 0.225) | |
# Model Leaderboard Paths | |
MODEL_LEADERBOARD = { | |
"intern_2_5-8B": "OpenGVLab/InternVL2_5-8B-MPO", | |
"intern_2_5-4B": "OpenGVLab/InternVL2_5-4B-MPO", | |
} | |
class StreetViewDownloader: | |
"""Downloads panoramic images from Google Street View""" | |
def __init__(self): | |
# URLs for API requests | |
# https://www.google.ca/maps/rpc/photo/listentityphotos?authuser=0&hl=en&gl=us&pb=!1e3!5m45!2m2!1i203!2i100!3m3!2i4!3sCAEIBAgFCAYgAQ!5b1!7m33!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!1m3!1e9!2b1!3e2!2b1!8m0!9b0!11m1!4b1!6m3!1sI63QZ8b4BcSli-gPvPHf-Qc!7e81!15i11021!9m2!2d-90.30324219145255!3d38.636242944711036!10d91.37627840655999 | |
#self.panoid_req = 'https://www.google.com/maps/preview/reveal?authuser=0&hl=en&gl=us&pb=!2m9!1m3!1d82597.14038230096!2d{}!3d{}!2m0!3m2!1i1523!2i1272!4f13.1!3m2!2d{}!3d{}!4m2!1syPETZOjwLvCIptQPiJum-AQ!7e81!5m5!2m4!1i96!2i64!3i1!4i8' | |
self.panoid_req = 'https://www.google.ca/maps/rpc/photo/listentityphotos?authuser=0&hl=en&gl=us&pb=!1e3!5m45!2m2!1i203!2i100!3m3!2i4!3sCAEIBAgFCAYgAQ!5b1!7m33!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!1m3!1e9!2b1!3e2!2b1!8m0!9b0!11m1!4b1!6m3!1sI63QZ8b4BcSli-gPvPHf-Qc!7e81!15i11021!9m2!2d{}!3d{}!10d25' | |
# https://www.google.com/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m3!1m2!1e2!2s{}!4m61!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!1e17!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3!11m2!3m1!4b1 # vmSzE7zkK2eETwAP_r8UdQ | |
# https://www.google.ca/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m3!1m2!1e2!2s{}!4m61!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!1e17!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3!11m2!3m1!4b1 # -9HfuNFUDOw_IP5SA5IspA | |
self.photometa_req = 'https://www.google.com/maps/photometa/v1?authuser=0&hl=en&gl=us&pb=!1m4!1smaps_sv.tactile!11m2!2m1!1b1!2m2!1sen!2sus!3m5!1m2!1e2!2s{}!2m1!5s0x87d8b49f53fc92e9:0x6ecb6e520c6f4d9f!4m57!1e1!1e2!1e3!1e4!1e5!1e6!1e8!1e12!2m1!1e1!4m1!1i48!5m1!1e1!5m1!1e2!6m1!1e1!6m1!1e2!9m36!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e3!2b1!3e2!1m3!1e3!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e1!2b0!3e3!1m3!1e4!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e3' | |
self.panimg_req = 'https://streetviewpixels-pa.googleapis.com/v1/tile?cb_client=maps_sv.tactile&panoid={}&x={}&y={}&zoom={}' | |
def get_image_id(self, lat, lon): | |
"""Get Street View panorama ID for given coordinates""" | |
null = None | |
pr_response = requests.get(self.panoid_req.format(lon, lat, lon, lat)) | |
if pr_response.status_code != 200: | |
error_message = f"Error fetching panorama ID: HTTP {pr_response.status_code}" | |
if pr_response.status_code == 400: | |
error_message += " - Bad request. Check coordinates format." | |
elif pr_response.status_code == 401 or pr_response.status_code == 403: | |
error_message += " - Authentication error. Check API key and permissions." | |
elif pr_response.status_code == 404: | |
error_message += " - No panorama found at these coordinates." | |
elif pr_response.status_code == 429: | |
error_message += " - Rate limit exceeded. Try again later." | |
elif pr_response.status_code >= 500: | |
error_message += " - Server error. Try again later." | |
return None | |
pr = BytesIO(pr_response.content).getvalue().decode('utf-8') | |
pr = eval(pr[pr.index('\n'):]) | |
try: | |
panoid = pr[0][0][0] | |
except: | |
return None | |
return panoid | |
def download_image(self, lat, lon, zoom=1): | |
"""Download Street View panorama and metadata""" | |
null = None | |
panoid = self.get_image_id(lat, lon) | |
if panoid is None: | |
raise ValueError(f"get_image_id failed() at coordinates: {lat}, {lon}") | |
# Get metadata | |
pm_response = requests.get(self.photometa_req.format(panoid)) | |
pm = BytesIO(pm_response.content).getvalue().decode('utf-8') | |
pm = eval(pm[pm.index('\n'):]) | |
pan_list = pm[1][0][5][0][3][0] | |
# Extract relevant info | |
pid = pan_list[0][0][1] | |
plat = pan_list[0][2][0][2] | |
plon = pan_list[0][2][0][3] | |
p_orient = pan_list[0][2][2][0] | |
# Download image tiles and assemble panorama | |
img_part_inds = [(x, y) for x in range(2**zoom) for y in range(2**(zoom-1))] | |
img = np.zeros((512*(2**(zoom-1)), 512*(2**zoom), 3), dtype=np.uint8) | |
for x, y in img_part_inds: | |
sub_img_response = requests.get(self.panimg_req.format(pid, x, y, zoom)) | |
sub_img = np.array(Image.open(BytesIO(sub_img_response.content))) | |
img[512*y:512*(y+1), 512*x:512*(x+1)] = sub_img | |
if (img[-1] == 0).all(): | |
# raise ValueError("Failed to download complete panorama") | |
print("Failed to download complete panorama") | |
return img, pid, plat, plon, p_orient | |
class PerspectiveExtractor: | |
"""Extracts perspective views from panoramic images""" | |
def __init__(self, output_shape=(256, 256), fov=(90, 90)): | |
self.output_shape = output_shape | |
self.fov = fov | |
def extract_views(self, pano_img, face_size=512): | |
"""Extract front, back, left, and right views based on orientation""" | |
# orientations = { | |
# "front": (0, p_orient, 0), # Align front with real orientation | |
# "back": (0, p_orient + 180, 0), # Behind | |
# "left": (0, p_orient - 90, 0), # Left side | |
# "right": (0, p_orient + 90, 0), # Right side | |
# } | |
# cutouts = {} | |
# for view, rot in orientations.items(): | |
# cutout, fov, applied_rot = sample_perspective_img( | |
# pano_img, self.output_shape, fov=self.fov, rot=rot | |
# ) | |
# cutouts[view] = cutout | |
# return cutouts | |
""" | |
Convert ERP panorama to four cubic faces: Front, Left, Back, Right. | |
Args: | |
erp_img (numpy.ndarray): The input equirectangular image. | |
face_size (int): The size of each cubic face. | |
Returns: | |
dict: A dictionary with the four cube faces. | |
""" | |
# Get ERP dimensions | |
h_erp, w_erp, _ = pano_img.shape | |
# Define cube face directions (yaw, pitch, roll) | |
cube_faces = { | |
"front": (0, 0), | |
"left": (90, 0), | |
"back": (180, 0), | |
"right": (-90, 0), | |
} | |
# Output faces | |
faces = {} | |
# Generate each face | |
for face_name, (yaw, pitch) in cube_faces.items(): | |
# Create a perspective transformation matrix | |
fov = 90 # Field of view | |
K = np.array([ | |
[face_size / (2 * np.tan(np.radians(fov / 2))), 0, face_size / 2], | |
[0, face_size / (2 * np.tan(np.radians(fov / 2))), face_size / 2], | |
[0, 0, 1] | |
]) | |
# Generate 3D world coordinates for the cube face | |
x, y = np.meshgrid(np.linspace(-1, 1, face_size), np.linspace(-1, 1, face_size)) | |
z = np.ones_like(x) | |
# Normalize 3D points | |
points_3d = np.stack((x, y, z), axis=-1) # Shape: (H, W, 3) | |
points_3d /= np.linalg.norm(points_3d, axis=-1, keepdims=True) | |
# Apply rotation to align with the cube face | |
yaw_rad, pitch_rad = np.radians(yaw), np.radians(pitch) | |
Ry = np.array([[np.cos(yaw_rad), 0, np.sin(yaw_rad)], [0, 1, 0], [-np.sin(yaw_rad), 0, np.cos(yaw_rad)]]) | |
Rx = np.array([[1, 0, 0], [0, np.cos(pitch_rad), -np.sin(pitch_rad)], [0, np.sin(pitch_rad), np.cos(pitch_rad)]]) | |
R = Ry @ Rx | |
# Rotate points | |
points_3d_rot = np.einsum('ij,hwj->hwi', R, points_3d) | |
# Convert 3D to spherical coordinates | |
lon = np.arctan2(points_3d_rot[..., 0], points_3d_rot[..., 2]) | |
lat = np.arcsin(points_3d_rot[..., 1]) | |
# Map spherical coordinates to ERP image coordinates | |
x_erp = (w_erp * (lon / (2 * np.pi) + 0.5)).astype(np.float32) | |
y_erp = (h_erp * (0.5 - lat / np.pi)).astype(np.float32) | |
# Sample pixels from ERP image | |
face_img = cv2.remap(pano_img, x_erp, y_erp, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_WRAP) | |
cv2.rotate(face_img, cv2.ROTATE_180, face_img) | |
faces[face_name] = face_img | |
return faces | |
class ImageAnalyzer: | |
"""Analyzes images using Vision-Language Models""" | |
def __init__(self, model_name="intern_2_5-4B", use_cuda=True): | |
self.model_name = model_name | |
self.use_cuda = use_cuda and torch.cuda.is_available() | |
self.model, self.tokenizer, self.device = self._load_model() | |
def _load_model(self): | |
"""Load selected Vision-Language Model""" | |
if self.model_name not in MODEL_LEADERBOARD: | |
raise ValueError(f"Model '{self.model_name}' not found. Choose from: {list(MODEL_LEADERBOARD.keys())}") | |
model_path = MODEL_LEADERBOARD[self.model_name] | |
# Configure device and parameters | |
if self.use_cuda: | |
device = torch.device("cuda") | |
torch_dtype = torch.bfloat16 | |
use_flash_attn = True | |
else: | |
device = torch.device("cpu") | |
torch_dtype = torch.float32 | |
use_flash_attn = False | |
# Load model and tokenizer | |
model = AutoModel.from_pretrained( | |
model_path, | |
torch_dtype=torch_dtype, | |
load_in_8bit=False, | |
low_cpu_mem_usage=True, | |
use_flash_attn=use_flash_attn, | |
trust_remote_code=True, | |
).eval().to(device) | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_path, | |
trust_remote_code=True, | |
use_fast=False | |
) | |
return model, tokenizer, device | |
def _build_transform(self, input_size=448): | |
"""Create image transformation pipeline""" | |
transform = T.Compose([ | |
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), | |
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC), | |
T.ToTensor(), | |
T.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD) | |
]) | |
return transform | |
def _find_closest_aspect_ratio(self, aspect_ratio, target_ratios, width, height, image_size): | |
"""Find closest aspect ratio for image tiling""" | |
best_ratio_diff = float('inf') | |
best_ratio = (1, 1) | |
area = width * height | |
for ratio in target_ratios: | |
target_aspect_ratio = ratio[0] / ratio[1] | |
ratio_diff = abs(aspect_ratio - target_aspect_ratio) | |
if ratio_diff < best_ratio_diff: | |
best_ratio_diff = ratio_diff | |
best_ratio = ratio | |
elif ratio_diff == best_ratio_diff: | |
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: | |
best_ratio = ratio | |
return best_ratio | |
def _preprocess_image(self, image, min_num=1, max_num=12, image_size=448, use_thumbnail=False): | |
"""Preprocess image for model input""" | |
orig_width, orig_height = image.size | |
aspect_ratio = orig_width / orig_height | |
# Calculate possible image aspect ratios | |
target_ratios = set( | |
(i, j) for n in range(min_num, max_num + 1) | |
for i in range(1, n + 1) | |
for j in range(1, n + 1) | |
if i * j <= max_num and i * j >= min_num | |
) | |
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) | |
# Find closest aspect ratio | |
target_aspect_ratio = self._find_closest_aspect_ratio( | |
aspect_ratio, target_ratios, orig_width, orig_height, image_size | |
) | |
# Calculate target dimensions | |
target_width = image_size * target_aspect_ratio[0] | |
target_height = image_size * target_aspect_ratio[1] | |
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] | |
# Resize and split image | |
resized_img = image.resize((target_width, target_height)) | |
processed_images = [] | |
for i in range(blocks): | |
box = ( | |
(i % (target_width // image_size)) * image_size, | |
(i // (target_width // image_size)) * image_size, | |
((i % (target_width // image_size)) + 1) * image_size, | |
((i // (target_width // image_size)) + 1) * image_size | |
) | |
split_img = resized_img.crop(box) | |
processed_images.append(split_img) | |
assert len(processed_images) == blocks | |
if use_thumbnail and len(processed_images) != 1: | |
thumbnail_img = image.resize((image_size, image_size)) | |
processed_images.append(thumbnail_img) | |
return processed_images | |
def load_image(self, image_path, input_size=448, max_num=12): | |
"""Load and process image for analysis""" | |
image = Image.open(image_path).convert('RGB') | |
transform = self._build_transform(input_size) | |
images = self._preprocess_image(image, image_size=input_size, use_thumbnail=True, max_num=max_num) | |
pixel_values = [transform(image) for image in images] | |
pixel_values = torch.stack(pixel_values) | |
return pixel_values | |
def analyze_image(self, image_path, max_num=12): | |
"""Analyze image for expected sounds""" | |
# Load and process image | |
pixel_values = self.load_image(image_path, max_num=max_num) | |
# Move to device with appropriate dtype | |
if self.device.type == "cuda": | |
pixel_values = pixel_values.to(torch.bfloat16).to(self.device) | |
else: | |
pixel_values = pixel_values.to(torch.float32).to(self.device) | |
# Create sound-focused query | |
query = soundscape_query | |
# Generate response | |
generation_config = dict(max_new_tokens=1024, do_sample=True) | |
response = self.model.chat(self.tokenizer, pixel_values, query, generation_config) | |
return response | |
class StreetSoundTextPipeline: | |
"""Complete pipeline for Street View sound analysis""" | |
def __init__(self, log_dir="logs", model_name="intern_2_5-4B", use_cuda=True): | |
# Create log directory if it doesn't exist | |
self.log_dir = log_dir | |
os.makedirs(log_dir, exist_ok=True) | |
# Initialize components | |
self.downloader = StreetViewDownloader() | |
self.extractor = PerspectiveExtractor() | |
# self.analyzer = ImageAnalyzer(model_name=model_name, use_cuda=use_cuda) | |
self.analyzer = None | |
self.model_name = model_name | |
self.use_cuda = use_cuda | |
def _load_analyzer(self): | |
if self.analyzer is None: | |
self.analyzer = ImageAnalyzer(model_name=self.model_name, use_cuda=self.use_cuda) | |
def _unload_analyzer(self): | |
if self.analyzer is not None: | |
if hasattr(self.analyzer, 'model') and self.analyzer.model is not None: | |
self.analyzer.model = self.analyzer.model.to("cpu") | |
del self.analyzer.model | |
self.analyzer.model = None | |
torch.cuda.empty_cache() | |
self.analyzer = None | |
def process(self, lat, lon, view, panoramic=False): | |
""" | |
Process a location to generate sound description for specified view or all views | |
Args: | |
lat (float): Latitude | |
lon (float): Longitude | |
view (str): Perspective view ('front', 'back', 'left', 'right') | |
panoramic (bool): If True, process all views instead of just the specified one | |
Returns: | |
dict or list: Results including panorama info and sound description(s) | |
""" | |
if view not in ["front", "back", "left", "right"]: | |
raise ValueError(f"Invalid view: {view}. Choose from: front, back, left, right") | |
# Step 1: Download panoramic image | |
print(f"Downloading Street View panorama for coordinates: {lat}, {lon}") | |
pano_path = os.path.join(self.log_dir, "panorama.jpg") | |
pano_img, pid, plat, plon, p_orient = self.downloader.download_image(lat, lon) | |
Image.fromarray(pano_img).save(pano_path) | |
# Step 2: Extract perspective views | |
print(f"Extracting perspective views with orientation: {p_orient}°") | |
cutouts = self.extractor.extract_views(pano_img, 512) | |
# Save all views | |
for v, img in cutouts.items(): | |
view_path = os.path.join(self.log_dir, f"{v}.jpg") | |
Image.fromarray(img).save(view_path) | |
self._load_analyzer() | |
print("\n[DEBUG] Current soundscape query:") | |
print(soundscape_query) | |
print("-" * 50) | |
if panoramic: | |
# Process all views | |
print(f"Analyzing all views for sound information") | |
results = [] | |
for current_view in ["front", "back", "left", "right"]: | |
view_path = os.path.join(self.log_dir, f"{current_view}.jpg") | |
sound_description = self.analyzer.analyze_image(view_path) | |
view_result = { | |
"panorama_id": pid, | |
"coordinates": {"lat": plat, "lon": plon}, | |
"orientation": p_orient, | |
"view": current_view, | |
"sound_description": sound_description, | |
"files": { | |
"panorama": pano_path, | |
"view_path": view_path | |
} | |
} | |
results.append(view_result) | |
self._unload_analyzer() | |
return results | |
else: | |
# Process only the selected view | |
view_path = os.path.join(self.log_dir, f"{view}.jpg") | |
print(f"Analyzing {view} view for sound information") | |
sound_description = self.analyzer.analyze_image(view_path) | |
self._unload_analyzer() | |
# Prepare results | |
results = { | |
"panorama_id": pid, | |
"coordinates": {"lat": plat, "lon": plon}, | |
"orientation": p_orient, | |
"view": view, | |
"sound_description": sound_description, | |
"files": { | |
"panorama": pano_path, | |
"views": {v: os.path.join(self.log_dir, f"{v}.jpg") for v in cutouts.keys()} | |
} | |
} | |
return results | |
def parse_location(location_str): | |
"""Parse location string in format 'lat,lon' into float tuple""" | |
try: | |
lat, lon = map(float, location_str.split(',')) | |
return lat, lon | |
except ValueError: | |
raise argparse.ArgumentTypeError("Location must be in format 'latitude,longitude'") | |
def generate_caption(lat, lon, view="front", model="intern_2_5-4B", cpu_only=False, panoramic=False): | |
""" | |
Generate sound captions for one or all views of a street view location | |
Args: | |
lat (float/str): Latitude | |
lon (float/str): Longitude | |
view (str): Perspective view ('front', 'back', 'left', 'right') | |
model (str): Model name to use for analysis | |
cpu_only (bool): Whether to force CPU usage | |
panoramic (bool): If True, process all views instead of just the specified one | |
Returns: | |
dict or list: Results with sound descriptions | |
""" | |
pipeline = StreetSoundTextPipeline( | |
log_dir=log_dir, | |
model_name=model, | |
use_cuda=not cpu_only | |
) | |
try: | |
results = pipeline.process(lat, lon, view, panoramic=panoramic) | |
if panoramic: | |
# Process results for all views | |
print(f"Generated captions for all views at location: {lat}, {lon}") | |
else: | |
print(f"Generated caption for {view} view at location: {lat}, {lon}") | |
return results | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
return None |