Passer au contenu principal
Il s’agit d’un notebook interactif. Vous pouvez l’exécuter localement ou utiliser les liens ci-dessous :

Sorties structurées pour les systèmes multi-agents

OpenAI a publié les sorties structurées pour permettre aux utilisateurs de s’assurer que le modèle génère toujours des réponses conformes au schéma JSON fourni, sans avoir recours à des prompts formulés de manière insistante. Avec les sorties structurées, il n’est plus nécessaire de valider ni de retenter les réponses mal formatées. En utilisant le nouveau paramètre strict: true, nous pouvons garantir que la réponse respecte le schéma fourni. L’utilisation de sorties structurées dans un système multi-agents améliore la communication en garantissant des données cohérentes et faciles à traiter entre les agents. Elle renforce aussi la sécurité en autorisant des refus explicites et améliore les performances en supprimant le besoin de nouvelles tentatives ou de validations. Cela simplifie les interactions et augmente l’efficacité globale du système. Ce tutoriel montre comment utiliser les sorties structurées dans un système multi-agent et les tracer avec Weave.
Source : Ce cookbook est basé sur l’exemple de code d’OpenAI pour les sorties structurées, avec quelques modifications pour améliorer la visualisation avec Weave.

Installation des dépendances

Nous avons besoin des bibliothèques suivantes pour ce tutoriel :
  • OpenAI pour créer un système multi-agent.
  • Weave pour suivre notre flux de travail avec les LLM et évaluer nos stratégies de prompt.
!pip install -qU openai weave wandb
python
%%capture
# Solution temporaire pour corriger un bug dans openai :
# TypeError: Client.__init__() got an unexpected keyword argument 'proxies'
# Voir https://community.openai.com/t/error-with-openai-1-56-0-client-init-got-an-unexpected-keyword-argument-proxies/1040332/15
!pip install "httpx<0.28"
Nous définissons WANDB_API_KEY dans nos variables d’environnement afin de pouvoir nous connecter facilement avec wandb.login() (cela doit être fourni à Colab en tant que secret). Nous définissons le projet dans W&B dans lequel nous voulons consigner ces données dans name_of_wandb_project. REMARQUE : name_of_wandb_project peut aussi être au format {team_name}/{project_name} pour préciser l’équipe dans laquelle consigner les traces. Nous récupérons ensuite un client Weave en appelant weave.init(). Puisque nous utiliserons l’API OpenAI, nous aurons également besoin d’une clé API OpenAI. Vous pouvez vous inscrire sur la plateforme OpenAI pour obtenir votre propre clé API. (cela doit aussi être fourni à Colab en tant que secret.)
import base64
import json
import os
from io import BytesIO, StringIO

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import wandb
from google.colab import userdata
from openai import OpenAI

import weave
python
os.environ["WANDB_API_KEY"] = userdata.get("WANDB_API_KEY")
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")

wandb.login()
name_of_wandb_project = "multi-agent-structured-output"
weave.init(name_of_wandb_project)

client = OpenAI()
MODEL = "gpt-4o-2024-08-06"

Configuration des agents

Le cas d’usage que nous allons aborder est une tâche d’analyse de données. Commençons par configurer notre système de 4 agents :
  • Agent de triage : décide quel(s) agent(s) appeler
  • Agent de prétraitement des données : prépare les données pour l’analyse, par exemple en les nettoyant
  • Agent d’analyse de données : effectue l’analyse des données
  • Data Visualization Agent : visualise le résultat de l’analyse pour en extraire des enseignements Nous commencerons par définir les prompts système pour chacun de ces agents.
triaging_system_prompt = """You are a Triaging Agent. Your role is to assess the user's query and route it to the relevant agents. The agents available are:
- Data Processing Agent: Cleans, transforms, and aggregates data.
- Analysis Agent: Performs statistical, correlation, and regression analysis.
- Visualization Agent: Creates bar charts, line charts, and pie charts.

Use the send_query_to_agents tool to forward the user's query to the relevant agents. Also, use the speak_to_user tool to get more information from the user if needed."""

processing_system_prompt = """You are a Data Processing Agent. Your role is to clean, transform, and aggregate data using the following tools:
- clean_data
- transform_data
- aggregate_data"""

analysis_system_prompt = """You are an Analysis Agent. Your role is to perform statistical, correlation, and regression analysis using the following tools:
- stat_analysis
- correlation_analysis
- regression_analysis"""

