Spaces:
Sleeping
Sleeping
# Transformers and its models | |
import transformers | |
# For Image Processing | |
from transformers import ViTImageProcessor | |
# For Model | |
from transformers import ViTModel, ViTConfig, pipeline | |
# For data augmentation | |
from torchvision import transforms, datasets | |
# For GPU | |
from transformers import set_seed | |
from torch.optim import AdamW | |
from accelerate import Accelerator, notebook_launcher | |
# For Data Loaders | |
import datasets | |
from torch.utils.data import Dataset, DataLoader | |
# For Display | |
#from tqdm.notebook import tqdm | |
# Other Generic Libraries | |
import torch | |
import PIL | |
import os | |
import streamlit as st | |
import gc | |
from glob import glob | |
import shutil | |
import pandas as pd | |
import numpy as np | |
#import matplotlib.pyplot as plt | |
from io import BytesIO | |
import torch.nn.functional as F | |
# Set the device (GPU or CPU) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Initialse Globle Variables | |
MODEL_TRANSFORMER = 'google/vit-base-patch16-224' | |
BATCH_SIZE = 8 | |
# Set Paths | |
data_path = 'employees' | |
model_path = 'vit_pytorch_GPU_1.pt' | |
webcam_path = 'captured_image.jpg' | |
# Set Title | |
st.title("Employee Attendance System") | |
#pipeline = pipeline(task="image-classification", model="julien-c/hotdog-not-hotdog") | |
# Define Image Processor | |
image_processor_prod = ViTImageProcessor.from_pretrained(MODEL_TRANSFORMER, attn_implementation="sdpa", torch_dtype=torch.float16) | |
# Define ML Model | |
class FaceEmbeddingModel(torch.nn.Module): | |
def __init__(self, model_name, embedding_size): | |
super(FaceEmbeddingModel, self).__init__() | |
self.config = ViTConfig.from_pretrained(model_name, id2label=idx_to_label, label2id=label_to_idx, return_dict=True) | |
self.backbone = ViTModel.from_pretrained(model_name, config=self.config) # Load ViT model | |
self.fc = torch.nn.Linear(self.backbone.config.hidden_size, embedding_size) # Convert to 512D feature vector | |
def forward(self, images): | |
x = self.backbone(images).last_hidden_state[:, 0] # Extract embeddings | |
x = self.fc(x) # Convert to 512D embedding | |
return torch.nn.functional.normalize(x) # Normalize for cosine similarity | |
# Load the model | |
model_pretrained = torch.load(model_path, map_location=device, weights_only=False) | |
# Define the ML model - Evaluation function | |
def prod_function(transformer_model, prod_dl, prod_data): | |
# Initialize accelerator | |
accelerator = Accelerator() | |
# to INFO for the main process only. | |
if accelerator.is_main_process: | |
datasets.utils.logging.set_verbosity_warning() | |
transformers.utils.logging.set_verbosity_info() | |
else: | |
datasets.utils.logging.set_verbosity_error() | |
transformers.utils.logging.set_verbosity_error() | |
# The seed need to be set before we instantiate the model, as it will determine the random head. | |
set_seed(42) | |
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the prepare method. | |
accelerated_model, acclerated_prod_dl, acclerated_prod_data = accelerator.prepare(transformer_model, prod_dl, prod_data) | |
# Evaluate at the end of the epoch | |
accelerated_model.eval() | |
# Find Embedding of the image to be evaluated | |
emb_prod = accelerated_model(acclerated_prod_data) | |
prod_preds = [] | |
for batch in acclerated_prod_dl: | |
with torch.no_grad(): | |
emb = accelerated_model(**batch) | |
distance = F.pairwise_distance(emb, emb_prod) | |
prod_preds.append(distance) | |
return prod_preds | |
# Creation of Dataloader | |
class CustomDatasetProd(Dataset): | |
def __init__(self, pixel_values): | |
self.pixel_values = pixel_values | |
def __len__(self): | |
return len(self.pixel_values) | |
def __getitem__(self, idx): | |
item = { | |
'pixel_values': self.pixel_values[idx].squeeze(0), | |
} | |
return item | |
# Creation of Dataset | |
class CreateDatasetProd(): | |
def __init__(self, image_processor): | |
super().__init__() | |
self.image_processor = image_processor | |
# Define a transformation pipeline | |
self.transform_prod = transforms.v2.Compose([ | |
transforms.v2.ToImage(), | |
transforms.v2.ToDtype(torch.uint8, scale=False) | |
]) | |
def get_pixels(self, img_paths): | |
pixel_values = [] | |
for path in img_paths: | |
# Read and process Images | |
img = PIL.Image.open(path) | |
img = self.transform_prod(img) | |
# Scaling the video to ML model's desired format | |
img = self.image_processor(img, return_tensors='pt') #, input_data_format='channels_first') | |
pixel_values.append(img['pixel_values'].squeeze(0)) | |
# Force garbage collection | |
del img | |
gc.collect() | |
return pixel_values | |
def create_dataset(self, image_paths): | |
pixel_values = torch.stack(self.get_pixels(image_paths)) | |
return CustomDatasetProd(pixel_values=pixel_values) | |
# Read images from directory | |
image_paths = [] | |
image_file = glob(os.path.join(data_path, '*.jpg')) | |
#st.write(image_file) | |
image_paths.extend(image_file) | |
#st.write('input path size:', len(image_paths)) | |
#st.write(image_paths) | |
# Create DataLoader for Employees image | |
dataset_prod_obj = CreateDatasetProd(image_processor_prod) | |
prod_ds = dataset_prod_obj.create_dataset(image_paths) | |
prod_dl = DataLoader(prod_ds, batch_size=BATCH_SIZE) | |
# Testing the dataloader | |
#prod_inputs = next(iter(prod_dl)) | |
#st.write(prod_inputs['pixel_values'].shape) | |
# Read image from Camera | |
enable = st.checkbox("Enable camera") | |
picture = st.camera_input("Take a picture", disabled=not enable) | |
if picture is not None: | |
img_bytes = picture.getvalue() | |
img = PIL.Image.open(img_bytes) | |
img.save(webcam_path, "JPEG") | |
st.write('Image saved as:',webcam_path) | |
# Create DataLoader for Webcam Image | |
webcam_ds = dataset_prod_obj.create_dataset(webcam_path) | |
webcam_dl = DataLoader(webcam_ds, batch_size=BATCH_SIZE) | |
# Run the predictions | |
prediction = prod_function(model_pretrained, prod_dl, webcam_dl) | |
predictions = torch.cat(prediction, 0).to('cpu') | |
match_idx = torch.argmin(predictions) | |
# Display the results | |
if predictions[match_idx] <= 0.3: | |
st.write('Welcome: ',image_paths[match_idx].split('/')[-1].split('.')[0]) | |
else: | |
st.write("Match not found") | |