Spaces:
Paused
Paused
File size: 5,042 Bytes
42c727a 37475a8 42c727a 37475a8 42c727a 37475a8 42c727a 37475a8 42c727a 37475a8 42c727a 37475a8 42c727a 37475a8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
from fastapi import FastAPI, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
import io
import os
import logging
from pathlib import Path
# Import our modules
from app.agent import process_text, clear_memory
from app.speech_to_text import transcribe_audio
from app.text_to_speech import synthesize_speech
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create the FastAPI app
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Create static directory if it doesn't exist
static_dir = Path("static")
static_dir.mkdir(exist_ok=True)
# Mount static files if directory exists
if static_dir.exists():
app.mount("/static", StaticFiles(directory="static"), name="static")
# Routes for the API
@app.get("/")
async def root():
"""Serve the main UI if available."""
index_path = static_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
return {"message": "AGI Telecom POC API - UI not available"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok"}
@app.post("/transcribe")
async def transcribe(file: UploadFile = File(...)):
"""
Transcribe audio to text.
Args:
file: Audio file upload
Returns:
JSON with transcription
"""
try:
audio_bytes = await file.read()
text = transcribe_audio(audio_bytes)
logger.info(f"Transcribed: {text[:30]}...")
return {"transcription": text}
except Exception as e:
logger.error(f"Transcription error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to transcribe audio: {str(e)}"}
)
@app.post("/query")
async def query_agent(request: Request):
"""
Process a text query with the agent.
Args:
request: Request with input_text in JSON body
Returns:
JSON with agent response
"""
try:
data = await request.json()
input_text = data.get("input_text", "")
if not input_text:
return JSONResponse(
status_code=400,
content={"error": "No input_text provided"}
)
response = process_text(input_text)
logger.info(f"Query: {input_text[:30]}... Response: {response[:30]}...")
return {"response": response}
except Exception as e:
logger.error(f"Query error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to process query: {str(e)}"}
)
@app.get("/speak")
async def speak(text: str):
"""
Convert text to speech.
Args:
text: Text to convert to speech
Returns:
Audio stream
"""
try:
audio = synthesize_speech(text)
return StreamingResponse(io.BytesIO(audio), media_type="audio/mpeg")
except Exception as e:
logger.error(f"Speech synthesis error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to synthesize speech: {str(e)}"}
)
@app.post("/clear_memory")
async def reset_memory():
"""Clear the conversation memory."""
success = clear_memory()
return {"success": success}
@app.post("/complete_flow")
async def complete_flow(request: Request):
"""
Complete flow: audio to text to agent to speech.
Args:
request: Request with audio_base64 or text_input in JSON body
Returns:
JSON with results and audio URL
"""
try:
data = await request.json()
text_input = data.get("text_input")
# Process with agent
if not text_input:
return JSONResponse(
status_code=400,
content={"error": "No input provided"}
)
response = process_text(text_input)
logger.info(f"Agent response: {response[:30]}...")
# Return the response
return {
"input": text_input,
"response": response
}
except Exception as e:
logger.error(f"Complete flow error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Failed to process: {str(e)}"}
)
# Run the app
if __name__ == "__main__":
import uvicorn
# Check if running on HF Spaces
if os.environ.get("SPACE_ID"):
# Running on HF Spaces - use their port
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)
else:
# Running locally
uvicorn.run(app, host="0.0.0.0", port=8000) |