Spaces:
Sleeping
Sleeping
import os | |
import time | |
import threading | |
import queue | |
import multiprocessing | |
from pathlib import Path | |
import torch | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
import numpy as np | |
# Set up environment variables for CPU optimization | |
os.environ["OMP_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal OpenMP threads | |
os.environ["MKL_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal MKL threads | |
os.environ["LLAMA_AVX"] = "1" | |
os.environ["LLAMA_AVX2"] = "1" | |
os.environ["LLAMA_F16"] = "1" | |
# Cache directories | |
CACHE_DIR = Path.home() / ".cache" / "fast_translate" | |
MODEL_CACHE = CACHE_DIR / "models" | |
QUANTIZED_CACHE = CACHE_DIR / "quantized" | |
os.makedirs(MODEL_CACHE, exist_ok=True) | |
os.makedirs(QUANTIZED_CACHE, exist_ok=True) | |
# Check if we're running on CPU | |
has_gpu = torch.cuda.is_available() | |
gpu_name = torch.cuda.get_device_name(0) if has_gpu else "No GPU" | |
print(f"GPU available: {has_gpu} - {gpu_name}") | |
# Configure CPU settings | |
cpu_count = multiprocessing.cpu_count() | |
optimal_threads = max(4, cpu_count - 1) # Leave one core free | |
print(f"Using {optimal_threads} of {cpu_count} CPU cores") | |
# Download model files | |
def get_model_path(repo_id, filename): | |
print(f"Obtaining {filename}...") | |
# Download to our custom cache location | |
return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=MODEL_CACHE) | |
# Function to quantize model to int4 or int8 | |
def quantize_model(input_model_path, output_model_path, quantization_type="q4_0"): | |
"""Quantize model to lower precision for faster inference on CPU""" | |
try: | |
from llama_cpp import llama_model_quantize | |
# Check if quantized model already exists | |
if os.path.exists(output_model_path): | |
print(f"Using existing quantized model: {output_model_path}") | |
return output_model_path | |
print(f"Quantizing model to {quantization_type}...") | |
start_time = time.time() | |
# Quantize using llama-cpp-python built-in quantization | |
llama_model_quantize( | |
input_model_path, | |
output_model_path, | |
quantization_type | |
) | |
print(f"Quantization completed in {time.time() - start_time:.2f}s") | |
return output_model_path | |
except Exception as e: | |
print(f"Quantization failed: {e}, using original model") | |
return input_model_path | |
# Download models | |
base_model_path = get_model_path( | |
"johnpaulbin/articulate-11-expspanish-base-merged-Q8_0-GGUF", | |
"articulate-11-expspanish-base-merged-q8_0.gguf" | |
) | |
adapter_path = get_model_path( | |
"johnpaulbin/articulate-V1-Q8_0-GGUF", | |
"articulate-V1-q8_0.gguf" | |
) | |
# Quantize models (creates int4 versions for faster CPU inference) | |
quantized_base_path = str(QUANTIZED_CACHE / "articulate-base-q4_0.gguf") | |
quantized_adapter_path = str(QUANTIZED_CACHE / "articulate-adapter-q4_0.gguf") | |
base_model_path = quantize_model(base_model_path, quantized_base_path, "q4_0") | |
adapter_path = quantize_model(adapter_path, quantized_adapter_path, "q4_0") | |
# Import after setting environment variables | |
from llama_cpp import Llama | |
# Translation cache | |
translation_cache = {} | |
MAX_CACHE_SIZE = 1000 | |
# Model worker with batching support | |
class ModelWorker: | |
def __init__(self): | |
self.model = None | |
self.request_queue = queue.Queue() | |
self.response_queue = queue.Queue() | |
self.batch_queue = [] | |
self.batch_event = threading.Event() | |
self.batch_size = 4 # Process up to 4 requests at once | |
self.batch_timeout = 0.1 # Wait 100ms max to collect batch | |
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) | |
self.batch_thread = threading.Thread(target=self._batch_loop, daemon=True) | |
self.worker_thread.start() | |
self.batch_thread.start() | |
def _batch_loop(self): | |
"""Collect requests into batches for more efficient processing""" | |
while True: | |
try: | |
# Get a request | |
request = self.request_queue.get() | |
if request is None: | |
break | |
# Add to batch | |
self.batch_queue.append(request) | |
# Try to collect more requests for the batch | |
batch_start = time.time() | |
while (len(self.batch_queue) < self.batch_size and | |
time.time() - batch_start < self.batch_timeout): | |
try: | |
req = self.request_queue.get_nowait() | |
if req is None: | |
break | |
self.batch_queue.append(req) | |
except queue.Empty: | |
time.sleep(0.01) | |
# Signal worker to process the batch | |
current_batch = self.batch_queue.copy() | |
self.batch_queue = [] | |
for req in current_batch: | |
self._process_request(req) | |
except Exception as e: | |
print(f"Error in batch thread: {e}") | |
def _worker_loop(self): | |
"""Initialize model and process requests""" | |
try: | |
# Initialize model with optimized settings | |
print("Initializing model in background thread...") | |
start_time = time.time() | |
# Create model context with very optimized settings for CPU | |
self.model = Llama( | |
model_path=base_model_path, | |
lora_path=adapter_path, | |
n_ctx=256, # Smaller context for speed | |
n_threads=optimal_threads, # Use all but one CPU core | |
n_batch=512, # Smaller batch for CPU | |
use_mmap=True, # Memory mapping (more efficient) | |
n_gpu_layers=0, # Force CPU only | |
seed=42, # Consistent results | |
rope_freq_base=10000, # Default RoPE settings | |
rope_freq_scale=1.0, | |
verbose=False # Reduce overhead | |
) | |
print(f"Model loaded in {time.time() - start_time:.2f} seconds") | |
# Pre-warm the model with common phrases by running a simple inference | |
print("Pre-warming model...") | |
self.model.create_completion("[ENGLISH]hello[SPANISH]", max_tokens=8) | |
print("Model ready for translation") | |
except Exception as e: | |
print(f"Failed to initialize model: {e}") | |
def _process_request(self, request): | |
"""Process a single translation request""" | |
try: | |
direction, text, callback_id = request | |
result = self._process_translation(direction, text) | |
self.response_queue.put((callback_id, result)) | |
except Exception as e: | |
print(f"Error processing request: {e}") | |
self.response_queue.put((callback_id, f"Error: {str(e)}")) | |
def _process_translation(self, direction, text): | |
"""Translate text with optimized settings""" | |
if not text or not text.strip(): | |
return "" | |
# Check cache first for faster response | |
cache_key = f"{direction}:{text}" | |
if cache_key in translation_cache: | |
print("Cache hit!") | |
return translation_cache[cache_key] | |
# Start timing for performance tracking | |
start_time = time.time() | |
# Map language directions | |
lang_map = { | |
"English to Spanish": ("ENGLISH", "SPANISH"), | |
"Spanish to English": ("SPANISH", "ENGLISH"), | |
"Korean to English": ("KOREAN", "ENGLISH"), | |
"English to Korean": ("ENGLISH", "KOREAN") | |
} | |
if direction not in lang_map: | |
return "Invalid direction" | |
source_lang, target_lang = lang_map[direction] | |
# Efficient prompt format | |
prompt = f"[{source_lang}]{text.strip()}[{target_lang}]" | |
# Estimate appropriate token length based on input | |
input_tokens = min(100, max(10, len(text.split()))) | |
max_tokens = min(100, max(25, int(input_tokens * 1.3))) | |
# Generate translation with aggressively optimized settings for speed | |
response = self.model.create_completion( | |
prompt, | |
max_tokens=max_tokens, | |
temperature=0.0, # Deterministic | |
top_k=1, # Most likely token | |
top_p=1.0, # No sampling | |
repeat_penalty=1.0, # No penalty | |
stream=False # Get complete response | |
) | |
translation = response['choices'][0]['text'].strip() | |
# Cache result | |
if len(translation_cache) >= MAX_CACHE_SIZE: | |
# Remove oldest entry (first key) | |
translation_cache.pop(next(iter(translation_cache))) | |
translation_cache[cache_key] = translation | |
# Log performance | |
inference_time = time.time() - start_time | |
tokens_per_second = (input_tokens + len(translation.split())) / inference_time | |
print(f"Translation: {inference_time:.3f}s ({tokens_per_second:.1f} tokens/sec)") | |
return translation | |
def request_translation(self, direction, text, callback_id): | |
"""Queue a translation request""" | |
self.request_queue.put((direction, text, callback_id)) | |
# Model preloading thread that preloads and pre-computes common translations | |
def preload_common_phrases(worker): | |
# Dictionary of common phrases that will benefit from caching | |
common_phrases = { | |
"English to Spanish": [ | |
"Hello", "Thank you", "Good morning", "How are you?", "What's your name?", | |
"I don't understand", "Please", "Sorry", "Yes", "No", "Where is", | |
"How much does it cost?", "What time is it?", "I don't speak Spanish", | |
"Where is the bathroom?", "I need help", "Can you help me?" | |
], | |
"Spanish to English": [ | |
"Hola", "Gracias", "Buenos dรญas", "ยฟCรณmo estรกs?", "ยฟCรณmo te llamas?", | |
"No entiendo", "Por favor", "Lo siento", "Sรญ", "No", "Dรณnde estรก", | |
"ยฟCuรกnto cuesta?", "ยฟQuรฉ hora es?", "No hablo espaรฑol", "ยฟDรณnde estรก el baรฑo?", | |
"Necesito ayuda", "ยฟPuedes ayudarme?" | |
], | |
"English to Korean": [ | |
"Hello", "Thank you", "Good morning", "How are you?", "What's your name?", | |
"I don't understand", "Please", "Sorry", "Yes", "No", "Where is", | |
"How much is this?", "What time is it?", "I don't speak Korean" | |
], | |
"Korean to English": [ | |
"์๋ ํ์ธ์", "๊ฐ์ฌํฉ๋๋ค", "์ข์ ์์นจ์ ๋๋ค", "์ด๋ป๊ฒ ์ง๋ด์ธ์?", "์ด๋ฆ์ด ๋ญ์์?", | |
"์ดํด๊ฐ ์ ๋ผ์", "์ ๋ฐ", "์ฃ์กํฉ๋๋ค", "๋ค", "์๋์", "์ด๋์ ์์ด์", | |
"์ด๊ฑฐ ์ผ๋ง์์?", "์ง๊ธ ๋ช ์์์?", "ํ๊ตญ์ด๋ฅผ ๋ชปํด์" | |
] | |
} | |
preload_requests = [] | |
for direction, phrases in common_phrases.items(): | |
for phrase in phrases: | |
preload_requests.append((direction, phrase, f"preload_{len(preload_requests)}")) | |
# Process preloading in a separate thread | |
def preloader(): | |
print(f"Preloading {len(preload_requests)} common phrases in background...") | |
for request in preload_requests: | |
worker.request_translation(*request) | |
# Small sleep to avoid overwhelming the queue | |
time.sleep(0.1) | |
print("Preloading complete") | |
thread = threading.Thread(target=preloader, daemon=True) | |
thread.start() | |
return thread | |
# Create worker instance | |
worker = ModelWorker() | |
# Start preloading common phrases in background | |
preload_thread = preload_common_phrases(worker) | |
# Counter for request IDs | |
next_request_id = 0 | |
# Implementation of a faster sentence splitter for batching | |
def split_sentences(text, max_length=50): | |
"""Split text into manageable chunks for faster translation""" | |
if len(text) <= max_length: | |
return [text] | |
# Split on natural boundaries | |
delimiters = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '\n\n'] | |
chunks = [] | |
current_chunk = "" | |
lines = text.split('\n') | |
for line in lines: | |
if not line.strip(): | |
if current_chunk: | |
chunks.append(current_chunk) | |
current_chunk = "" | |
continue | |
words = line.split(' ') | |
for word in words: | |
test_chunk = f"{current_chunk} {word}".strip() | |
if len(test_chunk) > max_length: | |
chunks.append(current_chunk) | |
current_chunk = word | |
else: | |
current_chunk = test_chunk | |
# Check for natural breaks | |
for delimiter in delimiters: | |
if delimiter in current_chunk[-len(delimiter):]: | |
chunks.append(current_chunk) | |
current_chunk = "" | |
break | |
if current_chunk: | |
chunks.append(current_chunk) | |
return chunks | |
# Gradio interface functions | |
def translate(direction, text, progress=gr.Progress()): | |
"""Fast translation with batching and caching""" | |
global next_request_id | |
# Skip empty inputs | |
if not text or not text.strip(): | |
return "" | |
# Check exact cache hit | |
cache_key = f"{direction}:{text}" | |
if cache_key in translation_cache: | |
return translation_cache[cache_key] | |
# For longer texts, split into sentences for faster processing | |
if len(text) > 50: | |
progress(0.1, desc="Processing text...") | |
chunks = split_sentences(text) | |
if len(chunks) > 1: | |
results = [] | |
for i, chunk in enumerate(chunks): | |
# Check if this chunk is in cache | |
chunk_key = f"{direction}:{chunk}" | |
if chunk_key in translation_cache: | |
results.append(translation_cache[chunk_key]) | |
continue | |
# Request translation for this chunk | |
chunk_id = next_request_id | |
next_request_id += 1 | |
worker.request_translation(direction, chunk, chunk_id) | |
# Wait for response | |
chunk_start = time.time() | |
while time.time() - chunk_start < 10: # 10 second timeout per chunk | |
progress((i + 0.5) / len(chunks), desc=f"Translating part {i+1}/{len(chunks)}") | |
try: | |
while not worker.response_queue.empty(): | |
resp_id, result = worker.response_queue.get_nowait() | |
if resp_id == chunk_id: | |
results.append(result) | |
chunk_found = True | |
break | |
except queue.Empty: | |
pass | |
time.sleep(0.05) | |
if len(results) != i + 1: | |
results.append(f"[Translation failed for part {i+1}]") | |
combined = " ".join(results) | |
translation_cache[cache_key] = combined | |
progress(1.0) | |
return combined | |
# For single sentences | |
request_id = next_request_id | |
next_request_id += 1 | |
# Queue the request | |
worker.request_translation(direction, text, request_id) | |
# Wait for the response | |
progress(0.2, desc="Translating...") | |
start_time = time.time() | |
max_wait = 20 # Maximum wait time in seconds | |
while time.time() - start_time < max_wait: | |
progress(0.2 + 0.8 * ((time.time() - start_time) / max_wait), desc="Translating...") | |
# Check for our response | |
try: | |
while not worker.response_queue.empty(): | |
resp_id, result = worker.response_queue.get_nowait() | |
if resp_id == request_id: | |
progress(1.0) | |
return result | |
except queue.Empty: | |
pass | |
# Small sleep to prevent CPU hogging | |
time.sleep(0.05) | |
progress(1.0) | |
return "Translation timed out. Please try again with a shorter text." | |
# Create Gradio interface | |
with gr.Blocks(title="Ultra-Fast Translation App (CPU Optimized)") as iface: | |
gr.Markdown(f""" | |
## Ultra-Fast Translation App (CPU Optimized) | |
Running on: {'GPU: ' + gpu_name if has_gpu else 'CPU optimized with int4 quantization'} | |
""") | |
with gr.Row(): | |
direction = gr.Dropdown( | |
choices=["English to Spanish", "Spanish to English", "Korean to English", "English to Korean"], | |
label="Translation Direction", | |
value="English to Spanish" | |
) | |
with gr.Row(): | |
input_text = gr.Textbox(lines=5, label="Input Text", placeholder="Enter text to translate...") | |
output_text = gr.Textbox(lines=5, label="Translation") | |
# Add translate button | |
translate_btn = gr.Button("Translate") | |
translate_btn.click(fn=translate, inputs=[direction, input_text], outputs=output_text) | |
# Optimization options | |
with gr.Accordion("Performance Tips", open=True): | |
gr.Markdown(""" | |
### Speed Optimization Tips | |
- โ The model has been quantized to int4 for faster CPU execution | |
- โ Common phrases are pre-cached for instant results | |
- โ Long text is automatically split into smaller chunks | |
- โ First translation will be slower as the model warms up | |
- โ Short sentences (< 50 chars) translate much faster | |
""") | |
# Add examples with preloaded common phrases | |
gr.Examples( | |
examples=[ | |
["English to Spanish", "Hello, how are you today?"], | |
["Spanish to English", "Hola, ยฟcรณmo estรกs hoy?"], | |
["English to Korean", "The weather is nice today."], | |
["Korean to English", "์๋ ํ์ธ์, ๋ง๋์ ๋ฐ๊ฐ์ต๋๋ค."] | |
], | |
inputs=[direction, input_text], | |
fn=translate, | |
outputs=output_text | |
) | |
# Launch with optimized settings | |
if __name__ == "__main__": | |
iface.launch( | |
debug=False, | |
show_error=True, | |
share=False, | |
quiet=True, | |
server_name="0.0.0.0", | |
server_port=7860 | |
) |