import weave
from contextlib import contextmanager
from typing import Dict
# Contexte de thread global pour la coordination des threads imbriqués
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# Instance globale
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""Gère toutes les opérations d'infrastructure dans un thread dédié"""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# Logique d'authentification...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# Traitement du paiement...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# Gestion des stocks...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""Exécute toutes les opérations d'infrastructure dans le contexte de thread dédié"""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""Gère la logique métier dans un thread dédié"""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# Logique de validation...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# Calculs de tarification...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# Règles métier...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""Exécute toute la logique métier dans le contexte de thread dédié"""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""Orchestrateur principal de l'application"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""Traitement principal des commandes — devient un tour dans le thread de l'application"""
# Exécute les opérations imbriquées dans leurs threads dédiés
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# Orchestration finale
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# Utilisation avec coordination du contexte de thread global
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# Configuration du contexte de thread pour cette requête
thread_ctx.setup_for_request(request_id)
# Exécution dans le contexte de thread de l'application
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# Exemple d'utilisation
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# Structure de threads attendue :
#
# Thread applicatif : app_req_789
# └── Tour : process_order() ← Orchestration principale
#
# Thread d'infrastructure : app_req_789_infra
# ├── Tour : authenticate_user() ← Opération d'infrastructure 1
# ├── Tour : call_payment_gateway() ← Opération d'infrastructure 2
# └── Tour : update_inventory() ← Opération d'infrastructure 3
#
# Thread de logique : app_req_789_logic
# ├── Tour : validate_order() ← Opération de logique métier 1
# ├── Tour : calculate_pricing() ← Opération de logique métier 2
# └── Tour : apply_business_rules() ← Opération de logique métier 3
#
# Avantages :
# - Séparation claire des responsabilités entre les threads
# - Pas de passage des IDs de thread en paramètre
# - Surveillance indépendante des couches application/infrastructure/logique
# - Coordination globale via le contexte de thread