Passer au contenu principal
Il s’agit d’un notebook interactif. Vous pouvez l’exécuter localement ou utiliser les liens ci-dessous :

Comment utiliser Weave avec des données Audio : un exemple OpenAI

Cette démo utilise l’API Chat Completions d’OpenAI avec GPT 4o Audio Preview pour générer des réponses audio à partir de prompts textuels et les suivre dans Weave. Interface de l’API Chat Completions d’OpenAI avec intégration de GPT 4o Audio Preview et flux de travail de génération de réponses audio Pour ce cas d’utilisation avancé, nous utilisons l’OpenAI Realtime API pour diffuser de l’audio en temps réel. Cliquez sur la vignette suivante pour voir la démonstration vidéo, ou cliquez ici. Everything Is AWESOME

Configuration

Commencez par installer les dépendances OpenAI (openai) et Weave (weave), ainsi que la dépendance set-env pour la gestion des clés API.
%%capture
!pip install openai
!pip install weave
!pip install set-env-colab-kaggle-dotenv -q # pour les variables d'environnement
python
%%capture
# Solution temporaire pour corriger un bug dans openai :
# TypeError: Client.__init__() got an unexpected keyword argument 'proxies'
# Voir https://community.openai.com/t/error-with-openai-1-56-0-client-init-got-an-unexpected-keyword-argument-proxies/1040332/15
!pip install "httpx<0.28"
Ensuite, chargez les clés API requises pour OpenAI et Weave. Ici, nous utilisons set_env, compatible avec le gestionnaire de clés secrètes de Google Colab, comme alternative à google.colab.userdata, spécifique à Colab. Voir : ici pour les instructions d’utilisation.
# Définir les variables d'environnement.
from set_env import set_env

_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")
Enfin, importez les bibliothèques requises.
import base64
import os
import time
import wave

import numpy as np
from IPython.display import display
from openai import OpenAI

import weave

Exemple de streaming et de stockage audio

Nous allons maintenant configurer un appel vers l’endpoint Completions d’OpenAI avec la modalité audio activée. Créez d’abord le client OpenAI et initialisez un projet Weave.
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
weave.init("openai-audio-chat")
Nous allons maintenant définir notre requête de complétion OpenAI et ajouter notre décorateur Weave (op). Ici, nous définissons la fonction prompt_endpont_and_log_trace. Cette fonction comporte trois étapes principales :
  1. Nous créons un objet de complétion à l’aide du modèle GPT 4o Audio Preview, qui prend en charge les entrées et sorties texte et audio.
    • Nous demandons au modèle de compter lentement jusqu’à 13 avec des accents variés.
    • Nous définissons la complétion sur “stream”.
  2. Nous ouvrons un nouveau fichier de sortie, dans lequel les données diffusées en continu sont écrites fragment par fragment.
  3. Nous renvoyons un descripteur de fichier ouvert vers le fichier audio afin que Weave consigne les données audio dans la trace.
SAMPLE_RATE = 22050

@weave.op()
def prompt_endpoint_and_log_trace(system_prompt=None, user_prompt=None):
    if not system_prompt:
        system_prompt = "You're the fastest counter in the world"
    if not user_prompt:
        user_prompt = "Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc."
    # Requête à l'API OpenAI avec modalité audio
    completion = client.chat.completions.create(
        model="gpt-4o-audio-preview",
        modalities=["text", "audio"],
        audio={"voice": "fable", "format": "pcm16"},
        stream=True,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )

    # Ouvrir un fichier wave en écriture
    with wave.open("./output.wav", "wb") as wav_file:
        wav_file.setnchannels(1)  # Mono
        wav_file.setsampwidth(2)  # 16 bits
        wav_file.setframerate(SAMPLE_RATE)  # Fréquence d'échantillonnage (à ajuster si nécessaire)

        # Écrire les fragments au fur et à mesure de leur réception depuis l'API
        for chunk in completion:
            if (
                hasattr(chunk, "choices")
                and chunk.choices is not None
                and len(chunk.choices) > 0
                and hasattr(chunk.choices[0].delta, "audio")
                and chunk.choices[0].delta.audio.get("data") is not None
            ):
                # Décoder les données audio encodées en base64
                audio_data = base64.b64decode(chunk.choices[0].delta.audio.get("data"))

                # Écrire le fragment courant dans le fichier wave
                wav_file.writeframes(audio_data)

    # Retourner le fichier à l'op Weave
    return wave.open("output.wav", "rb")

Test

Exécutez la cellule suivante. Le prompt système, le prompt utilisateur et l’audio de sortie seront enregistrés dans une trace Weave. Après avoir exécuté la cellule, cliquez sur le lien à côté de l’emoji ”🍩” pour afficher votre trace.
from IPython.display import Audio

