메인 콘텐츠로 건너뛰기
Weave의 OpenAI Realtime API 인테그레이션을 사용하면 애플리케이션의 음성-대-음성 상호작용을 발생하는 즉시 자동으로 트레이스할 수 있습니다. 이를 통해 에이전트와 사용자 간의 대화를 캡처하여 에이전트 성능을 검토하고 평가할 수 있습니다.

실시간 트레이스 통합하기

Weave는 OpenAI Realtime API에 자동으로 패치되므로, 애플리케이션의 오디오 상호작용을 캡처하기 시작하려면 몇 줄의 코드만 추가하면 됩니다. 다음 코드는 Weave와 Realtime API 인테그레이션을 임포트합니다:
import weave
from weave.integrations import patch_openai_realtime

weave.init("your-team-name/your-project-name")
patch_openai_realtime()

# 애플리케이션 로직
코드에 임포트해 실행하면 Weave가 사용자와 OpenAI Realtime API 간의 상호작용을 자동으로 트레이스합니다.

OpenAI Agents SDK로 실시간 음성 어시스턴트 실행하기

이 예제는 마이크 오디오를 OpenAI의 Realtime API로 스트리밍하고, AI의 음성 응답을 로컬 머신의 스피커를 통해 재생하는 실시간 음성 어시스턴트를 실행합니다. 이 애플리케이션은 RealtimeAgentRealtimeRunner를 사용하는 OpenAI Agents SDK를 사용하며, patch_openai_realtime()로 패치해 Weave tracing을 활성화합니다. 예제를 실행하려면:
  1. Python 환경을 실행하고 다음 라이브러리를 설치하세요:
    uv add weave openai-agents websockets pyaudio numpy
    
  2. weave_voice_assistant.py라는 이름의 파일을 만들고 여기에 다음 코드를 추가합니다. 강조 표시된 줄은 애플리케이션에 Weave를 인테그레이션한 부분을 나타냅니다. 나머지 코드는 기본적인 음성 비서 앱을 만듭니다.
    import argparse
    import asyncio
    import queue
    import sys
    import termios
    import threading
    import tty
    import weave
    import pyaudio
    import numpy as np
    from weave.integrations import patch_openai_realtime
    from agents.realtime import RealtimeAgent, RealtimeRunner
    
    DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>"
    
    FORMAT = pyaudio.paInt16
    RATE = 24000  # Required by the OpenAI Realtime API.
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 1
    MAX_OUTPUT_CHANNELS = 1
    
    INP_DEV_IDX = None
    OUT_DEV_IDX = None
    
    # Weave 프로젝트 이름 및 오디오 장치 선택을 위한 CLI 인수를 파싱합니다.
    def parse_args():
        parser = argparse.ArgumentParser(description="Weave 로깅을 사용하는 Realtime 에이전트")
        parser.add_argument(
            "--weave-project",
            default=DEFAULT_WEAVE_PROJECT,
            help=f"Weave 프로젝트 이름 (기본값: {DEFAULT_WEAVE_PROJECT})",
            dest="weave_project"
        )
        parser.add_argument(
            "--input-device",
            type=int,
            default=None,
            help="PyAudio 입력(마이크) 장치 인덱스. 기본값은 시스템 기본값입니다. 장치 목록을 확인하려면 mic_detect.py를 실행하세요.",
            dest="input_device"
        )
        parser.add_argument(
            "--output-device",
            type=int,
            default=None,
            help="PyAudio 출력(스피커) 장치 인덱스. 기본값은 시스템 기본값입니다. 장치 목록을 확인하려면 mic_detect.py를 실행하세요.",
            dest="output_device"
        )
        return parser.parse_args()
    
    
    # Weave를 초기화하고 트레이싱을 위해 OpenAI Realtime API를 패치합니다.
    def init_weave(project_name: str | None = None) -> None:
        name = project_name or DEFAULT_WEAVE_PROJECT
        weave.init(name)
        patch_openai_realtime()  # Realtime API 세션의 자동 트레이싱을 활성화합니다.
    
    
    mic_enabled = True
    
    # 마이크 켜기/끄기 전환을 위해 't' 키 입력을 감지합니다. 데몬 스레드에서 실행됩니다.
    def start_keylistener():
        global mic_enabled
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            tty.setcbreak(fd)
            while True:
                ch = sys.stdin.read(1)
                if ch.lower() == 't':
                    mic_enabled = not mic_enabled
                    state = "ON" if mic_enabled else "OFF"
                    print(f"\n🎙  마이크 {state} (t를 눌러 전환)")
                elif ch == '\x03':  # Ctrl-C
                    break
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
    
    # 백그라운드 스레드에서 오디오 큐를 비우고 스피커에 출력합니다.
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    # 오디오 스트림을 열고, Realtime 세션을 시작하며, 송수신 루프를 실행합니다.
    async def main(*, input_device_index: int | None = None, output_device_index: int | None = None):
        p = pyaudio.PyAudio()
    
        if input_device_index is None:
            input_device_index = int(p.get_default_input_device_info()['index'])
        if output_device_index is None:
            output_device_index = int(p.get_default_output_device_info()['index'])
    
        # pyaudio 오류를 방지하기 위해 채널 수를 장치 지원 범위 내로 제한합니다.
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
    
        input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS)
        output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # 인터럽트 발생 시 재생 중인 오디오를 플러시할 수 있도록 큐를 통해 오디오를 버퍼링합니다.
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        s_agent = RealtimeAgent(
            name="Speech Assistant",
            instructions="당신은 도구를 활용하는 AI입니다. 가능한 한 도구를 사용하여 작업을 수행하세요."
        )
    
        s_runner = RealtimeRunner(s_agent, config={
            "model_settings": {
                "model_name": "gpt-realtime",
                "modalities": ["audio"],
                "output_modalities": ["audio"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "speed": 1.2,
                "turn_detection": {
                    "prefix_padding_ms": 100,
                    "silence_duration_ms": 100,
                    "type": "server_vad",
                    "interrupt_response": True,
                    "create_response": True,
                },
            }
        })
        print("--- 세션 활성화됨 (마이크에 대고 말하세요) ---")
        print("🎙  마이크 ON (t를 눌러 전환)")
    
        threading.Thread(target=start_keylistener, daemon=True).start()
    
        async with await s_runner.run() as session:
            # 마이크 입력을 Realtime API로 스트리밍하며, 음소거 시 무음을 전송합니다.
            async def send_mic_audio():
                silence = b'\x00' * CHUNK * 2  # 샘플당 2바이트 (16비트 PCM).
                try:
                    while True:
                        raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                        if mic_enabled:
                            audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                            rms = np.sqrt(np.mean(audio_data**2))
                            meter = int(min(rms / 50, 50))
                            print(f"마이크 레벨: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r")
                            await session.send_audio(raw_data)
                        else:
                            print(f"마이크 레벨: {' ' * 50} | 🎙 OFF", end="\r")
                            await session.send_audio(silence)
    
                        await asyncio.sleep(0)  # 읽기 사이에 이벤트 루프에 제어권을 양보합니다.
                except Exception:
                    pass
    
            # 세션에서 이벤트를 수신하고 오디오를 스피커로 전달합니다.
            async def handle_events():
                async for event in session:
                    if event.type == "audio":
                        audio_output_queue.put(event.audio.data)
                    elif event.type == "audio_interrupted":
                        # 사용자 발화를 덮어쓰지 않도록 큐에 쌓인 AI 오디오를 플러시합니다.
                        while not audio_output_queue.empty():
                            try:
                                audio_output_queue.get_nowait()
                            except queue.Empty:
                                break
    
            mic_task = asyncio.create_task(send_mic_audio())
            try:
                await handle_events()
            finally:
                mic_task.cancel()
    
        # 정리
        audio_output_queue.put(None)  # 재생 스레드에 종료 신호를 보냅니다.
        mic.close()
        speaker.close()
        p.terminate()
    
    if __name__ == "__main__":
        args = parse_args()
        init_weave(args.weave_project)
        fd = sys.stdin.fileno()
        old_settings = termios.tcgetattr(fd)
        try:
            asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device))
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
    
  3. DEFAULT_WEAVE_PROJECT 값을 팀 이름과 프로젝트 이름으로 설정하세요.
  4. OPENAI_API_KEY 환경 변수를 설정하세요.
  5. 코드를 실행하세요:
    python weave_voice_assistant.py
    
