File size: 4,084 Bytes
5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e c3adbf1 5856e1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
from flask import Flask, request, jsonify
import os
import time
import tempfile
import cv2
import insightface
import onnxruntime
import gfpgan
import io
import concurrent.futures
import numpy as np
from PIL import Image
app = Flask(__name__)
class Predictor:
def __init__(self):
self.setup()
def setup(self):
os.makedirs('models', exist_ok=True)
os.chdir('models')
if not os.path.exists('GFPGANv1.4.pth'):
os.system(
'wget https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth'
)
if not os.path.exists('inswapper_128.onnx'):
os.system(
'wget https://huggingface.co./ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx'
)
os.chdir('..')
"""Load the model into memory to make running multiple predictions efficient"""
self.face_swapper = insightface.model_zoo.get_model('models/inswapper_128.onnx',
providers=onnxruntime.get_available_providers())
self.face_enhancer = gfpgan.GFPGANer(model_path='models/GFPGANv1.4.pth', upscale=1)
self.face_analyser = insightface.app.FaceAnalysis(name='buffalo_l')
self.face_analyser.prepare(ctx_id=0, det_size=(640, 640))
def get_face(self, img_data):
analysed = self.face_analyser.get(img_data)
try:
largest = max(analysed, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
return largest
except:
print("No face found")
return None
def process_images(self, input_image, swap_image):
"""Process a pair of images: target image and swap image"""
try:
# Read the input images directly from memory
input_image_data = np.asarray(bytearray(input_image.read()), dtype=np.uint8)
swap_image_data = np.asarray(bytearray(swap_image.read()), dtype=np.uint8)
frame = cv2.imdecode(input_image_data, cv2.IMREAD_COLOR)
swap_frame = cv2.imdecode(swap_image_data, cv2.IMREAD_COLOR)
face = self.get_face(frame)
source_face = self.get_face(swap_frame)
if face is None or source_face is None:
return None
result = self.face_swapper.get(frame, face, source_face, paste_back=True)
_, _, result = self.face_enhancer.enhance(result, paste_back=True)
# Create a result image in memory
_, result_image = cv2.imencode('.jpg', result)
return result_image.tobytes()
except Exception as e:
print(f"Error in processing images: {e}")
return None
# Instantiate the Predictor class
predictor = Predictor()
@app.route('/predict', methods=['POST'])
def predict():
if 'target_images' not in request.files or 'swap_images' not in request.files:
return jsonify({'error': 'No image files provided'}), 400
target_images = request.files.getlist('target_images')
swap_images = request.files.getlist('swap_images')
if len(target_images) != len(swap_images):
return jsonify({'error': 'Number of target images must match number of swap images'}), 400
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_pair = {
executor.submit(predictor.process_images, target_images[i], swap_images[i]): i
for i in range(len(target_images))
}
for future in concurrent.futures.as_completed(future_to_pair):
idx = future_to_pair[future]
result = future.result()
if result:
results.append({
'index': idx,
'result_image': result
})
else:
results.append({
'index': idx,
'error': 'Face swap failed'
})
return jsonify({'results': results})
if __name__ == "__main__":
app.run(debug=True, threaded=True)
|