メインコンテンツへスキップ
Open In Colab CrewAI は、完全にゼロから構築された軽量かつ高速な Python フレームワークであり、LangChain やその他のエージェントフレームワークから完全に独立しています。CrewAI は、高レベルのシンプルさ(Crews)と、きめ細かな低レベル制御(Flows)の両方を開発者に提供し、あらゆるシナリオに合わせた自律型 AI エージェントを構築するのに最適です。詳細は CrewAI のドキュメントをご覧ください。 AI エージェントを扱う際には、エージェント同士のやり取りをデバッグおよびモニタリングすることが重要です。CrewAI アプリケーションは複数のエージェントが協調して動作することが多く、エージェントがどのように連携し、コミュニケーションを取っているかを把握することが不可欠です。Weave は CrewAI アプリケーションのトレースを自動的に取得することで、このプロセスを簡素化し、エージェントのパフォーマンスややり取りをモニタリングおよび解析できるようにします。 このインテグレーションは Crews と Flows の両方をサポートします。

Crew の利用を開始する

この例を実行するには、CrewAI(インストール手順)と Weave をインストールする必要があります。
pip install crewai weave
ここでは CrewAI の Crew を作成し、Weave を使用して実行をトレースします。まずはスクリプトの先頭で weave.init() を呼び出してください。weave.init() の引数には、トレースを記録するプロジェクト名を指定します。
import weave
from crewai import Agent, Task, Crew, LLM, Process

# プロジェクト名でWeaveを初期化する
weave.init(project_name="crewai_demo")

# 決定論的な出力を保証するためtemperatureを0に設定してLLMを作成する
llm = LLM(model="gpt-4o-mini", temperature=0)

# エージェントを作成する
researcher = Agent(
    role='Research Analyst',
    goal='Find and analyze the best investment opportunities',
    backstory='Expert in financial analysis and market research',
    llm=llm,
    verbose=True,
    allow_delegation=False,
)

writer = Agent(
    role='Report Writer',
    goal='Write clear and concise investment reports',
    backstory='Experienced in creating detailed financial reports',
    llm=llm,
    verbose=True,
    allow_delegation=False,
)

# タスクを作成する
research_task = Task(
    description='Deep research on the {topic}',
    expected_output='Comprehensive market data including key players, market size, and growth trends.',
    agent=researcher
)

writing_task = Task(
    description='Write a detailed report based on the research',
    expected_output='The report should be easy to read and understand. Use bullet points where applicable.',
    agent=writer
)

# クルーを作成する
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=True,
    process=Process.sequential,
)

# クルーを実行する
result = crew.kickoff(inputs={"topic": "AI in material science"})
print(result)
Weave は CrewAI ライブラリ経由で行われるすべての呼び出し(エージェントのインタラクション、タスクの実行、LLM 呼び出しを含む)を追跡してログに記録します。これらのトレースは Weave の Web インターフェース上で確認できます。 crew_trace.png
CrewAI は、kickoff プロセスをより細かく制御するために、kickoff(), kickoff_for_each(), kickoff_async(), kickoff_for_each_async() など複数のメソッドを提供しています。このインテグレーションは、これらすべてのメソッドからのトレースのログ記録をサポートします。

ツールのトラッキング

CrewAI のツールは、ウェブ検索やデータ分析から、コラボレーションや他のエージェントへのタスクの委任まで、幅広い機能をエージェントに提供します。インテグレーションは、それらの実行をトレースすることもできます。 先ほどの例で生成されるレポートの品質を向上させるために、インターネットを検索して最も関連性の高い結果を返せるツールにアクセスできるようにします。 まずは追加の依存関係をインストールします。
pip install 'crewai[tools]'
この例では、SerperDevTool を使用して「Research Analyst」エージェントがインターネット上の関連情報を検索できるようにしています。このツールおよび API の要件についての詳細は、こちらを参照してください。
# .... 既存のインポート ....
from crewai_tools import SerperDevTool

# エージェントにツールを提供します。
researcher = Agent(
    role='Research Analyst',
    goal='Find and analyze the best investment opportunities',
    backstory='Expert in financial analysis and market research',
    llm=llm,
    verbose=True,
    allow_delegation=False,
    tools=[SerperDevTool()],
)

# .... 既存のコード ....
インターネットにアクセス可能なエージェントを使ってこの Crew を実行すると、より高品質で、より関連性の高い結果が得られます。以下の画像に示すように、ツールの利用状況は自動的にトレースされます。 crew_with_tool_trace.png
このインテグレーションでは、crewAI-tools リポジトリで利用可能なすべてのツールに対して自動的にパッチを適用します。

Flow を使い始める

