import weave
from contextlib import contextmanager
from typing import Dict
# ネストされたスレッドを調整するためのグローバルスレッドコンテキスト
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# グローバルインスタンス
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""専用スレッドですべてのインフラストラクチャ操作を処理する"""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# 認証ロジック...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# 支払い処理...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# 在庫管理...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""専用スレッドコンテキストですべてのインフラストラクチャ操作を実行する"""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""専用スレッドでビジネスロジックを処理する"""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# 検証ロジック...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# 価格計算...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# ビジネスルール...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""専用スレッドコンテキストですべてのビジネスロジックを実行する"""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""メインアプリケーションオーケストレーター"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""メインの注文処理 - アプリスレッドのターンになる"""
# 専用スレッドでネストされた操作を実行する
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# 最終オーケストレーション
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# グローバルスレッドコンテキストの調整を使用した利用例
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# このリクエスト用のスレッドコンテキストをセットアップする
thread_ctx.setup_for_request(request_id)
# アプリスレッドコンテキストで実行する
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# 使用例
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# 想定されるスレッド構造:
#
# アプリスレッド: app_req_789
# └── ターン: process_order() ← メインオーケストレーション
#
# インフラスレッド: app_req_789_infra
# ├── ターン: authenticate_user() ← インフラストラクチャ操作 1
# ├── ターン: call_payment_gateway() ← インフラストラクチャ操作 2
# └── ターン: update_inventory() ← インフラストラクチャ操作 3
#
# ロジックスレッド: app_req_789_logic
# ├── ターン: validate_order() ← ビジネスロジック操作 1
# ├── ターン: calculate_pricing() ← ビジネスロジック操作 2
# └── ターン: apply_business_rules() ← ビジネスロジック操作 3
#
# メリット:
# - スレッド間での関心事の明確な分離
# - スレッド ID をパラメーターとして引き回す必要がない
# - アプリ/インフラ/ロジック層の独立した監視
# - スレッドコンテキストによるグローバルな調整