メインコンテンツへスキップ
W&B Weave Threads を使用すると、LLM アプリケーション内のマルチターン会話を追跡および分析できます。Threads は関連する呼び出しを共通の thread_id の下にグループ化し、セッション全体を可視化し、ターンをまたいだ会話レベルのメトリクスを追跡できるようにします。Threads はプログラムから作成でき、Weave UI で可視化できます。 Threads の利用を開始するには、次のことを行います。
  1. Threads の基本を把握します。
  2. 一般的な利用パターンと実際のユースケースを示すコードサンプルを試します。

ユースケース

スレッドは、次のようなものを整理・分析したいときに有用です:
  • マルチターンの会話
  • セッションベースのワークフロー
  • 関連する一連の処理
スレッドを使うと、呼び出しをコンテキストごとにグループ化でき、システムが複数のステップにわたってどのように応答しているかを理解しやすくなります。たとえば、単一のユーザーセッション、エージェントの一連の意思決定、インフラストラクチャ層とビジネスロジック層にまたがる複雑なリクエストを追跡できます。 アプリケーションをスレッドとターンで構造化することで、メトリクスがより整ったものになり、Weave UI での可視性も向上します。すべての低レベルな処理を追うのではなく、重要な高レベルのステップに集中できます。

定義

スレッド

スレッド は、共通の会話コンテキストを共有する関連する呼び出しを論理的にまとめたものです。スレッドには次の特徴があります:
  • 一意の thread_id を持つ
  • 1 つ以上の ターン を含む
  • 呼び出し間でコンテキストを維持する
  • 完全なユーザーセッションまたはインタラクションフローを表す

Turn

Turn は Thread 内の高レベルな操作であり、UI ではスレッドビューの個別の行として表示されます。各 Turn は次のようなものです。
  • 会話やワークフローにおける 1 つの論理ステップを表します
  • Turn は thread context の直接の子であり、ネストされた、より低レベルな Call を含む場合があります(これらは thread レベルの統計には表示されません)。

Call

Call は、アプリケーション内で実行される、@weave.op デコレータ付きの関数のことです。
  • Turn calls は、新しいターンを開始するトップレベルの処理です
  • Nested calls は、ターン内に含まれる下位レベルの処理です

Trace

Trace は、単一のオペレーションにおけるコールスタック全体をキャプチャします。スレッドは、同じ論理的な会話またはセッションに属する複数の Trace をまとめたものです。言い換えると、スレッドは複数のターンから構成されており、それぞれが会話の一部を表します。Trace の詳細については、Tracing の概要を参照してください。

UI の概要

Weave サイドバーで Threads を選択して、Threads リストビュー にアクセスします。
Weave のサイドバーにある Threads アイコン

スレッド一覧ビュー

  • プロジェクト内の最近のスレッドを一覧表示します
  • 列にはターン数、開始時刻、最終更新時刻が表示されます
  • 行をクリックすると、その詳細ドロワーが開きます
The Threads list view

スレッド詳細ドロワー

  • 任意の行をクリックすると、その行の詳細ドロワーが開きます。
  • スレッド内のすべてのターンが表示されます。
  • ターンは開始時刻順に一覧表示されます(継続時間や終了時刻ではなく、開始時刻に基づきます)。
  • 呼び出しレベルのメタデータ(レイテンシ、入力、出力)が含まれます。
  • ログされている場合は、メッセージ内容や構造化データをオプションとして表示できます。
  • ターンの実行全体を確認するには、スレッド詳細ドロワーからそのターンを開きます。これにより、その特定のターン中に発生したすべてのネストされたオペレーションを掘り下げて確認できます。
  • ターンに LLM コールから抽出されたメッセージが含まれている場合、それらは右側のチャットペインに表示されます。これらのメッセージは通常、対応するインテグレーション(例: openai.ChatCompletion.create)によって行われたコールから生成され、表示されるには特定の条件を満たす必要があります。詳細については、Chat view behavior を参照してください。

チャットビューの動作

チャットペインには、各ターンでの LLM 呼び出しから抽出された構造化メッセージデータが表示されます。このビューでは、やり取りの内容が会話形式で表示されます。
Chat view

メッセージとして扱われるものは何ですか?

