メインコンテンツへスキップ
これはインタラクティブなノートブックです。ローカルで実行するか、以下のリンクから開くことができます:

音声データで Weave を使う方法: OpenAI の例

このデモでは、OpenAI chat completions API と GPT 4o Audio Preview を使用して、テキストプロンプトに対する音声応答を生成し、それらを Weave でトラッキングします。 GPT 4o Audio Preview インテグレーションと音声応答生成ワークフローを備えた OpenAI chat completions API のインターフェース 高度なユースケースでは、OpenAI Realtime API を活用して音声をリアルタイムにストリーミングします。以下のサムネイルをクリックしてデモ動画を視聴するか、こちらをクリックしてください。 Everything Is AWESOME

セットアップ

まず、OpenAI(openai)と Weave(weave)、および APIキー管理用の依存パッケージ set-env をインストールします。
%%capture
!pip install openai
!pip install weave
!pip install set-env-colab-kaggle-dotenv -q # 環境変数用
python
%%capture
# openai のバグを修正するための一時的な回避策:
# TypeError: Client.__init__() got an unexpected keyword argument 'proxies'
# 参照: https://community.openai.com/t/error-with-openai-1-56-0-client-init-got-an-unexpected-keyword-argument-proxies/1040332/15
!pip install "httpx<0.28"
次に、OpenAI と Weave に必要な APIキー を読み込みます。ここでは、Google Colab のシークレットキー マネージャーと互換性のある set_env を使用します。これは、Colab 固有の google.colab.userdata の代替手段です。使い方についてはこちらを参照してください。
# 環境変数を設定する。
from set_env import set_env

_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")
最後に必要なライブラリをインポートします。
import base64
import os
import time
import wave

import numpy as np
from IPython.display import display
from openai import OpenAI

import weave

オーディオのストリーミングと保存の例

ここでは、音声モダリティを有効にして OpenAI の completions エンドポイントを呼び出せるように設定します。まず OpenAI クライアントを作成し、Weave プロジェクトを初期化します。
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
weave.init("openai-audio-chat")
ここでは、OpenAI の completions リクエストを定義し、Weave のデコレータ(op)を追加します。 関数 prompt_endpont_and_log_trace を定義します。この関数には主に 3 つのステップがあります。
  1. テキストと音声の入力・出力をサポートする GPT 4o Audio Preview モデルを使って completion オブジェクトを作成します。
    • モデルに対して、さまざまなアクセントで、ゆっくり 13 まで数えるようにプロンプトします。
    • completion を「stream」に設定します。
  2. ストリーミングされたデータがチャンクごとに書き込まれる新しい出力ファイルを開きます。
  3. Weave がトレース内で音声データをログできるように、音声ファイルへのオープンなファイルハンドラーを返します。
SAMPLE_RATE = 22050

@weave.op()
def prompt_endpoint_and_log_trace(system_prompt=None, user_prompt=None):
    if not system_prompt:
        system_prompt = "You're the fastest counter in the world"
    if not user_prompt:
        user_prompt = "Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc."
    # オーディオモダリティを使用してOpenAI APIにリクエスト
    completion = client.chat.completions.create(
        model="gpt-4o-audio-preview",
        modalities=["text", "audio"],
        audio={"voice": "fable", "format": "pcm16"},
        stream=True,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )

    # 書き込み用にwaveファイルを開く
    with wave.open("./output.wav", "wb") as wav_file:
        wav_file.setnchannels(1)  # モノラル
        wav_file.setsampwidth(2)  # 16ビット
        wav_file.setframerate(SAMPLE_RATE)  # サンプルレート(必要に応じて調整)

        # APIからストリーミングされたチャンクを書き込む
        for chunk in completion:
            if (
                hasattr(chunk, "choices")
                and chunk.choices is not None
                and len(chunk.choices) > 0
                and hasattr(chunk.choices[0].delta, "audio")
                and chunk.choices[0].delta.audio.get("data") is not None
            ):
                # base64オーディオデータをデコード
                audio_data = base64.b64decode(chunk.choices[0].delta.audio.get("data"))

                # 現在のチャンクをwaveファイルに書き込む
                wav_file.writeframes(audio_data)

    # Weave opにファイルを返す
    return wave.open("output.wav", "rb")

テスト

次のセルを実行してください。system プロンプトと user プロンプト、および出力音声は Weave のトレースに保存されます。 セルの実行後、「🍩」の絵文字の横にあるリンクをクリックしてトレースを確認します。
from IPython.display import Audio

# オーディオストリームを書き込む関数を呼び出す
prompt_endpoint_and_log_trace(
    system_prompt="You're the fastest counter in the world",
    user_prompt="Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc.",
)