visualization_system_prompt = """You are a Visualization Agent. Your role is to create bar charts, line charts, and pie charts using the following tools:
- create_bar_chart
- create_line_chart
- create_pie_chart"""
Nous définirons ensuite les outils de chaque agent. À l’exception de l’agent de triage, chaque agent disposera d’outils propres à son rôle : Agent de prétraitement des données : 1. Nettoyer les données, 2. Transformer les données, 3. Agréger les données Agent d’analyse des données : 1. Analyse statistique, 2. Analyse de corrélation, 3. Analyse de régression Agent de visualisation des données : 1. Créer un graphique en barres, 2. Créer un graphique linéaire, 3. Créer un diagramme circulaire
triage_tools = [
    {
        "type": "function",
        "function": {
            "name": "send_query_to_agents",
            "description": "Envoie la requête de l'utilisateur aux agents pertinents en fonction de leurs capacités.",
            "parameters": {
                "type": "object",
                "properties": {
                    "agents": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Un tableau de noms d'agents auxquels envoyer la requête.",
                    },
                    "query": {
                        "type": "string",
                        "description": "La requête de l'utilisateur à envoyer.",
                    },
                },
                "required": ["agents", "query"],
            },
        },
        "strict": True,
    }
]

preprocess_tools = [
    {
        "type": "function",
        "function": {
            "name": "clean_data",
            "description": "Nettoie les données fournies en supprimant les doublons et en gérant les valeurs manquantes.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Le jeu de données à nettoyer. Doit être dans un format approprié tel que JSON ou CSV.",
                    }
                },
                "required": ["data"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "transform_data",
            "description": "Transforme les données selon des règles spécifiées.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Les données à transformer. Doivent être dans un format approprié tel que JSON ou CSV.",
                    },
                    "rules": {
                        "type": "string",
                        "description": "Règles de transformation à appliquer, spécifiées dans un format structuré.",
                    },
                },
                "required": ["data", "rules"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "aggregate_data",
            "description": "Agrège les données selon les colonnes et opérations spécifiées.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Les données à agréger. Doivent être dans un format approprié tel que JSON ou CSV.",
                    },
                    "group_by": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Colonnes de regroupement.",
                    },
                    "operations": {
                        "type": "string",
                        "description": "Opérations d'agrégation à effectuer, spécifiées dans un format structuré.",
                    },
                },
                "required": ["data", "group_by", "operations"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

analysis_tools = [
    {
        "type": "function",
        "function": {
            "name": "stat_analysis",
            "description": "Effectue une analyse statistique sur le jeu de données fourni.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Le jeu de données à analyser. Doit être dans un format approprié tel que JSON ou CSV.",
                    }
                },
                "required": ["data"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "correlation_analysis",
            "description": "Calcule les coefficients de corrélation entre les variables du jeu de données.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Le jeu de données à analyser. Doit être dans un format approprié tel que JSON ou CSV.",
                    },
                    "variables": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Liste des variables pour lesquelles calculer les corrélations.",
                    },
                },
                "required": ["data", "variables"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "regression_analysis",
            "description": "Effectue une analyse de régression sur le jeu de données.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Le jeu de données à analyser. Doit être dans un format approprié tel que JSON ou CSV.",
                    },
                    "dependent_var": {
                        "type": "string",
                        "description": "La variable dépendante pour la régression.",
                    },
                    "independent_vars": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Liste des variables indépendantes.",
                    },
                },
                "required": ["data", "dependent_var", "independent_vars"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

visualization_tools = [
    {
        "type": "function",
        "function": {
            "name": "create_bar_chart",
            "description": "Crée un graphique à barres à partir des données fournies.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Les données pour le graphique à barres. Doivent être dans un format approprié tel que JSON ou CSV.",
                    },
                    "x": {"type": "string", "description": "Colonne pour l'axe des abscisses."},
                    "y": {"type": "string", "description": "Colonne pour l'axe des ordonnées."},
                },
                "required": ["data", "x", "y"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "create_line_chart",
            "description": "Crée un graphique en courbes à partir des données fournies.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Les données pour le graphique en courbes. Doivent être dans un format approprié tel que JSON ou CSV.",
                    },
                    "x": {"type": "string", "description": "Colonne pour l'axe des abscisses."},
                    "y": {"type": "string", "description": "Colonne pour l'axe des ordonnées."},
                },
                "required": ["data", "x", "y"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
    {
        "type": "function",
        "function": {
            "name": "create_pie_chart",
            "description": "Crée un graphique circulaire à partir des données fournies.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "Les données pour le graphique circulaire. Doivent être dans un format approprié tel que JSON ou CSV.",
                    },
                    "labels": {
                        "type": "string",
                        "description": "Colonne pour les étiquettes.",
                    },
                    "values": {
                        "type": "string",
                        "description": "Colonne pour les valeurs.",
                    },
                },
                "required": ["data", "labels", "values"],
                "additionalProperties": False,
            },
        },
        "strict": True,
    },
]

