FastFlowWrapper / app.py
nitrox's picture
Update app.py
05469cf verified
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
import os
from dotenv import load_dotenv
import requests
from typing import Dict, Any, List
from pydantic import BaseModel
import time
import json
import asyncio
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# Получаем переменные окружения
FLOWISE_API_BASE_URL = os.getenv("FLOWISE_API_BASE_URL")
FLOWISE_CHATFLOW_ID = os.getenv("FLOWISE_CHATFLOW_ID")
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: float = 0.7
stream: bool = False
seed: int = None
def count_tokens(text: str) -> int:
# Используем тот же алгоритм, что и в прямом API
# Считаем слова и знаки препинания
words = text.split()
punctuation = sum(1 for c in text if c in ".,!?;:()[]{}")
return len(words) + punctuation
def clean_assistant_response(text: str) -> str:
# Удаляем лишние маркеры кода и форматирования
text = text.strip()
if text.endswith("```"):
text = text[:-3].strip()
return text
async def stream_response(response_text: str):
# Разбиваем текст на части для стриминга
words = response_text.split()
for i in range(0, len(words), 2):
chunk = " ".join(words[i:i+2]) + " "
chunk_data = {
'id': f'chatcmpl-{os.urandom(12).hex()}',
'object': 'chat.completion.chunk',
'created': int(time.time()),
'model': 'phi4-r1',
'choices': [{
'index': 0,
'delta': {'content': chunk},
'finish_reason': None
}]
}
yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.1) # Небольшая задержка между чанками
# Отправляем финальное сообщение
final_data = {
'id': f'chatcmpl-{os.urandom(12).hex()}',
'object': 'chat.completion.chunk',
'created': int(time.time()),
'model': 'phi4-r1',
'choices': [{
'index': 0,
'delta': {},
'finish_reason': 'stop'
}]
}
yield f"data: {json.dumps(final_data, ensure_ascii=False)}\n\n"
@app.get("/")
async def root():
response = JSONResponse({"status": "FastFlowWrapper is running"})
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
@app.get("/v1/models")
async def get_models():
try:
# Запрашиваем список чатфлоу из Flowise
response = requests.get(f"{FLOWISE_API_BASE_URL}/chatflows")
response.raise_for_status()
chatflows = response.json()
# Преобразуем в формат OpenAI API
models = []
for chatflow in chatflows:
models.append({
"id": chatflow.get("id"),
"object": "model",
"created": int(time.time()),
"owned_by": "flowise",
"permission": [],
"root": "flowise",
"parent": None,
"system_fingerprint": "phi4-r1"
})
response = JSONResponse({"object": "list", "data": models})
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
try:
# Получаем последнее сообщение из диалога
last_message = request.messages[-1]
if last_message.role != "user":
raise HTTPException(status_code=400, detail="Last message must be from user")
# Объединяем system prompt с сообщением пользователя
system_prompt = ""
for msg in request.messages:
if msg.role == "system":
system_prompt = msg.content
break
user_message = last_message.content
if system_prompt:
combined_message = f"{system_prompt}\n\n{user_message}"
else:
combined_message = user_message
# Формируем историю диалога для Flowise
history = []
for i, msg in enumerate(request.messages[:-1]): # исключаем последнее сообщение
if msg.role == "user":
history.append({
"role": "userMessage",
"content": msg.content
})
elif msg.role == "assistant":
history.append({
"role": "apiMessage",
"content": msg.content
})
# Формируем запрос к Flowise
flowise_request = {
"question": combined_message
}
# Добавляем историю, если она есть
if history:
flowise_request["history"] = history
# Засекаем время начала запроса
start_time = time.time()
# Отправляем запрос к Flowise с таймаутом
response = requests.post(
f"{FLOWISE_API_BASE_URL}/prediction/{FLOWISE_CHATFLOW_ID}",
json=flowise_request,
timeout=10
)
response.raise_for_status()
# Получаем и очищаем ответ
flowise_response = response.json()
assistant_response = clean_assistant_response(flowise_response.get("text", ""))
# Если запрошен стриминг
if request.stream:
return StreamingResponse(
stream_response(assistant_response),
media_type="text/event-stream"
)
# Подсчитываем токены
prompt_tokens = count_tokens(combined_message)
completion_tokens = count_tokens(assistant_response)
# Создаем ID сессии, используя seed из запроса или генерируем новый
session_id = f"chatcmpl-{request.seed or os.urandom(12).hex()}"
response = JSONResponse({
"id": session_id,
"object": "chat.completion",
"created": int(start_time),
"model": "phi4-r1",
"choices": [
{
"index": 0,
"logprobs": None,
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": assistant_response
}
}
],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
},
"stats": {},
"system_fingerprint": "phi4-r1"
})
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=str(e))