실행되면 키보드에서 T를 눌러 마이크를 음소거하거나 음소거를 해제하세요. 어시스턴트는 서버 측 음성 활동 감지를 사용해 차례 전환과 끼어들기를 처리합니다. 어시스턴트와 대화하는 동안 Weave는 세션 오디오를 포함한 트레이스를 수집하며, 이를 Weave UI에서 탐색할 수 있습니다.

WebSockets를 사용해 실시간 음성 어시스턴트 실행

다음 예제는 WebSockets를 통해 OpenAI Realtime API에 직접 연결합니다. 마이크 오디오를 API로 스트리밍하고, 음성 응답을 재생하며, 도구 호출 기능(날씨 조회, 수식 계산, 코드 실행, 파일 쓰기)을 지원합니다. Weave는 weave.init()patch_openai_realtime()을 사용해 세션을 트레이스합니다. 예제를 실행하려면:
  1. Python 환경을 실행한 다음 다음 라이브러리를 설치합니다:
    uv add weave websockets pyaudio numpy
    
  2. tool_definitions.py 파일을 만들고 여기에 다음 도구 정의를 추가하세요. 메인 애플리케이션은 이 모듈을 임포트합니다.
    import json
    import subprocess
    import tempfile
    from pathlib import Path
    import weave
    
    
    # @function_tool
    @weave.op
    def get_weather(city: str) -> str:
        """도시의 현재 날씨를 조회합니다.
    
        매개변수:
            city: 날씨를 조회할 도시 이름입니다.
        """
        return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"})
    
    
    @weave.op
    def calculate(expression: str) -> str:
        """수학 식을 평가하고 결과를 반환합니다.
    
        매개변수:
            expression: 평가할 수학 식입니다(예: '2 + 2').
        """
        try:
            result = eval(expression)
            return str(result)
        except Exception as e:
            return f"오류: {e}"
    
    
    @weave.op
    def run_python_code(code: str) -> str:
        """Python 스크립트를 작성하고 실행한 뒤 stdout/stderr를 반환합니다.
    
        매개변수:
            code: 실행할 Python 소스 코드입니다.
        """
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False
        ) as f:
            f.write(code)
            script_path = Path(f.name)
    
        try:
            result = subprocess.run(
                ["python", str(script_path)],
                capture_output=True,
                text=True,
                timeout=30,
            )
            output = result.stdout
            if result.stderr:
                output += f"\nSTDERR:\n{result.stderr}"
            if result.returncode != 0:
                output += f"\n(종료 코드 {result.returncode})"
            return output or "(출력 없음)"
        except subprocess.TimeoutExpired:
            return "오류: 스크립트 실행 시간이 30초를 초과했습니다."
        finally:
            script_path.unlink(missing_ok=True)
    
    
    @weave.op
    async def write_file(file_path: str, content: str) -> str:
        """디스크의 파일에 내용을 씁니다.
    
        매개변수:
            file_path: 파일을 쓸 경로입니다.
            content: 파일에 쓸 내용입니다.
        """
        try:
            path = Path(file_path)
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content)
            return f"{file_path}{len(content)}바이트를 썼습니다"
        except Exception as e:
            return f"파일 쓰기 오류: {e}"
    
  3. 같은 디렉터리에 weave_ws_voice_assistant.py라는 파일을 만들고, 그 파일에 다음 코드를 추가합니다.
    import asyncio
    import base64
    import json
    import os
    import queue
    import threading
    from typing import Any, Callable
    
    import numpy as np
    import pyaudio
    import websockets
    
    import weave
    weave.init("<your-team-name/your-project-name>")
    from weave.integrations import patch_openai_realtime
    patch_openai_realtime()
    
    from tool_definitions import (
        calculate,
        get_weather,
        run_python_code,
        write_file,
    )
    
    # 오디오 형식 (Realtime API에서는 PCM16이어야 합니다).
    FORMAT = pyaudio.paInt16
    RATE = 24000
    CHUNK = 1024
    MAX_INPUT_CHANNELS = 2
    MAX_OUTPUT_CHANNELS = 2
    
    OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
    REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
    
    DEBUG_WRITE_LOG = False
    
    # 함수 호출 디스패치를 위한 도구 이름 -> callable 매핑.
    TOOL_REGISTRY: dict[str, Callable[..., Any]] = {
        "get_weather": get_weather,
        "calculate": calculate,
        "run_python_code": run_python_code,
        "write_file": write_file,
    }
    
    # Realtime API 세션 설정을 위한 원시 도구 정의.
    TOOL_DEFINITIONS = [
        {
            "type": "function",
            "name": "get_weather",
            "description": "도시의 현재 날씨를 조회합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "날씨를 조회할 도시 이름.",
                    }
                },
                "required": ["city"],
            },
        },
        {
            "type": "function",
            "name": "calculate",
            "description": "수식을 계산하고 결과를 반환합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "계산할 수식 (예: '2 + 2').",
                    }
                },
                "required": ["expression"],
            },
        },
        {
            "type": "function",
            "name": "run_python_code",
            "description": "Python 스크립트를 작성하고 실행하여 stdout/stderr를 반환합니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "실행할 Python 소스 코드.",
                    }
                },
                "required": ["code"],
            },
        },
        {
            "type": "function",
            "name": "write_file",
            "description": "디스크의 파일에 내용을 씁니다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "파일을 쓸 경로.",
                    },
                    "content": {
                        "type": "string",
                        "description": "파일에 쓸 내용.",
                    },
                },
                "required": ["file_path", "content"],
            },
        },
    ]
    
    
    async def send_event(ws, event: dict) -> None:
        await ws.send(json.dumps(event))
    
    
    async def configure_session(ws) -> None:
        event = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "model": "gpt-realtime",
                "output_modalities": ["audio"],
                "instructions": (
                    "당신은 도구를 사용할 수 있는 유용한 AI 어시스턴트입니다. "
                    "가능한 한 도구를 활용하여 작업을 수행하세요. "
                    "명확하고 간결하게 말하세요."
                ),
                "tools": TOOL_DEFINITIONS,
                "tool_choice": "auto",
                "audio": {
                    "input": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                        "transcription": {"model": "gpt-4o-transcribe"},
                        "turn_detection": {
                            "type": "server_vad",
                            "threshold": 0.5,
                            "prefix_padding_ms": 300,
                            "silence_duration_ms": 500,
                        },
                    },
                    "output": {
                        "format": {"type": "audio/pcm", "rate": 24000},
                    },
                },
            },
        }
        await send_event(ws, event)
        print("세션이 설정되었습니다.")
    
    
    async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None:
        if not name:
            raise Exception("함수 이름을 받지 못했습니다")
    
        print(f"\n[함수 호출] {name}({arguments})")
        tool_fn = TOOL_REGISTRY.get(name)
        if tool_fn is None:
            result = json.dumps({"error": f"알 수 없는 함수: {name}"})
        else:
            try:
                args = json.loads(arguments)
                result = tool_fn(**args)
                if asyncio.iscoroutine(result):
                    result = await result
            except Exception as e:
                result = json.dumps({"error": str(e)})
    
        print(f"[함수 결과] {result}")
    
        # 함수 호출 결과를 모델에 다시 전송합니다.
        await send_event(ws, {
            "type": "conversation.item.create",
            "item": {
                "type": "function_call_output",
                "call_id": call_id,
                "output": result if isinstance(result, str) else json.dumps(result),
            },
        })
    
        # 모델이 함수 결과를 반영할 수 있도록 새 응답을 트리거합니다.
        await send_event(ws, {"type": "response.create"})
    
    
    def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
        """pyaudio의 write()는 사운드 카드가 샘플을 소비할 때까지 블로킹되므로
        별도의 스레드에서 실행됩니다. 재생을 비동기 이벤트 루프에서 분리하면
        진행 중인 쓰기 완료를 기다리지 않고 인터럽트 시 큐를 즉시 비울 수 있습니다."""
        while True:
            data = audio_output_queue.get()
            if data is None:
                break
            output_stream.write(data)
    
    
    async def send_mic_audio(ws, mic) -> None:
        try:
            while True:
                raw_data = mic.read(CHUNK, exception_on_overflow=False)
    
                # 시각적 볼륨 미터.
                audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                rms = np.sqrt(np.mean(audio_data**2))
                meter = int(min(rms / 50, 50))
                print(f"마이크 레벨: {'█' * meter}{' ' * (50 - meter)} |", end="\r")
    
                # Base64로 인코딩하여 오디오 청크를 전송합니다.
                b64_audio = base64.b64encode(raw_data).decode("utf-8")
                await send_event(ws, {
                    "type": "input_audio_buffer.append",
                    "audio": b64_audio,
                })
    
                await asyncio.sleep(0)
        except asyncio.CancelledError:
            pass
    
    
    async def receive_events(ws, audio_output_queue: queue.Queue) -> None:
        # 델타 이벤트에 걸쳐 함수 호출 인수를 누적합니다.
        pending_calls: dict[str, dict] = {}
        async for raw_message in ws:
            if DEBUG_WRITE_LOG:
                with open("data.jsonl", "a", encoding="utf-8") as f:
                    f.write(json.dumps(raw_message) + "\n")
    
            event = json.loads(raw_message)
            event_type = event.get("type", "")
    
            if event_type == "session.created":
                print(raw_message)
    
            elif event_type == "session.updated":
                print(raw_message)
    
            elif event_type == "error":
                print(f"\n[오류] {event}")
    
            elif event_type == "input_audio_buffer.speech_started":
                # 사용자 발화와 겹치지 않도록 큐에 쌓인 AI 오디오를 비웁니다.
                while not audio_output_queue.empty():
                    try:
                        audio_output_queue.get_nowait()
                    except queue.Empty:
                        break
    
            elif event_type == "input_audio_buffer.speech_stopped":
                pass
    
            elif event_type == "input_audio_buffer.committed":
                pass
    
            elif event_type == "response.created":
                pass
    
            elif event_type == "response.output_text.delta":
                pass
    
            elif event_type == "response.output_text.done":
                pass
    
            # 오디오 출력 델타 - 재생을 위해 큐에 추가합니다.
            elif event_type == "response.output_audio.delta":
                audio_bytes = base64.b64decode(event.get("delta", ""))
                audio_output_queue.put(audio_bytes)
    
            elif event_type == "response.output_audio_transcript.delta":
                pass
    
            elif event_type == "response.output_audio_transcript.done":
                pass
    
            # 함수 호출 시작 - 대기 중인 호출을 초기화합니다.
            elif event_type == "response.output_item.added":
                item = event.get("item", {})
                if item.get("type") == "function_call" and item.get("status") == "in_progress":
                    item_id = item.get("id", "")
                    pending_calls[item_id] = {
                        "call_id": item.get("call_id", ""),
                        "name": item.get("name", ""),
                        "arguments": "",
                    }
                    print(f"\n[함수 호출 시작] {item.get('name', '')}")
    
            # 함수 호출 인수 델타 - 누적합니다.
            elif event_type == "response.function_call_arguments.delta":
                item_id = event.get("item_id", "")
                if item_id in pending_calls:
                    pending_calls[item_id]["arguments"] += event.get("delta", "")
    
            elif event_type == "response.function_call_arguments.done":
                item_id = event.get("item_id", "")
                call_info = pending_calls.pop(item_id, None)
                if call_info is None:
                    # 대체 처리: done 이벤트에서 직접 데이터를 사용합니다.
                    call_info = {
                        "call_id": event.get("call_id"),
                        "name": event.get("name"),
                        "arguments": event.get("arguments"),
                    }
                try:
                    await handle_function_call(
                        ws,
                        call_info["call_id"],
                        call_info["name"],
                        call_info["arguments"],
                    )
                except Exception as e:
                    print(f"메시지 {call_info}에 대한 함수 호출 실패: 오류 - {e}")
    
            elif event_type == "response.done":
                pass
    
            elif event_type == "rate_limits.updated":
                pass
    
            else:
                print(f"\n[Event: {event_type}]")
    
    
    async def main():
        if not OPENAI_API_KEY:
            print("오류: OPENAI_API_KEY 환경 변수가 설정되지 않았습니다")
            return
    
        p = pyaudio.PyAudio()
    
        input_device_index = int(p.get_default_input_device_info()['index'])
        output_device_index = int(p.get_default_output_device_info()['index'])
    
        # 채널 수는 장치의 지원 범위와 일치해야 합니다. 그렇지 않으면 pyaudio가 열기 시 오류를 발생시킵니다.
        input_info = p.get_device_info_by_index(input_device_index)
        output_info = p.get_device_info_by_index(output_device_index)
        input_channels = min(int(input_info['maxInputChannels']), 1)
        output_channels = min(int(output_info['maxOutputChannels']), 1)
    
        mic = p.open(
            format=FORMAT,
            channels=input_channels,
            rate=RATE,
            input=True,
            output=False,
            frames_per_buffer=CHUNK,
            input_device_index=input_device_index,
            start=False,
        )
        speaker = p.open(
            format=FORMAT,
            channels=output_channels,
            rate=RATE,
            input=False,
            output=True,
            frames_per_buffer=CHUNK,
            output_device_index=output_device_index,
            start=False,
        )
        mic.start_stream()
        speaker.start_stream()
    
        # 오디오는 큐를 통해 전달되므로 사용자가 중단할 때 비울 수 있습니다.
        # 스피커에 직접 쓰면 전송 중인 오디오를 취소할 수 없습니다.
        audio_output_queue = queue.Queue()
        threading.Thread(
            target=play_audio, args=(speaker, audio_output_queue), daemon=True
        ).start()
    
        headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
        }
    
        print("OpenAI Realtime API에 연결 중...")
    
        async with websockets.connect(
            REALTIME_URL,
            additional_headers=headers,
        ) as ws:
            print("연결되었습니다! 세션을 구성하는 중...")
            await configure_session(ws)
    
            print("--- 세션 활성화됨 (마이크에 말하세요) ---")
    
            mic_task = asyncio.create_task(send_mic_audio(ws, mic))
            try:
                await receive_events(ws, audio_output_queue)
            finally:
                mic_task.cancel()
                try:
                    await mic_task
                except asyncio.CancelledError:
                    pass
    
        # 정리
        audio_output_queue.put(None)  # 재생 스레드에 종료 신호를 보냅니다.
        mic.close()
        speaker.close()
        p.terminate()
        print("\n세션이 종료되었습니다.")
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
  4. weave.init() 호출에서 팀 이름과 프로젝트 이름을 업데이트하세요.
  5. OPENAI_API_KEY 환경 변수를 설정하세요.
  6. 다음 코드를 실행하세요:
    python weave_ws_voice_assistant.py
    
실행되면 키보드에서 T 키를 눌러 마이크를 음소거하거나 음소거 해제할 수 있습니다. 어시스턴트는 서버 측 음성 활동 감지를 사용해 차례 전환과 중간 끼어들기를 처리합니다. 어시스턴트에게 말을 하는 동안 Weave는 세션의 오디오를 포함한 트레이스를 캡처하며, 이를 Weave UI에서 탐색할 수 있습니다.