メインコンテンツへスキップ
Weave の OpenAI Realtime API とのインテグレーションにより、アプリケーションで発生する音声対音声のやり取りをリアルタイムで自動的にトレースできます。これを使用すると、エージェントとユーザーの会話をキャプチャして、エージェントのパフォーマンスを確認・評価できます。

リアルタイムトレースを統合する

Weave は OpenAI Realtime API に自動的にパッチを適用するため、アプリケーションでのオーディオインタラクションのキャプチャを開始するには、数行のコードを追加するだけです。以下のコードでは、Weave と Realtime API のインテグレーションをインポートします。
import weave
from weave.integrations import patch_openai_realtime

weave.init("your-team-name/your-project-name")
patch_openai_realtime()

# アプリケーションのロジック
コードにimportして実行すると、Weave はユーザーと OpenAI Realtime API 間のやり取りを自動的にトレースします。

OpenAI Agents SDK を使用してリアルタイム音声アシスタントを実行する

この例では、マイクのオーディオを OpenAI の Realtime API にストリーミングし、AI の音声による応答をローカルマシンのスピーカーで再生するリアルタイム音声アシスタントを実行します。このアプリケーションは、RealtimeAgentRealtimeRunner を使用する OpenAI Agents SDK を利用し、patch_openai_realtime() を適用して Weave トレースを有効にします。 例を実行するには:
  1. Python環境を起動し、以下のライブラリをインストールします:
    uv add weave openai-agents websockets pyaudio numpy
    
  2. weave_voice_assistant.py という名前のファイルを作成し、以下のコードを追加します。 ハイライトされた行は、アプリケーション内でのWeaveのインテグレーションを示しています。残りのコードは、基本的な音声アシスタントアプリを作成するものです。
    import argparse
    import asyncio
    import queue
    import sys
    import termios
    import threading
    import tty
    import weave
    import pyaudio
    import numpy as np
    from weave.integrations import patch_openai_realtime
    from agents.realtime import RealtimeAgent, RealtimeRunner
    
    DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>"
    
    FORMAT = pyaudio.paInt16
    RATE = 24000  # OpenAI Realtime API で必須。
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 1
    MAX_OUTPUT_CHANNELS = 1
    
    INP_DEV_IDX = None
    OUT_DEV_IDX = None
    
    # Weave プロジェクト名とオーディオデバイス選択のための CLI 引数を解析する。
    def parse_args():
        parser = argparse.ArgumentParser(description="Weave ロギングを使用したリアルタイムエージェント")
        parser.add_argument(
            "--weave-project",
            default=DEFAULT_WEAVE_PROJECT,
            help=f"Weave プロジェクト名 (デフォルト: {DEFAULT_WEAVE_PROJECT})",
            dest="weave_project"
        )
        parser.add_argument(
            "--input-device",
            type=int,
            default=None,
            help="PyAudio 入力(マイク)デバイスのインデックス。デフォルトはシステムのデフォルト。デバイス一覧を確認するには mic_detect.py を実行してください。",
            dest="input_device"
        )
        parser.add_argument(
            "--output-device",
            type=int,
            default=None,
            help="PyAudio 出力(スピーカー)デバイスのインデックス。デフォルトはシステムのデフォルト。デバイス一覧を確認するには mic_detect.py を実行してください。",
            dest="output_device"
        )
        return parser.parse_args()
    
    
    # Weave を初期化し、トレース用に OpenAI Realtime API にパッチを適用する。
    def init_weave(project_name: str | None = None) -> None:
        name = project_name or DEFAULT_WEAVE_PROJECT
        weave.init(name)
        patch_openai_realtime()  # Realtime API セッションの自動トレースを有効化する。
    
    
    mic_enabled = True
    
    # 't' キーを監視してマイクのオン/オフを切り替える。デーモンスレッドで動作する。
    def start_keylistener():
        global mic_enabled
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            tty.setcbreak(fd)
            while True:
                ch = sys.stdin.read(1)
                if ch.lower() == 't':
                    mic_enabled = not mic_enabled
                    state = "ON" if mic_enabled else "OFF"
                    print(f"\n🎙  マイク {state} (t キーで切り替え)")
                elif ch == '\x03':  # Ctrl-C
                    break
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
    
    # バックグラウンドスレッドでオーディオキューを消費し、スピーカーに書き込む。
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    # オーディオストリームを開き、Realtime セッションを開始して送受信ループを実行する。
    async def main(*, input_device_index: int | None = None, output_device_index: int | None = None):
        p = pyaudio.PyAudio()
    
        if input_device_index is None:
            input_device_index = int(p.get_default_input_device_info()['index'])
        if output_device_index is None:
            output_device_index = int(p.get_default_output_device_info()['index'])
    
        # pyaudio エラーを防ぐため、チャンネル数をデバイスの上限に制限する。
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
    
        input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS)
        output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # 割り込み時に再生中のオーディオをフラッシュできるよう、キュー経由でバッファリングする。
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        s_agent = RealtimeAgent(
            name="Speech Assistant",
            instructions="あなたはツールを使用する AI です。可能な限りツールを使ってタスクを達成してください"
        )
    
        s_runner = RealtimeRunner(s_agent, config={
            "model_settings": {
                "model_name": "gpt-realtime",
                "modalities": ["audio"],
                "output_modalities": ["audio"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "speed": 1.2,
                "turn_detection": {
                    "prefix_padding_ms": 100,
                    "silence_duration_ms": 100,
                    "type": "server_vad",
                    "interrupt_response": True,
                    "create_response": True,
                },
            }
        })
        print("--- セッション開始(マイクに向かって話してください)---")
        print("🎙  マイク ON (t キーで切り替え)")
    
        threading.Thread(target=start_keylistener, daemon=True).start()
    
        async with await s_runner.run() as session:
            # マイク入力を Realtime API にストリーミングし、ミュート中は無音を送信する。
            async def send_mic_audio():
                silence = b'\x00' * CHUNK * 2  # サンプルあたり 2 バイト(16 ビット PCM)。
                try:
                    while True:
                        raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                        if mic_enabled:
                            audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                            rms = np.sqrt(np.mean(audio_data**2))
                            meter = int(min(rms / 50, 50))
                            print(f"マイクレベル: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r")
                            await session.send_audio(raw_data)
                        else:
                            print(f"マイクレベル: {' ' * 50} | 🎙 OFF", end="\r")
                            await session.send_audio(silence)
    
                        await asyncio.sleep(0)  # 読み取りの合間にイベントループへ制御を返す。
                except Exception:
                    pass
    
            # セッションからイベントを受信し、オーディオをスピーカーへ転送する。
            async def handle_events():
                async for event in session:
                    if event.type == "audio":
                        audio_output_queue.put(event.audio.data)
                    elif event.type == "audio_interrupted":
                        # キューに溜まった AI のオーディオをフラッシュして、ユーザーの発話に被らないようにする。
                        while not audio_output_queue.empty():
                            try:
                                audio_output_queue.get_nowait()
                            except queue.Empty:
                                break
    
            mic_task = asyncio.create_task(send_mic_audio())
            try:
                await handle_events()
            finally:
                mic_task.cancel()
    
        # クリーンアップ
        audio_output_queue.put(None)  # 再生スレッドに終了を通知する。
        mic.close()
        speaker.close()
        p.terminate()
    
    if __name__ == "__main__":
        args = parse_args()
        init_weave(args.weave_project)
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device))
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
  3. DEFAULT_WEAVE_PROJECT の値を、ご自身のチーム名とプロジェクト名に置き換えます。
  4. OPENAI_API_KEY 環境変数を設定してください。
  5. コードを実行します。
    python weave_voice_assistant.py
    