import weave
# プロジェクト名でWeaveを初期化する
weave.init("crewai_demo")

from crewai.flow.flow import Flow, listen, router, start
from litellm import completion


class CustomerFeedbackFlow(Flow):
    model = "gpt-4o-mini"

    @start()
    def fetch_feedback(self):
        print("Fetching customer feedback")
        # 実際のシナリオでは、これをAPIコールに置き換えることができる。
        # この例では、顧客フィードバックをシミュレートする。
        feedback = (
            "I had a terrible experience with the product. "
            "It broke after one use and customer service was unhelpful."
        )
        self.state["feedback"] = feedback
        return feedback

    @router(fetch_feedback)
    def analyze_feedback(self, feedback):
        # 言語モデルを使用してセンチメントを分析する
        prompt = (
            f"Analyze the sentiment of this customer feedback and "
            "return only 'positive' or 'negative':\n\n"
            f"Feedback: {feedback}"
        )
        response = completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
        )
        sentiment = response["choices"][0]["message"]["content"].strip().lower()
        # レスポンスが曖昧な場合はnegativeをデフォルトとする
        if sentiment not in ["positive", "negative"]:
            sentiment = "negative"
        return sentiment

    @listen("positive")
    def handle_positive_feedback(self):
        # ポジティブなフィードバックに対するお礼メッセージを生成する
        prompt = "Generate a thank you message for a customer who provided positive feedback."
        response = completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
        )
        thank_you_message = response["choices"][0]["message"]["content"].strip()
        self.state["response"] = thank_you_message
        return thank_you_message

    @listen("negative")
    def handle_negative_feedback(self):
        # ネガティブなフィードバックに対してサービス改善を約束するお詫びメッセージを生成する
        prompt = (
            "Generate an apology message to a customer who provided negative feedback and offer assistance or a solution."
        )
        response = completion(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
        )
        apology_message = response["choices"][0]["message"]["content"].strip()
        self.state["response"] = apology_message
        return apology_message

# フローをインスタンス化してキックオフする
flow = CustomerFeedbackFlow()
result = flow.kickoff()
print(result)
flow.png
このインテグレーションは、Flow.kickoff エントリポイントと、利用可能なすべてのデコレータ(@start@listen@router@or_@and_)に自動的にパッチを当てます。

Crew ガードレール - 自前のオペレーションを追跡する

タスクガードレールは、タスクの出力を次のタスクに渡す前に検証および変換するための手段を提供します。シンプルな Python 関数を使って、エージェントの実行をオンザフライで検証できます。 この関数を @weave.op でラップすると、入力・出力・アプリケーションロジックが自動的に記録されるため、データがエージェントを通じてどのように検証されているかをデバッグできます。さらに、実験中のコードも自動的にバージョン管理され、まだ git にコミットしていないアドホックな変更も含めて履歴を残せます。 リサーチアナリスト兼ライターの例を考えてみましょう。生成されたレポートの長さを検証するためにガードレールを追加します。
# .... 既存のインポートとweaveの初期化 ....

# ガードレール関数を `@weave.op()` でデコレートする
@weave.op(name="guardrail-validate_blog_content")
def validate_blog_content(result: TaskOutput) -> Tuple[bool, Any]:
    # 生の文字列結果を取得する
    result = result.raw

    """ブログコンテンツが要件を満たしているか検証する。"""
    try:
        # 単語数を確認する
        word_count = len(result.split())

        if word_count > 200:
            return (False, {
                "error": "ブログコンテンツが200語を超えています",
                "code": "WORD_COUNT_ERROR",
                "context": {"word_count": word_count}
            })

        # 追加の検証ロジックをここに記述
        return (True, result.strip())
    except Exception as e:
        return (False, {
            "error": "検証中に予期しないエラーが発生しました",
            "code": "SYSTEM_ERROR"
        })


# .... 既存のエージェントとリサーチアナリストのタスク ....

writing_task = Task(
    description='調査に基づいて200語以内の詳細なレポートを作成する',
    expected_output='レポートは読みやすく理解しやすいものにすること。必要に応じて箇条書きを使用すること。',
    agent=writer,
    guardrail=validate_blog_content,
)

# .... クルーを実行する既存のコード ....
@weave.op で guardrail 関数をデコレートするだけで、この関数の入出力に加えて、実行時間、内部で LLM が動作している場合のトークン情報、コードバージョンなどを追跡できるようになります。 guardrail.png

結論

このインテグレーションについて改善できる点があれば、ぜひお知らせください。問題が発生した場合は、こちらから issue を作成してください。 CrewAI の豊富なサンプルドキュメントを通じて、強力なマルチエージェントシステムの構築方法についてさらに学ぶことができます。