# 更新されたオーディオストリームを表示する
display(Audio("output.wav", rate=SAMPLE_RATE, autoplay=True))

高度な使い方: Weave を使った Realtime Audio API

Weave とストリーミング音声会話インターフェースによる Realtime Audio API インテグレーション OpenAI の Realtime API は、リアルタイム音声およびテキストアシスタントを構築するための、高機能で信頼性の高い会話 API です。 次の点を確認してください:
  • Microphone Configuration のセルを確認する
  • Google Colab の実行環境の制約により、これはブラウザ上では実行できず、ホストマシン上の Jupyter Notebook として実行する必要があります。
    • MacOS では、Pyaudio を動作させるために、Brew 経由で portaudio をインストールする必要があります(こちら を参照)。
  • OpenAI の Python SDK は、まだ Realtime API をサポートしていません。可読性を高めるために、Pydantic で OAI Realtime API スキーマを完全に実装しており、公式サポートがリリースされた際には非推奨にする可能性があります。
  • enable_audio_playback トグルを有効にすると、アシスタントが出力した音声を再生します。これを有効にした場合はヘッドフォンの着用が必須であり、エコー検出には非常に複雑な実装が必要になる点に注意してください。

前提条件の設定

%%capture
!pip install numpy==2.0
!pip install weave
!pip install pyaudio # Macの場合、先に`brew install portaudio`でportaudioをインストールする必要があります
!pip install websocket-client
!pip install set-env-colab-kaggle-dotenv -q # 環境変数用
!pip install resampy
python
import io
import json
import os
import threading
from typing import Optional

import pyaudio
import resampy
import websocket
from set_env import set_env

import weave
python
# 環境変数を設定します。
# 使用方法については https://pypi.org/project/set-env-colab-kaggle-dotenv/ を参照してください。
_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")

マイク設定

利用可能なすべてのオーディオデバイスを列挙するには、次のセルを実行します。次に、表示されたデバイス一覧に基づいて INPUT_DEVICE_INDEXOUTPUT_DEVICE_INDEX を設定します。入力デバイスには少なくとも 1 つの入力チャンネルがあり、出力デバイスには少なくとも 1 つの出力チャンネルがあります。
# 次のセルを設定するためにpyaudioからデバイスリストを取得する
p = pyaudio.PyAudio()
devices_data = {i: p.get_device_info_by_index(i) for i in range(p.get_device_count())}
for i, device in devices_data.items():
    print(
        f"Found device @{i}: {device['name']} with sample rate: {device['defaultSampleRate']} and input channels: {device['maxInputChannels']} and output channels: {device['maxOutputChannels']}"
    )
python
INPUT_DEVICE_INDEX = 3  # @param                                                 # 上記のデバイスリストに基づいて選択する。デバイスの入力チャンネルが1以上であることを確認すること。
OUTPUT_DEVICE_INDEX = 12  # @param                                                # 上記のデバイスリストに基づいて選択する。デバイスの出力チャンネルが1以上であることを確認すること。
enable_audio_playback = True  # @param {type:"boolean"}                           # アシスタントの音声再生をオンにする。ヘッドフォンが必要。

# 音声録音およびストリーミングのパラメータ
INPUT_DEVICE_CHANNELS = devices_data[INPUT_DEVICE_INDEX][
    "maxInputChannels"
]  # 上記のデバイスリストより
SAMPLE_RATE = int(
    devices_data[INPUT_DEVICE_INDEX]["defaultSampleRate"]
)  # 上記のデバイスリストより
CHUNK = int(SAMPLE_RATE / 10)  # フレームあたりのサンプル数
SAMPLE_WIDTH = p.get_sample_size(pyaudio.paInt16)  # フォーマットのフレームあたりのサンプル幅
CHUNK_DURATION = 0.3  # OAI APIに送信する1チャンクあたりの音声時間(秒)
OAI_SAMPLE_RATE = (
    24000  # OAIのサンプルレートは24kHz。アシスタントの音声を再生または保存するために必要
)
OUTPUT_DEVICE_CHANNELS = 1  # モノラル出力の場合は1に設定

OpenAI Realtime API スキーマの実装

OpenAI Python SDK はまだ Realtime API をサポートしていません。公式サポートが提供されるまでの間、より読みやすくするために Pydantic で OAI Realtime API スキーマ全体を実装しており、公式サポートの提供後はこの実装自体を非推奨とする可能性があります。

OpenAI Realtime API 向けの Pydantic スキーマ

from enum import Enum
from typing import Any, Literal, Union

from pydantic import BaseModel, Field, ValidationError

