googoo / app.py
johnpaulbin's picture
Update app.py
3ca3b8c verified
raw
history blame
18.6 kB
import os
import time
import threading
import queue
import multiprocessing
from pathlib import Path
import torch
import gradio as gr
from huggingface_hub import hf_hub_download
import numpy as np
# Set up environment variables for CPU optimization
os.environ["OMP_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal OpenMP threads
os.environ["MKL_NUM_THREADS"] = str(max(1, multiprocessing.cpu_count() - 1)) # Optimal MKL threads
os.environ["LLAMA_AVX"] = "1"
os.environ["LLAMA_AVX2"] = "1"
os.environ["LLAMA_F16"] = "1"
# Cache directories
CACHE_DIR = Path.home() / ".cache" / "fast_translate"
MODEL_CACHE = CACHE_DIR / "models"
QUANTIZED_CACHE = CACHE_DIR / "quantized"
os.makedirs(MODEL_CACHE, exist_ok=True)
os.makedirs(QUANTIZED_CACHE, exist_ok=True)
# Check if we're running on CPU
has_gpu = torch.cuda.is_available()
gpu_name = torch.cuda.get_device_name(0) if has_gpu else "No GPU"
print(f"GPU available: {has_gpu} - {gpu_name}")
# Configure CPU settings
cpu_count = multiprocessing.cpu_count()
optimal_threads = max(4, cpu_count - 1) # Leave one core free
print(f"Using {optimal_threads} of {cpu_count} CPU cores")
# Download model files
def get_model_path(repo_id, filename):
print(f"Obtaining {filename}...")
# Download to our custom cache location
return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=MODEL_CACHE)
# Function to quantize model to int4 or int8
def quantize_model(input_model_path, output_model_path, quantization_type="q4_0"):
"""Quantize model to lower precision for faster inference on CPU"""
try:
from llama_cpp import llama_model_quantize
# Check if quantized model already exists
if os.path.exists(output_model_path):
print(f"Using existing quantized model: {output_model_path}")
return output_model_path
print(f"Quantizing model to {quantization_type}...")
start_time = time.time()
# Quantize using llama-cpp-python built-in quantization
llama_model_quantize(
input_model_path,
output_model_path,
quantization_type
)
print(f"Quantization completed in {time.time() - start_time:.2f}s")
return output_model_path
except Exception as e:
print(f"Quantization failed: {e}, using original model")
return input_model_path
# Download models
base_model_path = get_model_path(
"johnpaulbin/articulate-11-expspanish-base-merged-Q8_0-GGUF",
"articulate-11-expspanish-base-merged-q8_0.gguf"
)
adapter_path = get_model_path(
"johnpaulbin/articulate-V1-Q8_0-GGUF",
"articulate-V1-q8_0.gguf"
)
# Quantize models (creates int4 versions for faster CPU inference)
quantized_base_path = str(QUANTIZED_CACHE / "articulate-base-q4_0.gguf")
quantized_adapter_path = str(QUANTIZED_CACHE / "articulate-adapter-q4_0.gguf")
base_model_path = quantize_model(base_model_path, quantized_base_path, "q4_0")
adapter_path = quantize_model(adapter_path, quantized_adapter_path, "q4_0")
# Import after setting environment variables
from llama_cpp import Llama
# Translation cache
translation_cache = {}
MAX_CACHE_SIZE = 1000
# Model worker with batching support
class ModelWorker:
def __init__(self):
self.model = None
self.request_queue = queue.Queue()
self.response_queue = queue.Queue()
self.batch_queue = []
self.batch_event = threading.Event()
self.batch_size = 4 # Process up to 4 requests at once
self.batch_timeout = 0.1 # Wait 100ms max to collect batch
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.batch_thread = threading.Thread(target=self._batch_loop, daemon=True)
self.worker_thread.start()
self.batch_thread.start()
def _batch_loop(self):
"""Collect requests into batches for more efficient processing"""
while True:
try:
# Get a request
request = self.request_queue.get()
if request is None:
break
# Add to batch
self.batch_queue.append(request)
# Try to collect more requests for the batch
batch_start = time.time()
while (len(self.batch_queue) < self.batch_size and
time.time() - batch_start < self.batch_timeout):
try:
req = self.request_queue.get_nowait()
if req is None:
break
self.batch_queue.append(req)
except queue.Empty:
time.sleep(0.01)
# Signal worker to process the batch
current_batch = self.batch_queue.copy()
self.batch_queue = []
for req in current_batch:
self._process_request(req)
except Exception as e:
print(f"Error in batch thread: {e}")
def _worker_loop(self):
"""Initialize model and process requests"""
try:
# Initialize model with optimized settings
print("Initializing model in background thread...")
start_time = time.time()
# Create model context with very optimized settings for CPU
self.model = Llama(
model_path=base_model_path,
lora_path=adapter_path,
n_ctx=256, # Smaller context for speed
n_threads=optimal_threads, # Use all but one CPU core
n_batch=512, # Smaller batch for CPU
use_mmap=True, # Memory mapping (more efficient)
n_gpu_layers=0, # Force CPU only
seed=42, # Consistent results
rope_freq_base=10000, # Default RoPE settings
rope_freq_scale=1.0,
verbose=False # Reduce overhead
)
print(f"Model loaded in {time.time() - start_time:.2f} seconds")
# Pre-warm the model with common phrases by running a simple inference
print("Pre-warming model...")
self.model.create_completion("[ENGLISH]hello[SPANISH]", max_tokens=8)
print("Model ready for translation")
except Exception as e:
print(f"Failed to initialize model: {e}")
def _process_request(self, request):
"""Process a single translation request"""
try:
direction, text, callback_id = request
result = self._process_translation(direction, text)
self.response_queue.put((callback_id, result))
except Exception as e:
print(f"Error processing request: {e}")
self.response_queue.put((callback_id, f"Error: {str(e)}"))
def _process_translation(self, direction, text):
"""Translate text with optimized settings"""
if not text or not text.strip():
return ""
# Check cache first for faster response
cache_key = f"{direction}:{text}"
if cache_key in translation_cache:
print("Cache hit!")
return translation_cache[cache_key]
# Start timing for performance tracking
start_time = time.time()
# Map language directions
lang_map = {
"English to Spanish": ("ENGLISH", "SPANISH"),
"Spanish to English": ("SPANISH", "ENGLISH"),
"Korean to English": ("KOREAN", "ENGLISH"),
"English to Korean": ("ENGLISH", "KOREAN")
}
if direction not in lang_map:
return "Invalid direction"
source_lang, target_lang = lang_map[direction]
# Efficient prompt format
prompt = f"[{source_lang}]{text.strip()}[{target_lang}]"
# Estimate appropriate token length based on input
input_tokens = min(100, max(10, len(text.split())))
max_tokens = min(100, max(25, int(input_tokens * 1.3)))
# Generate translation with aggressively optimized settings for speed
response = self.model.create_completion(
prompt,
max_tokens=max_tokens,
temperature=0.0, # Deterministic
top_k=1, # Most likely token
top_p=1.0, # No sampling
repeat_penalty=1.0, # No penalty
stream=False # Get complete response
)
translation = response['choices'][0]['text'].strip()
# Cache result
if len(translation_cache) >= MAX_CACHE_SIZE:
# Remove oldest entry (first key)
translation_cache.pop(next(iter(translation_cache)))
translation_cache[cache_key] = translation
# Log performance
inference_time = time.time() - start_time
tokens_per_second = (input_tokens + len(translation.split())) / inference_time
print(f"Translation: {inference_time:.3f}s ({tokens_per_second:.1f} tokens/sec)")
return translation
def request_translation(self, direction, text, callback_id):
"""Queue a translation request"""
self.request_queue.put((direction, text, callback_id))
# Model preloading thread that preloads and pre-computes common translations
def preload_common_phrases(worker):
# Dictionary of common phrases that will benefit from caching
common_phrases = {
"English to Spanish": [
"Hello", "Thank you", "Good morning", "How are you?", "What's your name?",
"I don't understand", "Please", "Sorry", "Yes", "No", "Where is",
"How much does it cost?", "What time is it?", "I don't speak Spanish",
"Where is the bathroom?", "I need help", "Can you help me?"
],
"Spanish to English": [
"Hola", "Gracias", "Buenos dรญas", "ยฟCรณmo estรกs?", "ยฟCรณmo te llamas?",
"No entiendo", "Por favor", "Lo siento", "Sรญ", "No", "Dรณnde estรก",
"ยฟCuรกnto cuesta?", "ยฟQuรฉ hora es?", "No hablo espaรฑol", "ยฟDรณnde estรก el baรฑo?",
"Necesito ayuda", "ยฟPuedes ayudarme?"
],
"English to Korean": [
"Hello", "Thank you", "Good morning", "How are you?", "What's your name?",
"I don't understand", "Please", "Sorry", "Yes", "No", "Where is",
"How much is this?", "What time is it?", "I don't speak Korean"
],
"Korean to English": [
"์•ˆ๋…•ํ•˜์„ธ์š”", "๊ฐ์‚ฌํ•ฉ๋‹ˆ๋‹ค", "์ข‹์€ ์•„์นจ์ž…๋‹ˆ๋‹ค", "์–ด๋–ป๊ฒŒ ์ง€๋‚ด์„ธ์š”?", "์ด๋ฆ„์ด ๋ญ์˜ˆ์š”?",
"์ดํ•ด๊ฐ€ ์•ˆ ๋ผ์š”", "์ œ๋ฐœ", "์ฃ„์†กํ•ฉ๋‹ˆ๋‹ค", "๋„ค", "์•„๋‹ˆ์š”", "์–ด๋””์— ์žˆ์–ด์š”",
"์ด๊ฑฐ ์–ผ๋งˆ์˜ˆ์š”?", "์ง€๊ธˆ ๋ช‡ ์‹œ์˜ˆ์š”?", "ํ•œ๊ตญ์–ด๋ฅผ ๋ชปํ•ด์š”"
]
}
preload_requests = []
for direction, phrases in common_phrases.items():
for phrase in phrases:
preload_requests.append((direction, phrase, f"preload_{len(preload_requests)}"))
# Process preloading in a separate thread
def preloader():
print(f"Preloading {len(preload_requests)} common phrases in background...")
for request in preload_requests:
worker.request_translation(*request)
# Small sleep to avoid overwhelming the queue
time.sleep(0.1)
print("Preloading complete")
thread = threading.Thread(target=preloader, daemon=True)
thread.start()
return thread
# Create worker instance
worker = ModelWorker()
# Start preloading common phrases in background
preload_thread = preload_common_phrases(worker)
# Counter for request IDs
next_request_id = 0
# Implementation of a faster sentence splitter for batching
def split_sentences(text, max_length=50):
"""Split text into manageable chunks for faster translation"""
if len(text) <= max_length:
return [text]
# Split on natural boundaries
delimiters = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '\n\n']
chunks = []
current_chunk = ""
lines = text.split('\n')
for line in lines:
if not line.strip():
if current_chunk:
chunks.append(current_chunk)
current_chunk = ""
continue
words = line.split(' ')
for word in words:
test_chunk = f"{current_chunk} {word}".strip()
if len(test_chunk) > max_length:
chunks.append(current_chunk)
current_chunk = word
else:
current_chunk = test_chunk
# Check for natural breaks
for delimiter in delimiters:
if delimiter in current_chunk[-len(delimiter):]:
chunks.append(current_chunk)
current_chunk = ""
break
if current_chunk:
chunks.append(current_chunk)
return chunks
# Gradio interface functions
def translate(direction, text, progress=gr.Progress()):
"""Fast translation with batching and caching"""
global next_request_id
# Skip empty inputs
if not text or not text.strip():
return ""
# Check exact cache hit
cache_key = f"{direction}:{text}"
if cache_key in translation_cache:
return translation_cache[cache_key]
# For longer texts, split into sentences for faster processing
if len(text) > 50:
progress(0.1, desc="Processing text...")
chunks = split_sentences(text)
if len(chunks) > 1:
results = []
for i, chunk in enumerate(chunks):
# Check if this chunk is in cache
chunk_key = f"{direction}:{chunk}"
if chunk_key in translation_cache:
results.append(translation_cache[chunk_key])
continue
# Request translation for this chunk
chunk_id = next_request_id
next_request_id += 1
worker.request_translation(direction, chunk, chunk_id)
# Wait for response
chunk_start = time.time()
while time.time() - chunk_start < 10: # 10 second timeout per chunk
progress((i + 0.5) / len(chunks), desc=f"Translating part {i+1}/{len(chunks)}")
try:
while not worker.response_queue.empty():
resp_id, result = worker.response_queue.get_nowait()
if resp_id == chunk_id:
results.append(result)
chunk_found = True
break
except queue.Empty:
pass
time.sleep(0.05)
if len(results) != i + 1:
results.append(f"[Translation failed for part {i+1}]")
combined = " ".join(results)
translation_cache[cache_key] = combined
progress(1.0)
return combined
# For single sentences
request_id = next_request_id
next_request_id += 1
# Queue the request
worker.request_translation(direction, text, request_id)
# Wait for the response
progress(0.2, desc="Translating...")
start_time = time.time()
max_wait = 20 # Maximum wait time in seconds
while time.time() - start_time < max_wait:
progress(0.2 + 0.8 * ((time.time() - start_time) / max_wait), desc="Translating...")
# Check for our response
try:
while not worker.response_queue.empty():
resp_id, result = worker.response_queue.get_nowait()
if resp_id == request_id:
progress(1.0)
return result
except queue.Empty:
pass
# Small sleep to prevent CPU hogging
time.sleep(0.05)
progress(1.0)
return "Translation timed out. Please try again with a shorter text."
# Create Gradio interface
with gr.Blocks(title="Ultra-Fast Translation App (CPU Optimized)") as iface:
gr.Markdown(f"""
## Ultra-Fast Translation App (CPU Optimized)
Running on: {'GPU: ' + gpu_name if has_gpu else 'CPU optimized with int4 quantization'}
""")
with gr.Row():
direction = gr.Dropdown(
choices=["English to Spanish", "Spanish to English", "Korean to English", "English to Korean"],
label="Translation Direction",
value="English to Spanish"
)
with gr.Row():
input_text = gr.Textbox(lines=5, label="Input Text", placeholder="Enter text to translate...")
output_text = gr.Textbox(lines=5, label="Translation")
# Add translate button
translate_btn = gr.Button("Translate")
translate_btn.click(fn=translate, inputs=[direction, input_text], outputs=output_text)
# Optimization options
with gr.Accordion("Performance Tips", open=True):
gr.Markdown("""
### Speed Optimization Tips
- โœ… The model has been quantized to int4 for faster CPU execution
- โœ… Common phrases are pre-cached for instant results
- โœ… Long text is automatically split into smaller chunks
- โœ… First translation will be slower as the model warms up
- โœ… Short sentences (< 50 chars) translate much faster
""")
# Add examples with preloaded common phrases
gr.Examples(
examples=[
["English to Spanish", "Hello, how are you today?"],
["Spanish to English", "Hola, ยฟcรณmo estรกs hoy?"],
["English to Korean", "The weather is nice today."],
["Korean to English", "์•ˆ๋…•ํ•˜์„ธ์š”, ๋งŒ๋‚˜์„œ ๋ฐ˜๊ฐ‘์Šต๋‹ˆ๋‹ค."]
],
inputs=[direction, input_text],
fn=translate,
outputs=output_text
)
# Launch with optimized settings
if __name__ == "__main__":
iface.launch(
debug=False,
show_error=True,
share=False,
quiet=True,
server_name="0.0.0.0",
server_port=7860
)