File size: 4,084 Bytes
5856e1e
c3adbf1
 
5856e1e
c3adbf1
 
5856e1e
 
 
 
c3adbf1
 
 
5856e1e
c3adbf1
5856e1e
c3adbf1
5856e1e
 
 
 
 
 
 
 
c3adbf1
5856e1e
 
 
 
 
c3adbf1
5856e1e
 
 
 
 
 
c3adbf1
5856e1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
 
c3adbf1
5856e1e
 
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
 
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
 
c3adbf1
5856e1e
c3adbf1
5856e1e
 
 
 
c3adbf1
 
5856e1e
 
 
 
 
 
 
 
 
 
 
 
 
c3adbf1
5856e1e
c3adbf1
5856e1e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from flask import Flask, request, jsonify
import os
import time
import tempfile
import cv2
import insightface
import onnxruntime
import gfpgan
import io
import concurrent.futures
import numpy as np
from PIL import Image

app = Flask(__name__)

class Predictor:
    def __init__(self):
        self.setup()

    def setup(self):
        os.makedirs('models', exist_ok=True)
        os.chdir('models')
        if not os.path.exists('GFPGANv1.4.pth'):
            os.system(
                'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth'
            )
        if not os.path.exists('inswapper_128.onnx'):
            os.system(
                'wget https://huggingface.co./ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx'
            )
        os.chdir('..')

        """Load the model into memory to make running multiple predictions efficient"""
        self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx',
                                                            providers=onnxruntime.get_available_providers())
        self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1)
        self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l')
        self.face_analyser.prepare(ctx_id=0, det_size=(640, 640))

    def get_face(self, img_data):
        analysed = self.face_analyser.get(img_data)
        try:
            largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
            return largest
        except:
            print("No face found")
            return None

    def process_images(self, input_image, swap_image):
        """Process a pair of images: target image and swap image"""
        try:
            # Read the input images directly from memory
            input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8)
            swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8)

            frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR)
            swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR)

            face = self.get_face(frame)
            source_face = self.get_face(swap_frame)

            if face is None or source_face is None:
                return None

            result = self.face_swapper.get(frame, face, source_face, paste_back=True)
            _, _, result = self.face_enhancer.enhance(result, paste_back=True)

            # Create a result image in memory
            _, result_image = cv2.imencode('.jpg', result)
            return result_image.tobytes()

        except Exception as e:
            print(f"Error in processing images: {e}")
            return None

# Instantiate the Predictor class
predictor = Predictor()

@app.route('/predict', methods=['POST'])
def predict():
    if 'target_images' not in request.files or 'swap_images' not in request.files:
        return jsonify({'error': 'No image files provided'}), 400

    target_images = request.files.getlist('target_images')
    swap_images = request.files.getlist('swap_images')

    if len(target_images) != len(swap_images):
        return jsonify({'error': 'Number of target images must match number of swap images'}), 400

    results = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_pair = {
            executor.submit(predictor.process_images, target_images[i], swap_images[i]): i
            for i in range(len(target_images))
        }

        for future in concurrent.futures.as_completed(future_to_pair):
            idx = future_to_pair[future]
            result = future.result()
            if result:
                results.append({
                    'index': idx,
                    'result_image': result
                })
            else:
                results.append({
                    'index': idx,
                    'error': 'Face swap failed'
                })

    return jsonify({'results': results})

if __name__ == "__main__":
    app.run(debug=True, threaded=True)