Activer le suivi d’un système multi-agent avec Weave

Nous devons écrire la logique nécessaire pour :
  • gérer la transmission de la requête utilisateur au système multi-agent
  • gérer le fonctionnement interne du système multi-agent
  • exécuter les appels d’outils
# Exemple de requête

user_query = """
Below is some data. I want you to first remove the duplicates then analyze the statistics of the data as well as plot a line chart.

house_size (m3), house_price ($)
90, 100
80, 90
100, 120
90, 100
"""
À partir de la requête de l’utilisateur, nous pouvons déduire que les outils à appeler sont clean_data, start_analysis et use_line_chart. Nous allons commencer par définir la fonction d’exécution chargée de lancer les appels d’outils. En décorant les fonctions Python avec @weave.op(), nous pouvons consigner dans le journal et déboguer les entrées, les sorties et les traces du modèle de langage. Lors de la création d’un système multi-agent, de nombreuses fonctions entreront en jeu, mais il suffit d’ajouter @weave.op() au-dessus de chacune d’elles.
@weave.op()
def clean_data(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    df_deduplicated = df.drop_duplicates()
    return df_deduplicated

@weave.op()
def stat_analysis(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    return df.describe()

@weave.op()
def plot_line_chart(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")

    x = df.iloc[:, 0]
    y = df.iloc[:, 1]

    coefficients = np.polyfit(x, y, 1)
    polynomial = np.poly1d(coefficients)
    y_fit = polynomial(x)

    plt.figure(figsize=(10, 6))
    plt.plot(x, y, "o", label="Data Points")
    plt.plot(x, y_fit, "-", label="Best Fit Line")
    plt.title("Line Chart with Best Fit Line")
    plt.xlabel(df.columns[0])
    plt.ylabel(df.columns[1])
    plt.legend()
    plt.grid(True)

    # Enregistrer le graphique dans un tampon BytesIO avant de l'afficher
    buf = BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)

    # Afficher le graphique
    plt.show()

    # Encoder l'image en base64 pour l'URL de données
    image_data = buf.getvalue()
    base64_encoded_data = base64.b64encode(image_data)
    base64_string = base64_encoded_data.decode("utf-8")
    data_url = f"data:image/png;base64,{base64_string}"

    return data_url

# Définir la fonction pour exécuter les outils
@weave.op()
def execute_tool(tool_calls, messages):
    for tool_call in tool_calls:
        tool_name = tool_call.function.name
        tool_arguments = json.loads(tool_call.function.arguments)

        if tool_name == "clean_data":
            # Simuler le nettoyage des données
            cleaned_df = clean_data(tool_arguments["data"])
            cleaned_data = {"cleaned_data": cleaned_df.to_dict()}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(cleaned_data)}
            )
            print("Cleaned data: ", cleaned_df)
        elif tool_name == "transform_data":
            # Simuler la transformation des données
            transformed_data = {"transformed_data": "sample_transformed_data"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(transformed_data),
                }
            )
        elif tool_name == "aggregate_data":
            # Simuler l'agrégation des données
            aggregated_data = {"aggregated_data": "sample_aggregated_data"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(aggregated_data),
                }
            )
        elif tool_name == "stat_analysis":
            # Simuler l'analyse statistique
            stats_df = stat_analysis(tool_arguments["data"])
            stats = {"stats": stats_df.to_dict()}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(stats)}
            )
            print("Statistical Analysis: ", stats_df)
        elif tool_name == "correlation_analysis":
            # Simuler l'analyse de corrélation
            correlations = {"correlations": "sample_correlations"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(correlations)}
            )
        elif tool_name == "regression_analysis":
            # Simuler l'analyse de régression
            regression_results = {"regression_results": "sample_regression_results"}
            messages.append(
                {
                    "role": "tool",
                    "name": tool_name,
                    "content": json.dumps(regression_results),
                }
            )
        elif tool_name == "create_bar_chart":
            # Simuler la création d'un graphique en barres
            bar_chart = {"bar_chart": "sample_bar_chart"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(bar_chart)}
            )
        elif tool_name == "create_line_chart":
            # Simuler la création d'un graphique linéaire
            line_chart = {"line_chart": plot_line_chart(tool_arguments["data"])}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(line_chart)}
            )
        elif tool_name == "create_pie_chart":
            # Simuler la création d'un diagramme circulaire
            pie_chart = {"pie_chart": "sample_pie_chart"}
            messages.append(
                {"role": "tool", "name": tool_name, "content": json.dumps(pie_chart)}
            )
    return messages
Ensuite, nous allons créer les gestionnaires d’outils pour chacun des sous-agents. Chacun reçoit un prompt et un ensemble d’outils qui lui sont propres, transmis au modèle. Le résultat est ensuite transmis à une fonction d’exécution qui lance les appels d’outils.
# Définir les fonctions pour gérer le traitement de chaque agent
@weave.op()
def handle_data_processing_agent(query, conversation_messages):
    messages = [{"role": "system", "content": processing_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=preprocess_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

@weave.op()
def handle_analysis_agent(query, conversation_messages):
    messages = [{"role": "system", "content": analysis_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=analysis_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

@weave.op()
def handle_visualization_agent(query, conversation_messages):
    messages = [{"role": "system", "content": visualization_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=visualization_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)
Enfin, nous créons l’outil principal chargé de traiter la requête de l’utilisateur. Cette fonction prend la requête de l’utilisateur, obtient une réponse du modèle et la transmet aux autres agents pour exécution.
# Fonction pour gérer les entrées utilisateur et le triage
@weave.op()
def handle_user_message(user_query, conversation_messages=None):
    if conversation_messages is None:
        conversation_messages = []
    user_message = {"role": "user", "content": user_query}
    conversation_messages.append(user_message)

    messages = [{"role": "system", "content": triaging_system_prompt}]
    messages.extend(conversation_messages)

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=triage_tools,
    )

    conversation_messages.append(
        [tool_call.function for tool_call in response.choices[0].message.tool_calls]
    )

    for tool_call in response.choices[0].message.tool_calls:
        if tool_call.function.name == "send_query_to_agents":
            agents = json.loads(tool_call.function.arguments)["agents"]
            query = json.loads(tool_call.function.arguments)["query"]
            for agent in agents:
                if agent == "Data Processing Agent":
                    handle_data_processing_agent(query, conversation_messages)
                elif agent == "Analysis Agent":
                    handle_analysis_agent(query, conversation_messages)
                elif agent == "Visualization Agent":
                    handle_visualization_agent(query, conversation_messages)

    outputs = extract_tool_contents(conversation_messages)

    return outputs

functions = [
    "clean_data",
    "transform_data",
    "stat_analysis",
    "aggregate_data",
    "correlation_analysis",
    "regression_analysis",
    "create_bar_chart",
    "create_line_chart",
    "create_pie_chart",
]

@weave.op()
def extract_tool_contents(data):
    contents = {}
    contents["all"] = data
    for element in data:
        if (
            isinstance(element, dict)
            and element.get("role") == "tool"
            and element.get("name") in functions
        ):
            name = element["name"]
            content_str = element["content"]
            try:
                content_json = json.loads(content_str)
                if "chart" not in element.get("name"):
                    contents[name] = [content_json]
                else:
                    first_key = next(iter(content_json))
                    second_level = content_json[first_key]
                    if isinstance(second_level, dict):
                        second_key = next(iter(second_level))
                        contents[name] = second_level[second_key]
                    else:
                        contents[name] = second_level
            except json.JSONDecodeError:
                print(f"Erreur de décodage JSON pour {name}")
                contents[name] = None

    return contents

Exécuter des systèmes multi-agents et leur visualisation dans Weave

Enfin, nous exécutons la fonction principale handle_user_message à partir de la saisie de l’utilisateur et observons les résultats.
handle_user_message(user_query)
Lorsque nous cliquons sur l’URL de Weave, nous pouvons voir le Tracing de l’exécution, comme ci-dessous. Sur la page Traces, nous pouvons vérifier l’entrée et la sortie. Par souci de clarté, des captures d’écran des résultats affichés lorsque l’on clique sur chaque sortie ont été ajoutées au diagramme. Weave fournit une intégration avec l’API d’OpenAI, ce qui permet de calculer automatiquement les coûts. Nous pouvons donc également confirmer que le coût et la latence s’affichent tout à droite. 1-1.png En cliquant sur une ligne, nous pouvons voir les processus intermédiaires exécutés au sein du système multi-agent. Par exemple, en examinant l’entrée et la sortie de analysis_agent, nous pouvons voir qu’elles sont au format de sortie structuré. La sortie structurée d’OpenAI facilite la collaboration entre agents, mais à mesure que le système devient plus complexe, il devient plus difficile de comprendre le format dans lequel ces interactions se déroulent. L’utilisation de Weave nous permet de comprendre ces processus intermédiaires ainsi que leurs entrées et sorties, comme si nous les avions sous les yeux.
3.png
Examinons de plus près comment le Tracing est géré dans Weave !

Conclusion

Dans ce tutoriel, nous avons vu comment développer facilement un système multi-agent à l’aide des sorties structurées et de Weave d’OpenAI pour suivre les entrées, les sorties finales et les formats de sortie intermédiaires.