# Appeler la fonction pour écrire le flux audio
prompt_endpoint_and_log_trace(
    system_prompt="You're the fastest counter in the world",
    user_prompt="Count to 13 super super slow, enunciate each number with a dramatic flair, changing up accents as you go along. British, French, German, Spanish, etc.",
)

# Afficher le flux audio mis à jour
display(Audio("output.wav", rate=SAMPLE_RATE, autoplay=True))

Utilisation avancée : Realtime Audio API avec Weave

Intégration de Realtime Audio API avec Weave et interface conversationnelle audio en streaming L’API temps réel d’OpenAI est une API conversationnelle très performante et fiable pour créer des assistants audio et textuels en temps réel. Veuillez noter :
  • Consultez les cellules de Configuration du microphone
  • En raison des limitations de l’environnement d’exécution de Google Colab, vous devez l’exécuter sur votre machine hôte dans un notebook Jupyter. Il n’est pas possible de l’exécuter dans le navigateur.
    • Sous MacOS, vous devrez installer portaudio via Brew (voir ici) pour que Pyaudio fonctionne.
  • Le bouton bascule enable_audio_playback active la lecture de l’audio généré par l’assistant. Veuillez noter qu’un casque est requis si cette option est activée, car la détection d’écho nécessite une implémentation très complexe.

Configuration requise

%%capture
!pip install numpy==2.0
!pip install weave
!pip install pyaudio # Sur Mac, vous devrez peut-être d'abord installer portaudio avec `brew install portaudio`
!pip install websocket-client
!pip install set-env-colab-kaggle-dotenv -q # pour les variables d'environnement
!pip install resampy
python
import io
import json
import os
import threading
from typing import Optional

import pyaudio
import resampy
import websocket
from set_env import set_env

import weave
python
# Définir les variables d'environnement.
# Voir : https://pypi.org/project/set-env-colab-kaggle-dotenv/ pour les instructions d'utilisation.
_ = set_env("OPENAI_API_KEY")
_ = set_env("WANDB_API_KEY")

Configuration du microphone

Exécutez la cellule suivante pour trouver tous les périphériques audio disponibles. Ensuite, renseignez INPUT_DEVICE_INDEX et OUTPUT_DEVICE_INDEX en fonction des périphériques répertoriés. Votre périphérique d’entrée aura au moins 1 canal d’entrée, et votre périphérique de sortie aura au moins 1 canal de sortie.
# Obtenir la liste des périphériques depuis pyaudio pour configurer la cellule suivante
p = pyaudio.PyAudio()
devices_data = {i: p.get_device_info_by_index(i) for i in range(p.get_device_count())}
for i, device in devices_data.items():
    print(
        f"Found device @{i}: {device['name']} with sample rate: {device['defaultSampleRate']} and input channels: {device['maxInputChannels']} and output channels: {device['maxOutputChannels']}"
    )
python
INPUT_DEVICE_INDEX = 3  # @param                                                 # Choisir en fonction de la liste des périphériques ci-dessus. S'assurer que le périphérique a > 0 canaux d'entrée.
OUTPUT_DEVICE_INDEX = 12  # @param                                                # Choisir en fonction de la liste des périphériques ci-dessus. S'assurer que le périphérique a > 0 canaux de sortie.
enable_audio_playback = True  # @param {type:"boolean"}                           # Activer la lecture audio de l'assistant. Nécessite un casque.

# Paramètres d'enregistrement et de streaming audio
INPUT_DEVICE_CHANNELS = devices_data[INPUT_DEVICE_INDEX][
    "maxInputChannels"
]  # D'après la liste des périphériques ci-dessus
SAMPLE_RATE = int(
    devices_data[INPUT_DEVICE_INDEX]["defaultSampleRate"]
)  # D'après la liste des périphériques ci-dessus
CHUNK = int(SAMPLE_RATE / 10)  # Échantillons par trame
SAMPLE_WIDTH = p.get_sample_size(pyaudio.paInt16)  # Échantillons par trame pour le format
CHUNK_DURATION = 0.3  # Secondes d'audio par fragment envoyé à l'API OAI
OAI_SAMPLE_RATE = (
    24000  # La fréquence d'échantillonnage OAI est de 24 kHz, nécessaire pour lire ou enregistrer l'audio de l'assistant
)
OUTPUT_DEVICE_CHANNELS = 1  # Définir à 1 pour une sortie mono

Implémentation du schéma de l’OpenAI Realtime API

Le SDK Python d’OpenAI ne prend pas encore en charge la Realtime API. Nous implémentons l’intégralité du schéma de l’OAI Realtime API dans Pydantic pour en améliorer la lisibilité, et pourrons le déprécier lorsque la prise en charge officielle sera disponible.

Schéma Pydantic pour l’OpenAI Realtime API

from enum import Enum
from typing import Any, Literal, Union

from pydantic import BaseModel, Field, ValidationError

