Spaces:
Paused
Paused
from fastapi import FastAPI, UploadFile, File, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse | |
import io | |
import os | |
import logging | |
from pathlib import Path | |
# Import our modules | |
from app.agent import process_text, clear_memory | |
from app.speech_to_text import transcribe_audio | |
from app.text_to_speech import synthesize_speech | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Create the FastAPI app | |
app = FastAPI() | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Create static directory if it doesn't exist | |
static_dir = Path("static") | |
static_dir.mkdir(exist_ok=True) | |
# Mount static files if directory exists | |
if static_dir.exists(): | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Routes for the API | |
async def root(): | |
"""Serve the main UI if available.""" | |
index_path = static_dir / "index.html" | |
if index_path.exists(): | |
return FileResponse(index_path) | |
return {"message": "AGI Telecom POC API - UI not available"} | |
async def health_check(): | |
"""Health check endpoint.""" | |
return {"status": "ok"} | |
async def transcribe(file: UploadFile = File(...)): | |
""" | |
Transcribe audio to text. | |
Args: | |
file: Audio file upload | |
Returns: | |
JSON with transcription | |
""" | |
try: | |
audio_bytes = await file.read() | |
text = transcribe_audio(audio_bytes) | |
logger.info(f"Transcribed: {text[:30]}...") | |
return {"transcription": text} | |
except Exception as e: | |
logger.error(f"Transcription error: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": f"Failed to transcribe audio: {str(e)}"} | |
) | |
async def query_agent(request: Request): | |
""" | |
Process a text query with the agent. | |
Args: | |
request: Request with input_text in JSON body | |
Returns: | |
JSON with agent response | |
""" | |
try: | |
data = await request.json() | |
input_text = data.get("input_text", "") | |
if not input_text: | |
return JSONResponse( | |
status_code=400, | |
content={"error": "No input_text provided"} | |
) | |
response = process_text(input_text) | |
logger.info(f"Query: {input_text[:30]}... Response: {response[:30]}...") | |
return {"response": response} | |
except Exception as e: | |
logger.error(f"Query error: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": f"Failed to process query: {str(e)}"} | |
) | |
async def speak(text: str): | |
""" | |
Convert text to speech. | |
Args: | |
text: Text to convert to speech | |
Returns: | |
Audio stream | |
""" | |
try: | |
audio = synthesize_speech(text) | |
return StreamingResponse(io.BytesIO(audio), media_type="audio/mpeg") | |
except Exception as e: | |
logger.error(f"Speech synthesis error: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": f"Failed to synthesize speech: {str(e)}"} | |
) | |
async def reset_memory(): | |
"""Clear the conversation memory.""" | |
success = clear_memory() | |
return {"success": success} | |
async def complete_flow(request: Request): | |
""" | |
Complete flow: audio to text to agent to speech. | |
Args: | |
request: Request with audio_base64 or text_input in JSON body | |
Returns: | |
JSON with results and audio URL | |
""" | |
try: | |
data = await request.json() | |
text_input = data.get("text_input") | |
# Process with agent | |
if not text_input: | |
return JSONResponse( | |
status_code=400, | |
content={"error": "No input provided"} | |
) | |
response = process_text(text_input) | |
logger.info(f"Agent response: {response[:30]}...") | |
# Return the response | |
return { | |
"input": text_input, | |
"response": response | |
} | |
except Exception as e: | |
logger.error(f"Complete flow error: {str(e)}") | |
return JSONResponse( | |
status_code=500, | |
content={"error": f"Failed to process: {str(e)}"} | |
) | |
# Run the app | |
if __name__ == "__main__": | |
import uvicorn | |
# Check if running on HF Spaces | |
if os.environ.get("SPACE_ID"): | |
# Running on HF Spaces - use their port | |
port = int(os.environ.get("PORT", 7860)) | |
uvicorn.run(app, host="0.0.0.0", port=port) | |
else: | |
# Running locally | |
uvicorn.run(app, host="0.0.0.0", port=8000) |