AgentChat), 핵심 멀티 에이전트 기능(Core), 외부 서비스와의 인테그레이션(Extensions)을 위한 컴포넌트를 제공합니다. AutoGen은 또한 코드 작성 없이 에이전트를 프로토타이핑할 수 있는 Studio를 제공합니다. 자세한 내용은 공식 AutoGen 문서를 참조하세요.
이 가이드는 독자가 AutoGen에 대해 기본적인 이해를 갖추고 있다고 가정합니다.
autogen_agentchat, autogen_core, autogen_ext 내의 상호작용을 자동으로 추적할 수 있습니다. 이 가이드는 AutoGen과 함께 Weave를 사용하는 다양한 예제를 통해 이를 단계별로 안내합니다.
사전 준비 사항
잘못된 코드 신고
복사
AI에게 묻기
pip install autogen_agentchat "autogen_ext[openai,anthropic]" weave
잘못된 코드 신고
복사
AI에게 묻기
import os
os.environ["OPENAI_API_KEY"] = "<your-openai-api-key>"
os.environ["ANTHROPIC_API_KEY"] = "<your-anthropic-api-key>"
기본 설정
잘못된 코드 신고
복사
AI에게 묻기
import weave
weave.init("autogen-demo")
간단한 모델 클라이언트 트레이싱
클라이언트 생성 호출 트레이싱
OpenAIChatCompletionClient에 대한 한 번의 호출을 트레이싱하는 방법을 보여줍니다.
잘못된 코드 신고
복사
AI에게 묻기
import asyncio
from autogen_core.models import UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
# from autogen_ext.models.anthropic import AnthropicChatCompletionClient
async def simple_client_call(model_name = "gpt-4o"):
model_client = OpenAIChatCompletionClient(
model=model_name,
)
# 또는 Anthropic이나 다른 모델 클라이언트를 사용할 수 있습니다
# model_client = AnthropicChatCompletionClient(
# model="claude-3-haiku-20240307"
# )
response = await model_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response)
asyncio.run(simple_client_call())
스트리밍을 사용하는 클라이언트 생성 호출 트레이싱
잘못된 코드 신고
복사
AI에게 묻기
async def simple_client_call_stream(model_name = "gpt-4o"):
openai_model_client = OpenAIChatCompletionClient(model=model_name)
async for item in openai_model_client.create_stream(
[UserMessage(content="Hello, how are you?", source="user")]
):
print(item, flush=True, end="")
asyncio.run(simple_client_call_stream())
Weave는 캐시된 호출을 기록합니다
ChatCompletionCache를 사용할 수 있으며, Weave는 이러한 상호작용을 추적하여 응답이 캐시에서 반환된 것인지 새 호출에서 생성된 것인지 보여줍니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_ext.models.cache import ChatCompletionCache
async def run_cache_client(model_name = "gpt-4o"):
openai_model_client = OpenAIChatCompletionClient(model=model_name)
cache_client = ChatCompletionCache(openai_model_client,)
response = await cache_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response) # OpenAI의 응답을 출력해야 함
response = await cache_client.create(
[UserMessage(content="Hello, how are you?", source="user")]
)
print(response) # 캐시된 응답을 출력해야 함
asyncio.run(run_cache_client())
도구 호출이 있는 에이전트 추적
잘못된 코드 신고
복사
AI에게 묻기
from autogen_agentchat.agents import AssistantAgent
async def get_weather(city: str) -> str:
return f"The weather in {city} is 73 degrees and Sunny."
async def run_agent_with_tools(model_name = "gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
agent = AssistantAgent(
name="weather_agent",
model_client=model_client,
tools=[get_weather],
system_message="You are a helpful assistant.",
reflect_on_tool_use=True,
)
# 콘솔에 스트리밍 출력하려면:
# await Console(agent.run_stream(task="What is the weather in New York?"))
res = await agent.run(task="What is the weather in New York?")
print(res)
await model_client.close()
asyncio.run(run_agent_with_tools())
GroupChat 추적 - RoundRobin
RoundRobinGroupChat와 같은 그룹 채팅 내 상호작용은 Weave에서 트레이싱되므로, 에이전트 간 대화 흐름을 추적할 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
# 전체 그룹 채팅을 추적하기 위해 여기에 weave op를 추가합니다
# 완전히 선택 사항이지만 사용하는 것을 강력히 권장합니다
@weave.op
async def run_round_robin_group_chat(model_name="gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
primary_agent = AssistantAgent(
"primary",
model_client=model_client,
system_message="You are a helpful AI assistant.",
)
critic_agent = AssistantAgent(
"critic",
model_client=model_client,
system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)
text_termination = TextMentionTermination("APPROVE")
team = RoundRobinGroupChat(
[primary_agent, critic_agent], termination_condition=text_termination
)
await team.reset()
# 콘솔에 스트리밍 출력하려면:
# await Console(team.run_stream(task="Write a short poem about the fall season."))
result = await team.run(task="Write a short poem about the fall season.")
print(result)
await model_client.close()
asyncio.run(run_round_robin_group_chat())
메모리 트레이싱
@weave.op()을 사용해 메모리 연산을 하나의 트레이스로 묶어서 가독성을 높일 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
# 단일 트레이스 아래에서 memory add 호출과 memory get 호출을 함께 추적하기 위해
# 여기에 weave op를 추가합니다
# 완전히 선택 사항이지만 사용하는 것을 강력히 권장합니다
@weave.op
async def run_memory_agent(model_name="gpt-4o"):
user_memory = ListMemory()
await user_memory.add(
MemoryContent(
content="The weather should be in metric units",
mime_type=MemoryMimeType.TEXT,
)
)
await user_memory.add(
MemoryContent(
content="Meal recipe must be vegan", mime_type=MemoryMimeType.TEXT
)
)
async def get_weather(city: str, units: str = "imperial") -> str:
if units == "imperial":
return f"The weather in {city} is 73 °F and Sunny."
elif units == "metric":
return f"The weather in {city} is 23 °C and Sunny."
else:
return f"Sorry, I don't know the weather in {city}."
model_client = OpenAIChatCompletionClient(model=model_name)
assistant_agent = AssistantAgent(
name="assistant_agent",
model_client=model_client,
tools=[get_weather],
memory=[user_memory],
)
# 콘솔에 스트리밍 출력하려면:
# stream = assistant_agent.run_stream(task="What is the weather in New York?")
# await Console(stream)
result = await assistant_agent.run(task="What is the weather in New York?")
print(result)
await model_client.close()
asyncio.run(run_memory_agent())
RAG 워크플로 추적
ChromaDBVectorMemory 같은 메모리 시스템을 활용한 검색 포함)는 추적 가능합니다. RAG 프로세스에 @weave.op() 데코레이터를 적용하면 전체 흐름을 시각화하는 데 도움이 됩니다.
이 RAG 예시는
chromadb가 필요합니다. pip install chromadb로 설치하세요.잘못된 코드 신고
복사
AI에게 묻기
# !pip install -q chromadb
# 환경에 chromadb가 설치되어 있는지 확인하세요: `pip install chromadb`
import re
from typing import List
import os
from pathlib import Path
import aiofiles
import aiohttp
from autogen_core.memory import Memory, MemoryContent, MemoryMimeType
from autogen_ext.memory.chromadb import (
ChromaDBVectorMemory,
PersistentChromaDBVectorMemoryConfig,
)
class SimpleDocumentIndexer:
def __init__(self, memory: Memory, chunk_size: int = 1500) -> None:
self.memory = memory
self.chunk_size = chunk_size
async def _fetch_content(self, source: str) -> str:
if source.startswith(("http://", "https://")):
async with aiohttp.ClientSession() as session:
async with session.get(source) as response:
return await response.text()
else:
async with aiofiles.open(source, "r", encoding="utf-8") as f:
return await f.read()
def _strip_html(self, text: str) -> str:
text = re.sub(r"<[^>]*>", " ", text)
text = re.sub(r"\\s+", " ", text)
return text.strip()
def _split_text(self, text: str) -> List[str]:
chunks: list[str] = []
for i in range(0, len(text), self.chunk_size):
chunk = text[i : i + self.chunk_size]
chunks.append(chunk.strip())
return chunks
async def index_documents(self, sources: List[str]) -> int:
total_chunks = 0
for source in sources:
try:
content = await self._fetch_content(source)
if "<" in content and ">" in content:
content = self._strip_html(content)
chunks = self._split_text(content)
for i, chunk in enumerate(chunks):
await self.memory.add(
MemoryContent(
content=chunk,
mime_type=MemoryMimeType.TEXT,
metadata={"source": source, "chunk_index": i},
)
)
total_chunks += len(chunks)
except Exception as e:
print(f"{source} 인덱싱 오류: {str(e)}")
return total_chunks
@weave.op
async def run_rag_agent(model_name="gpt-4o"):
rag_memory = ChromaDBVectorMemory(
config=PersistentChromaDBVectorMemoryConfig(
collection_name="autogen_docs",
persistence_path=os.path.join(str(Path.home()), ".chromadb_autogen_weave"),
k=3,
score_threshold=0.4,
)
)
# await rag_memory.clear() # 기존 메모리를 초기화하려면 주석을 해제하세요
async def index_autogen_docs() -> None:
indexer = SimpleDocumentIndexer(memory=rag_memory)
sources = [
"https://raw.githubusercontent.com/microsoft/autogen/main/README.md",
"https://microsoft.github.io/autogen/dev/user-guide/agentchat-user-guide/tutorial/agents.html",
]
chunks: int = await indexer.index_documents(sources)
print(f"{len(sources)}개의 AutoGen 문서에서 {chunks}개의 청크를 인덱싱했습니다")
# 컬렉션이 비어 있거나 재인덱싱이 필요한 경우에만 인덱싱하세요
# 데모 목적으로 매번 인덱싱하거나 이미 인덱싱되었는지 확인할 수 있습니다.
# 이 예제는 매 run마다 인덱싱을 시도합니다. 확인 로직 추가를 고려하세요.
await index_autogen_docs()
model_client = OpenAIChatCompletionClient(model=model_name)
rag_assistant = AssistantAgent(
name="rag_assistant",
model_client=model_client,
memory=[rag_memory],
)
# 콘솔에 스트리밍 출력하려면:
# stream = rag_assistant.run_stream(task="What is AgentChat?")
# await Console(stream)
result = await rag_assistant.run(task="What is AgentChat?")
print(result)
await rag_memory.close()
await model_client.close()
asyncio.run(run_rag_agent())
에이전트 런타임 트레이싱
SingleThreadedAgentRuntime와 같은 AutoGen 에이전트 런타임 내에서 수행되는 작업을 트레이싱할 수 있습니다. 런타임 실행 함수를 @weave.op()으로 감싸면 관련 트레이스를 하나로 묶을 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from dataclasses import dataclass
from typing import Callable
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
default_subscription,
message_handler,
AgentId,
SingleThreadedAgentRuntime
)
@dataclass
class Message:
content: int
@default_subscription
class Modifier(RoutedAgent):
def __init__(self, modify_val: Callable[[int], int]) -> None:
super().__init__("A modifier agent.")
self._modify_val = modify_val
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
val = self._modify_val(message.content)
print(f"{'-'*80}\\nModifier:\\nModified {message.content} to {val}")
await self.publish_message(Message(content=val), DefaultTopicId())
@default_subscription
class Checker(RoutedAgent):
def __init__(self, run_until: Callable[[int], bool]) -> None:
super().__init__("A checker agent.")
self._run_until = run_until
@message_handler
async def handle_message(self, message: Message, ctx: MessageContext) -> None:
if not self._run_until(message.content):
print(f"{'-'*80}\\nChecker:\\n{message.content} passed the check, continue.")
await self.publish_message(
Message(content=message.content), DefaultTopicId()
)
else:
print(f"{'-'*80}\\nChecker:\\n{message.content} failed the check, stopping.")
# 전체 에이전트 런타임 호출을 단일 트레이스로 추적하기 위해
# 여기에 weave op를 추가합니다
# 선택 사항이지만 사용하는 것을 강력히 권장합니다
@weave.op
async def run_agent_runtime() -> None:
runtime = SingleThreadedAgentRuntime()
await Modifier.register(
runtime,
"modifier",
lambda: Modifier(modify_val=lambda x: x - 1),
)
await Checker.register(
runtime,
"checker",
lambda: Checker(run_until=lambda x: x <= 1),
)
runtime.start()
await runtime.send_message(Message(content=3), AgentId("checker", "default"))
await runtime.stop_when_idle()
asyncio.run(run_agent_runtime())
트레이싱 워크플로(순차형)
@weave.op()을(를) 사용할 수 있습니다.
잘못된 코드 신고
복사
AI에게 묻기
from autogen_core import TopicId, type_subscription
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage
@dataclass
class WorkflowMessage:
content: str
concept_extractor_topic_type = "ConceptExtractorAgent"
writer_topic_type = "WriterAgent"
format_proof_topic_type = "FormatProofAgent"
user_topic_type = "User"
@type_subscription(topic_type=concept_extractor_topic_type)
class ConceptExtractorAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("개념 추출 에이전트입니다.")
self._system_message = SystemMessage(
content=(
"You are a marketing analyst. Given a product description, identify:\n"
"- 주요 기능\n"
"- 타겟 고객\n"
"- 고유한 판매 포인트\n\n"
)
)
self._model_client = model_client
@message_handler
async def handle_user_description(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"제품 설명: {message.content}"
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(writer_topic_type, source=self.id.key)
)
@type_subscription(topic_type=writer_topic_type)
class WriterAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("작성 에이전트입니다.")
self._system_message = SystemMessage(
content=(
"You are a marketing copywriter. Given a block of text describing features, audience, and USPs, "
"이러한 포인트를 강조하는 설득력 있는 마케팅 카피(뉴스레터 섹션 형식)를 작성하세요. "
"출력은 짧아야 하며(약 150단어), 단일 텍스트 블록으로 카피만 출력하세요."
)
)
self._model_client = model_client
@message_handler
async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"아래는 제품에 대한 정보입니다:\\n\\n{message.content}"
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(format_proof_topic_type, source=self.id.key)
)
@type_subscription(topic_type=format_proof_topic_type)
class FormatProofAgent(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("형식 교정 에이전트입니다.")
self._system_message = SystemMessage(
content=(
"You are an editor. Given the draft copy, correct grammar, improve clarity, ensure consistent tone, "
"형식을 갖추고 완성도 있게 다듬으세요. 최종적으로 개선된 카피를 단일 텍스트 블록으로 출력하세요."
)
)
self._model_client = model_client
@message_handler
async def handle_intermediate_text(self, message: WorkflowMessage, ctx: MessageContext) -> None:
prompt = f"초안 카피:\\n{message.content}."
llm_result = await self._model_client.create(
messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
cancellation_token=ctx.cancellation_token,
)
response = llm_result.content
assert isinstance(response, str)
print(f"{'-'*80}\\n{self.id.type}:\\n{response}")
await self.publish_message(
WorkflowMessage(response), topic_id=TopicId(user_topic_type, source=self.id.key)
)
@type_subscription(topic_type=user_topic_type)
class UserAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("최종 카피를 사용자에게 출력하는 사용자 에이전트입니다.")
@message_handler
async def handle_final_copy(self, message: WorkflowMessage, ctx: MessageContext) -> None:
print(f"\\n{'-'*80}\\n{self.id.type} 최종 카피 수신:\\n{message.content}")
# 전체 에이전트 워크플로우를 단일 트레이스로 추적하기 위해
# 여기에 weave op를 추가합니다.
# 완전히 선택 사항이지만 사용하는 것을 강력히 권장합니다.
@weave.op(call_display_name="Sequential Agent Workflow")
async def run_agent_workflow(model_name="gpt-4o"):
model_client = OpenAIChatCompletionClient(model=model_name)
runtime = SingleThreadedAgentRuntime()
await ConceptExtractorAgent.register(runtime, type=concept_extractor_topic_type, factory=lambda: ConceptExtractorAgent(model_client=model_client))
await WriterAgent.register(runtime, type=writer_topic_type, factory=lambda: WriterAgent(model_client=model_client))
await FormatProofAgent.register(runtime, type=format_proof_topic_type, factory=lambda: FormatProofAgent(model_client=model_client))
await UserAgent.register(runtime, type=user_topic_type, factory=lambda: UserAgent())
runtime.start()
await runtime.publish_message(
WorkflowMessage(
content="음료를 24시간 차갑게 유지하는 친환경 스테인리스 스틸 물병"
),
topic_id=TopicId(concept_extractor_topic_type, source="default"),
)
await runtime.stop_when_idle()
await model_client.close()
asyncio.run(run_agent_workflow())
코드 실행 추적기
Docker 필요
이 예제는 Docker를 사용해 코드를 실행하며, 모든 환경(예: Colab에서 직접 실행)에서 동작하지 않을 수 있습니다. 이 예제를 시도하려면 로컬에서 Docker가 실행 중인지 확인하세요.
잘못된 코드 신고
복사
AI에게 묻기
import tempfile
from autogen_core import DefaultTopicId
from autogen_core.code_executor import CodeBlock, CodeExecutor
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
)
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
@dataclass
class CodeGenMessage:
content: str
@default_subscription
class Assistant(RoutedAgent):
def __init__(self, model_client: ChatCompletionClient) -> None:
super().__init__("An assistant agent.")
self._model_client = model_client
self._chat_history: List[LLMMessage] = [
SystemMessage(
content="""Write Python script in markdown block, and it will be executed.
Always save figures to file in the current directory. Do not use plt.show(). All code required to complete this task must be contained within a single response.""",
)
]
@message_handler
async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
self._chat_history.append(UserMessage(content=message.content, source="user"))
result = await self._model_client.create(self._chat_history)
print(f"\\n{'-'*80}\\nAssistant:\\n{result.content}")
self._chat_history.append(AssistantMessage(content=result.content, source="assistant"))
await self.publish_message(CodeGenMessage(content=result.content), DefaultTopicId())
def extract_markdown_code_blocks(markdown_text: str) -> List[CodeBlock]:
pattern = re.compile(r"```(?:\\s*([\\w\\+\\-]+))?\\n([\\s\\S]*?)```")
matches = pattern.findall(markdown_text)
code_blocks: List[CodeBlock] = []
for match in matches:
language = match[0].strip() if match[0] else ""
code_content = match[1]
code_blocks.append(CodeBlock(code=code_content, language=language))
return code_blocks
@default_subscription
class Executor(RoutedAgent):
def __init__(self, code_executor: CodeExecutor) -> None:
super().__init__("An executor agent.")
self._code_executor = code_executor
@message_handler
async def handle_message(self, message: CodeGenMessage, ctx: MessageContext) -> None:
code_blocks = extract_markdown_code_blocks(message.content)
if code_blocks:
result = await self._code_executor.execute_code_blocks(
code_blocks, cancellation_token=ctx.cancellation_token
)
print(f"\\n{'-'*80}\\nExecutor:\\n{result.output}")
await self.publish_message(CodeGenMessage(content=result.output), DefaultTopicId())
# 전체 코드 생성 워크플로우를 단일 트레이스로 추적하기 위해
# 여기에 weave op를 추가합니다
# 완전히 선택 사항이지만 사용하는 것을 강력히 권장합니다
@weave.op(call_display_name="CodeGen Agent Workflow")
async def run_codegen(model_name="gpt-4o"): # 업데이트된 모델
work_dir = tempfile.mkdtemp()
runtime = SingleThreadedAgentRuntime()
# 이 예제를 실행하려면 Docker가 실행 중인지 확인하세요
try:
async with DockerCommandLineCodeExecutor(work_dir=work_dir) as executor:
model_client = OpenAIChatCompletionClient(model=model_name)
await Assistant.register(runtime, "assistant", lambda: Assistant(model_client=model_client))
await Executor.register(runtime, "executor", lambda: Executor(executor))
runtime.start()
await runtime.publish_message(
CodeGenMessage(content="Create a plot of NVDA vs TSLA stock returns YTD from 2024-01-01."),
DefaultTopicId(),
)
await runtime.stop_when_idle()
await model_client.close()
except Exception as e:
print(f"Could not run Docker code executor example: {e}")
print("Please ensure Docker is installed and running.")
finally:
import shutil
shutil.rmtree(work_dir)
asyncio.run(run_codegen())
더 알아보기
- Weave:
- AutoGen: