Spaces:
Running
Running
# https://binkhoale1812-interview-ai.hf.space/ | |
# Interview Q&A – FastAPI backend | |
import base64, io, json, logging, os, tempfile | |
import re | |
from pathlib import Path | |
from typing import Dict | |
from fastapi import FastAPI, File, UploadFile, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse, FileResponse | |
from fastapi.staticfiles import StaticFiles | |
# AI / LLM | |
from google import genai | |
from google.genai import types | |
# ASR | |
import numpy as np | |
from pydub import AudioSegment | |
from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
# Misc | |
from PIL import Image | |
############################################################################## | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if not GEMINI_API_KEY: | |
raise RuntimeError("❌ GEMINI_API_KEY must be set as env var") | |
ASR_MODEL_ID = "openai/whisper-small.en" | |
ASR_LANGUAGE = "en" | |
SAMPLE_RATE = 16_000 | |
############################################################################## | |
app = FastAPI(title="Interview Q&A Assistant", docs_url="/docs") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], | |
) | |
app.mount("/statics", StaticFiles(directory="statics"), name="statics") | |
# Enable Logging for Debugging | |
import psutil | |
import logging | |
# Set up app-specific logger | |
logger = logging.getLogger("triage-response") | |
logger.setLevel(logging.INFO) # Set to DEBUG only when needed | |
# Set log format | |
formatter = logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s") | |
handler = logging.StreamHandler() | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
# Suppress noisy libraries like pymongo, urllib3, etc. | |
for noisy in ["pymongo", "urllib3", "httpx", "uvicorn", "uvicorn.error", "uvicorn.access"]: | |
logging.getLogger(noisy).setLevel(logging.WARNING) | |
# Monitor Resources Before Startup | |
def check_system_resources(): | |
memory = psutil.virtual_memory() | |
cpu = psutil.cpu_percent(interval=1) | |
disk = psutil.disk_usage("/") | |
# Defines log info messages | |
logger.info(f"🔍 System Resources - RAM: {memory.percent}%, CPU: {cpu}%, Disk: {disk.percent}%") | |
if memory.percent > 85: | |
logger.warning("⚠️ High RAM usage detected!") | |
if cpu > 90: | |
logger.warning("⚠️ High CPU usage detected!") | |
if disk.percent > 90: | |
logger.warning("⚠️ High Disk usage detected!") | |
check_system_resources() | |
############################################################################## | |
# Global ASR (lazy-loaded) | |
processor = model = None | |
def build_prompt(question: str) -> str: | |
return ( | |
"You are a helpful career-coach AI. Answer the following interview " | |
"question clearly and concisely (≤200 words). Use markdown when helpful.\n\n" | |
f"Interview question: \"{question.strip()}\"" | |
) | |
def memory_mb() -> float: | |
return round(psutil.Process().memory_info().rss / 1_048_576, 1) | |
async def load_models(): | |
global processor, model | |
cache = Path("model_cache"); cache.mkdir(exist_ok=True) | |
processor = WhisperProcessor.from_pretrained(ASR_MODEL_ID, cache_dir=cache) | |
model = WhisperForConditionalGeneration.from_pretrained(ASR_MODEL_ID, cache_dir=cache) | |
forced = processor.get_decoder_prompt_ids(task="transcribe", language="english") | |
model.config.forced_decoder_ids = forced | |
model.to("cpu").eval() | |
logger.info("[MODEL] 🔊 Whisper loaded ✔") | |
async def root() -> FileResponse: # serve SPA | |
logger.info("[STATIC] Serving frontend") | |
return FileResponse(Path("statics/index.html")) | |
############################################################################## | |
# ── MAIN ENDPOINTS ────────────────────────────────────────────────────────── | |
def call_gemini(prompt: str, vision_parts=None) -> str: | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
kwargs: Dict = {} | |
if vision_parts: # multimodal call | |
kwargs["contents"] = vision_parts + [{"text": prompt}] | |
else: | |
kwargs["contents"] = prompt | |
resp = client.models.generate_content( | |
model="gemini-2.5-flash-preview-04-17", **kwargs | |
) | |
try: | |
resp = client.models.generate_content( | |
model="gemini-2.5-flash-preview-04-17", **kwargs | |
) | |
# Check for at least one valid candidate | |
if not resp.candidates: | |
raise RuntimeError("No candidates returned from Gemini") | |
# Start at first index | |
candidate = resp.candidates[0] | |
if candidate.content is None or not hasattr(candidate.content, "parts"): | |
raise RuntimeError("Gemini candidate missing content parts") | |
# Join all .text fields in case Gemini responds in multiple parts. | |
text = "".join(part.text for part in candidate.content.parts if hasattr(part, "text")) | |
if not text.strip(): | |
raise RuntimeError("Gemini response contained empty text") | |
# Success | |
logger.info(f"[LLM] ✅ Response received: {text[:100]}...") | |
return text.strip() | |
# Fail | |
except Exception as e: | |
logger.error(f"[LLM] ❌ Gemini API error: {e}") | |
raise RuntimeError("Gemini API response format error") | |
async def voice_transcribe(file: UploadFile = File(...)): | |
if file.content_type not in {"audio/wav", "audio/x-wav", "audio/mpeg"}: | |
raise HTTPException(415, "Unsupported audio type") | |
# Write temporary audio file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
tmp.write(await file.read()); tmp_path = tmp.name | |
# Audio processing and transcription | |
try: | |
seg = AudioSegment.from_file(tmp_path).set_frame_rate(SAMPLE_RATE).set_channels(1) | |
audio = np.array(seg.get_array_of_samples()).astype(np.float32) / (2 ** 15) | |
inputs = processor(audio, sampling_rate=SAMPLE_RATE, return_tensors="pt") | |
ids = model.generate(inputs.input_features.to(model.device)) | |
question = processor.decode(ids[0], skip_special_tokens=True).strip() | |
if not question: | |
raise ValueError("No speech detected") | |
logger.info(f"[MODEL] Transcribed text: {question}") | |
answer = call_gemini(build_prompt(question)) | |
return JSONResponse({"question": question, "answer": answer, "memory_mb": memory_mb()}) | |
finally: | |
os.remove(tmp_path) | |
# Route sending question as image (PNG/JPEG) | |
async def image_question(file: UploadFile = File(...)): | |
if file.content_type not in {"image/png", "image/jpeg"}: | |
raise HTTPException(415, "Unsupported image type") | |
# Read file and decode | |
raw = await file.read() | |
b64 = base64.b64encode(raw).decode() | |
# Send image data | |
vision_part = [{ | |
"inline_data": { | |
"mime_type": file.content_type, | |
"data": b64 | |
} | |
}] | |
# Ask Gemini to return JSON splitting Q&A | |
prompt = ( | |
"From the screenshot, extract all English interview question(s). " | |
"There may be multiple questions. For each, provide a concise answer (≤200 words).\n\n" | |
"Return only valid JSON as a list of objects:\n" | |
"[\n" | |
" {\"question\": \"...\", \"answer\": \"...\"},\n" | |
" {\"question\": \"...\", \"answer\": \"...\"},\n" | |
" ...\n" | |
"]\n\n" | |
"Do not include explanations or additional formatting — only output raw JSON." | |
) | |
# Send prompt and image | |
text = call_gemini(prompt, vision_part) | |
try: # Parsed from JSON (rm bracket and markdown) | |
cleaned = re.sub(r"^```json\s*|\s*```$", "", text.strip(), flags=re.IGNORECASE | re.MULTILINE) | |
parsed = json.loads(cleaned) | |
try: | |
# If it's a list of Q&A | |
if isinstance(parsed, list): | |
return JSONResponse(parsed) | |
# Fallback: single object | |
elif isinstance(parsed, dict): | |
question = str(parsed.get("question", "")).strip() | |
answer = str(parsed.get("answer", "")).strip() | |
return JSONResponse([{"question": question, "answer": answer}]) | |
except Exception as e: | |
raise ValueError("Unexpected JSON format from Gemini") | |
# Remove accidental outer quotes if double-wrapped | |
if question.startswith("{") or answer.startswith("{"): | |
raise ValueError("Wrapped JSON detected inside field") | |
except Exception as e: | |
logger.warning(f"[PARSE] Failed to cleanly extract JSON fields: {e}") | |
return JSONResponse([{ | |
"question": "[Extracted from screenshot]", | |
"answer": text.strip() | |
}]) | |
# Text based question (both voice transcribe or edit question) | |
async def text_question(payload: Dict): | |
question = (payload.get("question") or "").strip() | |
if not question: | |
raise HTTPException(400, "question is required") | |
answer = call_gemini(build_prompt(question)) | |
return JSONResponse({"question": question, "answer": answer, "memory_mb": memory_mb()}) | |