リアルタイムトレースを統合する
import weave
from weave.integrations import patch_openai_realtime
weave.init("your-team-name/your-project-name")
patch_openai_realtime()
# アプリケーションのロジック
OpenAI Agents SDK を使用してリアルタイム音声アシスタントを実行する
RealtimeAgent と RealtimeRunner を使用する OpenAI Agents SDK を利用し、patch_openai_realtime() を適用して Weave トレースを有効にします。
例を実行するには:
-
Python環境を起動し、以下のライブラリをインストールします:
- uv
- pip
uv add weave openai-agents websockets pyaudio numpypip install weave openai-agents websockets pyaudio numpy -
weave_voice_assistant.pyという名前のファイルを作成し、以下のコードを追加します。 ハイライトされた行は、アプリケーション内でのWeaveのインテグレーションを示しています。残りのコードは、基本的な音声アシスタントアプリを作成するものです。weave_voice_assistant.py
import argparse import asyncio import queue import sys import termios import threading import tty import weave import pyaudio import numpy as np from weave.integrations import patch_openai_realtime from agents.realtime import RealtimeAgent, RealtimeRunner DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>" FORMAT = pyaudio.paInt16 RATE = 24000 # OpenAI Realtime API で必須。 CHUNK = 1024 MAX_INPUT_CHANNELS = 1 MAX_OUTPUT_CHANNELS = 1 INP_DEV_IDX = None OUT_DEV_IDX = None # Weave プロジェクト名とオーディオデバイス選択のための CLI 引数を解析する。 def parse_args(): parser = argparse.ArgumentParser(description="Weave ロギングを使用したリアルタイムエージェント") parser.add_argument( "--weave-project", default=DEFAULT_WEAVE_PROJECT, help=f"Weave プロジェクト名 (デフォルト: {DEFAULT_WEAVE_PROJECT})", dest="weave_project" ) parser.add_argument( "--input-device", type=int, default=None, help="PyAudio 入力(マイク)デバイスのインデックス。デフォルトはシステムのデフォルト。デバイス一覧を確認するには mic_detect.py を実行してください。", dest="input_device" ) parser.add_argument( "--output-device", type=int, default=None, help="PyAudio 出力(スピーカー)デバイスのインデックス。デフォルトはシステムのデフォルト。デバイス一覧を確認するには mic_detect.py を実行してください。", dest="output_device" ) return parser.parse_args() # Weave を初期化し、トレース用に OpenAI Realtime API にパッチを適用する。 def init_weave(project_name: str | None = None) -> None: name = project_name or DEFAULT_WEAVE_PROJECT weave.init(name) patch_openai_realtime() # Realtime API セッションの自動トレースを有効化する。 mic_enabled = True # 't' キーを監視してマイクのオン/オフを切り替える。デーモンスレッドで動作する。 def start_keylistener(): global mic_enabled fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setcbreak(fd) while True: ch = sys.stdin.read(1) if ch.lower() == 't': mic_enabled = not mic_enabled state = "ON" if mic_enabled else "OFF" print(f"\n🎙 マイク {state} (t キーで切り替え)") elif ch == '\x03': # Ctrl-C break finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) # バックグラウンドスレッドでオーディオキューを消費し、スピーカーに書き込む。 def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) # オーディオストリームを開き、Realtime セッションを開始して送受信ループを実行する。 async def main(*, input_device_index: int | None = None, output_device_index: int | None = None): p = pyaudio.PyAudio() if input_device_index is None: input_device_index = int(p.get_default_input_device_info()['index']) if output_device_index is None: output_device_index = int(p.get_default_output_device_info()['index']) # pyaudio エラーを防ぐため、チャンネル数をデバイスの上限に制限する。 input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS) output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # 割り込み時に再生中のオーディオをフラッシュできるよう、キュー経由でバッファリングする。 audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() s_agent = RealtimeAgent( name="Speech Assistant", instructions="あなたはツールを使用する AI です。可能な限りツールを使ってタスクを達成してください" ) s_runner = RealtimeRunner(s_agent, config={ "model_settings": { "model_name": "gpt-realtime", "modalities": ["audio"], "output_modalities": ["audio"], "input_audio_format": "pcm16", "output_audio_format": "pcm16", "speed": 1.2, "turn_detection": { "prefix_padding_ms": 100, "silence_duration_ms": 100, "type": "server_vad", "interrupt_response": True, "create_response": True, }, } }) print("--- セッション開始(マイクに向かって話してください)---") print("🎙 マイク ON (t キーで切り替え)") threading.Thread(target=start_keylistener, daemon=True).start() async with await s_runner.run() as session: # マイク入力を Realtime API にストリーミングし、ミュート中は無音を送信する。 async def send_mic_audio(): silence = b'\x00' * CHUNK * 2 # サンプルあたり 2 バイト(16 ビット PCM)。 try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) if mic_enabled: audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"マイクレベル: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r") await session.send_audio(raw_data) else: print(f"マイクレベル: {' ' * 50} | 🎙 OFF", end="\r") await session.send_audio(silence) await asyncio.sleep(0) # 読み取りの合間にイベントループへ制御を返す。 except Exception: pass # セッションからイベントを受信し、オーディオをスピーカーへ転送する。 async def handle_events(): async for event in session: if event.type == "audio": audio_output_queue.put(event.audio.data) elif event.type == "audio_interrupted": # キューに溜まった AI のオーディオをフラッシュして、ユーザーの発話に被らないようにする。 while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break mic_task = asyncio.create_task(send_mic_audio()) try: await handle_events() finally: mic_task.cancel() # クリーンアップ audio_output_queue.put(None) # 再生スレッドに終了を通知する。 mic.close() speaker.close() p.terminate() if __name__ == "__main__": args = parse_args() init_weave(args.weave_project) fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device)) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -
DEFAULT_WEAVE_PROJECTの値を、ご自身のチーム名とプロジェクト名に置き換えます。 -
OPENAI_API_KEY環境変数を設定してください。 -
コードを実行します。
python weave_voice_assistant.py
WebSockets を使用してリアルタイム音声アシスタントを実行する
weave.init() と patch_openai_realtime() を使用してセッションをトレースします。
例を実行するには:
-
Python 環境を起動し、次のライブラリをインストールしてください。
- uv
- pip
uv add weave websockets pyaudio numpypip install weave websockets pyaudio numpy -
tool_definitions.pyという名前のファイルを作成し、次のツール定義を追加します。メインのアプリケーションはこのモジュールをインポートします。tool_definitions.py
import json import subprocess import tempfile from pathlib import Path import weave # @function_tool @weave.op def get_weather(city: str) -> str: """都市の現在の天気を取得します。 引数: city: 天気を取得する都市名。 """ return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"}) @weave.op def calculate(expression: str) -> str: """数式を評価して、その結果を返します。 引数: expression: 評価する数式。例: '2 + 2'。 """ try: result = eval(expression) return str(result) except Exception as e: return f"Error: {e}" @weave.op def run_python_code(code: str) -> str: """Python スクリプトを書き出して実行し、その stdout/stderr を返します。 引数: code: 実行する Python のソースコード。 """ with tempfile.NamedTemporaryFile( mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False ) as f: f.write(code) script_path = Path(f.name) try: result = subprocess.run( ["python", str(script_path)], capture_output=True, text=True, timeout=30, ) output = result.stdout if result.stderr: output += f"\nSTDERR:\n{result.stderr}" if result.returncode != 0: output += f"\n(exit code {result.returncode})" return output or "(no output)" except subprocess.TimeoutExpired: return "Error: script timed out after 30 seconds." finally: script_path.unlink(missing_ok=True) @weave.op async def write_file(file_path: str, content: str) -> str: """ディスク上のファイルに内容を書き込みます。 引数: file_path: ファイルの書き込み先パス。 content: ファイルに書き込む内容。 """ try: path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) return f"Wrote {len(content)} bytes to {file_path}" except Exception as e: return f"Error writing file: {e}" -
同じディレクトリに
weave_ws_voice_assistant.pyというファイルを作成し、次のコードを追加します。weave_ws_voice_assistant.py
import asyncio import base64 import json import os import queue import threading from typing import Any, Callable import numpy as np import pyaudio import websockets import weave weave.init("<your-team-name/your-project-name>") from weave.integrations import patch_openai_realtime patch_openai_realtime() from tool_definitions import ( calculate, get_weather, run_python_code, write_file, ) # オーディオフォーマット(Realtime API では PCM16 を使用する必要があります)。 FORMAT = pyaudio.paInt16 RATE = 24000 CHUNK = 1024 MAX_INPUT_CHANNELS = 2 MAX_OUTPUT_CHANNELS = 2 OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime" DEBUG_WRITE_LOG = False # ツール名 -> 関数呼び出しディスパッチ用の callable のマッピング。 TOOL_REGISTRY: dict[str, Callable[..., Any]] = { "get_weather": get_weather, "calculate": calculate, "run_python_code": run_python_code, "write_file": write_file, } # Realtime API セッション設定用の生のツール定義。 TOOL_DEFINITIONS = [ { "type": "function", "name": "get_weather", "description": "指定した都市の現在の天気を取得します。", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "天気を取得する都市名。", } }, "required": ["city"], }, }, { "type": "function", "name": "calculate", "description": "数式を評価して結果を返します。", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "評価する数式(例: '2 + 2')。", } }, "required": ["expression"], }, }, { "type": "function", "name": "run_python_code", "description": "Python スクリプトを記述して実行し、stdout/stderr を返します。", "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "実行する Python ソースコード。", } }, "required": ["code"], }, }, { "type": "function", "name": "write_file", "description": "ディスク上のファイルにコンテンツを書き込みます。", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "ファイルの書き込み先パス。", }, "content": { "type": "string", "description": "ファイルに書き込むコンテンツ。", }, }, "required": ["file_path", "content"], }, }, ] async def send_event(ws, event: dict) -> None: await ws.send(json.dumps(event)) async def configure_session(ws) -> None: event = { "type": "session.update", "session": { "type": "realtime", "model": "gpt-realtime", "output_modalities": ["audio"], "instructions": ( "あなたはツールを利用できる有能な AI アシスタントです。" "可能な限りツールを使用してタスクを達成してください。" "明確かつ簡潔に話してください。" ), "tools": TOOL_DEFINITIONS, "tool_choice": "auto", "audio": { "input": { "format": {"type": "audio/pcm", "rate": 24000}, "transcription": {"model": "gpt-4o-transcribe"}, "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500, }, }, "output": { "format": {"type": "audio/pcm", "rate": 24000}, }, }, }, } await send_event(ws, event) print("セッションが設定されました。") async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None: if not name: raise Exception("関数名が取得できませんでした") print(f"\n[関数呼び出し] {name}({arguments})") tool_fn = TOOL_REGISTRY.get(name) if tool_fn is None: result = json.dumps({"error": f"不明な関数: {name}"}) else: try: args = json.loads(arguments) result = tool_fn(**args) if asyncio.iscoroutine(result): result = await result except Exception as e: result = json.dumps({"error": str(e)}) print(f"[関数の結果] {result}") # 関数呼び出しの出力をモデルに送り返す。 await send_event(ws, { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": call_id, "output": result if isinstance(result, str) else json.dumps(result), }, }) # モデルが関数の結果を反映できるよう、新しいレスポンスをトリガーする。 await send_event(ws, {"type": "response.create"}) def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): """pyaudio の write() はサウンドカードがサンプルを消費するまでブロックするため、 別スレッドで実行します。再生を非同期イベントループから切り離すことで、 処理中の書き込みの完了を待たずに割り込み時にキューをフラッシュできます。""" while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) async def send_mic_audio(ws, mic) -> None: try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) # ビジュアルボリュームメーター。 audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"マイクレベル: {'█' * meter}{' ' * (50 - meter)} |", end="\r") # Base64 エンコードしてオーディオチャンクを送信する。 b64_audio = base64.b64encode(raw_data).decode("utf-8") await send_event(ws, { "type": "input_audio_buffer.append", "audio": b64_audio, }) await asyncio.sleep(0) except asyncio.CancelledError: pass async def receive_events(ws, audio_output_queue: queue.Queue) -> None: # デルタイベントをまたいで関数呼び出しの引数を蓄積する。 pending_calls: dict[str, dict] = {} async for raw_message in ws: if DEBUG_WRITE_LOG: with open("data.jsonl", "a", encoding="utf-8") as f: f.write(json.dumps(raw_message) + "\n") event = json.loads(raw_message) event_type = event.get("type", "") if event_type == "session.created": print(raw_message) elif event_type == "session.updated": print(raw_message) elif event_type == "error": print(f"\n[エラー] {event}") elif event_type == "input_audio_buffer.speech_started": # キューに溜まった AI オーディオをフラッシュして、ユーザーの発話に被らないようにする。 while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break elif event_type == "input_audio_buffer.speech_stopped": pass elif event_type == "input_audio_buffer.committed": pass elif event_type == "response.created": pass elif event_type == "response.output_text.delta": pass elif event_type == "response.output_text.done": pass # オーディオ出力デルタ - 再生用にキューに追加する。 elif event_type == "response.output_audio.delta": audio_bytes = base64.b64decode(event.get("delta", "")) audio_output_queue.put(audio_bytes) elif event_type == "response.output_audio_transcript.delta": pass elif event_type == "response.output_audio_transcript.done": pass # 関数呼び出し開始 - 保留中の呼び出しを初期化する。 elif event_type == "response.output_item.added": item = event.get("item", {}) if item.get("type") == "function_call" and item.get("status") == "in_progress": item_id = item.get("id", "") pending_calls[item_id] = { "call_id": item.get("call_id", ""), "name": item.get("name", ""), "arguments": "", } print(f"\n[関数呼び出し開始] {item.get('name', '')}") # 関数呼び出しの引数デルタ - 蓄積する。 elif event_type == "response.function_call_arguments.delta": item_id = event.get("item_id", "") if item_id in pending_calls: pending_calls[item_id]["arguments"] += event.get("delta", "") elif event_type == "response.function_call_arguments.done": item_id = event.get("item_id", "") call_info = pending_calls.pop(item_id, None) if call_info is None: # Fallback: use data directly from the done event. call_info = { "call_id": event.get("call_id"), "name": event.get("name"), "arguments": event.get("arguments"), } try: await handle_function_call( ws, call_info["call_id"], call_info["name"], call_info["arguments"], ) except Exception as e: print(f"Failed to call function for message {call_info}: error - {e}") elif event_type == "response.done": pass elif event_type == "rate_limits.updated": pass else: print(f"\n[Event: {event_type}]") async def main(): if not OPENAI_API_KEY: print("Error: OPENAI_API_KEY environment variable not set") return p = pyaudio.PyAudio() input_device_index = int(p.get_default_input_device_info()['index']) output_device_index = int(p.get_default_output_device_info()['index']) # Channel count must match the device's capabilities or pyaudio errors upon open. input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), 1) output_channels = min(int(output_info['maxOutputChannels']), 1) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # オーディオをキュー経由にすることで、ユーザーが割り込んだときにフラッシュできます。 # スピーカーに直接書き込むと、再生中のオーディオをキャンセルできなくなります。 audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", } print("Connecting to OpenAI Realtime API...") async with websockets.connect( REALTIME_URL, additional_headers=headers, ) as ws: print("Connected! Configuring session...") await configure_session(ws) print("--- Session Active (Speak into mic) ---") mic_task = asyncio.create_task(send_mic_audio(ws, mic)) try: await receive_events(ws, audio_output_queue) finally: mic_task.cancel() try: await mic_task except asyncio.CancelledError: pass # Cleanup audio_output_queue.put(None) # Signal playback thread to exit. mic.close() speaker.close() p.terminate() print("\nSession ended.") if __name__ == "__main__": asyncio.run(main()) -
weave.init()の呼び出しを、チーム名とプロジェクト名に合わせて更新します。 -
OPENAI_API_KEY環境変数を設定してください。 -
次のコードを実行します:
python weave_ws_voice_assistant.py