|
from flask import Flask, request, jsonify |
|
import os |
|
import time |
|
import tempfile |
|
import cv2 |
|
import insightface |
|
import onnxruntime |
|
import gfpgan |
|
import io |
|
import concurrent.futures |
|
import numpy as np |
|
from PIL import Image |
|
|
|
app = Flask(__name__) |
|
|
|
class Predictor: |
|
def __init__(self): |
|
self.setup() |
|
|
|
def setup(self): |
|
os.makedirs('models', exist_ok=True) |
|
os.chdir('models') |
|
if not os.path.exists('GFPGANv1.4.pth'): |
|
os.system( |
|
'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth' |
|
) |
|
if not os.path.exists('inswapper_128.onnx'): |
|
os.system( |
|
'wget https://huggingface.co./ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx' |
|
) |
|
os.chdir('..') |
|
|
|
"""Load the model into memory to make running multiple predictions efficient""" |
|
self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx', |
|
providers=onnxruntime.get_available_providers()) |
|
self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1) |
|
self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l') |
|
self.face_analyser.prepare(ctx_id=0, det_size=(640, 640)) |
|
|
|
def get_face(self, img_data): |
|
analysed = self.face_analyser.get(img_data) |
|
try: |
|
largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) |
|
return largest |
|
except: |
|
print("No face found") |
|
return None |
|
|
|
def process_images(self, input_image, swap_image): |
|
"""Process a pair of images: target image and swap image""" |
|
try: |
|
|
|
input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8) |
|
swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8) |
|
|
|
frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR) |
|
swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR) |
|
|
|
face = self.get_face(frame) |
|
source_face = self.get_face(swap_frame) |
|
|
|
if face is None or source_face is None: |
|
return None |
|
|
|
result = self.face_swapper.get(frame, face, source_face, paste_back=True) |
|
_, _, result = self.face_enhancer.enhance(result, paste_back=True) |
|
|
|
|
|
_, result_image = cv2.imencode('.jpg', result) |
|
return result_image.tobytes() |
|
|
|
except Exception as e: |
|
print(f"Error in processing images: {e}") |
|
return None |
|
|
|
|
|
predictor = Predictor() |
|
|
|
@app.route('/predict', methods=['POST']) |
|
def predict(): |
|
if 'target_images' not in request.files or 'swap_images' not in request.files: |
|
return jsonify({'error': 'No image files provided'}), 400 |
|
|
|
target_images = request.files.getlist('target_images') |
|
swap_images = request.files.getlist('swap_images') |
|
|
|
if len(target_images) != len(swap_images): |
|
return jsonify({'error': 'Number of target images must match number of swap images'}), 400 |
|
|
|
results = [] |
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
future_to_pair = { |
|
executor.submit(predictor.process_images, target_images[i], swap_images[i]): i |
|
for i in range(len(target_images)) |
|
} |
|
|
|
for future in concurrent.futures.as_completed(future_to_pair): |
|
idx = future_to_pair[future] |
|
result = future.result() |
|
if result: |
|
results.append({ |
|
'index': idx, |
|
'result_image': result |
|
}) |
|
else: |
|
results.append({ |
|
'index': idx, |
|
'error': 'Face swap failed' |
|
}) |
|
|
|
return jsonify({'results': results}) |
|
|
|
if __name__ == "__main__": |
|
app.run(debug=True, threaded=True) |
|
|