File size: 5,466 Bytes
97226b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor
from sentence_transformers import SentenceTransformer
import torch
import torch.nn.functional as F
from PIL import Image
import requests
import os
import json
import math
import re
import pandas as pd
import numpy as np
from omeka_s_api_client import OmekaSClient,OmekaSClientError
from typing import List, Dict, Any, Union
import io
from dotenv import load_dotenv

# env var
load_dotenv(os.path.join(os.getcwd(), ".env"))
HF_TOKEN = os.environ.get("HF_TOKEN")

# Nomic vison model
processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5")
vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True)

# Nomic text model
text_model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True, token=HF_TOKEN)

def image_url_to_pil(url: str, max_size=(512, 512)) -> Image:
    """

    Ex usage : image_blobs = df["image_url"].apply(image_url_to_pil).tolist()

    """
    response = requests.get(url, stream=True, timeout=5)
    response.raise_for_status()
    image = Image.open(io.BytesIO(response.content)).convert("RGB")
    image.thumbnail(max_size, Image.Resampling.LANCZOS)
    return image

def generate_img_embed(images_urls, batch_size=20):
    """Generate image embeddings in batches to manage memory usage.

    

    Args:

        images_urls (list): List of image URLs

        batch_size (int): Number of images to process at once

    """
    all_embeddings = []
    
    for i in range(0, len(images_urls), batch_size):
        batch_urls = images_urls[i:i + batch_size]
        images = [image_url_to_pil(image_url) for image_url in batch_urls]
        inputs = processor(images, return_tensors="pt")
        img_emb = vision_model(**inputs).last_hidden_state
        img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1)
        all_embeddings.append(img_embeddings.detach().numpy())
    
    return np.vstack(all_embeddings)

def generate_text_embed(sentences: List, batch_size=64):
    """Generate text embeddings in batches to manage memory usage.

    

    Args:

        sentences (List): List of text strings to encode

        batch_size (int): Number of sentences to process at once

    """
    all_embeddings = []
    
    for i in range(0, len(sentences), batch_size):
        batch_sentences = sentences[i:i + batch_size]
        embeddings = text_model.encode(batch_sentences)
        all_embeddings.append(embeddings)
    
    return np.vstack(all_embeddings)
        
def add_concatenated_text_field_exclude_keys(item_dict, keys_to_exclude=None, text_field_key="text", pair_separator=" - "):
    if not isinstance(item_dict, dict):
        raise TypeError("Input must be a dictionary.")
    if keys_to_exclude is None:
        keys_to_exclude = set() # Default to empty set
    else:
        keys_to_exclude = set(keys_to_exclude) # Ensure it's a set for efficient lookup

    # Add the target text key to the exclusion set automatically
    keys_to_exclude.add(text_field_key)

    formatted_pairs = []
    for key, value in item_dict.items():
        # 1. Skip any key in the exclusion set
        if key in keys_to_exclude:
            continue

        # 2. Check for empty/invalid values (same logic as before)
        is_empty_or_invalid = False
        if value is None: is_empty_or_invalid = True
        elif isinstance(value, float) and math.isnan(value): is_empty_or_invalid = True
        elif isinstance(value, (str, list, tuple, dict)) and len(value) == 0: is_empty_or_invalid = True

        # 3. Format and add if valid
        if not is_empty_or_invalid:
            formatted_pairs.append(f"{str(key)}: {str(value)}")
            
    concatenated_text = f"search_document: {pair_separator.join(formatted_pairs)}"
    item_dict[text_field_key] = concatenated_text
    return item_dict

def prepare_df_atlas(df: pd.DataFrame, id_col='id', images_col='images_urls'):

    # Drop completely empty columns
    #df = df.dropna(axis=1, how='all')

    # Fill remaining nulls with empty strings
    #df = df.fillna('')

    # Ensure ID column exists
    if id_col not in df.columns:
        df[id_col] = [f'{i}' for i in range(len(df))]

    # Ensure indexed field exists and is not empty
    #if indexed_col not in df.columns:
    #    df[indexed_col] = ''

    #df[images_col] = df[images_col].apply(lambda x: [x[0]] if isinstance(x, list) and len(x) > 1 else x if isinstance(x, list) else [x])
    df[images_col] = df[images_col].apply(lambda x: x[0] if isinstance(x, list) else x)

    # Optional: force all to string (can help with weird dtypes)
    for col in df.columns:
        df[col] = df[col].astype(str)

    return df

def remove_key_value_from_dict(list_of_dict, key_to_remove):
    new_list = []
    for dictionary in list_of_dict:
        new_dict = dictionary.copy()  # Create a copy to avoid modifying the original list
        if key_to_remove in new_dict:
            del new_dict[key_to_remove]
        new_list.append(new_dict)
    return new_list

def remove_key_value_from_dict(input_dict, key_to_remove='text'):
    if not isinstance(input_dict, dict):
        raise TypeError("Input must be a dictionary.")

    if key_to_remove in input_dict:
        del input_dict[key_to_remove]

    return input_dict