ISE / engines /image.py
fikird
Complete rewrite of ISE with advanced RAG and OSINT capabilities
48922fa
raw
history blame
6.19 kB
"""
Image analysis engine for processing and analyzing images.
"""
from typing import Dict, Any, List, Optional
import io
from PIL import Image
import torch
from torchvision import transforms
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
import face_recognition
import numpy as np
from tenacity import retry, stop_after_attempt, wait_exponential
class ImageEngine:
def __init__(self):
# Initialize image classification model
self.feature_extractor = AutoFeatureExtractor.from_pretrained(
"microsoft/resnet-50"
)
self.model = AutoModelForImageClassification.from_pretrained(
"microsoft/resnet-50"
)
# Set up image transforms
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def analyze_image(self, image_data: bytes) -> Dict[str, Any]:
"""Analyze image content and detect objects/faces."""
try:
# Load image
image = Image.open(io.BytesIO(image_data)).convert('RGB')
# Prepare image for model
inputs = self.feature_extractor(images=image, return_tensors="pt")
# Get model predictions
with torch.no_grad():
outputs = self.model(**inputs)
probs = outputs.logits.softmax(-1)
# Get top predictions
top_probs, top_indices = torch.topk(probs, k=5)
# Convert predictions to list
predictions = [
{
"label": self.model.config.id2label[idx.item()],
"confidence": prob.item()
}
for prob, idx in zip(top_probs[0], top_indices[0])
]
# Analyze faces
np_image = np.array(image)
face_locations = face_recognition.face_locations(np_image)
face_encodings = face_recognition.face_encodings(np_image, face_locations)
faces = []
for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)):
face = {
"id": i + 1,
"location": {
"top": face_location[0],
"right": face_location[1],
"bottom": face_location[2],
"left": face_location[3]
},
"encoding": face_encoding.tolist()
}
faces.append(face)
# Get image metadata
metadata = {
"format": image.format,
"mode": image.mode,
"size": image.size,
"width": image.width,
"height": image.height
}
return {
"predictions": predictions,
"faces": faces,
"metadata": metadata
}
except Exception as e:
return {"error": str(e)}
async def compare_faces(self, face1_data: bytes, face2_data: bytes) -> Dict[str, Any]:
"""Compare two faces and determine if they are the same person."""
try:
# Load and process first image
image1 = face_recognition.load_image_file(io.BytesIO(face1_data))
face1_encoding = face_recognition.face_encodings(image1)
if not face1_encoding:
return {"error": "No face found in first image"}
# Load and process second image
image2 = face_recognition.load_image_file(io.BytesIO(face2_data))
face2_encoding = face_recognition.face_encodings(image2)
if not face2_encoding:
return {"error": "No face found in second image"}
# Compare faces
results = face_recognition.compare_faces(
[face1_encoding[0]], face2_encoding[0]
)
# Calculate face distance (lower means more similar)
face_distance = face_recognition.face_distance(
[face1_encoding[0]], face2_encoding[0]
)
return {
"match": bool(results[0]),
"confidence": float(1 - face_distance[0]),
"distance": float(face_distance[0])
}
except Exception as e:
return {"error": str(e)}
async def search_similar_faces(self,
target_encoding: List[float],
face_database: List[Dict[str, Any]],
threshold: float = 0.6) -> List[Dict[str, Any]]:
"""Search for similar faces in a database of face encodings."""
try:
matches = []
target_encoding = np.array(target_encoding)
for face_data in face_database:
if "encoding" not in face_data:
continue
current_encoding = np.array(face_data["encoding"])
distance = face_recognition.face_distance([target_encoding], current_encoding)[0]
if distance < threshold:
matches.append({
"face_id": face_data.get("id"),
"confidence": float(1 - distance),
"metadata": face_data.get("metadata", {})
})
# Sort matches by confidence
matches.sort(key=lambda x: x["confidence"], reverse=True)
return matches
except Exception as e:
return [{"error": str(e)}]