import asyncio from websockets import connect, Data, ClientConnection import json import numpy as np import base64 import soundfile as sf import io from pydub import AudioSegment import os # Load OpenAI API key from dotenv import load_dotenv load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY must be set in environment") WEBSOCKET_URI = "wss://api.openai.com/v1/realtime?intent=transcription" WEBSOCKET_HEADERS = { "Authorization": f"Bearer {OPENAI_API_KEY}", "OpenAI-Beta": "realtime=v1" } # Shared client registry connections = {} class WebSocketClient: def __init__(self, uri: str, headers: dict, client_id: str): self.uri = uri self.headers = headers self.websocket: ClientConnection = None self.queue = asyncio.Queue(maxsize=10) self.loop = None self.client_id = client_id self.transcript = "" async def connect(self): try: self.websocket = await connect(self.uri, additional_headers=self.headers) print(f"✅ Connected to OpenAI WebSocket") # Send transcription session settings with open("openai_transcription_settings.json", "r") as f: settings = f.read() await self.websocket.send(settings) await asyncio.gather(self.receive_messages(), self.send_audio_chunks()) except Exception as e: print(f"❌ WebSocket Error: {e}") def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self.connect()) def process_websocket_message(self, message: Data): try: message_object = json.loads(message) if message_object["type"] == "conversation.item.input_audio_transcription.delta": delta = message_object["delta"] self.transcript += delta elif message_object["type"] == "conversation.item.input_audio_transcription.completed": self.transcript += ' ' if self.transcript and self.transcript[-1] != ' ' else '' except Exception as e: print(f"⚠️ Error processing message: {e}") async def send_audio_chunks(self): while True: sample_rate, audio_array = await self.queue.get() if self.websocket: if audio_array.ndim > 1: audio_array = audio_array.mean(axis=1) audio_array = audio_array.astype(np.float32) audio_array /= np.max(np.abs(audio_array)) if np.max(np.abs(audio_array)) > 0 else 1.0 int_audio = (audio_array * 32767).astype(np.int16) buffer = io.BytesIO() sf.write(buffer, int_audio, sample_rate, format="WAV", subtype="PCM_16") buffer.seek(0) audio_segment = AudioSegment.from_file(buffer, format="wav") resampled = audio_segment.set_frame_rate(24000) out_buf = io.BytesIO() resampled.export(out_buf, format="wav") out_buf.seek(0) b64_audio = base64.b64encode(out_buf.read()).decode("utf-8") await self.websocket.send(json.dumps({ "type": "input_audio_buffer.append", "audio": b64_audio })) async def receive_messages(self): async for message in self.websocket: self.process_websocket_message(message) def enqueue_audio_chunk(self, sample_rate: int, chunk_array: np.ndarray): if not self.queue.full(): asyncio.run_coroutine_threadsafe(self.queue.put((sample_rate, chunk_array)), self.loop)