メッセージは、1ターン内の呼び出しのうち、LLMプロバイダとの直接的なやり取り(例: プロンプト送信とレスポンス受信)にあたるものから抽出されます。他の呼び出しの内部にさらにネストされていない呼び出しだけがメッセージとして表示されます。これにより、中間ステップや集約された内部ロジックが重複して表示されることを防ぎます。 通常、メッセージは次のような、自動パッチ適用済みのサードパーティ SDK によって生成されます:
  • openai.ChatCompletion.create
  • anthropic.Anthropic.completion

メッセージが存在しない場合はどうなりますか?

あるターンでメッセージが一切出力されない場合、そのターンにはチャットペイン上で空のメッセージセクションが表示されます。ただし、同じスレッド内の別のターンからのメッセージは、引き続きチャットペインに表示される場合があります。

ターンとチャットの連動

  • ターンをクリックすると、そのターンのメッセージ位置までチャットペインがスクロールします(ピン留め動作)。
  • チャットペインをスクロールすると、左側のリストで対応するターンがハイライト表示されます。
ターンの完全なトレースを表示するには、そのターンをクリックします。 スレッド詳細ビューに戻るための戻るボタンが左上に表示されます。UI の状態(スクロール位置など)は、この画面遷移を行っても保持されません。
Threads ドロワービュー

SDK の使用方法

このセクションの各サンプルは、アプリケーション内でターンやスレッドを構成するためのさまざまな戦略を示します。ほとんどのサンプルでは、スタブ関数内に独自の LLM 呼び出しやシステム動作を実装してください。
  • セッションや会話をトラッキングするには、weave.thread() コンテキストマネージャーを使用します。
  • 論理処理を @weave.op でデコレートして、ターンやネストされた呼び出しとしてトラッキングします。
  • thread_id を渡した場合、Weave はそのブロック内のすべての操作を同一スレッドとしてまとめます。thread_id を省略した場合、Weave は一意な ID を自動生成します。
weave.thread() の戻り値は thread_id プロパティを持つ ThreadContext オブジェクトです。これをログに記録したり、再利用したり、他のシステムへ渡したりできます。 ネストされた weave.thread() コンテキストは、同じ thread_id が再利用されない限り、常に新しいスレッドを開始します。子コンテキストの終了は、親コンテキストを中断したり上書きしたりしません。これにより、アプリのロジックに応じて、フォークしたスレッド構造や多層的なスレッドオーケストレーションを実現できます。

基本的なスレッドの作成

次のコードサンプルは、weave.thread() を使用して、1 つ以上の処理を共通の thread_id でグループ化する方法を示します。これは、アプリケーションで Threads を使い始めるための最も簡単な方法です。
import weave

@weave.op
def say_hello(name: str) -> str:
    return f"Hello, {name}!"

# 新しいスレッドコンテキストを開始する
with weave.thread() as thread_ctx:
    print(f"Thread ID: {thread_ctx.thread_id}")
    say_hello("Bill Nye the Science Guy")

手動エージェントループの実装

この例では、@weave.op デコレーターと weave.thread() コンテキストマネージャーを使って、会話型エージェントを手動で定義する方法を示します。process_user_message を呼び出すたびに、スレッド内に新しいターンが作成されます。独自のエージェントループを構築し、コンテキストやネスト構造の扱いを完全に制御したい場合に、このパターンを利用できます。 短時間のやり取りには自動生成されたスレッド ID を使用するか、セッション間でスレッドコンテキストを保持するために、user_session_123 のようなカスタムセッション ID を渡してください。
import weave