class BaseEvent(BaseModel):
    type: Union["ClientEventTypes", "ServerEventTypes"]
    event_id: Optional[str] = None  # Add event_id as an optional field for all events

    # def model_dump_json(self, *args, **kwargs):
    #     # Only include non-None fields
    #     return super().model_dump_json(*args, exclude_none=True, **kwargs)

class ChatMessage(BaseModel):
    role: Literal["user", "assistant"]
    content: str
    timestamp: float

""" CLIENT EVENTS """

class ClientEventTypes(str, Enum):
    SESSION_UPDATE = "session.update"
    CONVERSATION_ITEM_CREATE = "conversation.item.create"
    CONVERSATION_ITEM_TRUNCATE = "conversation.item.truncate"
    CONVERSATION_ITEM_DELETE = "conversation.item.delete"
    RESPONSE_CREATE = "response.create"
    RESPONSE_CANCEL = "response.cancel"
    INPUT_AUDIO_BUFFER_APPEND = "input_audio_buffer.append"
    INPUT_AUDIO_BUFFER_COMMIT = "input_audio_buffer.commit"
    INPUT_AUDIO_BUFFER_CLEAR = "input_audio_buffer.clear"
    ERROR = "error"

#### Session Update
class TurnDetection(BaseModel):
    type: Literal["server_vad"]
    threshold: float = Field(..., ge=0.0, le=1.0)
    prefix_padding_ms: int
    silence_duration_ms: int

class InputAudioTranscription(BaseModel):
    model: Optional[str] = None

class ToolParameterProperty(BaseModel):
    type: str

class ToolParameter(BaseModel):
    type: str
    properties: dict[str, ToolParameterProperty]
    required: list[str]

class Tool(BaseModel):
    type: Literal["function", "code_interpreter", "file_search"]
    name: Optional[str] = None
    description: Optional[str] = None
    parameters: Optional[ToolParameter] = None

class Session(BaseModel):
    modalities: Optional[list[str]] = None
    instructions: Optional[str] = None
    voice: Optional[str] = None
    input_audio_format: Optional[str] = None
    output_audio_format: Optional[str] = None
    input_audio_transcription: Optional[InputAudioTranscription] = None
    turn_detection: Optional[TurnDetection] = None
    tools: Optional[list[Tool]] = None
    tool_choice: Optional[str] = None
    temperature: Optional[float] = None
    max_output_tokens: Optional[int] = None

class SessionUpdate(BaseEvent):
    type: Literal[ClientEventTypes.SESSION_UPDATE] = ClientEventTypes.SESSION_UPDATE
    session: Session

#### Audio Buffers
class InputAudioBufferAppend(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND
    )
    audio: str

class InputAudioBufferCommit(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT
    )

class InputAudioBufferClear(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR
    )

#### Messages
class MessageContent(BaseModel):
    type: Literal["input_audio"]
    audio: str

class ConversationItemContent(BaseModel):
    type: Literal["input_text", "input_audio", "text", "audio"]
    text: Optional[str] = None
    audio: Optional[str] = None
    transcript: Optional[str] = None

class FunctionCallContent(BaseModel):
    call_id: str
    name: str
    arguments: str

class FunctionCallOutputContent(BaseModel):
    output: str

class ConversationItem(BaseModel):
    id: Optional[str] = None
    type: Literal["message", "function_call", "function_call_output"]
    status: Optional[Literal["completed", "in_progress", "incomplete"]] = None
    role: Literal["user", "assistant", "system"]
    content: list[
        Union[ConversationItemContent, FunctionCallContent, FunctionCallOutputContent]
    ]
    call_id: Optional[str] = None
    name: Optional[str] = None
    arguments: Optional[str] = None
    output: Optional[str] = None

class ConversationItemCreate(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_CREATE] = (
        ClientEventTypes.CONVERSATION_ITEM_CREATE
    )
    item: ConversationItem

class ConversationItemTruncate(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_TRUNCATE] = (
        ClientEventTypes.CONVERSATION_ITEM_TRUNCATE
    )
    item_id: str
    content_index: int
    audio_end_ms: int

class ConversationItemDelete(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_DELETE] = (
        ClientEventTypes.CONVERSATION_ITEM_DELETE
    )
    item_id: str

#### Responses
class ResponseCreate(BaseEvent):
    type: Literal[ClientEventTypes.RESPONSE_CREATE] = ClientEventTypes.RESPONSE_CREATE

class ResponseCancel(BaseEvent):
    type: Literal[ClientEventTypes.RESPONSE_CANCEL] = ClientEventTypes.RESPONSE_CANCEL

