Documentation Index
Fetch the complete documentation index at: https://wb-21fd5541-john-wbdocs-2044-rename-serverless-products.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
W&B Weave の Threads を使用すると、LLM アプリケーション内の複数ターンにわたる会話をトラッキングして分析できます。Threads では、関連する call を共通の thread_id の下にグループ化するため、セッション全体を可視化し、ターンをまたいで会話レベルのメトリクスをトラッキングできます。スレッドはプログラムで作成でき、Weave UI で可視化できます。
Threads を使い始めるには、次の手順に従ってください。
- まず、Threads の基本を確認します。
- コードサンプルを試します。一般的な使用パターンと実際のユースケースを示しています。
Threads は、次のような対象を整理・分析したい場合に役立ちます。
- 複数ターンにわたる会話
- セッションベースのワークフロー
- 関連する一連の操作
Threads を使用すると、コンテキストごとに Calls をグループ化できるため、複数の step にまたがってシステムがどのように応答するかを把握しやすくなります。たとえば、単一のユーザーセッション、agent の一連の意思決定、あるいはインフラストラクチャー層とビジネスロジック層にまたがる複雑なリクエストをトラッキングできます。
application を threads と turns で構造化することで、より整理されたメトリクスを得られ、Weave UI での可視性も向上します。低レベルの Op を個別にすべて見るのではなく、重要な高レベルの step に集中できます。
Thread は、共通の会話コンテキストを共有する関連する Calls を、論理的にまとめたものです。Thread には次の特徴があります。
- 一意の
thread_id を持つ
- 1 つ以上の turn を含む
- Calls をまたいでコンテキストを維持する
- 完結したユーザーセッションまたはインタラクションフローを表す
Turn は Thread 内の高レベルな操作で、UI ではスレッドビュー内の個々の行として表示されます。各 Turn には、次の特徴があります。
- 会話またはワークフローにおける 1 つの論理的な step を表します
- Turn はスレッドコンテキストの直接の子要素であり、ネストされた下位レベルの call を含む場合があります (スレッドレベルの統計には表示されません) 。
Call とは、アプリケーション内で実行される @weave.op でデコレートされた関数呼び出しのことです。
- Turn Calls は、新しいターンを開始するトップレベルの操作です
- Nested Calls は、ターン内のより下位の操作です
トレース には、単一のoperationにおける完全なCallスタックが記録されます。スレッドは、同じ論理的な会話またはセッションに属するトレースをまとめるものです。言い換えると、スレッドは複数のターンで構成され、それぞれが会話の一部を表します。トレースの詳細については、トレースの概要を参照してください。
Weave サイドバーで Threads を選択して、Threads list view を開きます。
- プロジェクト内の最近のThreadsを一覧表示します
- 列には、ターン数、開始時刻、最終更新時刻があります
- 行をクリックすると、詳細ドロワーが開きます
- 任意の行をクリックすると、その行の詳細ドロワーが開きます。
- スレッド内のすべてのターンを表示します。
- ターンは開始順に表示されます (継続時間や終了時刻ではなく、開始時刻に基づきます) 。
- Callレベルのメタデータ (レイテンシ、inputs、outputs) が含まれます。
- ログされていれば、メッセージ内容や構造化データを表示することもできます。
- ターンの実行全体を表示するには、スレッド詳細ドロワーからそのターンを開きます。これにより、そのターン中に発生したすべてのネストされた操作を詳しく確認できます。
- ターンに LLM calls から抽出されたメッセージが含まれている場合、それらは右側のチャットペインに表示されます。これらのメッセージは通常、サポート対象のインテグレーションによる calls (たとえば
openai.ChatCompletion.create) に由来し、表示するには特定の条件を満たす必要があります。詳細は チャットビューの挙動 を参照してください。
チャットペインには、各ターンで行われた LLM Calls から抽出された構造化メッセージデータが表示されます。このビューでは、やり取りを会話形式で表示できます。
Weave は、1 つのターン内にある Calls のうち、LLM プロバイダーとの直接的なやり取り (たとえば、プロンプトを送信してレスポンスを受け取る処理) を表すものからメッセージを抽出します。ほかの Calls の内側にさらにネストされていない Calls だけが、メッセージとして表示されます。これにより、中間 step や集約された内部ロジックが重複して表示されるのを防げます。
通常、メッセージを生成するのは、自動的にパッチが適用されたサードパーティ製 SDK です。たとえば、次のようなものがあります。
openai.ChatCompletion.create
anthropic.Anthropic.completion
ターンでメッセージが1件も出力されない場合、チャットペイン にはそのターンの空のメッセージセクションが表示されます。同じスレッド内の別のターンのメッセージは、引き続き チャットペイン に表示される場合があります。
- ターンをクリックすると、チャットペインがそのターンのメッセージ位置までスクロールします (ピン留めの動作) 。
- チャットペインをスクロールすると、左側のリストで対応するターンがハイライト表示されます。
項目をクリックすると、そのターンのトレース全体を開けます。
左上に戻るボタンが表示され、スレッド詳細ビューに戻れます。Weave は、この画面遷移の前後で UI の状態 (スクロール位置など) を保持しません。
このセクションの各例では、アプリケーション内でターンとスレッドを整理するための異なる方法を示します。ほとんどの例では、スタブ関数内に独自の LLM call やシステムの挙動を実装する必要があります。
- セッションまたは会話をトラッキングするには、
weave.thread() コンテキストマネージャを使用します。
- 論理的な処理に
@weave.op を付けて、ターンまたはネストされた call としてトラッキングします。
thread_id を渡すと、Weave はそのブロック内のすべての処理を同じスレッドにまとめます。thread_id を省略すると、Weave が一意の ID を自動生成します。
weave.thread() の戻り値は、thread_id プロパティを持つ ThreadContext オブジェクトです。これをログしたり、再利用したり、他のシステムに渡したりできます。
ネストされた weave.thread() コンテキストは、同じ thread_id を再利用しない限り、常に新しいスレッドを開始します。子コンテキストを終了しても、親コンテキストが中断されたり上書きされたりすることはありません。これにより、アプリのロジックに応じて、分岐したスレッド構造や階層的なスレッドオーケストレーションが可能になります。
次のコード例では、weave.thread() を使用して、1 つ以上の処理を共通の thread_id でグループ化する方法を示します。これは、アプリケーションでスレッドを使い始める最も簡単な方法です。
import weave
@weave.op
def say_hello(name: str) -> str:
return f"Hello, {name}!"
# 新しいスレッドコンテキストを開始する
with weave.thread() as thread_ctx:
print(f"Thread ID: {thread_ctx.thread_id}")
say_hello("Bill Nye the Science Guy")
この例では、@weave.op デコレーターと weave.thread() のコンテキスト管理を使って、会話型エージェントを手動で定義する方法を示します。process_user_message を呼び出すたびに、スレッドに新しいターンが作成されます。独自のエージェントループを構築する際に、コンテキストやネストの扱いを完全に制御したい場合は、このパターンを使用できます。
短時間のやり取りでは自動生成されたスレッド ID を使用し、セッションをまたいでスレッドコンテキストを保持したい場合は、カスタムのセッション ID (user_session_123 など) を渡します。
import weave
class ConversationAgent:
@weave.op
def process_user_message(self, message: str) -> str:
"""
ターンレベルの操作: これは1つの会話ターンを表します。
スレッド統計にカウントされるのはこの関数のみです。
"""
# ユーザーメッセージを保存
# ネストされたcallを通じてAIレスポンスを生成
response = self._generate_response(message)
# アシスタントのレスポンスを保存
return response
@weave.op
def _generate_response(self, message: str) -> str:
"""ネストされたcall: 実装の詳細。スレッド統計にはカウントされません。"""
context = self._retrieve_context(message) # 別のネストされたcall
intent = self._classify_intent(message) # 別のネストされたcall
response = self._call_llm(message, context) # LLM call(ネスト)
return self._format_response(response) # 最後のネストされたcall
@weave.op
def _retrieve_context(self, message: str) -> str:
# ベクターDBルックアップ、ナレッジベースクエリなど
return "retrieved_context"
@weave.op
def _classify_intent(self, message: str) -> str:
# 意図分類ロジック
return "general_inquiry"
@weave.op
def _call_llm(self, message: str, context: str) -> str:
# OpenAI/Anthropic/etc API呼び出し
return "llm_response"
@weave.op
def _format_response(self, response: str) -> str:
# レスポンスフォーマットロジック
return f"Formatted: {response}"
# 使用方法: スレッドコンテキストは自動的に確立されます
agent = ConversationAgent()
# スレッドコンテキストを確立 - process_user_messageの各callがターンになります
with weave.thread() as thread_ctx: # thread_idを自動生成
print(f"Thread ID: {thread_ctx.thread_id}")
# process_user_messageを呼び出すたびに1ターン + 複数のネストされたcallが生成されます
agent.process_user_message("Hello, help with setup") # ターン1
agent.process_user_message("What languages do you recommend?") # ターン2
agent.process_user_message("Explain Python vs JavaScript") # ターン3
# 結果: 3ターンのスレッド、合計約15〜20回のcall(ネストを含む)
# 代替方法: セッショントラッキングに明示的なthread_idを使用する
session_id = "user_session_123"
with weave.thread(session_id) as thread_ctx:
print(f"Session Thread ID: {thread_ctx.thread_id}") # "user_session_123"
agent.process_user_message("Continue our previous conversation") # このセッションのターン1
agent.process_user_message("Can you summarize what we discussed?") # このセッションのターン2
この例では、スレッドコンテキストの適用方法によって、ターンを call スタック内の異なる深度で定義できることを示します。サンプルでは 2 つのプロバイダ (OpenAI と Anthropic) を使用しており、それぞれターン境界に到達するまでの call 深度が異なります。
すべてのターンは同じ thread_id を共有しますが、ターン境界はプロバイダのロジックに応じてスタック内の異なるレベルに現れます。これは、バックエンドごとに call を異なる方法でトレースする必要がありつつ、それらを同じスレッドにまとめたい場合に便利です。
import weave
import random
import asyncio
class OpenAIProvider:
"""OpenAIブランチ: ターン境界まで2レベルのcallチェーン"""
@weave.op
def route_to_openai(self, user_input: str, thread_id: str) -> str:
"""Level 1: OpenAIリクエストのルーティングと準備"""
# 入力検証、ルーティングロジック、基本的な前処理
print(f" L1: Routing to OpenAI for: {user_input}")
# ここがターン境界 - スレッドコンテキストでラップする
with weave.thread(thread_id):
# Level 2を直接呼び出す - callチェーンの深さが生まれる
return self.execute_openai_call(user_input)
@weave.op
def execute_openai_call(self, user_input: str) -> str:
"""Level 2: ターン境界 - OpenAI API callの実行"""
print(f" L2: Executing OpenAI API call")
response = f"OpenAI GPT-4 response: {user_input}"
return response
class AnthropicProvider:
"""Anthropicブランチ: ターン境界まで3レベルのcallチェーン"""
@weave.op
def route_to_anthropic(self, user_input: str, thread_id: str) -> str:
"""Level 1: Anthropicリクエストのルーティングと準備"""
# 入力検証、ルーティングロジック、プロバイダー選択
print(f" L1: Routing to Anthropic for: {user_input}")
# Level 2を呼び出す - callチェーンの深さが生まれる
return self.authenticate_anthropic(user_input, thread_id)
@weave.op
def authenticate_anthropic(self, user_input: str, thread_id: str) -> str:
"""Level 2: Anthropic認証とセットアップの処理"""
print(f" L2: Authenticating with Anthropic")
# 認証、レート制限、セッション管理
auth_token = "anthropic_key_xyz_authenticated"
# ここがターン境界 - Level 3でスレッドコンテキストでラップする
with weave.thread(thread_id):
# Level 3を呼び出す - callチェーンをさらにネストする
return self.execute_anthropic_call(user_input, auth_token)
@weave.op
def execute_anthropic_call(self, user_input: str, auth_token: str) -> str:
"""Level 3: ターン境界 - Anthropic API callの実行"""
print(f" L3: Executing Anthropic API call with auth")
response = f"Anthropic Claude response (auth: {auth_token[:15]}...): {user_input}"
return response
class MultiProviderAgent:
"""異なるcallチェーンの深さを持つプロバイダー間でルーティングするメインエージェント"""
def __init__(self):
self.openai_provider = OpenAIProvider()
self.anthropic_provider = AnthropicProvider()
def handle_conversation_turn(self, user_input: str, thread_id: str) -> str:
"""
callチェーンの深さが不均一な各プロバイダーにルーティングする。
スレッドコンテキストは各チェーンの異なるネストレベルで適用される。
"""
# デモ用にプロバイダーをランダムに選択する
use_openai = random.choice([True, False])
if use_openai:
print(f"Choosing OpenAI (2-level call chain)")
# OpenAI: Level 1 → Level 2(ターン境界)
response = self.openai_provider.route_to_openai(user_input, thread_id)
return f"[OpenAI Branch] {response}"
else:
print(f"Choosing Anthropic (3-level call chain)")
# Anthropic: Level 1 → Level 2 → Level 3(ターン境界)
response = self.anthropic_provider.route_to_anthropic(user_input, thread_id)
return f"[Anthropic Branch] {response}"
async def main():
agent = MultiProviderAgent()
conversation_id = "nested_depth_conversation_999"
# 異なるcallチェーンの深さを持つマルチターン会話
conversation_turns = [
"What's deep learning?",
"Explain neural network backpropagation",
"How do attention mechanisms work?",
"What's the transformer architecture?",
"Compare CNNs vs RNNs"
]
print(f"Starting conversation: {conversation_id}")
for i, user_input in enumerate(conversation_turns, 1):
print(f"\\n--- Turn {i} ---")
print(f"User: {user_input}")
# 異なるcallチェーンの深さ全体で同じthread_idを使用する
response = agent.handle_conversation_turn(user_input, conversation_id)
print(f"Agent: {response}")
if __name__ == "__main__":
asyncio.run(main())
# 期待される結果: 5ターンを持つ単一スレッド
# - OpenAIターン: callチェーンのLevel 2にスレッドコンテキスト
# callスタック: route_to_openai() → execute_openai_call() ← ここにスレッドコンテキスト
# - Anthropicターン: callチェーンのLevel 3にスレッドコンテキスト
# callスタック: route_to_anthropic() → authenticate_anthropic() → execute_anthropic_call() ← ここにスレッドコンテキスト
# - 全ターンがthread_idを共有: "nested_depth_conversation_999"
# - ターン境界は異なるcallスタックの深さでマークされる
# - callチェーン内のサポート操作はターンではなくネストされたcallsとしてトラッキングされる
以前に開始したセッションを再開し、同じスレッドに Calls を追加し続ける必要がある場合があります。一方で、既存のセッションを再開できず、代わりに新しいスレッドを開始しなければならない場合もあります。
スレッド再開をオプションとして実装する場合は、thread_id パラメーターを None のままに絶対にしないでください。そうすると、スレッドのグループ化が完全に無効になります。代わりに、必ず有効なスレッド ID を指定してください。新しいスレッドを作成する必要がある場合は、generate_id() のような関数を使用して一意の ID を生成してください。
thread_id が指定されていない場合、Weave の内部実装によってランダムな UUID v7 が自動的に生成されます。この動作は独自の generate_id() 関数で再現することも、任意の一意な文字列値を使用することもできます。
import weave
import uuidv7
import argparse
def generate_id():
"""UUID v7 を使用して一意のスレッド ID を生成する。"""
return str(uuidv7.uuidv7())
@weave.op
def load_history(session_id):
"""指定されたセッションの会話履歴を読み込む。"""
# ここに実装を記述する
return []
# セッション再開のためのコマンドライン引数を解析する
parser = argparse.ArgumentParser()
parser.add_argument("--session-id", help="再開する既存のセッション ID")
args = parser.parse_args()
# スレッド ID を決定する: 既存のセッションを再開するか新しいセッションを作成する
if args.session_id:
thread_id = args.session_id
print(f"セッションを再開中: {thread_id}")
else:
thread_id = generate_id()
print(f"新しいセッションを開始中: {thread_id}")
# calls をトラッキングするためのスレッドコンテキストを確立する
with weave.thread(thread_id) as thread_ctx:
# 会話履歴を読み込むか初期化する
history = load_history(thread_id)
print(f"アクティブなスレッド ID: {thread_ctx.thread_id}")
# ここにアプリケーションのロジックを記述する...
この例では、複数の連携したスレッドを使って複雑なアプリケーションをどのように構成するかを示します。
各レイヤーはそれぞれ独自のスレッドコンテキストで実行されるため、責務を明確に分離できます。親アプリケーションのスレッドは、共有の ThreadContext を使ってスレッド ID を設定し、これらのレイヤーを連携させます。システムの異なる部分を個別に分析または監視しつつ、同じセッションにひも付けたい場合は、このパターンを使用してください。
import weave
from contextlib import contextmanager
from typing import Dict
# ネストされたスレッドを調整するためのグローバルスレッドコンテキスト
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# Global instance
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""専用スレッドですべてのインフラストラクチャー操作を処理する"""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# 認証ロジック...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# 決済処理...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# 在庫管理...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""専用スレッドコンテキストですべてのインフラストラクチャー操作を実行する"""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""専用スレッドでビジネスロジックを処理する"""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# 検証ロジック...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# 価格計算...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# ビジネスルール...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""専用スレッドコンテキストですべてのビジネスロジックを実行する"""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""メインアプリケーションオーケストレーター"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""メインの注文処理 - アプリスレッドのターンになる"""
# ネストされた操作を専用スレッドで実行する
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# 最終オーケストレーション
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# グローバルスレッドコンテキストを使った調整の利用例
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# このリクエスト用のスレッドコンテキストをセットアップする
thread_ctx.setup_for_request(request_id)
# アプリスレッドコンテキストで実行する
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# 使用例
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# 想定されるスレッド構造:
#
# App Thread: app_req_789
# └── Turn: process_order() ← メインオーケストレーション
#
# Infra Thread: app_req_789_infra
# ├── Turn: authenticate_user() ← インフラストラクチャー操作 1
# ├── Turn: call_payment_gateway() ← インフラストラクチャー操作 2
# └── Turn: update_inventory() ← インフラストラクチャー操作 3
#
# Logic Thread: app_req_789_logic
# ├── Turn: validate_order() ← ビジネスロジック操作 1
# ├── Turn: calculate_pricing() ← ビジネスロジック操作 2
# └── Turn: apply_business_rules() ← ビジネスロジック操作 3
#
# メリット:
# - スレッド間での関心事の明確な分離
# - スレッド ID をパラメーターとして引き回す必要がない
# - app/infra/logic レイヤーの独立した監視
# - スレッドコンテキストによるグローバルな調整
エンドポイント: POST /threads/query
class ThreadsQueryReq:
project_id: str
limit: Optional[int] = None
offset: Optional[int] = None
sort_by: Optional[list[SortBy]] = None # サポートされるフィールド: thread_id, turn_count, start_time, last_updated
sortable_datetime_after: Optional[datetime] = None # グラニュール最適化によるスレッドのフィルター
sortable_datetime_before: Optional[datetime] = None # グラニュール最適化によるスレッドのフィルター
class ThreadSchema:
thread_id: str # スレッドの一意の ID
turn_count: int # このスレッド内のターン calls の数
start_time: datetime # このスレッド内のターン calls の最も早い開始時刻
last_updated: datetime # このスレッド内のターン calls の最も遅い終了時刻
class ThreadsQueryRes:
threads: List[ThreadSchema]
この例では、直近で更新されたスレッドを50件取得します。my-project は実際のプロジェクト ID に置き換えてください。
# 最近アクティブなスレッドを取得する
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sort_by=[SortBy(field="last_updated", direction="desc")],
limit=50
))
for thread in response.threads:
print(f"Thread {thread.thread_id}: {thread.turn_count} turns, last active {thread.last_updated}")
この例では、ターン数が多い順に、最もアクティブなスレッド20件を取得します。
# 最もアクティブなスレッドを取得する(ターン数が最多)
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sort_by=[SortBy(field="turn_count", direction="desc")],
limit=20
))
この例では、過去24時間以内に開始されたスレッドを返します。timedelta の days の値を調整すると、期間を変更できます。
from datetime import datetime, timedelta
# 過去24時間に開始されたスレッドを取得する
yesterday = datetime.now() - timedelta(days=1)
response = client.threads_query(ThreadsQueryReq(
project_id="my-project",
sortable_datetime_after=yesterday,
sort_by=[SortBy(field="start_time", direction="desc")]
))