Intégrer les traces en temps réel
import weave
from weave.integrations import patch_openai_realtime
weave.init("your-team-name/your-project-name")
patch_openai_realtime()
# La logique de votre application
Exécuter un assistant vocal en temps réel avec le SDK OpenAI Agents
RealtimeAgent et RealtimeRunner, et active le tracing Weave en appliquant le patch patch_openai_realtime().
Pour exécuter l’exemple :
-
Démarrez votre environnement Python et installez les bibliothèques suivantes :
- uv
- pip
uv add weave openai-agents websockets pyaudio numpypip install weave openai-agents websockets pyaudio numpy -
Créez un fichier nommé
weave_voice_assistant.pyet ajoutez-y le code suivant. Les lignes surlignées indiquent l’intégration de Weave dans l’application. Le reste du code sert à créer l’application d’assistant vocal de base.weave_voice_assistant.py
import argparse import asyncio import queue import sys import termios import threading import tty import weave import pyaudio import numpy as np from weave.integrations import patch_openai_realtime from agents.realtime import RealtimeAgent, RealtimeRunner DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>" FORMAT = pyaudio.paInt16 RATE = 24000 # Required by the OpenAI Realtime API. CHUNK = 1024 MAX_INPUT_CHANNELS = 1 MAX_OUTPUT_CHANNELS = 1 INP_DEV_IDX = None OUT_DEV_IDX = None # Parse CLI args for Weave project name and audio device selection. def parse_args(): parser = argparse.ArgumentParser(description="Realtime agent with Weave logging") parser.add_argument( "--weave-project", default=DEFAULT_WEAVE_PROJECT, help=f"Weave project name (default: {DEFAULT_WEAVE_PROJECT})", dest="weave_project" ) parser.add_argument( "--input-device", type=int, default=None, help="PyAudio input (mic) device index. Defaults to system default. Run mic_detect.py to list devices.", dest="input_device" ) parser.add_argument( "--output-device", type=int, default=None, help="Index du périphérique de sortie PyAudio (haut-parleur). Par défaut, utilise le périphérique système par défaut. Exécutez mic_detect.py pour lister les périphériques.", dest="output_device" ) return parser.parse_args() # Initialize Weave and patch the OpenAI Realtime API for tracing. def init_weave(project_name: str | None = None) -> None: name = project_name or DEFAULT_WEAVE_PROJECT weave.init(name) patch_openai_realtime() # Enables automatic tracing of Realtime API sessions. mic_enabled = True # Listen for 't' key to toggle mic on/off. Runs in a daemon thread. def start_keylistener(): global mic_enabled fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setcbreak(fd) while True: ch = sys.stdin.read(1) if ch.lower() == 't': mic_enabled = not mic_enabled state = "ON" if mic_enabled else "OFF" print(f"\n🎙 Mic {state} (press t to toggle)") elif ch == '\x03': # Ctrl-C break finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) # Drain the audio queue and write to the speaker in a background thread. def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) # Open audio streams, start a Realtime session, and run the send/receive loop. async def main(*, input_device_index: int | None = None, output_device_index: int | None = None): p = pyaudio.PyAudio() if input_device_index is None: input_device_index = int(p.get_default_input_device_info()['index']) if output_device_index is None: output_device_index = int(p.get_default_output_device_info()['index']) # Clamp channel count to device capabilities to avoid pyaudio errors. input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS) output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # Buffer audio through a queue so in-flight playback can be flushed on interrupt. audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() s_agent = RealtimeAgent( name="Speech Assistant", instructions="You are a tool using AI. Use tools to accomplish a task whenever possible" ) s_runner = RealtimeRunner(s_agent, config={ "model_settings": { "model_name": "gpt-realtime", "modalities": ["audio"], "output_modalities": ["audio"], "input_audio_format": "pcm16", "output_audio_format": "pcm16", "speed": 1.2, "turn_detection": { "prefix_padding_ms": 100, "silence_duration_ms": 100, "type": "server_vad", "interrupt_response": True, "create_response": True, }, } }) print("--- Session Active (Speak into mic) ---") print("🎙 Mic ON (press t to toggle)") threading.Thread(target=start_keylistener, daemon=True).start() async with await s_runner.run() as session: # Stream mic input to the Realtime API, sending silence when muted. async def send_mic_audio(): silence = b'\x00' * CHUNK * 2 # 2 bytes per sample (16-bit PCM). try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) if mic_enabled: audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"Mic Level: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r") await session.send_audio(raw_data) else: print(f"Mic Level: {' ' * 50} | 🎙 OFF", end="\r") await session.send_audio(silence) await asyncio.sleep(0) # Yield to the event loop between reads. except Exception: pass # Receive events from the session and route audio to the speaker. async def handle_events(): async for event in session: if event.type == "audio": audio_output_queue.put(event.audio.data) elif event.type == "audio_interrupted": # Flush queued AI audio so it doesn't talk over the user. while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break mic_task = asyncio.create_task(send_mic_audio()) try: await handle_events() finally: mic_task.cancel() # Cleanup audio_output_queue.put(None) # Signal playback thread to exit. mic.close() speaker.close() p.terminate() if __name__ == "__main__": args = parse_args() init_weave(args.weave_project) fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device)) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -
Mettez à jour la valeur
DEFAULT_WEAVE_PROJECTavec les noms de votre équipe et de votre projet. -
Définissez la variable d’environnement
OPENAI_API_KEY. -
Exécutez le code :
python weave_voice_assistant.py
Exécuter un assistant vocal en temps réel avec WebSockets
weave.init() et patch_openai_realtime().
Pour exécuter l’exemple :
-
Lancez votre environnement Python et installez les bibliothèques suivantes :
- uv
- pip
uv add weave websockets pyaudio numpypip install weave websockets pyaudio numpy -
Créez un fichier intitulé
tool_definitions.pyet ajoutez-y les définitions d’outil suivantes. L’application principale importe ce module.tool_definitions.py
import json import subprocess import tempfile from pathlib import Path import weave # @function_tool @weave.op def get_weather(city: str) -> str: """Obtenir la météo actuelle d'une ville. Arguments: city: Le nom de la ville pour laquelle obtenir la météo. """ return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"}) @weave.op def calculate(expression: str) -> str: """Évaluer une expression mathématique et renvoyer le résultat. Arguments: expression: Une expression mathématique à évaluer, par exemple '2 + 2'. """ try: result = eval(expression) return str(result) except Exception as e: return f"Erreur: {e}" @weave.op def run_python_code(code: str) -> str: """Écrire et exécuter un script Python, puis renvoyer sa sortie stdout/stderr. Arguments: code: Le code source Python à exécuter. """ with tempfile.NamedTemporaryFile( mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False ) as f: f.write(code) script_path = Path(f.name) try: result = subprocess.run( ["python", str(script_path)], capture_output=True, text=True, timeout=30, ) output = result.stdout if result.stderr: output += f"\nSTDERR:\n{result.stderr}" if result.returncode != 0: output += f"\n(code de sortie {result.returncode})" return output or "(aucune sortie)" except subprocess.TimeoutExpired: return "Erreur: le script a dépassé le délai de 30 secondes." finally: script_path.unlink(missing_ok=True) @weave.op async def write_file(file_path: str, content: str) -> str: """Écrire du contenu dans un fichier sur disque. Arguments: file_path: Le chemin du fichier à écrire. content: Le contenu à écrire dans le fichier. """ try: path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) return f"{len(content)} octets écrits dans {file_path}" except Exception as e: return f"Erreur lors de l'écriture du fichier: {e}" -
Créez un fichier nommé
weave_ws_voice_assistant.pydans le même répertoire, puis ajoutez-y le code suivant.weave_ws_voice_assistant.py
import asyncio import base64 import json import os import queue import threading from typing import Any, Callable import numpy as np import pyaudio import websockets import weave weave.init("<your-team-name/your-project-name>") from weave.integrations import patch_openai_realtime patch_openai_realtime() from tool_definitions import ( calculate, get_weather, run_python_code, write_file, ) # Audio format (must be PCM16 for the Realtime API). FORMAT = pyaudio.paInt16 RATE = 24000 CHUNK = 1024 MAX_INPUT_CHANNELS = 2 MAX_OUTPUT_CHANNELS = 2 OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime" DEBUG_WRITE_LOG = False # Map tool name -> callable for function call dispatch. TOOL_REGISTRY: dict[str, Callable[..., Any]] = { "get_weather": get_weather, "calculate": calculate, "run_python_code": run_python_code, "write_file": write_file, } # Raw tool definitions for the Realtime API session config. TOOL_DEFINITIONS = [ { "type": "function", "name": "get_weather", "description": "Get the current weather for a city.", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "The city name to get weather for.", } }, "required": ["city"], }, }, { "type": "function", "name": "calculate", "description": "Evaluate a math expression and return the result.", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "A math expression to evaluate, e.g. '2 + 2'.", } }, "required": ["expression"], }, }, { "type": "function", "name": "run_python_code", "description": "Write and execute a Python script, returning its stdout/stderr.", "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "The Python source code to execute.", } }, "required": ["code"], }, }, { "type": "function", "name": "write_file", "description": "Write content to a file on disk.", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "The path to write the file to.", }, "content": { "type": "string", "description": "The content to write into the file.", }, }, "required": ["file_path", "content"], }, }, ] async def send_event(ws, event: dict) -> None: await ws.send(json.dumps(event)) async def configure_session(ws) -> None: event = { "type": "session.update", "session": { "type": "realtime", "model": "gpt-realtime", "output_modalities": ["audio"], "instructions": ( "You are a helpful AI assistant with access to tools. " "Use tools to accomplish tasks whenever possible. " "Speak clearly and briefly." ), "tools": TOOL_DEFINITIONS, "tool_choice": "auto", "audio": { "input": { "format": {"type": "audio/pcm", "rate": 24000}, "transcription": {"model": "gpt-4o-transcribe"}, "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500, }, }, "output": { "format": {"type": "audio/pcm", "rate": 24000}, }, }, }, } await send_event(ws, event) print("Session configured.") async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None: if not name: raise Exception("Did not get a function name") print(f"\n[Function Call] {name}({arguments})") tool_fn = TOOL_REGISTRY.get(name) if tool_fn is None: result = json.dumps({"error": f"Unknown function: {name}"}) else: try: args = json.loads(arguments) result = tool_fn(**args) if asyncio.iscoroutine(result): result = await result except Exception as e: result = json.dumps({"error": str(e)}) print(f"[Function Result] {result}") # Send the function call output back to the model. await send_event(ws, { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": call_id, "output": result if isinstance(result, str) else json.dumps(result), }, }) # Trigger a new response so the model incorporates the function result. await send_event(ws, {"type": "response.create"}) def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): """S'exécute dans un thread séparé car la méthode write() de pyaudio bloque jusqu'à ce que la carte son consomme les échantillons. Découpler la lecture de la boucle d'événements async permet de vider la file d'attente lors d'une interruption sans attendre la fin des écritures en cours.""" while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) async def send_mic_audio(ws, mic) -> None: try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) # Visual volume meter. audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"Mic Level: {'█' * meter}{' ' * (50 - meter)} |", end="\r") # Base64-encode and send audio chunk. b64_audio = base64.b64encode(raw_data).decode("utf-8") await send_event(ws, { "type": "input_audio_buffer.append", "audio": b64_audio, }) await asyncio.sleep(0) except asyncio.CancelledError: pass async def receive_events(ws, audio_output_queue: queue.Queue) -> None: # Accumulate function call arguments across delta events. pending_calls: dict[str, dict] = {} async for raw_message in ws: if DEBUG_WRITE_LOG: with open("data.jsonl", "a", encoding="utf-8") as f: f.write(json.dumps(raw_message) + "\n") event = json.loads(raw_message) event_type = event.get("type", "") if event_type == "session.created": print(raw_message) elif event_type == "session.updated": print(raw_message) elif event_type == "error": print(f"\n[Error] {event}") elif event_type == "input_audio_buffer.speech_started": # Flush queued AI audio so it doesn't talk over the user. while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break elif event_type == "input_audio_buffer.speech_stopped": pass elif event_type == "input_audio_buffer.committed": pass elif event_type == "response.created": pass elif event_type == "response.output_text.delta": pass elif event_type == "response.output_text.done": pass # Audio output deltas - queue for playback. elif event_type == "response.output_audio.delta": audio_bytes = base64.b64decode(event.get("delta", "")) audio_output_queue.put(audio_bytes) elif event_type == "response.output_audio_transcript.delta": pass elif event_type == "response.output_audio_transcript.done": pass # Function call started - initialize pending call. elif event_type == "response.output_item.added": item = event.get("item", {}) if item.get("type") == "function_call" and item.get("status") == "in_progress": item_id = item.get("id", "") pending_calls[item_id] = { "call_id": item.get("call_id", ""), "name": item.get("name", ""), "arguments": "", } print(f"\n[Function Call Started] {item.get('name', '')}") # Function call argument deltas - accumulate. elif event_type == "response.function_call_arguments.delta": item_id = event.get("item_id", "") if item_id in pending_calls: pending_calls[item_id]["arguments"] += event.get("delta", "") elif event_type == "response.function_call_arguments.done": item_id = event.get("item_id", "") call_info = pending_calls.pop(item_id, None) if call_info is None: # Repli : utiliser les données directement depuis l'événement done. call_info = { "call_id": event.get("call_id"), "name": event.get("name"), "arguments": event.get("arguments"), } try: await handle_function_call( ws, call_info["call_id"], call_info["name"], call_info["arguments"], ) except Exception as e: print(f"Échec de l'appel de fonction pour le message {call_info} : erreur - {e}") elif event_type == "response.done": pass elif event_type == "rate_limits.updated": pass else: print(f"\n[Event: {event_type}]") async def main(): if not OPENAI_API_KEY: print("Erreur : la variable d'environnement OPENAI_API_KEY n'est pas définie") return p = pyaudio.PyAudio() input_device_index = int(p.get_default_input_device_info()['index']) output_device_index = int(p.get_default_output_device_info()['index']) # Le nombre de canaux doit correspondre aux capacités de l'appareil, sinon pyaudio génère une erreur à l'ouverture. input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), 1) output_channels = min(int(output_info['maxOutputChannels']), 1) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # L'audio transite par une file d'attente afin de pouvoir la vider lorsque l'utilisateur interrompt la lecture. # Écrire directement sur le haut-parleur rend impossible l'annulation de l'audio en cours de lecture. audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", } print("Connexion à l'OpenAI Realtime API...") async with websockets.connect( REALTIME_URL, additional_headers=headers, ) as ws: print("Connecté ! Configuration de la session...") await configure_session(ws) print("--- Session active (parlez dans le micro) ---") mic_task = asyncio.create_task(send_mic_audio(ws, mic)) try: await receive_events(ws, audio_output_queue) finally: mic_task.cancel() try: await mic_task except asyncio.CancelledError: pass # Nettoyage audio_output_queue.put(None) # Signal au thread de lecture pour qu'il se termine. mic.close() speaker.close() p.terminate() print("\nSession terminée.") if __name__ == "__main__": asyncio.run(main()) -
Mettez à jour l’appel à
weave.init()avec le nom de votre équipe et de votre projet. -
Définissez la variable d’environnement
OPENAI_API_KEY. -
Exécutez le code :
python weave_ws_voice_assistant.py