File size: 4,234 Bytes
6a22aef
 
c5ee215
6a22aef
 
f994a03
c5ee215
f994a03
6a22aef
 
 
 
 
 
 
 
1268dfc
 
6a22aef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f994a03
a1c9ab8
6a22aef
 
 
 
 
 
 
 
 
 
 
 
0b4c116
3a972ae
6a22aef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2880445
6a22aef
 
 
 
 
 
c5ee215
 
 
6a22aef
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import spaces
import cv2
import torch
import numpy as np
from PIL import Image
import gradio as gr
from transformers import CLIPProcessor, CLIPModel

# Load LightCLIP model.
# Replace "openai/clip-vit-base-patch32" with your LightCLIP model checkpoint if available.
MODEL_NAME = "openai/clip-vit-base-patch32"

# Define text prompts for fall and non-fall.
fall_prompt = "A person falling on the ground."
nofall_prompt = "A person standing or walking."

# if torch.cuda.is_available():
#     text_inputs = {k: v.cuda() for k, v in text_inputs.items()}

def extract_frames(video_path, target_size=(224, 224)):
    """
    Extract all frames from the uploaded video and convert them to PIL Image.
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        # Convert frame from BGR to RGB and resize.
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = cv2.resize(frame, target_size)
        frames.append(Image.fromarray(frame))
    cap.release()
    return frames

@spaces.GPU(duration=600)
def process_window(frames_window):
    """
    Process a window of frames and compute the average fall score.
    """

    processor = CLIPProcessor.from_pretrained(MODEL_NAME)
    model = CLIPModel.from_pretrained(MODEL_NAME)
    text_inputs = processor(text=[fall_prompt, nofall_prompt], return_tensors="pt", padding=True)


    inputs = processor(images=frames_window, return_tensors="pt", padding=True)
    if torch.cuda.is_available():
        text_inputs = text_inputs.to(torch.device("cuda"))
        model = model.to(torch.device("cuda"))
        inputs = {k: v.cuda() for k, v in inputs.items()}
    with torch.no_grad():
        image_features = model.get_image_features(**inputs)
    # Normalize embeddings.
    image_features = image_features / image_features.norm(dim=-1, keepdim=True)

    with torch.no_grad():
        text_features = model.get_text_features(**text_inputs)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)

    # Compute cosine similarity.
    sims = (image_features @ text_features.T).cpu().numpy()  # shape: (num_frames, 2)
    # We assume index 0 is for the fall prompt.
    fall_scores = sims[:, 0]
    window_score = np.mean(fall_scores)
    return window_score, fall_scores

def detect_fall(video_path, window_size=16, stride=8, threshold=0.8, fps=15):
    """
    Process the video file using a sliding window over frames.
    Returns a list of timestamps where a fall is detected.
    """
    frames = extract_frames(video_path)
    if len(frames) < window_size:
        return "Video too short for inference.", None

    window_scores = []
    window_indices = []
    for start in range(0, len(frames) - window_size + 1, stride):
        window = frames[start:start + window_size]
        score, _ = process_window(window)
        window_scores.append(score)
        window_indices.append(start)

    detected_events = []
    for idx, score in zip(window_indices, window_scores):
        if score > threshold:
            time_sec = idx / fps  # approximate timestamp
            detected_events.append(time_sec)

    result_text = ""
    if detected_events:
        result_text = "Fall events detected at (sec): " + ", ".join([f"{t:.1f}" for t in detected_events])
    else:
        result_text = "No fall detected."
    # Return result and a representative frame for visual reference.
    rep_frame = frames[len(frames) // 2]
    return result_text, rep_frame

def process_video(video_file):
    result_text, rep_frame = detect_fall(video_file)
    return result_text, rep_frame

# Gradio interface definition.
demo = gr.Interface(
    fn=process_video,
    inputs=gr.Video(value="filepath", label="Upload Video Clip"),
    outputs=[gr.Textbox(label="Detection Results"), gr.Image(label="Representative Frame")],
    title="LightCLIP Fall Detection Demo",
    description=(
        "This demo detects human falls in video clips using a lightweight transformer-based model (LightCLIP). "
        "A sliding window approach aggregates results over multiple frames to improve precision in complex scenes."
    )
)

if __name__ == "__main__":
    demo.launch()