import os import json import httpx import asyncio # Load providers and models with open("llm/model_config.json", "r") as f: CONFIG = json.load(f) PROVIDERS = CONFIG["providers"] MODELS = CONFIG["models"] # === SYSTEM PROMPTS === STRUCTURED_ASSISTANT_PROMPT = """You are a helpful AI assistant. - Respond to the user’s message in a structured and professional way. - Match the length and complexity of your response to the user's input. - If the user's input is simple (e.g., "Hi"), reply politely without overexplaining. - If the user's input is complex, give a complete and organized answer. - Do not repeat the user's prompt. - Be direct, helpful, and clear. """ AGGREGATOR_PROMPT = """You are an AI responsible for combining the outputs of multiple AI assistants. - Read their answers carefully. - Identify the best parts from each. - Write a single, coherent, and helpful reply. - Do not simply merge texts or repeat everything. - Match the depth and tone to the user's original input. - Keep it natural and conversational. """ # === CORE FUNCTIONS === async def query_llm(model_name, user_input, role_prompt): provider_key = MODELS.get(model_name) if not provider_key: return f"Model '{model_name}' is not supported." provider = PROVIDERS.get(provider_key) if not provider: return f"Provider '{provider_key}' is not configured." endpoint = provider["url"] api_key_env = provider["key_env"] api_key = os.getenv(api_key_env) if not api_key: return f"API key for provider '{provider_key}' not found." headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model_name, "messages": [ {"role": "system", "content": role_prompt}, {"role": "user", "content": user_input} ] } try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(endpoint, headers=headers, json=payload) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] except Exception as e: return f"Error: {str(e)}" async def query_moa_chain(user_input, settings): """Queries LLM-A, LLM-B, LLM-C, and Aggregator in sequence.""" llm_a = settings["models"].get("LLM-A") llm_b = settings["models"].get("LLM-B") llm_c = settings["models"].get("LLM-C") aggregator = settings.get("aggregator") # Parallel queries to LLM-A, B, C results = await asyncio.gather( query_llm(llm_a, user_input, STRUCTURED_ASSISTANT_PROMPT), query_llm(llm_b, user_input, STRUCTURED_ASSISTANT_PROMPT), query_llm(llm_c, user_input, STRUCTURED_ASSISTANT_PROMPT) ) # Format outputs to feed aggregator combined_content = ( f"[LLM-A] {results[0]}\n\n" f"[LLM-B] {results[1]}\n\n" f"[LLM-C] {results[2]}" ) # Single query to Aggregator (LLM-D) final_response = await query_llm(aggregator, combined_content, AGGREGATOR_PROMPT) return final_response