class BaseEvent(BaseModel):
    type: Union["ClientEventTypes", "ServerEventTypes"]
    event_id: Optional[str] = None  # Add event_id as an optional field for all events

    # def model_dump_json(self, *args, **kwargs):
    #     # Only include non-None fields
    #     return super().model_dump_json(*args, exclude_none=True, **kwargs)

class ChatMessage(BaseModel):
    role: Literal["user", "assistant"]
    content: str
    timestamp: float

""" CLIENT EVENTS """

class ClientEventTypes(str, Enum):
    SESSION_UPDATE = "session.update"
    CONVERSATION_ITEM_CREATE = "conversation.item.create"
    CONVERSATION_ITEM_TRUNCATE = "conversation.item.truncate"
    CONVERSATION_ITEM_DELETE = "conversation.item.delete"
    RESPONSE_CREATE = "response.create"
    RESPONSE_CANCEL = "response.cancel"
    INPUT_AUDIO_BUFFER_APPEND = "input_audio_buffer.append"
    INPUT_AUDIO_BUFFER_COMMIT = "input_audio_buffer.commit"
    INPUT_AUDIO_BUFFER_CLEAR = "input_audio_buffer.clear"
    ERROR = "error"

#### Session Update
class TurnDetection(BaseModel):
    type: Literal["server_vad"]
    threshold: float = Field(..., ge=0.0, le=1.0)
    prefix_padding_ms: int
    silence_duration_ms: int

class InputAudioTranscription(BaseModel):
    model: Optional[str] = None

class ToolParameterProperty(BaseModel):
    type: str

class ToolParameter(BaseModel):
    type: str
    properties: dict[str, ToolParameterProperty]
    required: list[str]

class Tool(BaseModel):
    type: Literal["function", "code_interpreter", "file_search"]
    name: Optional[str] = None
    description: Optional[str] = None
    parameters: Optional[ToolParameter] = None

class Session(BaseModel):
    modalities: Optional[list[str]] = None
    instructions: Optional[str] = None
    voice: Optional[str] = None
    input_audio_format: Optional[str] = None
    output_audio_format: Optional[str] = None
    input_audio_transcription: Optional[InputAudioTranscription] = None
    turn_detection: Optional[TurnDetection] = None
    tools: Optional[list[Tool]] = None
    tool_choice: Optional[str] = None
    temperature: Optional[float] = None
    max_output_tokens: Optional[int] = None

class SessionUpdate(BaseEvent):
    type: Literal[ClientEventTypes.SESSION_UPDATE] = ClientEventTypes.SESSION_UPDATE
    session: Session

#### Audio Buffers
class InputAudioBufferAppend(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_APPEND
    )
    audio: str

class InputAudioBufferCommit(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_COMMIT
    )

class InputAudioBufferClear(BaseEvent):
    type: Literal[ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR] = (
        ClientEventTypes.INPUT_AUDIO_BUFFER_CLEAR
    )

#### Messages
class MessageContent(BaseModel):
    type: Literal["input_audio"]
    audio: str

class ConversationItemContent(BaseModel):
    type: Literal["input_text", "input_audio", "text", "audio"]
    text: Optional[str] = None
    audio: Optional[str] = None
    transcript: Optional[str] = None

class FunctionCallContent(BaseModel):
    call_id: str
    name: str
    arguments: str

class FunctionCallOutputContent(BaseModel):
    output: str

class ConversationItem(BaseModel):
    id: Optional[str] = None
    type: Literal["message", "function_call", "function_call_output"]
    status: Optional[Literal["completed", "in_progress", "incomplete"]] = None
    role: Literal["user", "assistant", "system"]
    content: list[
        Union[ConversationItemContent, FunctionCallContent, FunctionCallOutputContent]
    ]
    call_id: Optional[str] = None
    name: Optional[str] = None
    arguments: Optional[str] = None
    output: Optional[str] = None

class ConversationItemCreate(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_CREATE] = (
        ClientEventTypes.CONVERSATION_ITEM_CREATE
    )
    item: ConversationItem

class ConversationItemTruncate(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_TRUNCATE] = (
        ClientEventTypes.CONVERSATION_ITEM_TRUNCATE
    )
    item_id: str
    content_index: int
    audio_end_ms: int

class ConversationItemDelete(BaseEvent):
    type: Literal[ClientEventTypes.CONVERSATION_ITEM_DELETE] = (
        ClientEventTypes.CONVERSATION_ITEM_DELETE
    )
    item_id: str

#### Responses
class ResponseCreate(BaseEvent):
    type: Literal[ClientEventTypes.RESPONSE_CREATE] = ClientEventTypes.RESPONSE_CREATE

class ResponseCancel(BaseEvent):
    type: Literal[ClientEventTypes.RESPONSE_CANCEL] = ClientEventTypes.RESPONSE_CANCEL

