Spaces:
Running
Running
File size: 3,863 Bytes
9aaf513 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
import functools
import numpy as np
from fastapi import (
File,
UploadFile,
)
import gradio as gr
from fastapi import APIRouter, BackgroundTasks, Depends, Response, status
from fastapi.responses import FileResponse
from typing import List, Dict, Tuple
from datetime import datetime
import os
from modules.whisper.data_classes import *
from modules.uvr.music_separator import MusicSeparator
from modules.utils.paths import BACKEND_CACHE_DIR
from backend.common.audio import read_audio
from backend.common.models import QueueResponse
from backend.common.config_loader import load_server_config
from backend.common.compresser import get_file_hash, find_file_by_hash
from backend.db.task.models import TaskStatus, TaskType, ResultType
from backend.db.task.dao import add_task_to_db, update_task_status_in_db
from .models import BGMSeparationResult
bgm_separation_router = APIRouter(prefix="/bgm-separation", tags=["BGM Separation"])
@functools.lru_cache
def get_bgm_separation_inferencer() -> 'MusicSeparator':
config = load_server_config()["bgm_separation"]
inferencer = MusicSeparator(
output_dir=os.path.join(BACKEND_CACHE_DIR, "UVR")
)
inferencer.update_model(
model_name=config["model_size"],
device=config["device"]
)
return inferencer
def run_bgm_separation(
audio: np.ndarray,
params: BGMSeparationParams,
identifier: str,
) -> Tuple[np.ndarray, np.ndarray]:
update_task_status_in_db(
identifier=identifier,
update_data={
"uuid": identifier,
"status": TaskStatus.IN_PROGRESS,
"updated_at": datetime.utcnow()
}
)
start_time = datetime.utcnow()
instrumental, vocal, filepaths = get_bgm_separation_inferencer().separate(
audio=audio,
model_name=params.uvr_model_size,
device=params.uvr_device,
segment_size=params.segment_size,
save_file=True,
progress=gr.Progress()
)
instrumental_path, vocal_path = filepaths
elapsed_time = (datetime.utcnow() - start_time).total_seconds()
update_task_status_in_db(
identifier=identifier,
update_data={
"uuid": identifier,
"status": TaskStatus.COMPLETED,
"result": BGMSeparationResult(
instrumental_hash=get_file_hash(instrumental_path),
vocal_hash=get_file_hash(vocal_path)
).model_dump(),
"result_type": ResultType.FILEPATH,
"updated_at": datetime.utcnow(),
"duration": elapsed_time
}
)
return instrumental, vocal
@bgm_separation_router.post(
"/",
response_model=QueueResponse,
status_code=status.HTTP_201_CREATED,
summary="Separate Background BGM abd vocal",
description="Separate background music and vocal from an uploaded audio or video file.",
)
async def bgm_separation(
background_tasks: BackgroundTasks,
file: UploadFile = File(..., description="Audio or video file to separate background music."),
params: BGMSeparationParams = Depends()
) -> QueueResponse:
if not isinstance(file, np.ndarray):
audio, info = await read_audio(file=file)
else:
audio, info = file, None
identifier = add_task_to_db(
status=TaskStatus.QUEUED,
file_name=file.filename,
audio_duration=info.duration if info else None,
task_type=TaskType.BGM_SEPARATION,
task_params=params.model_dump(),
)
background_tasks.add_task(
run_bgm_separation,
audio=audio,
params=params,
identifier=identifier
)
return QueueResponse(identifier=identifier, status=TaskStatus.QUEUED, message="BGM Separation task has queued")
|