omeka-s-computer-vision / lancedb_client.py
Geraldine's picture
Upload 6 files
97226b8 verified
raw
history blame
3.73 kB
import numpy as np
import lancedb
import pyarrow as pa
import logging
from dotenv import load_dotenv
import os
import ast
# Load env vars
load_dotenv(os.path.join(os.getcwd(), ".env"),override = True)
metadata_keys_raw = os.getenv("_DEFAULT_PARSE_METADATA", "").split(",")
metadata_keys = [key.replace(" ", "").replace(")", "").strip("'") for key in metadata_keys_raw]
# Setup logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LanceDBManager:
def __init__(self, db_uri="lancedb", embedding_dim=1536):
self.db = lancedb.connect(db_uri)
self.embedding_dim = embedding_dim
logger.info(f"Connected to LanceDB at {db_uri}")
def _build_schema(self):
"""Build LanceDB schema with dynamic metadata fields and embedding vector."""
fields = [
pa.field("id", pa.int64()),
pa.field("item_id", pa.string()),
pa.field("images_urls", pa.string()),
pa.field("text", pa.string()),
pa.field("Cluster", pa.string()),
pa.field("Topic", pa.string()),
pa.field("embeddings", pa.list_(pa.float32(), self.embedding_dim)),
pa.field("umap_embeddings", pa.list_(pa.float32(), 2)),
]
# Add fields from metadata
for key in metadata_keys:
sanitized_key = key.split(":")[1].strip().capitalize() # remove the vocabulary prefix in key label and capitalize
fields.append(pa.field(sanitized_key, pa.string()))
return pa.schema(fields)
def create_table(self, table_name):
"""Create table using dynamic schema."""
try:
schema = self._build_schema()
table = self.db.create_table(table_name, schema=schema)
logger.info(f"Created LanceDB table '{table_name}'")
return table
except Exception as e:
logger.error(f"Failed to create table '{table_name}': {e}")
raise
def retrieve_table(self, table_name):
try:
table = self.db.open_table(table_name)
logger.info(f"Opened existing LanceDB table '{table_name}'")
return table
except Exception as e:
logger.error(f"Failed to open table '{table_name}': {e}")
raise
def initialize_table(self, table_name):
try:
return self.retrieve_table(table_name)
except Exception:
logger.info(f"Table '{table_name}' not found. Creating new.")
return self.create_table(table_name)
def add_entry(self, table_name, items):
table = self.initialize_table(table_name)
table.add(items)
logger.info(f"Added items to table '{table_name}'")
def list_tables(self):
"""List all existing tables in the LanceDB instance."""
try:
tables = self.db.table_names()
logger.info("Retrieved list of tables.")
return tables
except Exception as e:
logger.error(f"Failed to list tables: {e}")
raise
def get_content_table(self, table_name):
table = self.initialize_table(table_name)
return table.to_pandas()
def drop_table(self, table_name):
"""remove an existing table by name."""
try:
table = self.db.drop_table(table_name)
logger.info(f"Remove existing LanceDB table '{table_name}' successfully.")
return table
except Exception as e:
logger.error(f"Failed to remove existing table '{table_name}': {e}")
raise