from flask import Flask, request, jsonify import os import time import tempfile import cv2 import insightface import onnxruntime import gfpgan import io import concurrent.futures import numpy as np from PIL import Image app = Flask(__name__) class Predictor: def __init__(self): self.setup() def setup(self): os.makedirs('models', exist_ok=True) os.chdir('models') if not os.path.exists('GFPGANv1.4.pth'): os.system( 'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth' ) if not os.path.exists('inswapper_128.onnx'): os.system( 'wget https://huggingface.co./ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx' ) os.chdir('..') """Load the model into memory to make running multiple predictions efficient""" self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx', providers=onnxruntime.get_available_providers()) self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1) self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l') self.face_analyser.prepare(ctx_id=0, det_size=(640, 640)) def get_face(self, img_data): analysed = self.face_analyser.get(img_data) try: largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) return largest except: print("No face found") return None def process_images(self, input_image, swap_image): """Process a pair of images: target image and swap image""" try: # Read the input images directly from memory input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8) swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8) frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR) swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR) face = self.get_face(frame) source_face = self.get_face(swap_frame) if face is None or source_face is None: return None result = self.face_swapper.get(frame, face, source_face, paste_back=True) _, _, result = self.face_enhancer.enhance(result, paste_back=True) # Create a result image in memory _, result_image = cv2.imencode('.jpg', result) return result_image.tobytes() except Exception as e: print(f"Error in processing images: {e}") return None # Instantiate the Predictor class predictor = Predictor() @app.route('/predict', methods=['POST']) def predict(): if 'target_images' not in request.files or 'swap_images' not in request.files: return jsonify({'error': 'No image files provided'}), 400 target_images = request.files.getlist('target_images') swap_images = request.files.getlist('swap_images') if len(target_images) != len(swap_images): return jsonify({'error': 'Number of target images must match number of swap images'}), 400 results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_pair = { executor.submit(predictor.process_images, target_images[i], swap_images[i]): i for i in range(len(target_images)) } for future in concurrent.futures.as_completed(future_to_pair): idx = future_to_pair[future] result = future.result() if result: results.append({ 'index': idx, 'result_image': result }) else: results.append({ 'index': idx, 'error': 'Face swap failed' }) return jsonify({'results': results}) if __name__ == "__main__": app.run(debug=True, threaded=True)