실시간 트레이스 통합하기
import weave
from weave.integrations import patch_openai_realtime
weave.init("your-team-name/your-project-name")
patch_openai_realtime()
# 애플리케이션 로직
OpenAI Agents SDK로 실시간 음성 어시스턴트 실행하기
RealtimeAgent와 RealtimeRunner를 사용하는 OpenAI Agents SDK를 사용하며, patch_openai_realtime()로 패치해 Weave tracing을 활성화합니다.
예제를 실행하려면:
-
Python 환경을 실행하고 다음 라이브러리를 설치하세요:
- uv
- pip
uv add weave openai-agents websockets pyaudio numpypip install weave openai-agents websockets pyaudio numpy -
weave_voice_assistant.py라는 이름의 파일을 만들고 여기에 다음 코드를 추가합니다. 강조 표시된 줄은 애플리케이션에 Weave를 인테그레이션한 부분을 나타냅니다. 나머지 코드는 기본적인 음성 비서 앱을 만듭니다.weave_voice_assistant.py
import argparse import asyncio import queue import sys import termios import threading import tty import weave import pyaudio import numpy as np from weave.integrations import patch_openai_realtime from agents.realtime import RealtimeAgent, RealtimeRunner DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>" FORMAT = pyaudio.paInt16 RATE = 24000 # Required by the OpenAI Realtime API. CHUNK = 1024 MAX_INPUT_CHANNELS = 1 MAX_OUTPUT_CHANNELS = 1 INP_DEV_IDX = None OUT_DEV_IDX = None # Weave 프로젝트 이름 및 오디오 장치 선택을 위한 CLI 인수를 파싱합니다. def parse_args(): parser = argparse.ArgumentParser(description="Weave 로깅을 사용하는 Realtime 에이전트") parser.add_argument( "--weave-project", default=DEFAULT_WEAVE_PROJECT, help=f"Weave 프로젝트 이름 (기본값: {DEFAULT_WEAVE_PROJECT})", dest="weave_project" ) parser.add_argument( "--input-device", type=int, default=None, help="PyAudio 입력(마이크) 장치 인덱스. 기본값은 시스템 기본값입니다. 장치 목록을 확인하려면 mic_detect.py를 실행하세요.", dest="input_device" ) parser.add_argument( "--output-device", type=int, default=None, help="PyAudio 출력(스피커) 장치 인덱스. 기본값은 시스템 기본값입니다. 장치 목록을 확인하려면 mic_detect.py를 실행하세요.", dest="output_device" ) return parser.parse_args() # Weave를 초기화하고 트레이싱을 위해 OpenAI Realtime API를 패치합니다. def init_weave(project_name: str | None = None) -> None: name = project_name or DEFAULT_WEAVE_PROJECT weave.init(name) patch_openai_realtime() # Realtime API 세션의 자동 트레이싱을 활성화합니다. mic_enabled = True # 마이크 켜기/끄기 전환을 위해 't' 키 입력을 감지합니다. 데몬 스레드에서 실행됩니다. def start_keylistener(): global mic_enabled fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setcbreak(fd) while True: ch = sys.stdin.read(1) if ch.lower() == 't': mic_enabled = not mic_enabled state = "ON" if mic_enabled else "OFF" print(f"\n🎙 마이크 {state} (t를 눌러 전환)") elif ch == '\x03': # Ctrl-C break finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) # 백그라운드 스레드에서 오디오 큐를 비우고 스피커에 출력합니다. def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) # 오디오 스트림을 열고, Realtime 세션을 시작하며, 송수신 루프를 실행합니다. async def main(*, input_device_index: int | None = None, output_device_index: int | None = None): p = pyaudio.PyAudio() if input_device_index is None: input_device_index = int(p.get_default_input_device_info()['index']) if output_device_index is None: output_device_index = int(p.get_default_output_device_info()['index']) # pyaudio 오류를 방지하기 위해 채널 수를 장치 지원 범위 내로 제한합니다. input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS) output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # 인터럽트 발생 시 재생 중인 오디오를 플러시할 수 있도록 큐를 통해 오디오를 버퍼링합니다. audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() s_agent = RealtimeAgent( name="Speech Assistant", instructions="당신은 도구를 활용하는 AI입니다. 가능한 한 도구를 사용하여 작업을 수행하세요." ) s_runner = RealtimeRunner(s_agent, config={ "model_settings": { "model_name": "gpt-realtime", "modalities": ["audio"], "output_modalities": ["audio"], "input_audio_format": "pcm16", "output_audio_format": "pcm16", "speed": 1.2, "turn_detection": { "prefix_padding_ms": 100, "silence_duration_ms": 100, "type": "server_vad", "interrupt_response": True, "create_response": True, }, } }) print("--- 세션 활성화됨 (마이크에 대고 말하세요) ---") print("🎙 마이크 ON (t를 눌러 전환)") threading.Thread(target=start_keylistener, daemon=True).start() async with await s_runner.run() as session: # 마이크 입력을 Realtime API로 스트리밍하며, 음소거 시 무음을 전송합니다. async def send_mic_audio(): silence = b'\x00' * CHUNK * 2 # 샘플당 2바이트 (16비트 PCM). try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) if mic_enabled: audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"마이크 레벨: {'█' * meter}{' ' * (50-meter)} | 🎙 ON ", end="\r") await session.send_audio(raw_data) else: print(f"마이크 레벨: {' ' * 50} | 🎙 OFF", end="\r") await session.send_audio(silence) await asyncio.sleep(0) # 읽기 사이에 이벤트 루프에 제어권을 양보합니다. except Exception: pass # 세션에서 이벤트를 수신하고 오디오를 스피커로 전달합니다. async def handle_events(): async for event in session: if event.type == "audio": audio_output_queue.put(event.audio.data) elif event.type == "audio_interrupted": # 사용자 발화를 덮어쓰지 않도록 큐에 쌓인 AI 오디오를 플러시합니다. while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break mic_task = asyncio.create_task(send_mic_audio()) try: await handle_events() finally: mic_task.cancel() # 정리 audio_output_queue.put(None) # 재생 스레드에 종료 신호를 보냅니다. mic.close() speaker.close() p.terminate() if __name__ == "__main__": args = parse_args() init_weave(args.weave_project) fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device)) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -
DEFAULT_WEAVE_PROJECT값을 팀 이름과 프로젝트 이름으로 설정하세요. -
OPENAI_API_KEY환경 변수를 설정하세요. -
코드를 실행하세요:
python weave_voice_assistant.py
WebSockets를 사용해 실시간 음성 어시스턴트 실행
weave.init()와 patch_openai_realtime()을 사용해 세션을 트레이스합니다.
예제를 실행하려면:
-
Python 환경을 실행한 다음 다음 라이브러리를 설치합니다:
- uv
- pip
uv add weave websockets pyaudio numpypip install weave websockets pyaudio numpy -
tool_definitions.py파일을 만들고 여기에 다음 도구 정의를 추가하세요. 메인 애플리케이션은 이 모듈을 임포트합니다.tool_definitions.py
import json import subprocess import tempfile from pathlib import Path import weave # @function_tool @weave.op def get_weather(city: str) -> str: """도시의 현재 날씨를 조회합니다. 매개변수: city: 날씨를 조회할 도시 이름입니다. """ return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"}) @weave.op def calculate(expression: str) -> str: """수학 식을 평가하고 결과를 반환합니다. 매개변수: expression: 평가할 수학 식입니다(예: '2 + 2'). """ try: result = eval(expression) return str(result) except Exception as e: return f"오류: {e}" @weave.op def run_python_code(code: str) -> str: """Python 스크립트를 작성하고 실행한 뒤 stdout/stderr를 반환합니다. 매개변수: code: 실행할 Python 소스 코드입니다. """ with tempfile.NamedTemporaryFile( mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False ) as f: f.write(code) script_path = Path(f.name) try: result = subprocess.run( ["python", str(script_path)], capture_output=True, text=True, timeout=30, ) output = result.stdout if result.stderr: output += f"\nSTDERR:\n{result.stderr}" if result.returncode != 0: output += f"\n(종료 코드 {result.returncode})" return output or "(출력 없음)" except subprocess.TimeoutExpired: return "오류: 스크립트 실행 시간이 30초를 초과했습니다." finally: script_path.unlink(missing_ok=True) @weave.op async def write_file(file_path: str, content: str) -> str: """디스크의 파일에 내용을 씁니다. 매개변수: file_path: 파일을 쓸 경로입니다. content: 파일에 쓸 내용입니다. """ try: path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) return f"{file_path}에 {len(content)}바이트를 썼습니다" except Exception as e: return f"파일 쓰기 오류: {e}" -
같은 디렉터리에
weave_ws_voice_assistant.py라는 파일을 만들고, 그 파일에 다음 코드를 추가합니다.weave_ws_voice_assistant.py
import asyncio import base64 import json import os import queue import threading from typing import Any, Callable import numpy as np import pyaudio import websockets import weave weave.init("<your-team-name/your-project-name>") from weave.integrations import patch_openai_realtime patch_openai_realtime() from tool_definitions import ( calculate, get_weather, run_python_code, write_file, ) # 오디오 형식 (Realtime API에서는 PCM16이어야 합니다). FORMAT = pyaudio.paInt16 RATE = 24000 CHUNK = 1024 MAX_INPUT_CHANNELS = 2 MAX_OUTPUT_CHANNELS = 2 OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime" DEBUG_WRITE_LOG = False # 함수 호출 디스패치를 위한 도구 이름 -> callable 매핑. TOOL_REGISTRY: dict[str, Callable[..., Any]] = { "get_weather": get_weather, "calculate": calculate, "run_python_code": run_python_code, "write_file": write_file, } # Realtime API 세션 설정을 위한 원시 도구 정의. TOOL_DEFINITIONS = [ { "type": "function", "name": "get_weather", "description": "도시의 현재 날씨를 조회합니다.", "parameters": { "type": "object", "properties": { "city": { "type": "string", "description": "날씨를 조회할 도시 이름.", } }, "required": ["city"], }, }, { "type": "function", "name": "calculate", "description": "수식을 계산하고 결과를 반환합니다.", "parameters": { "type": "object", "properties": { "expression": { "type": "string", "description": "계산할 수식 (예: '2 + 2').", } }, "required": ["expression"], }, }, { "type": "function", "name": "run_python_code", "description": "Python 스크립트를 작성하고 실행하여 stdout/stderr를 반환합니다.", "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "실행할 Python 소스 코드.", } }, "required": ["code"], }, }, { "type": "function", "name": "write_file", "description": "디스크의 파일에 내용을 씁니다.", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "파일을 쓸 경로.", }, "content": { "type": "string", "description": "파일에 쓸 내용.", }, }, "required": ["file_path", "content"], }, }, ] async def send_event(ws, event: dict) -> None: await ws.send(json.dumps(event)) async def configure_session(ws) -> None: event = { "type": "session.update", "session": { "type": "realtime", "model": "gpt-realtime", "output_modalities": ["audio"], "instructions": ( "당신은 도구를 사용할 수 있는 유용한 AI 어시스턴트입니다. " "가능한 한 도구를 활용하여 작업을 수행하세요. " "명확하고 간결하게 말하세요." ), "tools": TOOL_DEFINITIONS, "tool_choice": "auto", "audio": { "input": { "format": {"type": "audio/pcm", "rate": 24000}, "transcription": {"model": "gpt-4o-transcribe"}, "turn_detection": { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500, }, }, "output": { "format": {"type": "audio/pcm", "rate": 24000}, }, }, }, } await send_event(ws, event) print("세션이 설정되었습니다.") async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None: if not name: raise Exception("함수 이름을 받지 못했습니다") print(f"\n[함수 호출] {name}({arguments})") tool_fn = TOOL_REGISTRY.get(name) if tool_fn is None: result = json.dumps({"error": f"알 수 없는 함수: {name}"}) else: try: args = json.loads(arguments) result = tool_fn(**args) if asyncio.iscoroutine(result): result = await result except Exception as e: result = json.dumps({"error": str(e)}) print(f"[함수 결과] {result}") # 함수 호출 결과를 모델에 다시 전송합니다. await send_event(ws, { "type": "conversation.item.create", "item": { "type": "function_call_output", "call_id": call_id, "output": result if isinstance(result, str) else json.dumps(result), }, }) # 모델이 함수 결과를 반영할 수 있도록 새 응답을 트리거합니다. await send_event(ws, {"type": "response.create"}) def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue): """pyaudio의 write()는 사운드 카드가 샘플을 소비할 때까지 블로킹되므로 별도의 스레드에서 실행됩니다. 재생을 비동기 이벤트 루프에서 분리하면 진행 중인 쓰기 완료를 기다리지 않고 인터럽트 시 큐를 즉시 비울 수 있습니다.""" while True: data = audio_output_queue.get() if data is None: break output_stream.write(data) async def send_mic_audio(ws, mic) -> None: try: while True: raw_data = mic.read(CHUNK, exception_on_overflow=False) # 시각적 볼륨 미터. audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64) rms = np.sqrt(np.mean(audio_data**2)) meter = int(min(rms / 50, 50)) print(f"마이크 레벨: {'█' * meter}{' ' * (50 - meter)} |", end="\r") # Base64로 인코딩하여 오디오 청크를 전송합니다. b64_audio = base64.b64encode(raw_data).decode("utf-8") await send_event(ws, { "type": "input_audio_buffer.append", "audio": b64_audio, }) await asyncio.sleep(0) except asyncio.CancelledError: pass async def receive_events(ws, audio_output_queue: queue.Queue) -> None: # 델타 이벤트에 걸쳐 함수 호출 인수를 누적합니다. pending_calls: dict[str, dict] = {} async for raw_message in ws: if DEBUG_WRITE_LOG: with open("data.jsonl", "a", encoding="utf-8") as f: f.write(json.dumps(raw_message) + "\n") event = json.loads(raw_message) event_type = event.get("type", "") if event_type == "session.created": print(raw_message) elif event_type == "session.updated": print(raw_message) elif event_type == "error": print(f"\n[오류] {event}") elif event_type == "input_audio_buffer.speech_started": # 사용자 발화와 겹치지 않도록 큐에 쌓인 AI 오디오를 비웁니다. while not audio_output_queue.empty(): try: audio_output_queue.get_nowait() except queue.Empty: break elif event_type == "input_audio_buffer.speech_stopped": pass elif event_type == "input_audio_buffer.committed": pass elif event_type == "response.created": pass elif event_type == "response.output_text.delta": pass elif event_type == "response.output_text.done": pass # 오디오 출력 델타 - 재생을 위해 큐에 추가합니다. elif event_type == "response.output_audio.delta": audio_bytes = base64.b64decode(event.get("delta", "")) audio_output_queue.put(audio_bytes) elif event_type == "response.output_audio_transcript.delta": pass elif event_type == "response.output_audio_transcript.done": pass # 함수 호출 시작 - 대기 중인 호출을 초기화합니다. elif event_type == "response.output_item.added": item = event.get("item", {}) if item.get("type") == "function_call" and item.get("status") == "in_progress": item_id = item.get("id", "") pending_calls[item_id] = { "call_id": item.get("call_id", ""), "name": item.get("name", ""), "arguments": "", } print(f"\n[함수 호출 시작] {item.get('name', '')}") # 함수 호출 인수 델타 - 누적합니다. elif event_type == "response.function_call_arguments.delta": item_id = event.get("item_id", "") if item_id in pending_calls: pending_calls[item_id]["arguments"] += event.get("delta", "") elif event_type == "response.function_call_arguments.done": item_id = event.get("item_id", "") call_info = pending_calls.pop(item_id, None) if call_info is None: # 대체 처리: done 이벤트에서 직접 데이터를 사용합니다. call_info = { "call_id": event.get("call_id"), "name": event.get("name"), "arguments": event.get("arguments"), } try: await handle_function_call( ws, call_info["call_id"], call_info["name"], call_info["arguments"], ) except Exception as e: print(f"메시지 {call_info}에 대한 함수 호출 실패: 오류 - {e}") elif event_type == "response.done": pass elif event_type == "rate_limits.updated": pass else: print(f"\n[Event: {event_type}]") async def main(): if not OPENAI_API_KEY: print("오류: OPENAI_API_KEY 환경 변수가 설정되지 않았습니다") return p = pyaudio.PyAudio() input_device_index = int(p.get_default_input_device_info()['index']) output_device_index = int(p.get_default_output_device_info()['index']) # 채널 수는 장치의 지원 범위와 일치해야 합니다. 그렇지 않으면 pyaudio가 열기 시 오류를 발생시킵니다. input_info = p.get_device_info_by_index(input_device_index) output_info = p.get_device_info_by_index(output_device_index) input_channels = min(int(input_info['maxInputChannels']), 1) output_channels = min(int(output_info['maxOutputChannels']), 1) mic = p.open( format=FORMAT, channels=input_channels, rate=RATE, input=True, output=False, frames_per_buffer=CHUNK, input_device_index=input_device_index, start=False, ) speaker = p.open( format=FORMAT, channels=output_channels, rate=RATE, input=False, output=True, frames_per_buffer=CHUNK, output_device_index=output_device_index, start=False, ) mic.start_stream() speaker.start_stream() # 오디오는 큐를 통해 전달되므로 사용자가 중단할 때 비울 수 있습니다. # 스피커에 직접 쓰면 전송 중인 오디오를 취소할 수 없습니다. audio_output_queue = queue.Queue() threading.Thread( target=play_audio, args=(speaker, audio_output_queue), daemon=True ).start() headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", } print("OpenAI Realtime API에 연결 중...") async with websockets.connect( REALTIME_URL, additional_headers=headers, ) as ws: print("연결되었습니다! 세션을 구성하는 중...") await configure_session(ws) print("--- 세션 활성화됨 (마이크에 말하세요) ---") mic_task = asyncio.create_task(send_mic_audio(ws, mic)) try: await receive_events(ws, audio_output_queue) finally: mic_task.cancel() try: await mic_task except asyncio.CancelledError: pass # 정리 audio_output_queue.put(None) # 재생 스레드에 종료 신호를 보냅니다. mic.close() speaker.close() p.terminate() print("\n세션이 종료되었습니다.") if __name__ == "__main__": asyncio.run(main()) -
weave.init()호출에서 팀 이름과 프로젝트 이름을 업데이트하세요. -
OPENAI_API_KEY환경 변수를 설정하세요. -
다음 코드를 실행하세요:
python weave_ws_voice_assistant.py