class ConversationAgent:
    @weave.op
    def process_user_message(self, message: str) -> str:
        """
        ターンレベルの操作: これは1つの会話ターンを表します。
        スレッド統計でカウントされるのはこの関数のみです。
        """
        # ユーザーメッセージを保存
        # ネストされた呼び出しを通じてAIレスポンスを生成
        response = self._generate_response(message)
        # アシスタントのレスポンスを保存
        return response

    @weave.op
    def _generate_response(self, message: str) -> str:
        """ネストされた呼び出し: 実装の詳細。スレッド統計にはカウントされません。"""
        context = self._retrieve_context(message)     # 別のネストされた呼び出し
        intent = self._classify_intent(message)       # 別のネストされた呼び出し
        response = self._call_llm(message, context)   # LLM呼び出し(ネスト)
        return self._format_response(response)        # 最後のネストされた呼び出し

    @weave.op
    def _retrieve_context(self, message: str) -> str:
        # ベクターDBルックアップ、ナレッジベースクエリなど
        return "retrieved_context"

    @weave.op
    def _classify_intent(self, message: str) -> str:
        # 意図分類ロジック
        return "general_inquiry"

    @weave.op
    def _call_llm(self, message: str, context: str) -> str:
        # OpenAI/Anthropic/等のAPI呼び出し
        return "llm_response"

    @weave.op
    def _format_response(self, response: str) -> str:
        # レスポンスフォーマットロジック
        return f"Formatted: {response}"

# 使用方法: スレッドコンテキストは自動的に確立されます
agent = ConversationAgent()

# スレッドコンテキストを確立 - process_user_messageの各呼び出しがターンになります
with weave.thread() as thread_ctx:  # thread_idを自動生成
    print(f"Thread ID: {thread_ctx.thread_id}")

    # process_user_messageの各呼び出しで1ターン + 複数のネストされた呼び出しが作成されます
    agent.process_user_message("Hello, help with setup")           # ターン1
    agent.process_user_message("What languages do you recommend?") # ターン2
    agent.process_user_message("Explain Python vs JavaScript")     # ターン3

# 結果: 3ターンのスレッド、合計約15〜20回の呼び出し(ネストを含む)

# 代替方法: セッション追跡に明示的なthread_idを使用
session_id = "user_session_123"
with weave.thread(session_id) as thread_ctx:
    print(f"Session Thread ID: {thread_ctx.thread_id}")  # "user_session_123"

    agent.process_user_message("Continue our previous conversation")  # このセッションのターン1
    agent.process_user_message("Can you summarize what we discussed?") # このセッションのターン2

手動エージェント(コールスタックの深さが不均一な場合)

この例では、スレッドコンテキストの適用方法に応じて、コールスタック内の異なる深さでターンを定義できることを示す例です。サンプルでは 2 つのプロバイダ(OpenAI と Anthropic)を使用しており、それぞれターン境界に到達するまでのコールの深さが異なります。 すべてのターンは同じ thread_id を共有しますが、プロバイダのロジックによって、ターン境界がスタック内の異なるレベルに現れます。これは、同じスレッドにまとめつつも、バックエンドごとに呼び出しを異なる形でトレースする必要がある場合に有用です。
import weave
import random
import asyncio

class OpenAIProvider:
    """OpenAIブランチ: ターン境界までの2段階コールチェーン"""

    @weave.op
    def route_to_openai(self, user_input: str, thread_id: str) -> str:
        """レベル1: OpenAIリクエストのルーティングと準備"""
        # 入力検証、ルーティングロジック、基本的な前処理
        print(f"  L1: Routing to OpenAI for: {user_input}")

        # ここがターン境界 - スレッドコンテキストでラップする
        with weave.thread(thread_id):
            # レベル2を直接呼び出す - コールチェーンの深さを生成する
            return self.execute_openai_call(user_input)

    @weave.op
    def execute_openai_call(self, user_input: str) -> str:
        """レベル2: ターン境界 - OpenAI API呼び出しの実行"""
        print(f"    L2: Executing OpenAI API call")
        response = f"OpenAI GPT-4 response: {user_input}"
        return response