# Update the Event union to include all event types
ClientEvent = Union[
    SessionUpdate,
    InputAudioBufferAppend,
    InputAudioBufferCommit,
    InputAudioBufferClear,
    ConversationItemCreate,
    ConversationItemTruncate,
    ConversationItemDelete,
    ResponseCreate,
    ResponseCancel,
]

""" SERVER EVENTS """

class ServerEventTypes(str, Enum):
    ERROR = "error"
    RESPONSE_AUDIO_TRANSCRIPT_DONE = "response.audio_transcript.done"
    RESPONSE_AUDIO_TRANSCRIPT_DELTA = "response.audio_transcript.delta"
    RESPONSE_AUDIO_DELTA = "response.audio.delta"
    SESSION_CREATED = "session.created"
    SESSION_UPDATED = "session.updated"
    CONVERSATION_CREATED = "conversation.created"
    INPUT_AUDIO_BUFFER_COMMITTED = "input_audio_buffer.committed"
    INPUT_AUDIO_BUFFER_CLEARED = "input_audio_buffer.cleared"
    INPUT_AUDIO_BUFFER_SPEECH_STARTED = "input_audio_buffer.speech_started"
    INPUT_AUDIO_BUFFER_SPEECH_STOPPED = "input_audio_buffer.speech_stopped"
    CONVERSATION_ITEM_CREATED = "conversation.item.created"
    CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED = (
        "conversation.item.input_audio_transcription.completed"
    )
    CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED = (
        "conversation.item.input_audio_transcription.failed"
    )
    CONVERSATION_ITEM_TRUNCATED = "conversation.item.truncated"
    CONVERSATION_ITEM_DELETED = "conversation.item.deleted"
    RESPONSE_CREATED = "response.created"
    RESPONSE_DONE = "response.done"
    RESPONSE_OUTPUT_ITEM_ADDED = "response.output_item.added"
    RESPONSE_OUTPUT_ITEM_DONE = "response.output_item.done"
    RESPONSE_CONTENT_PART_ADDED = "response.content_part.added"
    RESPONSE_CONTENT_PART_DONE = "response.content_part.done"
    RESPONSE_TEXT_DELTA = "response.text.delta"
    RESPONSE_TEXT_DONE = "response.text.done"
    RESPONSE_AUDIO_DONE = "response.audio.done"
    RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA = "response.function_call_arguments.delta"
    RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE = "response.function_call_arguments.done"
    RATE_LIMITS_UPDATED = "rate_limits.updated"

#### Errors
class ErrorDetails(BaseModel):
    type: Optional[str] = None
    code: Optional[str] = None
    message: Optional[str] = None
    param: Optional[str] = None

class ErrorEvent(BaseEvent):
    type: Literal[ServerEventTypes.ERROR] = ServerEventTypes.ERROR
    error: ErrorDetails

#### Session
class SessionCreated(BaseEvent):
    type: Literal[ServerEventTypes.SESSION_CREATED] = ServerEventTypes.SESSION_CREATED
    session: Session

class SessionUpdated(BaseEvent):
    type: Literal[ServerEventTypes.SESSION_UPDATED] = ServerEventTypes.SESSION_UPDATED
    session: Session

#### Conversation
class Conversation(BaseModel):
    id: str
    object: Literal["realtime.conversation"]

class ConversationCreated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_CREATED] = (
        ServerEventTypes.CONVERSATION_CREATED
    )
    conversation: Conversation

class ConversationItemCreated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_CREATED] = (
        ServerEventTypes.CONVERSATION_ITEM_CREATED
    )
    previous_item_id: Optional[str] = None
    item: ConversationItem

class ConversationItemInputAudioTranscriptionCompleted(BaseEvent):
    type: Literal[
        ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
    ] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
    item_id: str
    content_index: int
    transcript: str

class ConversationItemInputAudioTranscriptionFailed(BaseEvent):
    type: Literal[
        ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
    ] = ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED
    item_id: str
    content_index: int
    error: dict[str, Any]

class ConversationItemTruncated(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_TRUNCATED] = (
        ServerEventTypes.CONVERSATION_ITEM_TRUNCATED
    )
    item_id: str
    content_index: int
    audio_end_ms: int

class ConversationItemDeleted(BaseEvent):
    type: Literal[ServerEventTypes.CONVERSATION_ITEM_DELETED] = (
        ServerEventTypes.CONVERSATION_ITEM_DELETED
    )
    item_id: str

#### Response
class ResponseUsage(BaseModel):
    total_tokens: int
    input_tokens: int
    output_tokens: int
    input_token_details: Optional[dict[str, int]] = None
    output_token_details: Optional[dict[str, int]] = None

class ResponseOutput(BaseModel):
    id: str
    object: Literal["realtime.item"]
    type: str
    status: str
    role: str
    content: list[dict[str, Any]]

