FluentQ / app /main.py
tommytracx's picture
Update app/main.py
37475a8 verified
from fastapi import FastAPI, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
import io
import os
import logging
from pathlib import Path
# Import our modules
from app.agent import process_text, clear_memory
from app.speech_to_text import transcribe_audio
from app.text_to_speech import synthesize_speech
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create the FastAPI app
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Create static directory if it doesn't exist
static_dir = Path("static")
static_dir.mkdir(exist_ok=True)
# Mount static files if directory exists
if static_dir.exists():
app.mount("/static", StaticFiles(directory="static"), name="static")
# Routes for the API
@app.get("/")
async def root():
"""Serve the main UI if available."""
index_path = static_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
return {"message": "AGI Telecom POC API - UI not available"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok"}
@app.post("/transcribe")
async def transcribe(file: UploadFile = File(...)):
"""
Transcribe audio to text.
Args:
file: Audio file upload
Returns:
JSON with transcription
"""
try:
audio_bytes = await file.read()
text = transcribe_audio(audio_bytes)
logger.info(f"Transcribed: {text[:30]}...")
return {"transcription": text}
except Exception as e:
logger.error(f"Transcription error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to transcribe audio: {str(e)}"}
)
@app.post("/query")
async def query_agent(request: Request):
"""
Process a text query with the agent.
Args:
request: Request with input_text in JSON body
Returns:
JSON with agent response
"""
try:
data = await request.json()
input_text = data.get("input_text", "")
if not input_text:
return JSONResponse(
status_code=400,
content={"error": "No input_text provided"}
)
response = process_text(input_text)
logger.info(f"Query: {input_text[:30]}... Response: {response[:30]}...")
return {"response": response}
except Exception as e:
logger.error(f"Query error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to process query: {str(e)}"}
)
@app.get("/speak")
async def speak(text: str):
"""
Convert text to speech.
Args:
text: Text to convert to speech
Returns:
Audio stream
"""
try:
audio = synthesize_speech(text)
return StreamingResponse(io.BytesIO(audio), media_type="audio/mpeg")
except Exception as e:
logger.error(f"Speech synthesis error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to synthesize speech: {str(e)}"}
)
@app.post("/clear_memory")
async def reset_memory():
"""Clear the conversation memory."""
success = clear_memory()
return {"success": success}
@app.post("/complete_flow")
async def complete_flow(request: Request):
"""
Complete flow: audio to text to agent to speech.
Args:
request: Request with audio_base64 or text_input in JSON body
Returns:
JSON with results and audio URL
"""
try:
data = await request.json()
text_input = data.get("text_input")
# Process with agent
if not text_input:
return JSONResponse(
status_code=400,
content={"error": "No input provided"}
)
response = process_text(text_input)
logger.info(f"Agent response: {response[:30]}...")
# Return the response
return {
"input": text_input,
"response": response
}
except Exception as e:
logger.error(f"Complete flow error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to process: {str(e)}"}
)
# Run the app
if __name__ == "__main__":
import uvicorn
# Check if running on HF Spaces
if os.environ.get("SPACE_ID"):
# Running on HF Spaces - use their port
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)
else:
# Running locally
uvicorn.run(app, host="0.0.0.0", port=8000)