class AnthropicProvider:
    """Anthropicブランチ: ターン境界までの3段階コールチェーン"""

    @weave.op
    def route_to_anthropic(self, user_input: str, thread_id: str) -> str:
        """レベル1: Anthropicリクエストのルーティングと準備"""
        # 入力検証、ルーティングロジック、プロバイダー選択
        print(f"  L1: Routing to Anthropic for: {user_input}")

        # レベル2を呼び出す - コールチェーンの深さを生成する
        return self.authenticate_anthropic(user_input, thread_id)

    @weave.op
    def authenticate_anthropic(self, user_input: str, thread_id: str) -> str:
        """レベル2: Anthropic認証とセットアップの処理"""
        print(f"    L2: Authenticating with Anthropic")

        # 認証、レート制限、セッション管理
        auth_token = "anthropic_key_xyz_authenticated"

         # ここがターン境界 - レベル3でスレッドコンテキストでラップする
        with weave.thread(thread_id):
            # レベル3を呼び出す - コールチェーンをさらにネストする
            return self.execute_anthropic_call(user_input, auth_token)

    @weave.op
    def execute_anthropic_call(self, user_input: str, auth_token: str) -> str:
        """レベル3: ターン境界 - Anthropic API呼び出しの実行"""
        print(f"      L3: Executing Anthropic API call with auth")
        response = f"Anthropic Claude response (auth: {auth_token[:15]}...): {user_input}"
        return response


class MultiProviderAgent:
    """コールチェーンの深さが異なるプロバイダー間でルーティングするメインエージェント"""

    def __init__(self):
        self.openai_provider = OpenAIProvider()
        self.anthropic_provider = AnthropicProvider()

    def handle_conversation_turn(self, user_input: str, thread_id: str) -> str:
        """
        コールチェーンの深さが不均一な異なるプロバイダーへルーティングする。
        スレッドコンテキストは各チェーンの異なるネストレベルで適用される。
        """
        # デモ用にプロバイダーをランダムに選択する
        use_openai = random.choice([True, False])

        if use_openai:
            print(f"Choosing OpenAI (2-level call chain)")
            # OpenAI: レベル1 → レベル2(ターン境界)
            response = self.openai_provider.route_to_openai(user_input, thread_id)
            return f"[OpenAI Branch] {response}"
        else:
            print(f"Choosing Anthropic (3-level call chain)")
            # Anthropic: レベル1 → レベル2 → レベル3(ターン境界)
            response = self.anthropic_provider.route_to_anthropic(user_input, thread_id)
            return f"[Anthropic Branch] {response}"


async def main():
    agent = MultiProviderAgent()
    conversation_id = "nested_depth_conversation_999"

    # コールチェーンの深さが異なるマルチターン会話
    conversation_turns = [
        "What's deep learning?",
        "Explain neural network backpropagation",
        "How do attention mechanisms work?",
        "What's the transformer architecture?",
        "Compare CNNs vs RNNs"
    ]

    print(f"Starting conversation: {conversation_id}")

    for i, user_input in enumerate(conversation_turns, 1):
        print(f"\\n--- Turn {i} ---")
        print(f"User: {user_input}")

        # 異なるコールチェーンの深さ全体で同じthread_idを使用する
        response = agent.handle_conversation_turn(user_input, conversation_id)
        print(f"Agent: {response}")

if __name__ == "__main__":
    asyncio.run(main())

# 期待される結果: 5ターンを持つ単一スレッド
# - OpenAIターン: コールチェーンのレベル2でスレッドコンテキスト
#   コールスタック: route_to_openai() → execute_openai_call() ← ここにスレッドコンテキスト
# - Anthropicターン: コールチェーンのレベル3でスレッドコンテキスト
#   コールスタック: route_to_anthropic() → authenticate_anthropic() → execute_anthropic_call() ← ここにスレッドコンテキスト
# - 全ターンがthread_idを共有: "nested_depth_conversation_999"
# - ターン境界は異なるコールスタックの深さでマークされる
# - コールチェーン内のサポート操作はターンではなくネストされた呼び出しとして追跡される

以前のセッションを再開する

以前に開始したセッションを再開し、同じスレッドに呼び出しを追加し続ける必要がある場合があります。場合によっては、既存のセッションを再開できず、新しいスレッドを開始しなければならないこともあります。 オプションのスレッド再開機能を実装する際は、thread_id パラメータを 決して None のままにしないでください。そうするとスレッドのグルーピングが完全に無効になります。代わりに、常に有効なスレッド ID を指定してください。新しいスレッドを作成する必要がある場合は、generate_id() のような関数を使って一意の識別子を生成します。 thread_id が指定されていない場合、Weave の内部実装は自動的にランダムな UUID v7 を生成します。自前の generate_id() 関数でこの挙動を再現するか、任意の一意な文字列値を使用できます。
import weave
import uuidv7
import argparse