起動したら、キーボードの T を押してマイクのミュートを切り替えます。アシスタントは、発話のターン管理や割り込みに対応するために、サーバー側の音声アクティビティ検出を使用します。 アシスタントに話しかけると、Weave はセッションのオーディオを含むトレースをキャプチャし、その内容を Weave UI で確認できます。

WebSockets を使用してリアルタイム音声アシスタントを実行する

次の例では、WebSockets 経由で OpenAI Realtime API に直接接続します。マイクのオーディオを API にストリーミングし、音声応答を再生し、ツール呼び出し (天気の検索、数式の評価、コードの実行、ファイルの書き込み) をサポートします。Weave は weave.init()patch_openai_realtime() を使用してセッションをトレースします。 例を実行するには:
  1. Python 環境を起動し、次のライブラリをインストールしてください。
    uv add weave websockets pyaudio numpy
    
  2. tool_definitions.py という名前のファイルを作成し、次のツール定義を追加します。メインのアプリケーションはこのモジュールをインポートします。
    import json
    import subprocess
    import tempfile
    from pathlib import Path
    import weave
    
    
    # @function_tool
    @weave.op
    def get_weather(city: str) -> str:
        """都市の現在の天気を取得します。
    
        引数:
            city: 天気を取得する都市名。
        """
        return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"})
    
    
    @weave.op
    def calculate(expression: str) -> str:
        """数式を評価して、その結果を返します。
    
        引数:
            expression: 評価する数式。例: '2 + 2'。
        """
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"Error: {e}"
    
    
    @weave.op
    def run_python_code(code: str) -> str:
        """Python スクリプトを書き出して実行し、その stdout/stderr を返します。
    
        引数:
            code: 実行する Python のソースコード。
        """
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False
        ) as f:
            f.write(code)
            script_path = Path(f.name)
    
        try:
            result = subprocess.run(
                ["python", str(script_path)],
                capture_output=True,
                text=True,
                timeout=30,
            )
            output = result.stdout
            if result.stderr:
                output += f"\nSTDERR:\n{result.stderr}"
            if result.returncode != 0:
                output += f"\n(exit code {result.returncode})"
            return output or "(no output)"
        except subprocess.TimeoutExpired:
            return "Error: script timed out after 30 seconds."
        finally:
            script_path.unlink(missing_ok=True)
    
    
    @weave.op
    async def write_file(file_path: str, content: str) -> str:
        """ディスク上のファイルに内容を書き込みます。
    
        引数:
            file_path: ファイルの書き込み先パス。
            content: ファイルに書き込む内容。
        """
        try:
            path = Path(file_path)
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content)
            return f"Wrote {len(content)} bytes to {file_path}"
        except Exception as e:
            return f"Error writing file: {e}"
    
  3. 同じディレクトリに weave_ws_voice_assistant.py というファイルを作成し、次のコードを追加します。
    import asyncio
    import base64
    import json
    import os
    import queue
    import threading
    from typing import Any, Callable
    
    import numpy as np
    import pyaudio
    import websockets
    
    import weave
    weave.init("<your-team-name/your-project-name>")
    from weave.integrations import patch_openai_realtime
    patch_openai_realtime()
    
    from tool_definitions import (
        calculate,
        get_weather,
        run_python_code,
        write_file,
    )
    
    # オーディオフォーマット(Realtime API では PCM16 を使用する必要があります)。
    FORMAT = pyaudio.paInt16
    RATE = 24000
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 2
    MAX_OUTPUT_CHANNELS = 2
    
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
    REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
    
    DEBUG_WRITE_LOG = False
    
    # ツール名 -> 関数呼び出しディスパッチ用の callable のマッピング。
    TOOL_REGISTRY: dict[str, Callable[..., Any]] = {
        "get_weather": get_weather,
        "calculate": calculate,
        "run_python_code": run_python_code,
        "write_file": write_file,
    }
    
    # Realtime API セッション設定用の生のツール定義。
    TOOL_DEFINITIONS = [
        {
            "type": "function",
            "name": "get_weather",
            "description": "指定した都市の現在の天気を取得します。",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "天気を取得する都市名。",
                    }
                },
                "required": ["city"],
            },
        },
        {
            "type": "function",
            "name": "calculate",
            "description": "数式を評価して結果を返します。",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "評価する数式(例: '2 + 2')。",
                    }
                },
                "required": ["expression"],
            },
        },
        {
            "type": "function",
            "name": "run_python_code",
            "description": "Python スクリプトを記述して実行し、stdout/stderr を返します。",
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "実行する Python ソースコード。",
                    }
                },
                "required": ["code"],
            },
        },
        {
            "type": "function",
            "name": "write_file",
            "description": "ディスク上のファイルにコンテンツを書き込みます。",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "ファイルの書き込み先パス。",
                    },
                    "content": {
                        "type": "string",
                        "description": "ファイルに書き込むコンテンツ。",
                    },
                },
                "required": ["file_path", "content"],
            },
        },
    ]
    
    
    async def send_event(ws, event: dict) -> None:
        await ws.send(json.dumps(event))
    
    
    async def configure_session(ws) -> None:
        event = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "model": "gpt-realtime",
                "output_modalities": ["audio"],
                "instructions": (
                    "あなたはツールを利用できる有能な AI アシスタントです。"
                    "可能な限りツールを使用してタスクを達成してください。"
                    "明確かつ簡潔に話してください。"
                ),
                "tools": TOOL_DEFINITIONS,
                "tool_choice": "auto",
                "audio": {
                    "input": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                        "transcription": {"model": "gpt-4o-transcribe"},
                        "turn_detection": {
                            "type": "server_vad",
                            "threshold": 0.5,
                            "prefix_padding_ms": 300,
                            "silence_duration_ms": 500,
                        },
                    },
                    "output": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                    },
                },
            },
        }
        await send_event(ws, event)
        print("セッションが設定されました。")
    
    
    async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None:
        if not name:
            raise Exception("関数名が取得できませんでした")
    
        print(f"\n[関数呼び出し] {name}({arguments})")
        tool_fn = TOOL_REGISTRY.get(name)
        if tool_fn is None:
            result = json.dumps({"error": f"不明な関数: {name}"})
        else:
            try:
                args = json.loads(arguments)
                result = tool_fn(**args)
                if asyncio.iscoroutine(result):
                    result = await result
            except Exception as e:
                result = json.dumps({"error": str(e)})
    
        print(f"[関数の結果] {result}")
    
        # 関数呼び出しの出力をモデルに送り返す。
        await send_event(ws, {
            "type": "conversation.item.create",
            "item": {
                "type": "function_call_output",
                "call_id": call_id,
                "output": result if isinstance(result, str) else json.dumps(result),
            },
        })
    
        # モデルが関数の結果を反映できるよう、新しいレスポンスをトリガーする。
        await send_event(ws, {"type": "response.create"})
    
    
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        """pyaudio の write() はサウンドカードがサンプルを消費するまでブロックするため、
        別スレッドで実行します。再生を非同期イベントループから切り離すことで、
        処理中の書き込みの完了を待たずに割り込み時にキューをフラッシュできます。"""
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    async def send_mic_audio(ws, mic) -> None:
        try:
            while True:
                raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                # ビジュアルボリュームメーター。
                audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                rms = np.sqrt(np.mean(audio_data**2))
                meter = int(min(rms / 50, 50))
                print(f"マイクレベル: {'█' * meter}{' ' * (50 - meter)} |", end="\r")
    
                # Base64 エンコードしてオーディオチャンクを送信する。
                b64_audio = base64.b64encode(raw_data).decode("utf-8")
                await send_event(ws, {
                    "type": "input_audio_buffer.append",
                    "audio": b64_audio,
                })
    
                await asyncio.sleep(0)
        except asyncio.CancelledError:
            pass
    
    
    async def receive_events(ws, audio_output_queue: queue.Queue) -> None:
        # デルタイベントをまたいで関数呼び出しの引数を蓄積する。
        pending_calls: dict[str, dict] = {}
        async for raw_message in ws:
            if DEBUG_WRITE_LOG:
                with open("data.jsonl", "a", encoding="utf-8") as f:
                    f.write(json.dumps(raw_message) + "\n")
    
            event = json.loads(raw_message)
            event_type = event.get("type", "")
    
            if event_type == "session.created":
                print(raw_message)
    
            elif event_type == "session.updated":
                print(raw_message)
    
            elif event_type == "error":
                print(f"\n[エラー] {event}")
    
            elif event_type == "input_audio_buffer.speech_started":
                # キューに溜まった AI オーディオをフラッシュして、ユーザーの発話に被らないようにする。
                while not audio_output_queue.empty():
                    try:
                        audio_output_queue.get_nowait()
                    except queue.Empty:
                        break
    
            elif event_type == "input_audio_buffer.speech_stopped":
                pass
    
            elif event_type == "input_audio_buffer.committed":
                pass
    
            elif event_type == "response.created":
                pass
    
            elif event_type == "response.output_text.delta":
                pass
    
            elif event_type == "response.output_text.done":
                pass
    
            # オーディオ出力デルタ - 再生用にキューに追加する。
            elif event_type == "response.output_audio.delta":
                audio_bytes = base64.b64decode(event.get("delta", ""))
                audio_output_queue.put(audio_bytes)
    
            elif event_type == "response.output_audio_transcript.delta":
                pass
    
            elif event_type == "response.output_audio_transcript.done":
                pass
    
            # 関数呼び出し開始 - 保留中の呼び出しを初期化する。
            elif event_type == "response.output_item.added":
                item = event.get("item", {})
                if item.get("type") == "function_call" and item.get("status") == "in_progress":
                    item_id = item.get("id", "")
                    pending_calls[item_id] = {
                        "call_id": item.get("call_id", ""),
                        "name": item.get("name", ""),
                        "arguments": "",
                    }
                    print(f"\n[関数呼び出し開始] {item.get('name', '')}")
    
            # 関数呼び出しの引数デルタ - 蓄積する。
            elif event_type == "response.function_call_arguments.delta":
                item_id = event.get("item_id", "")
                if item_id in pending_calls:
                    pending_calls[item_id]["arguments"] += event.get("delta", "")
    
            elif event_type == "response.function_call_arguments.done":
                item_id = event.get("item_id", "")
                call_info = pending_calls.pop(item_id, None)
                if call_info is None:
                    # Fallback: use data directly from the done event.
                    call_info = {
                        "call_id": event.get("call_id"),
                        "name": event.get("name"),
                        "arguments": event.get("arguments"),
                    }
                try:
                    await handle_function_call(
                        ws,
                        call_info["call_id"],
                        call_info["name"],
                        call_info["arguments"],
                    )
                except Exception as e:
                    print(f"Failed to call function for message {call_info}: error - {e}")
    
            elif event_type == "response.done":
                pass
    
            elif event_type == "rate_limits.updated":
                pass
    
            else:
                print(f"\n[Event: {event_type}]")
    
    
    async def main():
        if not OPENAI_API_KEY:
            print("Error: OPENAI_API_KEY environment variable not set")
            return
    
        p = pyaudio.PyAudio()
    
        input_device_index = int(p.get_default_input_device_info()['index'])
        output_device_index = int(p.get_default_output_device_info()['index'])
    
        # Channel count must match the device's capabilities or pyaudio errors upon open.
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
        input_channels = min(int(input_info['maxInputChannels']), 1)
        output_channels = min(int(output_info['maxOutputChannels']), 1)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # オーディオをキュー経由にすることで、ユーザーが割り込んだときにフラッシュできます。
        # スピーカーに直接書き込むと、再生中のオーディオをキャンセルできなくなります。
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
        }
    
        print("Connecting to OpenAI Realtime API...")
    
        async with websockets.connect(
            REALTIME_URL,
            additional_headers=headers,
        ) as ws:
            print("Connected! Configuring session...")
            await configure_session(ws)
    
            print("--- Session Active (Speak into mic) ---")
    
            mic_task = asyncio.create_task(send_mic_audio(ws, mic))
            try:
                await receive_events(ws, audio_output_queue)
            finally:
                mic_task.cancel()
                try:
                    await mic_task
                except asyncio.CancelledError:
                    pass
    
        # Cleanup
        audio_output_queue.put(None)  # Signal playback thread to exit.
        mic.close()
        speaker.close()
        p.terminate()
        print("\nSession ended.")
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
  4. weave.init() の呼び出しを、チーム名とプロジェクト名に合わせて更新します。
  5. OPENAI_API_KEY 環境変数を設定してください。
  6. 次のコードを実行します:
    python weave_ws_voice_assistant.py
    
起動したら、キーボードの T キーを押してマイクのミュートを切り替えます。アシスタントは、発話のターン管理と割り込みの処理に、サーバー側の音声アクティビティ検出を使用します。 アシスタントに話しかけると、Weave がセッションのオーディオを含むトレースを記録し、それを Weave UI で確認できます。