# Update the Event union to include all event types
ClientEvent = Union[
    SessionUpdate,
    InputAudioBufferAppend,
    InputAudioBufferCommit,
    InputAudioBufferClear,
    ConversationItemCreate,
    ConversationItemTruncate,
    ConversationItemDelete,
    ResponseCreate,
    ResponseCancel,
]

""" SERVER EVENTS """

class ServerEventTypes(str, Enum):
    ERROR = "error"
    RESPONSE_AUDIO_TRANSCRIPT_DONE = "response.audio_transcript.done"
    RESPONSE_AUDIO_TRANSCRIPT_DELTA = "response.audio_transcript.delta"
    RESPONSE_AUDIO_DELTA = "response.audio.delta"
    SESSION_CREATED = "session.created"
    SESSION_UPDATED = "session.updated"
    CONVERSATION_CREATED = "conversation.created"
    INPUT_AUDIO_BUFFER_COMMITTED = "input_audio_buffer.committed"
    INPUT_AUDIO_BUFFER_CLEARED = "input_audio_buffer.cleared"
    INPUT_AUDIO_BUFFER_SPEECH_STARTED = "input_audio_buffer.speech_started"
    INPUT_AUDIO_BUFFER_SPEECH_STOPPED = "input_audio_buffer.speech_stopped"
    CONVERSATION_ITEM_CREATED = "conversation.item.created"
    CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED = (
        "conversation.item.input_audio_transcription.completed"
    )
    CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED = (
        "conversation.item.input_audio_transcription.failed"
    )
    CONVERSATION_ITEM_TRUNCATED = "conversation.item.truncated"
    CONVERSATION_ITEM_DELETED = "conversation.item.deleted"
    RESPONSE_CREATED = "response.created"
    RESPONSE_DONE = "response.done"
    RESPONSE_OUTPUT_ITEM_ADDED = "response.output_item.added"
    RESPONSE_OUTPUT_ITEM_DONE = "response.output_item.done"
    RESPONSE_CONTENT_PART_ADDED = "response.content_part.added"
    RESPONSE_CONTENT_PART_DONE = "response.content_part.done"
    RESPONSE_TEXT_DELTA = "response.text.delta"
    RESPONSE_TEXT_DONE = "response.text.done"
    RESPONSE_AUDIO_DONE = "response.audio.done"
    RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA = "response.function_call_arguments.delta"
    RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE = "response.function_call_arguments.done"
    RATE_LIMITS_UPDATED = "rate_limits.updated"

#### Errors
class ErrorDetails(BaseModel):
    type: Optional[str] = None
    code: Optional[str] = None
    message: Optional[str] = None
    param: Optional[str] = None

class ErrorEvent(BaseEvent):
    type: Literal[ServerEventTypes.ERROR] = ServerEventTypes.ERROR
    error: ErrorDetails

#### Session
class SessionCreated(BaseEvent):
    type: Literal[ServerEventTypes.SESSION_CREATED] = ServerEventTypes.SESSION_CREATED
    session: Session

class SessionUpdated(BaseEvent):
    type: Literal[ServerEventTypes.SESSION_UPDATED] = ServerEventTypes.SESSION_UPDATED
    session: Session

#### Conversation
class Conversation(BaseModel):
    id: str
    object: Literal["realtime.conversation"]

class ConversationCreated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_CREATED] = (
        ServerEventTypes.CONVERSATION_CREATED
    )
    conversation: Conversation

class ConversationItemCreated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_CREATED] = (
        ServerEventTypes.CONVERSATION_ITEM_CREATED
    )
    previous_item_id: Optional[str] = None
    item: ConversationItem

class ConversationItemInputAudioTranscriptionCompleted(BaseEvent):
    type: Literal[
        ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
    ] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
    item_id: str
    content_index: int
    transcript: str

class ConversationItemInputAudioTranscriptionFailed(BaseEvent):
    type: Literal[
        ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
    ] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
    item_id: str
    content_index: int
    error: dict[str, Any]

class ConversationItemTruncated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_TRUNCATED] = (
        ServerEventTypes.CONVERSATION_ITEM_TRUNCATED
    )
    item_id: str
    content_index: int
    audio_end_ms: int

class ConversationItemDeleted(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_DELETED] = (
        ServerEventTypes.CONVERSATION_ITEM_DELETED
    )
    item_id: str

#### レスポンス
class ResponseUsage(BaseModel):
    total_tokens: int
    input_tokens: int
    output_tokens: int
    input_token_details: Optional[dict[str, int]] = None
    output_token_details: Optional[dict[str, int]] = None

class ResponseOutput(BaseModel):
    id: str
    object: Literal["realtime.item"]
    type: str
    status: str
    role: str
    content: list[dict[str, Any]]

