API / app.py
ar08's picture
Update app.py
5856e1e verified
from flask import Flask, request, jsonify
import os
import time
import tempfile
import cv2
import insightface
import onnxruntime
import gfpgan
import io
import concurrent.futures
import numpy as np
from PIL import Image
app = Flask(__name__)
class Predictor:
def __init__(self):
self.setup()
def setup(self):
os.makedirs('models', exist_ok=True)
os.chdir('models')
if not os.path.exists('GFPGANv1.4.pth'):
os.system(
'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth'
)
if not os.path.exists('inswapper_128.onnx'):
os.system(
'wget https://huggingface.co./ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx'
)
os.chdir('..')
"""Load the model into memory to make running multiple predictions efficient"""
self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx',
providers=onnxruntime.get_available_providers())
self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1)
self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l')
self.face_analyser.prepare(ctx_id=0, det_size=(640, 640))
def get_face(self, img_data):
analysed = self.face_analyser.get(img_data)
try:
largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
return largest
except:
print("No face found")
return None
def process_images(self, input_image, swap_image):
"""Process a pair of images: target image and swap image"""
try:
# Read the input images directly from memory
input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8)
swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8)
frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR)
swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR)
face = self.get_face(frame)
source_face = self.get_face(swap_frame)
if face is None or source_face is None:
return None
result = self.face_swapper.get(frame, face, source_face, paste_back=True)
_, _, result = self.face_enhancer.enhance(result, paste_back=True)
# Create a result image in memory
_, result_image = cv2.imencode('.jpg', result)
return result_image.tobytes()
except Exception as e:
print(f"Error in processing images: {e}")
return None
# Instantiate the Predictor class
predictor = Predictor()
@app.route('/predict', methods=['POST'])
def predict():
if 'target_images' not in request.files or 'swap_images' not in request.files:
return jsonify({'error': 'No image files provided'}), 400
target_images = request.files.getlist('target_images')
swap_images = request.files.getlist('swap_images')
if len(target_images) != len(swap_images):
return jsonify({'error': 'Number of target images must match number of swap images'}), 400
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_pair = {
executor.submit(predictor.process_images, target_images[i], swap_images[i]): i
for i in range(len(target_images))
}
for future in concurrent.futures.as_completed(future_to_pair):
idx = future_to_pair[future]
result = future.result()
if result:
results.append({
'index': idx,
'result_image': result
})
else:
results.append({
'index': idx,
'error': 'Face swap failed'
})
return jsonify({'results': results})
if __name__ == "__main__":
app.run(debug=True, threaded=True)