class ResponseContentPart(BaseModel):
    type: str
    text: Optional[str] = None

class ResponseOutputItemContent(BaseModel):
    type: str
    text: Optional[str] = None

class ResponseStatusDetails(BaseModel):
    type: str
    reason: str

class ResponseOutputItem(BaseModel):
    id: str
    object: Literal["realtime.item"]
    type: str
    status: str
    role: str
    content: list[ResponseOutputItemContent]

class Response(BaseModel):
    id: str
    object: Literal["realtime.response"]
    status: str
    status_details: Optional[ResponseStatusDetails] = None
    output: list[ResponseOutput]
    usage: Optional[ResponseUsage]

class ResponseCreated(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CREATED] = ServerEventTypes.RESPONSE_CREATED
    response: Response

class ResponseDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_DONE] = ServerEventTypes.RESPONSE_DONE
    response: Response

class ResponseOutputItemAdded(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED] = (
        ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED
    )
    response_id: str
    output_index: int
    item: ResponseOutputItem

class ResponseOutputItemDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE] = (
        ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE
    )
    response_id: str
    output_index: int
    item: ResponseOutputItem

class ResponseContentPartAdded(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_ADDED] = (
        ServerEventTypes.RESPONSE_CONTENT_PART_ADDED
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    part: ResponseContentPart

class ResponseContentPartDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_CONTENT_PART_DONE] = (
        ServerEventTypes.RESPONSE_CONTENT_PART_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    part: ResponseContentPart

#### Response Text
class ResponseTextDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_TEXT_DELTA] = (
        ServerEventTypes.RESPONSE_TEXT_DELTA
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    delta: str

class ResponseTextDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_TEXT_DONE] = (
        ServerEventTypes.RESPONSE_TEXT_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int
    text: str

#### Response Audio
class ResponseAudioTranscriptDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE] = (
        ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE
    )
    transcript: str

class ResponseAudioTranscriptDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA] = (
        ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA
    )
    delta: str

class ResponseAudioDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_DELTA] = (
        ServerEventTypes.RESPONSE_AUDIO_DELTA
    )
    response_id: str
    item_id: str
    delta: str

class ResponseAudioDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_AUDIO_DONE] = (
        ServerEventTypes.RESPONSE_AUDIO_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    content_index: int

class InputAudioBufferCommitted(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED
    )
    previous_item_id: Optional[str] = None
    item_id: Optional[str] = None
    event_id: Optional[str] = None

class InputAudioBufferCleared(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED
    )

class InputAudioBufferSpeechStarted(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED
    )
    audio_start_ms: int
    item_id: str

class InputAudioBufferSpeechStopped(BaseEvent):
    type: Literal[ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED] = (
        ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED
    )
    audio_end_ms: int
    item_id: str

#### Function Calls
class ResponseFunctionCallArgumentsDelta(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA] = (
        ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA
    )
    response_id: str
    item_id: str
    output_index: int
    call_id: str
    delta: str

class ResponseFunctionCallArgumentsDone(BaseEvent):
    type: Literal[ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE] = (
        ServerEventTypes.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE
    )
    response_id: str
    item_id: str
    output_index: int
    call_id: str
    arguments: str

#### Rate Limits
class RateLimit(BaseModel):
    name: str
    limit: int
    remaining: int
    reset_seconds: float

class RateLimitsUpdated(BaseEvent):
    type: Literal[ServerEventTypes.RATE_LIMITS_UPDATED] = (
        ServerEventTypes.RATE_LIMITS_UPDATED
    )
    rate_limits: list[RateLimit]

ServerEvent = Union[
    ErrorEvent,
    ConversationCreated,
    ResponseAudioTranscriptDone,
    ResponseAudioTranscriptDelta,
    ResponseAudioDelta,
    ResponseCreated,
    ResponseDone,
    ResponseOutputItemAdded,
    ResponseOutputItemDone,
    ResponseContentPartAdded,
    ResponseContentPartDone,
    ResponseTextDelta,
    ResponseTextDone,
    ResponseAudioDone,
    ConversationItemInputAudioTranscriptionCompleted,
    SessionCreated,
    SessionUpdated,
    InputAudioBufferCleared,
    InputAudioBufferSpeechStarted,
    InputAudioBufferSpeechStopped,
    ConversationItemCreated,
    ConversationItemInputAudioTranscriptionFailed,
    ConversationItemTruncated,
    ConversationItemDeleted,
    RateLimitsUpdated,
]

EVENT_TYPE_TO_MODEL = {
    ServerEventTypes.ERROR: ErrorEvent,
    ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE: ResponseAudioTranscriptDone,
    ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DELTA: ResponseAudioTranscriptDelta,
    ServerEventTypes.RESPONSE_AUDIO_DELTA: ResponseAudioDelta,
    ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: ConversationItemInputAudioTranscriptionCompleted,
    ServerEventTypes.SESSION_CREATED: SessionCreated,
    ServerEventTypes.SESSION_UPDATED: SessionUpdated,
    ServerEventTypes.CONVERSATION_CREATED: ConversationCreated,
    ServerEventTypes.INPUT_AUDIO_BUFFER_COMMITTED: InputAudioBufferCommitted,
    ServerEventTypes.INPUT_AUDIO_BUFFER_CLEARED: InputAudioBufferCleared,
    ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STARTED: InputAudioBufferSpeechStarted,
    ServerEventTypes.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: InputAudioBufferSpeechStopped,
    ServerEventTypes.CONVERSATION_ITEM_CREATED: ConversationItemCreated,
    ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_FAILED: ConversationItemInputAudioTranscriptionFailed,
    ServerEventTypes.CONVERSATION_ITEM_TRUNCATED: ConversationItemTruncated,
    ServerEventTypes.CONVERSATION_ITEM_DELETED: ConversationItemDeleted,
    ServerEventTypes.RESPONSE_CREATED: ResponseCreated,
    ServerEventTypes.RESPONSE_DONE: ResponseDone,
    ServerEventTypes.RESPONSE_OUTPUT_ITEM_ADDED: ResponseOutputItemAdded,
    ServerEventTypes.RESPONSE_OUTPUT_ITEM_DONE: ResponseOutputItemDone,
    ServerEventTypes.RESPONSE_CONTENT_PART_ADDED: ResponseContentPartAdded,
    ServerEventTypes.RESPONSE_CONTENT_PART_DONE: ResponseContentPartDone,
    ServerEventTypes.RESPONSE_TEXT_DELTA: ResponseTextDelta,
    ServerEventTypes.RESPONSE_TEXT_DONE: ResponseTextDone,
    ServerEventTypes.RESPONSE_AUDIO_DONE: ResponseAudioDone,
    ServerEventTypes.RATE_LIMITS_UPDATED: RateLimitsUpdated,
}

def parse_server_event(event_data: dict) -> ServerEvent:
    event_type = event_data.get("type")
    if not event_type:
        raise ValueError("Event data is missing 'type' field")

    model_class = EVENT_TYPE_TO_MODEL.get(event_type)
    if not model_class:
        raise ValueError(f"Unknown event type: {event_type}")

    try:
        return model_class(**event_data)
    except ValidationError as e:
        raise ValueError(f"Failed to parse event of type {event_type}: {str(e)}") from e

Enregistreur de flux audio (sur disque et en mémoire)

class StreamingWavWriter:
    """Écrit des fragments audio entiers ou tableaux d'octets dans un fichier WAV."""

    wav_file = None
    buffer = None
    in_memory = False

    def __init__(
        self,
        filename=None,
        channels=INPUT_DEVICE_CHANNELS,
        sample_width=SAMPLE_WIDTH,
        framerate=SAMPLE_RATE,
    ):
        self.in_memory = filename is None
        if self.in_memory:
            self.buffer = io.BytesIO()
            self.wav_file = wave.open(self.buffer, "wb")
        else:
            self.wav_file = wave.open(filename, "wb")

        self.wav_file.setnchannels(channels)
        self.wav_file.setsampwidth(sample_width)
        self.wav_file.setframerate(framerate)

    def append_int16_chunk(self, int16_data):
        if int16_data is not None:
            self.wav_file.writeframes(
                int16_data.tobytes()
                if isinstance(int16_data, np.ndarray)
                else int16_data
            )

    def close(self):
        self.wav_file.close()

    def get_wav_buffer(self):
        assert self.in_memory, "Le tampon n'est disponible que si le flux est en mémoire."
        return self.buffer

Modèle Audio en temps réel

Le modèle audio en temps réel (RT) utilise un WebSocket pour envoyer des événements à l’API audio Realtime d’OpenAI. Le fonctionnement est le suivant :
  1. init: Nous initialisons les tampons locaux (audio d’entrée) et les flux (flux de lecture de l’assistant, flux d’écriture sur disque de l’audio utilisateur), puis ouvrons une connexion à l’API Realtime.
  2. receive_messages_thread: Un thread gère la réception des messages depuis l’API. Quatre types d’événements principaux sont traités : - RESPONSE_AUDIO_TRANSCRIPT_DONE: Le serveur indique que la réponse de l’assistant est terminée et fournit la transcription.
    • CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: Le serveur indique que l’audio de l’utilisateur a été transcrit et envoie sa transcription. Nous journalisons la transcription dans Weave et l’affichons à l’utilisateur.
    • RESPONSE_AUDIO_DELTA: Le serveur envoie un nouveau fragment de l’audio de réponse de l’assistant. Nous l’ajoutons aux données de la réponse en cours via l’ID de réponse, puis au flux de sortie pour la lecture.
    • RESPONSE_DONE: Le serveur indique qu’une réponse de l’assistant est terminée. Nous obtenons tous les fragments audio associés à la réponse, ainsi que la transcription, et les journalisons dans Weave.
    3.send_audio: Un gestionnaire ajoute des fragments d’audio utilisateur à un tampon et envoie des fragments audio lorsque le tampon audio atteint une certaine taille.
class RTAudioModel(weave.Model):
    """Classe de modèle pour l'interaction avec le modèle audio e2e temps réel d'OpenAI avec transcription utilisateur Whisper pour la journalisation."""

    realtime_model_name: str = "gpt-4o-realtime-preview-2024-10-01"  # interaction avec le modèle audio e2e temps réel uniquement

    stop_event: Optional[threading.Event] = threading.Event()  # Événement pour arrêter le modèle
    ws: Optional[websocket.WebSocket] = None  # Websocket pour les communications OpenAI

    user_wav_writer: Optional[StreamingWavWriter] = (
        None  # Flux pour l'écriture de la sortie utilisateur dans un fichier
    )
    input_audio_buffer: Optional[np.ndarray] = None  # Tampon pour les fragments audio utilisateur
    assistant_outputs: dict[str, StreamingWavWriter] = (
        None  # Sorties de l'assistant agrégées à envoyer à Weave
    )
    playback_stream: Optional[pyaudio.Stream] = (
        None  # Flux de lecture pour jouer les réponses de l'assistant
    )

    def __init__(self):
        super().__init__()
        self.stop_event.clear()
        self.user_wav_writer = StreamingWavWriter(
            filename="user_audio.wav", framerate=SAMPLE_RATE
        )
        self.input_audio_buffer = np.array([], dtype=np.int16)
        self.ws = websocket.WebSocket()
        self.assistant_outputs = {}

        # Ouvrir le flux de lecture audio de l'assistant si activé
        if enable_audio_playback:
            self.playback_stream = pyaudio.PyAudio().open(
                format=pyaudio.paInt16,
                channels=OUTPUT_DEVICE_CHANNELS,
                rate=OAI_SAMPLE_RATE,
                output=True,
                output_device_index=OUTPUT_DEVICE_INDEX,
            )

        # Connexion au Websocket
        try:
            self.ws.connect(
                f"wss://api.openai.com/v1/realtime?model={self.realtime_model_name}",
                header={
                    "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
                    "OpenAI-Beta": "realtime=v1",
                },
            )

            # Envoi du message de configuration
            config_event = SessionUpdate(
                session=Session(
                    modalities=["text", "audio"],  # modalités à utiliser
                    input_audio_transcription=InputAudioTranscription(
                        model="whisper-1"
                    ),  # whisper-1 pour la transcription
                    turn_detection=TurnDetection(
                        type="server_vad",
                        threshold=0.3,
                        prefix_padding_ms=300,
                        silence_duration_ms=600,
                    ),  # VAD serveur pour la détection du silence
                )
            )
            self.ws.send(config_event.model_dump_json(exclude_none=True))
            self.log_ws_message(config_event.model_dump_json(exclude_none=True), "Sent")

            # Démarrage de l'écouteur
            websocket_thread = threading.Thread(target=self.receive_messages_thread)
            websocket_thread.daemon = True
            websocket_thread.start()

        except Exception as e:
            print(f"Erreur de connexion au WebSocket : {e}")

    ##### Intégration Weave et gestionnaires de messages #####
    def handle_assistant_response_audio_delta(self, data: ResponseAudioDelta):
        if data.response_id not in self.assistant_outputs:
            self.assistant_outputs[data.response_id] = StreamingWavWriter(
                framerate=OAI_SAMPLE_RATE
            )

        data_bytes = base64.b64decode(data.delta)
        self.assistant_outputs[data.response_id].append_int16_chunk(data_bytes)

        if enable_audio_playback:
            self.playback_stream.write(data_bytes)

        return {"assistant_audio": data_bytes}

    @weave.op()
    def handle_assistant_response_done(self, data: ResponseDone):
        wave_file_stream = self.assistant_outputs[data.response.id]
        wave_file_stream.close()
        wave_file_stream.buffer.seek(0)
        weave_payload = {
            "assistant_audio": wave.open(wave_file_stream.get_wav_buffer(), "rb"),
            "assistant_transcript": data.response.output[0]
            .content[0]
            .get("transcript", "Transcription indisponible."),
        }
        return weave_payload

    @weave.op()
    def handle_user_transcription_done(
        self, data: ConversationItemInputAudioTranscriptionCompleted
    ):
        return {"user_transcript": data.transcript}

    ##### Réception et envoi de messages #####
    def receive_messages_thread(self):
        while not self.stop_event.is_set():
            try:
                data = json.loads(self.ws.recv())
                self.log_ws_message(json.dumps(data, indent=2))

                parsed_event = parse_server_event(data)

                if parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_TRANSCRIPT_DONE:
                    print("Assistant : ", parsed_event.transcript)
                elif (
                    parsed_event.type
                    == ServerEventTypes.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED
                ):
                    print("Utilisateur : ", parsed_event.transcript)
                    self.handle_user_transcription_done(parsed_event)
                elif parsed_event.type == ServerEventTypes.RESPONSE_AUDIO_DELTA:
                    self.handle_assistant_response_audio_delta(parsed_event)
                elif parsed_event.type == ServerEventTypes.RESPONSE_DONE:
                    self.handle_assistant_response_done(parsed_event)
                elif parsed_event.type == ServerEventTypes.ERROR:
                    print(
                        f"\nErreur du serveur : {parsed_event.error.model_dump_json(exclude_none=True)}"
                    )
            except websocket.WebSocketConnectionClosedException:
                print("\nConnexion WebSocket fermée")
                break
            except json.JSONDecodeError:
                continue
            except Exception as e:
                print(f"\nErreur dans receive_messages : {e}")
                break

    def send_audio(self, audio_chunk):
        if self.ws and self.ws.connected:
            self.input_audio_buffer = np.append(
                self.input_audio_buffer, np.frombuffer(audio_chunk, dtype=np.int16)
            )
            if len(self.input_audio_buffer) >= SAMPLE_RATE * CHUNK_DURATION:
                try:
                    # Rééchantillonnage de l'audio au taux d'échantillonnage OAI
                    resampled_audio = (
                        resampy.resample(
                            self.input_audio_buffer, SAMPLE_RATE, OAI_SAMPLE_RATE
                        )
                        if SAMPLE_RATE != OAI_SAMPLE_RATE
                        else self.input_audio_buffer
                    )

                    # Envoi du fragment audio à l'API OAI
                    audio_event = InputAudioBufferAppend(
                        audio=base64.b64encode(
                            resampled_audio.astype(np.int16).tobytes()
                        ).decode("utf-8")  # Conversion du tableau audio en octets b64
                    )
                    self.ws.send(audio_event.model_dump_json(exclude_none=True))
                    self.log_ws_message(
                        audio_event.model_dump_json(exclude_none=True), "Sent"
                    )
                finally:
                    self.user_wav_writer.append_int16_chunk(self.input_audio_buffer)

                    # Effacement du tampon audio
                    self.input_audio_buffer = np.array([], dtype=np.int16)
        else:
            print("Erreur lors de l'envoi de l'audio : websocket non initialisé.")

    ##### Fonctions utilitaires générales #####
    def log_ws_message(self, message, direction="Received"):
        with open("websocket_log.txt", "a") as log_file:
            log_file.write(
                f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {direction}: {message}\n"
            )

    def stop(self):
        self.stop_event.set()

        if self.ws:
            self.ws.close()

        self.user_wav_writer.close()

Enregistreur audio

Nous utilisons un flux d’entrée pyaudio avec un gestionnaire associé à la méthode send_audio du modèle RTAudio. Le flux est renvoyé au thread principal afin de pouvoir être arrêté en toute sécurité à la fin du programme.
# Flux de capture audio
def record_audio(realtime_model: RTAudioModel) -> pyaudio.Stream:
    """Configure un flux d'entrée Pyaudio et utilise le RTAudioModel comme rappel pour la diffusion des données."""

    def audio_callback(in_data, frame_count, time_info, status):
        realtime_model.send_audio(in_data)
        return (None, pyaudio.paContinue)

    p = pyaudio.PyAudio()
    stream = p.open(
        format=pyaudio.paInt16,
        channels=INPUT_DEVICE_CHANNELS,
        rate=SAMPLE_RATE,
        input=True,
        input_device_index=INPUT_DEVICE_INDEX,
        frames_per_buffer=CHUNK,
        stream_callback=audio_callback,
    )
    stream.start_stream()

    print("Enregistrement démarré. Veuillez commencer à parler à votre assistant personnel...")
    return stream

Thread principal (Lancez-moi !)

Le thread principal lance un modèle Audio en temps réel intégrant Weave. Ensuite, on ouvre un enregistrement et on attend une interruption clavier de l’utilisateur.
weave.init(project_name="realtime-oai-audio-testing")

realtime_model = RTAudioModel()

if realtime_model.ws and realtime_model.ws.connected:
    recording_stream: pyaudio.Stream = record_audio(realtime_model)

    try:
        while not realtime_model.stop_event.is_set():
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"Error in main loop: {e}")
        import traceback

        traceback.print_exc()
    finally:
        print("Exiting...")
        realtime_model.stop()
        if recording_stream and recording_stream.is_active():
            recording_stream.stop_stream()
            recording_stream.close()
else:
    print(
        "WebSocket connection failed. Please check your API key and internet connection."
    )