def generate_id():
    """UUID v7を使用して一意のスレッドIDを生成する。"""
    return str(uuidv7.uuidv7())

@weave.op
def load_history(session_id):
    """指定されたセッションの会話履歴を読み込む。"""
    # ここに実装を記述する
    return []

# セッション再開のためのコマンドライン引数を解析する
parser = argparse.ArgumentParser()
parser.add_argument("--session-id", help="再開する既存のセッションID")
args = parser.parse_args()

# スレッドIDを決定する: 既存のセッションを再開するか新規作成する
if args.session_id:
    thread_id = args.session_id
    print(f"Resuming session: {thread_id}")
else:
    thread_id = generate_id()
    print(f"Starting new session: {thread_id}")

# 呼び出しを追跡するためのスレッドコンテキストを確立する
with weave.thread(thread_id) as thread_ctx:
    # 会話履歴を読み込むか初期化する
    history = load_history(thread_id)
    print(f"Active thread ID: {thread_ctx.thread_id}")
    
    # ここにアプリケーションのロジックを記述する...

ネストされたスレッド

この例では、複数の連携したスレッドを使用して複雑なアプリケーションを設計する方法を示します。 各レイヤーはそれぞれ独自のスレッドコンテキスト内で実行されるため、関心事を明確に分離できます。親アプリケーションスレッドは、共有の ThreadContext を使ってスレッド ID を設定することで、これらのレイヤーを調整します。このパターンは、システムの異なる部分を独立して分析または監視しつつ、それらを共有セッションに結び付けておきたい場合に使用してください。
import weave
from contextlib import contextmanager
from typing import Dict

# ネストされたスレッドを調整するためのグローバルスレッドコンテキスト
class ThreadContext:
    def __init__(self):
        self.app_thread_id = None
        self.infra_thread_id = None
        self.logic_thread_id = None

    def setup_for_request(self, request_id: str):
        self.app_thread_id = f"app_{request_id}"
        self.infra_thread_id = f"{self.app_thread_id}_infra"
        self.logic_thread_id = f"{self.app_thread_id}_logic"

# グローバルインスタンス
thread_ctx = ThreadContext()

class InfrastructureLayer:
    """専用スレッドですべてのインフラストラクチャ操作を処理する"""

    @weave.op
    def authenticate_user(self, user_id: str) -> Dict:
        # 認証ロジック...
        return {"user_id": user_id, "authenticated": True}

    @weave.op
    def call_payment_gateway(self, amount: float) -> Dict:
        # 支払い処理...
        return {"status": "approved", "amount": amount}

    @weave.op
    def update_inventory(self, product_id: str, quantity: int) -> Dict:
        # 在庫管理...
        return {"product_id": product_id, "updated": True}

    def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
        """専用スレッドコンテキストですべてのインフラストラクチャ操作を実行する"""
        with weave.thread(thread_ctx.infra_thread_id):
            auth_result = self.authenticate_user(user_id)
            payment_result = self.call_payment_gateway(order_data["amount"])
            inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])

            return {
                "auth": auth_result,
                "payment": payment_result,
                "inventory": inventory_result
            }


class BusinessLogicLayer:
    """専用スレッドでビジネスロジックを処理する"""

    @weave.op
    def validate_order(self, order_data: Dict) -> Dict:
        # 検証ロジック...
        return {"valid": True}

    @weave.op
    def calculate_pricing(self, order_data: Dict) -> Dict:
        # 価格計算...
        return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}

    @weave.op
    def apply_business_rules(self, order_data: Dict) -> Dict:
        # ビジネスルール...
        return {"rules_applied": ["standard_processing"], "priority": "normal"}

    def execute_logic(self, order_data: Dict) -> Dict:
        """専用スレッドコンテキストですべてのビジネスロジックを実行する"""
        with weave.thread(thread_ctx.logic_thread_id):
            validation = self.validate_order(order_data)
            pricing = self.calculate_pricing(order_data)
            rules = self.apply_business_rules(order_data)

            return {"validation": validation, "pricing": pricing, "rules": rules}


