from fastapi import FastAPI, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse, JSONResponse, FileResponse import io import os import logging from pathlib import Path # Import our modules from app.agent import process_text, clear_memory from app.speech_to_text import transcribe_audio from app.text_to_speech import synthesize_speech # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Create the FastAPI app app = FastAPI() # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Create static directory if it doesn't exist static_dir = Path("static") static_dir.mkdir(exist_ok=True) # Mount static files if directory exists if static_dir.exists(): app.mount("/static", StaticFiles(directory="static"), name="static") # Routes for the API @app.get("/") async def root(): """Serve the main UI if available.""" index_path = static_dir / "index.html" if index_path.exists(): return FileResponse(index_path) return {"message": "AGI Telecom POC API - UI not available"} @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "ok"} @app.post("/transcribe") async def transcribe(file: UploadFile = File(...)): """ Transcribe audio to text. Args: file: Audio file upload Returns: JSON with transcription """ try: audio_bytes = await file.read() text = transcribe_audio(audio_bytes) logger.info(f"Transcribed: {text[:30]}...") return {"transcription": text} except Exception as e: logger.error(f"Transcription error: {str(e)}") return JSONResponse( status_code=500, content={"error": f"Failed to transcribe audio: {str(e)}"} ) @app.post("/query") async def query_agent(request: Request): """ Process a text query with the agent. Args: request: Request with input_text in JSON body Returns: JSON with agent response """ try: data = await request.json() input_text = data.get("input_text", "") if not input_text: return JSONResponse( status_code=400, content={"error": "No input_text provided"} ) response = process_text(input_text) logger.info(f"Query: {input_text[:30]}... Response: {response[:30]}...") return {"response": response} except Exception as e: logger.error(f"Query error: {str(e)}") return JSONResponse( status_code=500, content={"error": f"Failed to process query: {str(e)}"} ) @app.get("/speak") async def speak(text: str): """ Convert text to speech. Args: text: Text to convert to speech Returns: Audio stream """ try: audio = synthesize_speech(text) return StreamingResponse(io.BytesIO(audio), media_type="audio/mpeg") except Exception as e: logger.error(f"Speech synthesis error: {str(e)}") return JSONResponse( status_code=500, content={"error": f"Failed to synthesize speech: {str(e)}"} ) @app.post("/clear_memory") async def reset_memory(): """Clear the conversation memory.""" success = clear_memory() return {"success": success} @app.post("/complete_flow") async def complete_flow(request: Request): """ Complete flow: audio to text to agent to speech. Args: request: Request with audio_base64 or text_input in JSON body Returns: JSON with results and audio URL """ try: data = await request.json() text_input = data.get("text_input") # Process with agent if not text_input: return JSONResponse( status_code=400, content={"error": "No input provided"} ) response = process_text(text_input) logger.info(f"Agent response: {response[:30]}...") # Return the response return { "input": text_input, "response": response } except Exception as e: logger.error(f"Complete flow error: {str(e)}") return JSONResponse( status_code=500, content={"error": f"Failed to process: {str(e)}"} ) # Run the app if __name__ == "__main__": import uvicorn # Check if running on HF Spaces if os.environ.get("SPACE_ID"): # Running on HF Spaces - use their port port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port) else: # Running locally uvicorn.run(app, host="0.0.0.0", port=8000)