class ResponseContentPart(BaseModel):
    type: str
    text: Optional[str] = None

class ResponseOutputItemContent(BaseModel):
    type: str
    text: Optional[str] = None

class ResponseStatusDetails(BaseModel):
    type: str
    reason: str

class ResponseOutputItem(BaseModel):
    id: str
    object: Literal["realtime.item"]
    type: str
    status: str
    role: str
    content: list[ResponseOutputItemContent]

class Response(BaseModel):
    id: str
    object: Literal["realtime.response"]
    status: str
    status_details: Optional[ResponseStatusDetails] = None
    output: list[ResponseOutput]
    usage: Optional[ResponseUsage]

class ResponseCreated(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CREATED] = ServerEventTypes.RESPONSE_CREATED
    response: Response

class ResponseDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_DONE] = ServerEventTypes.RESPONSE_DONE
    response: Response

class ResponseOutputItemAdded(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED] = (
        ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED
    )
    response_id: str
    output_index: int
    item: ResponseOutputItem

class ResponseOutputItemDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE] = (
        ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE
    )
    response_id: str
    output_index: int
    item: ResponseOutputItem

class ResponseContentPartAdded(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_ADDED] = (
        ServerEventTypes.RESPONSE_CONTENT_PART_ADDED
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    part: ResponseContentPart

class ResponseContentPartDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_DONE] = (
        ServerEventTypes.RESPONSE_CONTENT_PART_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    part: ResponseContentPart

#### レスポンステキスト
class ResponseTextDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_TEXT_DELTA] = (
        ServerEventTypes.RESPONSE_TEXT_DELTA
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    delta: str

class ResponseTextDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_TEXT_DONE] = (
        ServerEventTypes.RESPONSE_TEXT_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    text: str

#### レスポンスオーディオ
class ResponseAudioTranscriptDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE] = (
        ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE
    )
    transcript: str

class ResponseAudioTranscriptDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA] = (
        ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA
    )
    delta: str

class ResponseAudioDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_DELTA] = (
        ServerEventTypes.RESPONSE_AUDIO_DELTA
    )
    response_id: str
    item_id: str
    delta: str

class ResponseAudioDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_DONE] = (
        ServerEventTypes.RESPONSE_AUDIO_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int

class InputAudioBufferCommitted(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED
    )
    previous_item_id: Optional[str] = None
    item_id: Optional[str] = None
    event_id: Optional[str] = None

class InputAudioBufferCleared(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED
    )

class InputAudioBufferSpeechStarted(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED
    )
    audio_start_ms: int
    item_id: str

class InputAudioBufferSpeechStopped(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED
    )
    audio_end_ms: int
    item_id: str

#### 関数呼び出し
class ResponseFunctionCallArgumentsDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA] = (
        ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA
    )
    response_id: str
    item_id: str
    output_index: int
    call_id: str
    delta: str

class ResponseFunctionCallArgumentsDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE] = (
        ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    call_id: str
    arguments: str

#### レート制限
class RateLimit(BaseModel):
    name: str
    limit: int
    remaining: int
    reset_seconds: float

class RateLimitsUpdated(BaseEvent):
    type: Literal[ServerEventTypes.RATE_LIMITS_UPDATED] = (
        ServerEventTypes.RATE_LIMITS_UPDATED
    )
    rate_limits: list[RateLimit]

ServerEvent = Union[
    ErrorEvent,
    ConversationCreated,
    ResponseAudioTranscriptDone,
    ResponseAudioTranscriptDelta,
    ResponseAudioDelta,
    ResponseCreated,
    ResponseDone,
    ResponseOutputItemAdded,
    ResponseOutputItemDone,
    ResponseContentPartAdded,
    ResponseContentPartDone,
    ResponseTextDelta,
    ResponseTextDone,
    ResponseAudioDone,
    ConversationItemInputAudioTranscriptionCompleted,
    SessionCreated,
    SessionUpdated,
    InputAudioBufferCleared,
    InputAudioBufferSpeechStarted,
    InputAudioBufferSpeechStopped,
    ConversationItemCreated,
    ConversationItemInputAudioTranscriptionFailed,
    ConversationItemTruncated,
    ConversationItemDeleted,
    RateLimitsUpdated,
]

EVENT_TYPE_TO_MODEL = {
    ServerEventTypes.ERROR: ErrorEvent,
    ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE: ResponseAudioTranscriptDone,
    ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA: ResponseAudioTranscriptDelta,
    ServerEventTypes.RESPONSE_AUDIO_DELTA: ResponseAudioDelta,
    ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: ConversationItemInputAudioTranscriptionCompleted,
    ServerEventTypes.SESSION_CREATED: SessionCreated,
    ServerEventTypes.SESSION_UPDATED: SessionUpdated,
    ServerEventTypes.CONVERSATION_CREATED: ConversationCreated,
    ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED: InputAudioBufferCommitted,
    ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED: InputAudioBufferCleared,
    ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED: InputAudioBufferSpeechStarted,
    ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: InputAudioBufferSpeechStopped,
    ServerEventTypes.CONVERSATION_ITEM_CREATED: ConversationItemCreated,
    ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED: ConversationItemInputAudioTranscriptionFailed,
    ServerEventTypes.CONVERSATION_ITEM_TRUNCATED: ConversationItemTruncated,
    ServerEventTypes.CONVERSATION_ITEM_DELETED: ConversationItemDeleted,
    ServerEventTypes.RESPONSE_CREATED: ResponseCreated,
    ServerEventTypes.RESPONSE_DONE: ResponseDone,
    ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED: ResponseOutputItemAdded,
    ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE: ResponseOutputItemDone,
    ServerEventTypes.RESPONSE_CONTENT_PART_ADDED: ResponseContentPartAdded,
    ServerEventTypes.RESPONSE_CONTENT_PART_DONE: ResponseContentPartDone,
    ServerEventTypes.RESPONSE_TEXT_DELTA: ResponseTextDelta,
    ServerEventTypes.RESPONSE_TEXT_DONE: ResponseTextDone,
    ServerEventTypes.RESPONSE_AUDIO_DONE: ResponseAudioDone,
    ServerEventTypes.RATE_LIMITS_UPDATED: RateLimitsUpdated,
}

def parse_server_event(event_data: dict) -> ServerEvent:
    event_type = event_data.get("type")
    if not event_type:
        raise ValueError("イベントデータに 'type' フィールドがありません")

    model_class = EVENT_TYPE_TO_MODEL.get(event_type)
    if not model_class:
        raise ValueError(f"不明なイベントタイプ: {event_type}")

    try:
        return model_class(**event_data)
    except ValidationError as e:
        raise ValueError(f"タイプ {event_type} のイベントの解析に失敗しました: {str(e)}") from e

オーディオストリームライター(ディスクおよびメモリ上)

class StreamingWavWriter:
    """音声の整数またはバイト配列チャンクをWAVファイルに書き込む。"""

    wav_file = None
    buffer = None
    in_memory = False

    def __init__(
        self,
        filename=None,
        channels=INPUT_DEVICE_CHANNELS,
        sample_width=SAMPLE_WIDTH,
        framerate=SAMPLE_RATE,
    ):
        self.in_memory = filename is None
        if self.in_memory:
            self.buffer = io.BytesIO()
            self.wav_file = wave.open(self.buffer, "wb")
        else:
            self.wav_file = wave.open(filename, "wb")

        self.wav_file.setnchannels(channels)
        self.wav_file.setsampwidth(sample_width)
        self.wav_file.setframerate(framerate)

    def append_int16_chunk(self, int16_data):
        if int16_data is not None:
            self.wav_file.writeframes(
                int16_data.tobytes()
                if isinstance(int16_data, np.ndarray)
                else int16_data
            )

    def close(self):
        self.wav_file.close()

    def get_wav_buffer(self):
        assert self.in_memory, "バッファはストリームがメモリ上にある場合のみ利用可能です。"
        return self.buffer

Realtime Audio Model

リアルタイム (RT) オーディオモデルは、WebSocket を使って OpenAI の Realtime audio API にイベントを送信します。動作は次のとおりです。
  1. init: ローカルバッファ(入力オーディオ)とストリーム(アシスタントの再生ストリーム、ユーザーオーディオのディスク書き込みストリーム)を初期化し、Realtime API への接続を開きます。
  2. receive_messages_thread: API からのメッセージ受信を処理するスレッドです。4 つの主要なイベントタイプを処理します。- RESPONSE_AUDIO_TRANSCRIPT_DONE: サーバーはアシスタントのレスポンスが完了したことを示し、文字起こし結果を返します。
    • CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: サーバーはユーザーのオーディオが文字起こしされたことを示し、ユーザーオーディオの文字起こしを送信します。文字起こしを Weave にログとして記録し、ユーザー向けに表示します。
    • RESPONSE_AUDIO_DELTA: サーバーはアシスタントのレスポンスオーディオの新しいチャンクを送信します。レスポンス ID を使ってこのチャンクを進行中のレスポンスデータに追加し、再生のために出力ストリームに追加します。
    • RESPONSE_DONE: サーバーはアシスタントのレスポンスが完了したことを示します。レスポンスに関連するすべてのオーディオチャンクと文字起こしを取得し、これらを Weave にログとして記録します。
    3.send_audio: ハンドラーはユーザーのオーディオチャンクをバッファに追加し、バッファが一定サイズに達したときにオーディオチャンクを送信します。
class RTAudioModel(weave.Model):
    """ログ記録用のWhisperユーザー文字起こしを使用したリアルタイムe2eオーディオOpenAIモデルインタラクションのモデルクラス。"""

    realtime_model_name: str = "gpt-4o-realtime-preview-2024-10-01"  # リアルタイムe2eオーディオ専用モデルインタラクション

    stop_event: Optional[threading.Event] = threading.Event()  # モデルを停止するイベント
    ws: Optional[websocket.WebSocket] = None  # OpenAI通信用WebSocket

    user_wav_writer: Optional[StreamingWavWriter] = (
        None  # ユーザー出力をファイルに書き込むストリーム
    )
    input_audio_buffer: Optional[np.ndarray] = None  # ユーザーオーディオチャンク用バッファ
    assistant_outputs: dict[str, StreamingWavWriter] = (
        None  # Weaveに送信するためにまとめたアシスタント出力
    )
    playback_stream: Optional[pyaudio.Stream] = (
        None  # アシスタントの応答を再生するための再生ストリーム
    )

    def __init__(self):
        super().__init__()
        self.stop_event.clear()
        self.user_wav_writer = StreamingWavWriter(
            filename="user_audio.wav", framerate=SAMPLE_RATE
        )
        self.input_audio_buffer = np.array([], dtype=np.int16)
        self.ws = websocket.WebSocket()
        self.assistant_outputs = {}

        # 有効な場合、アシスタントオーディオ再生ストリームを開く
        if enable_audio_playback:
            self.playback_stream = pyaudio.PyAudio().open(
                format=pyaudio.paInt16,
                channels=OUTPUT_DEVICE_CHANNELS,
                rate=OAI_SAMPLE_RATE,
                output=True,
                output_device_index=OUTPUT_DEVICE_INDEX,
            )

        # WebSocketに接続
        try:
            self.ws.connect(
                f"wss://api.openai.com/v1/realtime?model={self.realtime_model_name}",
                header={
                    "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
                    "OpenAI-Beta": "realtime=v1",
                },
            )

            # 設定メッセージを送信
            config_event = SessionUpdate(
                session=Session(
                    modalities=["text", "audio"],  # 使用するモダリティ
                    input_audio_transcription=InputAudioTranscription(
                        model="whisper-1"
                    ),  # 文字起こし用whisper-1
                    turn_detection=TurnDetection(
                        type="server_vad",
                        threshold=0.3,
                        prefix_padding_ms=300,
                        silence_duration_ms=600,
                    ),  # 無音検出用サーバーVAD
                )
            )
            self.ws.send(config_event.model_dump_json(exclude_none=True))
            self.log_ws_message(config_event.model_dump_json(exclude_none=True), "Sent")

            # リスナーを起動
            websocket_thread = threading.Thread(target=self.receive_messages_thread)
            websocket_thread.daemon = True
            websocket_thread.start()

        except Exception as e:
            print(f"WebSocketへの接続エラー: {e}")

    ##### Weaveインテグレーションとメッセージハンドラー #####
    def handle_assistant_response_audio_delta(self, data: ResponseAudioDelta):
        if data.response_id not in self.assistant_outputs:
            self.assistant_outputs[data.response_id] = StreamingWavWriter(
                framerate=OAI_SAMPLE_RATE
            )

        data_bytes = base64.b64decode(data.delta)
        self.assistant_outputs[data.response_id].append_int16_chunk(data_bytes)

        if enable_audio_playback:
            self.playback_stream.write(data_bytes)

        return {"assistant_audio": data_bytes}

    @weave.op()
    def handle_assistant_response_done(self, data: ResponseDone):
        wave_file_stream = self.assistant_outputs[data.response.id]
        wave_file_stream.close()
        wave_file_stream.buffer.seek(0)
        weave_payload = {
            "assistant_audio": wave.open(wave_file_stream.get_wav_buffer(), "rb"),
            "assistant_transcript": data.response.output[0]
            .content[0]
            .get("transcript", "文字起こしを利用できません。"),
        }
        return weave_payload

    @weave.op()
    def handle_user_transcription_done(
        self, data: ConversationItemInputAudioTranscriptionCompleted
    ):
        return {"user_transcript": data.transcript}

    ##### メッセージ受信と送信 #####
    def receive_messages_thread(self):
        while not self.stop_event.is_set():
            try:
                data = json.loads(self.ws.recv())
                self.log_ws_message(json.dumps(data, indent=2))

                parsed_event = parse_server_event(data)

                if parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE:
                    print("アシスタント: ", parsed_event.transcript)
                elif (
                    parsed_event.type
                    == ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
                ):
                    print("ユーザー: ", parsed_event.transcript)
                    self.handle_user_transcription_done(parsed_event)
                elif parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_DELTA:
                    self.handle_assistant_response_audio_delta(parsed_event)
                elif parsed_event.type == ServerEventTypes.RESPONSE_DONE:
                    self.handle_assistant_response_done(parsed_event)
                elif parsed_event.type == ServerEventTypes.ERROR:
                    print(
                        f"\nサーバーからのエラー: {parsed_event.error.model_dump_json(exclude_none=True)}"
                    )
            except websocket.WebSocketConnectionClosedException:
                print("\nWebSocket接続が切断されました")
                break
            except json.JSONDecodeError:
                continue
            except Exception as e:
                print(f"\nreceive_messagesでエラーが発生しました: {e}")
                break

    def send_audio(self, audio_chunk):
        if self.ws and self.ws.connected:
            self.input_audio_buffer = np.append(
                self.input_audio_buffer, np.frombuffer(audio_chunk, dtype=np.int16)
            )
            if len(self.input_audio_buffer) >= SAMPLE_RATE * CHUNK_DURATION:
                try:
                    # OAIサンプルレートにオーディオをリサンプリング
                    resampled_audio = (
                        resampy.resample(
                            self.input_audio_buffer, SAMPLE_RATE, OAI_SAMPLE_RATE
                        )
                        if SAMPLE_RATE != OAI_SAMPLE_RATE
                        else self.input_audio_buffer
                    )

                    # OAI APIにオーディオチャンクを送信
                    audio_event = InputAudioBufferAppend(
                        audio=base64.b64encode(
                            resampled_audio.astype(np.int16).tobytes()
                        ).decode("utf-8")  # オーディオ配列をb64バイトに変換
                    )
                    self.ws.send(audio_event.model_dump_json(exclude_none=True))
                    self.log_ws_message(
                        audio_event.model_dump_json(exclude_none=True), "Sent"
                    )
                finally:
                    self.user_wav_writer.append_int16_chunk(self.input_audio_buffer)

                    # オーディオバッファをクリア
                    self.input_audio_buffer = np.array([], dtype=np.int16)
        else:
            print("オーディオ送信エラー: WebSocketが初期化されていません。")

    ##### 汎用ユーティリティ関数 #####
    def log_ws_message(self, message, direction="受信"):
        with open("websocket_log.txt", "a") as log_file:
            log_file.write(
                f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {direction}: {message}\n"
            )

    def stop(self):
        self.stop_event.set()

        if self.ws:
            self.ws.close()

        self.user_wav_writer.close()

音声レコーダー

pyaudio の入力ストリームを使用し、RTAudio モデルの send_audio メソッドに紐づいたハンドラーを設定します。ストリームはメインスレッドに返されるため、プログラム終了時に安全に終了できます。
# 音声キャプチャストリーム
def record_audio(realtime_model: RTAudioModel) -> pyaudio.Stream:
    """Pyaudio入力ストリームをセットアップし、RTAudioModelをストリーミングデータのコールバックとして使用する。"""

    def audio_callback(in_data, frame_count, time_info, status):
        realtime_model.send_audio(in_data)
        return (None, pyaudio.paContinue)

    p = pyaudio.PyAudio()
    stream = p.open(
        format=pyaudio.paInt16,
        channels=INPUT_DEVICE_CHANNELS,
        rate=SAMPLE_RATE,
        input=True,
        input_device_index=INPUT_DEVICE_INDEX,
        frames_per_buffer=CHUNK,
        stream_callback=audio_callback,
    )
    stream.start_stream()

    print("Recording started. Please begin speaking to your personal assistant...")
    return stream

メインスレッド(実行してみよう!)

メインスレッドでは、Weave を統合した Realtime Audio Model を初期化します。次に録音を開始し、ユーザーによるキーボード割り込みを待機します。
weave.init(project_name="realtime-oai-audio-testing")

realtime_model = RTAudioModel()

if realtime_model.ws and realtime_model.ws.connected:
    recording_stream: pyaudio.Stream = record_audio(realtime_model)

    try:
        while not realtime_model.stop_event.is_set():
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"Error in main loop: {e}")
        import traceback

        traceback.print_exc()
    finally:
        print("Exiting...")
        realtime_model.stop()
        if recording_stream and recording_stream.is_active():
            recording_stream.stop_stream()
            recording_stream.close()
else:
    print(
        "WebSocket connection failed. Please check your API key and internet connection."
    )