class OrderProcessingApp:
    """メインアプリケーションオーケストレーター"""

    def __init__(self):
        self.infra = InfrastructureLayer()
        self.business = BusinessLogicLayer()

    @weave.op
    def process_order(self, user_id: str, order_data: Dict) -> Dict:
        """メインの注文処理 - アプリスレッドのターンになる"""

        # 専用スレッドでネストされた操作を実行する
        infra_results = self.infra.execute_operations(user_id, order_data)
        logic_results = self.business.execute_logic(order_data)

        # 最終オーケストレーション
        return {
            "order_id": f"order_12345",
            "status": "completed",
            "infra_results": infra_results,
            "logic_results": logic_results
        }


# グローバルスレッドコンテキストの調整を使用した利用例
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
    # このリクエスト用のスレッドコンテキストをセットアップする
    thread_ctx.setup_for_request(request_id)

    # アプリスレッドコンテキストで実行する
    with weave.thread(thread_ctx.app_thread_id):
        app = OrderProcessingApp()
        result = app.process_order(user_id, order_data)
        return result

# 使用例
order_result = handle_order_request(
    request_id="req_789",
    user_id="user_001",
    order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)

# 想定されるスレッド構造:
#
# アプリスレッド: app_req_789
# └── ターン: process_order() ← メインオーケストレーション
#
# インフラスレッド: app_req_789_infra
# ├── ターン: authenticate_user() ← インフラストラクチャ操作 1
# ├── ターン: call_payment_gateway() ← インフラストラクチャ操作 2
# └── ターン: update_inventory() ← インフラストラクチャ操作 3
#
# ロジックスレッド: app_req_789_logic
# ├── ターン: validate_order() ← ビジネスロジック操作 1
# ├── ターン: calculate_pricing() ← ビジネスロジック操作 2
# └── ターン: apply_business_rules() ← ビジネスロジック操作 3
#
# メリット:
# - スレッド間での関心事の明確な分離
# - スレッド ID をパラメーターとして引き回す必要がない
# - アプリ/インフラ/ロジック層の独立した監視
# - スレッドコンテキストによるグローバルな調整

API 仕様

エンドポイント

エンドポイント: POST /threads/query

リクエストスキーマ

class ThreadsQueryReq:
    project_id: str
    limit: Optional[int] = None
    offset: Optional[int] = None
    sort_by: Optional[list[SortBy]] = None  # サポートされているフィールド: thread_id, turn_count, start_time, last_updated
    sortable_datetime_after: Optional[datetime] = None   # グラニュール最適化によるスレッドのフィルタリング
    sortable_datetime_before: Optional[datetime] = None  # グラニュール最適化によるスレッドのフィルタリング

レスポンススキーマ

class ThreadSchema:
    thread_id: str           # スレッドの一意識別子
    turn_count: int          # このスレッド内のターン呼び出し数
    start_time: datetime     # このスレッド内のターン呼び出しの最も早い開始時刻
    last_updated: datetime   # このスレッド内のターン呼び出しの最も遅い終了時刻

class ThreadsQueryRes:
    threads: List[ThreadSchema]

最近アクティブなスレッドをクエリする

この例では、最新の更新から順にスレッドを 50 件取得します。my-project は実際のプロジェクト ID に置き換えてください。
# 最近アクティブなスレッドを取得する
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sort_by=[SortBy(field="last_updated", direction="desc")],
    limit=50
))

for thread in response.threads:
    print(f"Thread {thread.thread_id}: {thread.turn_count} turns, last active {thread.last_updated}")

アクティビティレベルでスレッドを検索する

この例では、ターン数の多い順に並べた、最もアクティブなスレッド上位 20 件を取得します。
# 最もアクティブなスレッドを取得(ターン数が最多)
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sort_by=[SortBy(field="turn_count", direction="desc")],
    limit=20
))

直近のスレッドのみをクエリする

この例では、過去24時間以内に開始されたスレッドを返します。timedelta 内の days の値を調整することで、対象期間を変更できます。
from datetime import datetime, timedelta

# 過去24時間に開始されたスレッドを取得する
yesterday = datetime.now() - timedelta(days=1)
response = client.threads_query(ThreadsQueryReq(
    project_id="my-project",
    sortable_datetime_after=yesterday,
    sort_by=[SortBy(field="start_time", direction="desc")]
))