File size: 3,863 Bytes
9aaf513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import functools
import numpy as np
from fastapi import (
    File,
    UploadFile,
)
import gradio as gr
from fastapi import APIRouter, BackgroundTasks, Depends, Response, status
from fastapi.responses import FileResponse
from typing import List, Dict, Tuple
from datetime import datetime
import os

from modules.whisper.data_classes import *
from modules.uvr.music_separator import MusicSeparator
from modules.utils.paths import BACKEND_CACHE_DIR
from backend.common.audio import read_audio
from backend.common.models import QueueResponse
from backend.common.config_loader import load_server_config
from backend.common.compresser import get_file_hash, find_file_by_hash
from backend.db.task.models import TaskStatus, TaskType, ResultType
from backend.db.task.dao import add_task_to_db, update_task_status_in_db
from .models import BGMSeparationResult


bgm_separation_router = APIRouter(prefix="/bgm-separation", tags=["BGM Separation"])


@functools.lru_cache
def get_bgm_separation_inferencer() -> 'MusicSeparator':
    config = load_server_config()["bgm_separation"]
    inferencer = MusicSeparator(
        output_dir=os.path.join(BACKEND_CACHE_DIR, "UVR")
    )
    inferencer.update_model(
        model_name=config["model_size"],
        device=config["device"]
    )
    return inferencer


def run_bgm_separation(

    audio: np.ndarray,

    params: BGMSeparationParams,

    identifier: str,

) -> Tuple[np.ndarray, np.ndarray]:
    update_task_status_in_db(
        identifier=identifier,
        update_data={
            "uuid": identifier,
            "status": TaskStatus.IN_PROGRESS,
            "updated_at": datetime.utcnow()
        }
    )

    start_time = datetime.utcnow()
    instrumental, vocal, filepaths = get_bgm_separation_inferencer().separate(
        audio=audio,
        model_name=params.uvr_model_size,
        device=params.uvr_device,
        segment_size=params.segment_size,
        save_file=True,
        progress=gr.Progress()
    )
    instrumental_path, vocal_path = filepaths
    elapsed_time = (datetime.utcnow() - start_time).total_seconds()

    update_task_status_in_db(
        identifier=identifier,
        update_data={
            "uuid": identifier,
            "status": TaskStatus.COMPLETED,
            "result": BGMSeparationResult(
                instrumental_hash=get_file_hash(instrumental_path),
                vocal_hash=get_file_hash(vocal_path)
            ).model_dump(),
            "result_type": ResultType.FILEPATH,
            "updated_at": datetime.utcnow(),
            "duration": elapsed_time
        }
    )
    return instrumental, vocal


@bgm_separation_router.post(

    "/",

    response_model=QueueResponse,

    status_code=status.HTTP_201_CREATED,

    summary="Separate Background BGM abd vocal",

    description="Separate background music and vocal from an uploaded audio or video file.",

)
async def bgm_separation(

    background_tasks: BackgroundTasks,

    file: UploadFile = File(..., description="Audio or video file to separate background music."),

    params: BGMSeparationParams = Depends()

) -> QueueResponse:
    if not isinstance(file, np.ndarray):
        audio, info = await read_audio(file=file)
    else:
        audio, info = file, None

    identifier = add_task_to_db(
        status=TaskStatus.QUEUED,
        file_name=file.filename,
        audio_duration=info.duration if info else None,
        task_type=TaskType.BGM_SEPARATION,
        task_params=params.model_dump(),
    )

    background_tasks.add_task(
        run_bgm_separation,
        audio=audio,
        params=params,
        identifier=identifier
    )

    return QueueResponse(identifier=identifier, status=TaskStatus.QUEUED, message="BGM Separation task has queued")