Skip to main content
Utilisez Dagster et W&B (W&B) pour orchestrer vos pipelines MLOps et gérer vos assets ML. L’intégration avec W&B facilite dans Dagster les opérations suivantes :
  • Créer et utiliser un W&B Artifact.
  • Utiliser et créer des Registered Models dans W&B Registry.
  • Exécuter des tâches d’entraînement sur une infrastructure dédiée à l’aide de W&B Launch.
  • Utiliser le client wandb dans les ops et les assets.
L’intégration W&B pour Dagster fournit une ressource Dagster spécifique à W&B et un IO Manager :
  • wandb_resource : une ressource Dagster utilisée pour s’authentifier auprès de l’API W&B et communiquer avec elle.
  • wandb_artifacts_io_manager : un IO Manager Dagster utilisé pour consommer des W&B Artifacts.
Le guide suivant explique comment remplir les prérequis pour utiliser W&B dans Dagster, comment créer et utiliser des W&B Artifacts dans les ops et les assets, comment utiliser W&B Launch, ainsi que les bonnes pratiques recommandées.

Avant de commencer

Vous aurez besoin des ressources suivantes pour utiliser Dagster dans W&B :
  1. Clé API W&B.
  2. entité W&B (utilisateur ou équipe) : une entité est un nom d’utilisateur ou un nom d’équipe auquel vous envoyez des Runs et des Artifacts W&B. Assurez-vous de créer votre compte ou votre entité d’équipe dans l’interface de l’application W&B avant d’enregistrer des runs. Si vous ne spécifiez pas d’entité, le run sera envoyé à votre entité par défaut, qui correspond généralement à votre nom d’utilisateur. Modifiez votre entité par défaut dans vos paramètres, sous Valeurs par défaut du projet.
  3. projet W&B : le nom du projet dans lequel les Runs W&B sont stockés.
Pour trouver votre entité W&B, consultez la page de profil de cet utilisateur ou de cette équipe dans l’application W&B. Vous pouvez utiliser un projet W&B existant ou en créer un nouveau. Vous pouvez créer de nouveaux projets sur la page d’accueil de l’application W&B ou sur la page de profil de l’utilisateur/de l’équipe. Si un projet n’existe pas, il sera automatiquement créé lorsque vous l’utiliserez pour la première fois.

Configurer votre clé API

  1. Connectez-vous à W&B. Remarque : si vous utilisez W&B Server, demandez à votre administrateur le nom d’hôte de l’instance.
  2. Créez une clé API dans les Paramètres utilisateur. Pour un environnement de production, nous vous recommandons d’utiliser un compte de service comme propriétaire de cette clé.
  3. Définissez une variable d’environnement pour cette clé API : export WANDB_API_KEY=YOUR_KEY.
Les exemples ci-dessous montrent où spécifier votre clé API dans votre code Dagster. Veillez à renseigner votre entité et le nom de votre projet dans le dictionnaire imbriqué wandb_config. Vous pouvez transmettre des valeurs wandb_config différentes à différentes ops/assets si vous souhaitez utiliser un autre projet W&B. Pour plus d’informations sur les clés que vous pouvez transmettre, voir la section Configuration ci-dessous.
Exemple : configuration pour @job
# ajoutez ceci à votre config.yaml
# vous pouvez également définir la configuration dans le Launchpad de Dagit ou JobDefinition.execute_in_process
# Référence : https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # remplacez ceci par votre entité W&B
     project: my_project # remplacez ceci par votre projet W&B


@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()

Configuration

Les options de configuration suivantes servent de paramètres pour la ressource Dagster spécifique à W&B et l’IO Manager fourni par l’intégration.
  • wandb_resource : ressource Dagster utilisée pour communiquer avec l’API W&B. Elle s’authentifie automatiquement à l’aide de la clé API fournie. Propriétés :
    • api_key : (str, requis) : une clé API W&B nécessaire pour communiquer avec l’API W&B.
    • host : (str, facultatif) : le serveur hôte de l’API que vous souhaitez utiliser. Requis uniquement si vous utilisez W&B Server. Par défaut, l’hôte Public Cloud https://api.wandb.ai est utilisé.
  • wandb_artifacts_io_manager : IO Manager Dagster permettant de consommer des W&B Artifacts. Propriétés :
    • base_dir : (int, facultatif) Répertoire de base utilisé pour le stockage local et la mise en cache. Les W&B Artifacts et les journaux des Runs W&B seront écrits dans ce répertoire et lus depuis celui-ci. Par défaut, le répertoire DAGSTER_HOME est utilisé.
    • cache_duration_in_minutes : (int, facultatif) permet de définir la durée pendant laquelle les W&B Artifacts et les journaux des Runs W&B doivent être conservés dans le stockage local. Seuls les fichiers et répertoires qui n’ont pas été ouverts pendant cette durée sont supprimés du cache. La purge du cache a lieu à la fin de l’exécution de l’IO Manager. Vous pouvez définir cette valeur sur 0 si vous souhaitez désactiver complètement la mise en cache. La mise en cache accélère l’exécution lorsqu’un Artifact est réutilisé entre des jobs exécutés sur la même machine. La valeur par défaut est de 30 jours.
    • run_id : (str, facultatif) : un ID unique pour ce run, utilisé pour la reprise. Il doit être unique dans le projet, et si vous supprimez un run, vous ne pouvez pas réutiliser cet ID. Utilisez le champ name pour un nom descriptif court, ou config pour enregistrer des hyperparamètres à comparer entre les runs. L’ID ne peut pas contenir les caractères spéciaux suivants : /\#?%:.. Vous devez définir le Run ID lorsque vous effectuez un suivi des expériences dans Dagster afin de permettre à l’IO Manager de reprendre le run. Par défaut, il est défini sur le Dagster Run ID, par exemple 7e4df022-1bf2-44b5-a383-bb852df4077e.
    • run_name : (str, facultatif) Un nom d’affichage court pour ce run afin de vous aider à l’identifier dans l’UI. Par défaut, il s’agit d’une chaîne au format suivant : dagster-run-[8 premiers caractères du Dagster Run ID]. Par exemple, dagster-run-7e4df022.
    • run_tags : (list[str], facultatif) : une liste de chaînes qui alimentera la liste des tags de ce run dans l’UI. Les tags sont utiles pour regrouper des runs ou appliquer des étiquettes temporaires comme baseline ou production. Il est facile d’ajouter et de supprimer des tags dans l’UI, ou de filtrer pour n’afficher que les runs ayant un tag spécifique. Tout W&B Run utilisé par l’intégration aura le tag dagster_wandb.

Utiliser les W&B Artifacts

L’intégration avec un W&B Artifact repose sur un Dagster IO Manager. Les IO Managers sont des objets fournis par l’utilisateur, chargés de stocker la sortie d’un asset ou d’une op, puis de la charger comme entrée pour les assets ou ops en aval. Par exemple, un IO Manager peut stocker et charger des objets à partir de fichiers sur un système de fichiers. L’intégration fournit un IO Manager pour les W&B Artifacts. Cela permet à n’importe quel @op ou @asset Dagster de créer et de consommer des W&B Artifacts de manière native. Voici un exemple simple d’un @asset qui produit un W&B Artifact de type jeu de données contenant une liste Python.
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # ceci sera stocké dans un Artifact
Vous pouvez annoter vos @op, @asset et @multi_asset avec une configuration de métadonnées pour écrire des Artifacts. De même, vous pouvez également consommer des W&B Artifacts même s’ils ont été créés en dehors de Dagster.

Écrire des W&B Artifacts

Avant de continuer, nous vous recommandons de bien comprendre comment utiliser les W&B Artifacts. Nous vous conseillons de lire le Guide sur les Artifacts. Pour écrire un W&B Artifact, renvoyez un objet depuis une fonction Python. W&B prend en charge les objets suivants :
  • Objets Python (int, dict, list…)
  • Objets W&B (Table, Image, Graph…)
  • Objets W&B Artifact
Les exemples suivants montrent comment écrire des W&B Artifacts avec des assets Dagster (@asset) :
Tout ce qui peut être sérialisé avec le module pickle est sérialisé avec pickle et ajouté à un Artifact créé par l’intégration. Le contenu est désérialisé lorsque vous lisez cet Artifact dans Dagster (voir Lire les artifacts pour plus de détails).
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]
W&B prend en charge plusieurs modules de sérialisation basés sur Pickle (pickle, dill, cloudpickle, joblib). Vous pouvez également utiliser des formats de sérialisation plus avancés comme ONNX ou PMML. Pour en savoir plus, consultez la section Serialization.

Configuration

Un dictionnaire de configuration appelé wandb_artifact_configuration peut être défini pour @op, @asset et @multi_asset. Ce dictionnaire doit être transmis dans les arguments du décorateur en tant que métadonnées. Cette configuration est requise pour contrôler les lectures et écritures de l’IO Manager de W&B Artifacts. Pour @op, il se trouve dans les métadonnées de sortie via l’argument de métadonnées Out. Pour @asset, il se trouve dans l’argument metadata de l’asset. Pour @multi_asset, il se trouve dans les métadonnées de chaque sortie via les arguments de métadonnées AssetOut. Les exemples de code ci-dessous montrent comment configurer un dictionnaire pour des calculs @op, @asset et @multi_asset :
Exemple pour @op :
@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]
Propriétés prises en charge :
  • name: (str) nom lisible par l’humain pour cet artifact, ce qui vous permet d’identifier cet artifact dans l’UI ou d’y faire référence dans les appels use_artifact. Les noms peuvent contenir des lettres, des chiffres, des underscores, des traits d’union et des points. Le nom doit être unique à l’échelle d’un projet. Requis pour @op.
  • type: (str) type de l’artifact, utilisé pour organiser et différencier les artifacts. Les types courants incluent les jeux de données ou les modèles, mais vous pouvez utiliser n’importe quelle chaîne contenant des lettres, des chiffres, des underscores, des traits d’union et des points. Requis lorsque la sortie n’est pas déjà un Artifact.
  • description: (str) Texte libre qui décrit l’artifact. La description est interprétée en markdown dans l’UI, c’est donc un bon endroit pour ajouter des tableaux, des liens, etc.
  • aliases: (list[str]) Un tableau contenant un ou plusieurs alias que vous souhaitez appliquer à l’Artifact. L’intégration ajoutera également le tag “latest” à cette liste, qu’il soit défini ou non. C’est un moyen efficace de gérer les versions des modèles et des jeux de données.
  • add_dirs: (list[dict[str, Any]]): Un tableau contenant la configuration de chaque répertoire local à inclure dans l’Artifact.
  • add_files: (list[dict[str, Any]]): Un tableau contenant la configuration de chaque fichier local à inclure dans l’Artifact.
  • add_references: (list[dict[str, Any]]): Un tableau contenant la configuration de chaque référence externe à inclure dans l’Artifact.
  • serialization_module: (dict) Configuration du module de sérialisation à utiliser. Se référer à la section Serialization pour plus d’informations.
    • name: (str) Nom du module de sérialisation. Valeurs acceptées : pickle, dill, cloudpickle, joblib. Le module doit être disponible localement.
    • parameters: (dict[str, Any]) Arguments facultatifs transmis à la fonction de sérialisation. Elle accepte les mêmes paramètres que la méthode dump de ce module. Par exemple, {"compress": 3, "protocol": 4}.
Exemple avancé :
@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]
L’asset est matérialisé avec des métadonnées utiles de part et d’autre de l’intégration :
  • Côté W&B : le nom et la version de l’intégration source, la version de Python utilisée, la version du protocole pickle, entre autres.
  • Côté Dagster :
    • ID du run Dagster
    • Run W&B : ID, nom, chemin, URL
    • W&B Artifact : ID, nom, type, version, taille, URL
    • Entité W&B
    • Projet W&B
L’image suivante montre les métadonnées issues de W&B qui ont été ajoutées à l’asset Dagster. Ces informations sont transmises à Dagster par l’intégration.
UI de Dagster avec une vue détaillée d'un asset et des métadonnées W&B associées, y compris des références à un projet et à un run W&B
L’image suivante montre comment la configuration fournie a été enrichie avec des métadonnées utiles dans le W&B Artifact. Ces informations devraient faciliter la reproductibilité et la maintenance. Elles ne seraient pas disponibles sans l’intégration.
Page de W&B Artifact avec des métadonnées de configuration enrichies provenant de Dagster
Panneau de métadonnées du W&B Artifact avec des détails de configuration supplémentaires provenant de Dagster
Vue du W&B Artifact avec des champs de métadonnées de configuration supplémentaires enrichis depuis Dagster
Si vous utilisez un vérificateur de types statique comme mypy, importez l’objet de définition du type de configuration comme suit :
from dagster_wandb import WandbArtifactConfiguration

Utiliser les partitions

L’intégration prend nativement en charge les partitions Dagster. Voici un exemple de partitionnement avec DailyPartitionsDefinition.
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"Creating partitioned asset for {partition_key}")
    return random.randint(0, 100)
Ce code produira un W&B Artifact pour chaque partition. Consultez les Artifacts dans le panneau Artifact (UI), sous le nom de l’asset, avec la clé de partition ajoutée. Par exemple, my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02 ou my_daily_partitioned_asset.2023-01-03. Les assets partitionnés sur plusieurs dimensions affichent chaque dimension dans un format délimité par des points. Par exemple, my_asset.car.blue.
L’intégration ne permet pas de matérialiser plusieurs partitions au sein d’un même run. Vous devrez effectuer plusieurs runs pour matérialiser vos assets. Vous pouvez le faire dans Dagit lorsque vous matérialisez vos assets.
Interface Dagster avec plusieurs runs pour des assets partitionnés, chaque partition correspondant à un run distinct

Utilisation avancée

Lire W&B Artifacts

La lecture des W&B Artifacts est similaire à leur écriture. Un dictionnaire de configuration appelé wandb_artifact_configuration peut être défini sur un @op ou un @asset. La seule différence est que la configuration doit être définie sur l’entrée plutôt que sur la sortie. Pour @op, elle se trouve dans les métadonnées d’entrée via l’argument de métadonnées In. Vous devez indiquer explicitement le nom de l’Artifact. Pour @asset, elle se trouve dans les métadonnées d’entrée via l’argument de métadonnées Asset In. Vous ne devez pas transmettre de nom d’Artifact, car le nom de l’asset parent doit correspondre. Si vous souhaitez créer une dépendance sur un Artifact créé en dehors de l’intégration, vous devrez utiliser SourceAsset. Il lira toujours la dernière version de cet asset. Les exemples suivants montrent comment lire un Artifact à partir de différents ops.
Lecture d’un artifact à partir d’un @op
@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)

Configuration

La configuration suivante sert à indiquer ce que l’IO Manager doit collecter et fournir en entrée aux fonctions décorées. Les modes de lecture suivants sont pris en charge.
  1. Pour obtenir un objet nommé contenu dans un Artifact, utilisez get :
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
  1. Pour obtenir le chemin local d’un fichier téléchargé présent dans un Artifact, utilisez get_path :
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
  1. Pour obtenir l’objet Artifact dans son intégralité (avec le contenu téléchargé localement) :
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)
Propriétés prises en charge
  • get: (str) Obtient l’objet W&B correspondant au nom relatif de l’artifact.
  • get_path: (str) Obtient le chemin du fichier correspondant au nom relatif de l’artifact.

Configuration de la sérialisation

Par défaut, l’intégration utilise le module standard pickle, mais certains objets ne sont pas compatibles avec celui-ci. Par exemple, les fonctions contenant yield génèrent une erreur si vous essayez de les sérialiser avec pickle. Nous prenons également en charge d’autres modules de sérialisation basés sur Pickle (dill, cloudpickle, joblib). Vous pouvez aussi utiliser des méthodes de sérialisation plus avancées comme ONNX ou PMML, en renvoyant une chaîne sérialisée ou en créant directement un Artifact. Le choix le plus adapté dépend de votre cas d’usage ; veuillez vous référer à la documentation disponible sur ce sujet.

Modules de sérialisation basés sur pickle

Le pickling est réputé non sécurisé. Si la sécurité est un enjeu, utilisez uniquement des objets W&B. Nous vous recommandons de signer vos données et de stocker les clés de hachage dans vos propres systèmes. Pour des cas d’usage plus complexes, n’hésitez pas à nous contacter : nous serons ravis de vous aider.
Vous pouvez configurer la sérialisation utilisée via le dictionnaire serialization_module dans wandb_artifact_configuration. Assurez-vous que le module est disponible sur la machine exécutant Dagster. L’intégration saura automatiquement quel module de sérialisation utiliser lorsque vous lirez cet Artifact. Les modules actuellement pris en charge sont pickle, dill, cloudpickle et joblib. Voici un exemple simplifié dans lequel nous créons un « modèle » sérialisé avec joblib, puis l’utilisons pour l’inférence.
@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # Ce n'est pas un vrai modèle ML, mais cela ne serait pas possible avec le module pickle
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # Affiche : 3
    return inference_result

Formats de sérialisation avancés (ONNX, PMML)

Il est courant d’utiliser des formats de fichier d’échange comme ONNX et PMML. L’intégration prend en charge ces formats, mais cela demande un peu plus de travail qu’une sérialisation basée sur Pickle. Il existe deux méthodes pour utiliser ces formats.
  1. Convertissez votre modèle dans le format sélectionné, puis renvoyez sa représentation sous forme de chaîne comme s’il s’agissait d’un objet Python ordinaire. L’intégration sérialisera cette chaîne avec pickle. Vous pourrez ensuite reconstruire votre modèle à partir de cette chaîne.
  2. Créez un nouveau fichier local contenant votre modèle sérialisé, puis créez un Artifact personnalisé avec ce fichier à l’aide de la configuration add_file.
Voici un exemple de modèle Scikit-learn sérialisé avec ONNX.
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # Inspiré de https://onnx.ai/sklearn-onnx/

    # Entraîner un modèle.
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # Convertir au format ONNX
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # Écrire les artifacts (modèle + test_set)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # Inspiré de https://onnx.ai/sklearn-onnx/

    # Calculer la prédiction avec ONNX Runtime
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx

Utilisation des partitions

L’intégration prend nativement en charge les partitions Dagster. Vous pouvez lire une, plusieurs ou toutes les partitions d’un asset de manière sélective. Toutes les partitions sont fournies dans un dictionnaire, où la clé et la valeur représentent respectivement la clé de partition et le contenu de l’Artifact.
Cet exemple lit toutes les partitions de l’@asset amont, fournies sous forme de dictionnaire. Dans ce dictionnaire, la clé et la valeur correspondent respectivement à la clé de partition et au contenu de l’Artifact.
@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
L’objet de configuration metadata détermine comment W&B interagit avec les différentes partitions d’artifact dans votre projet. L’objet metadata contient une clé nommée wandb_artifact_configuration, qui contient elle-même un objet imbriqué partitions. L’objet partitions associe le nom de chaque partition à sa configuration. La configuration de chaque partition peut préciser comment récupérer ses données. Ces configurations peuvent contenir différentes clés, à savoir get, version et alias, selon les exigences de chaque partition. Clés de configuration
  1. get: La clé get spécifie le nom de l’objet W&B (Table, Image…) depuis lequel récupérer les données.
  2. version: La clé version est utilisée lorsque vous souhaitez récupérer une version spécifique de l’Artifact.
  3. alias: La clé alias vous permet d’obtenir l’Artifact via son alias.
Configuration générique Le caractère générique "*" désigne toutes les partitions non configurées. Il fournit une configuration par défaut pour les partitions qui ne sont pas explicitement mentionnées dans l’objet partitions. Par exemple,
"*": {
    "get": "default_table_name",
},
Cette configuration signifie que, pour toutes les partitions non configurées explicitement, les données sont récupérées depuis le tableau nommé default_table_name. Configuration spécifique de partition Vous pouvez remplacer la configuration générique pour certaines partitions en fournissant leur propre configuration à l’aide de leur clé. Par exemple,
"yellow": {
    "get": "custom_table_name",
},
Cette configuration signifie que, pour la partition nommée yellow, les données seront récupérées depuis la table nommée custom_table_name, en remplaçant la configuration générique. Gestion des versions et des alias À des fins de gestion des versions et des alias, vous pouvez spécifier des clés version et alias spécifiques dans votre configuration. Pour les versions,
"orange": {
    "version": "v0",
},
Cette configuration récupérera les données de la partition de l’Artifact orange en version v0. Pour les alias,
"blue": {
    "alias": "special_alias",
},
Cette configuration récupérera des données depuis le tableau default_table_name de la partition Artifact avec l’alias special_alias (désigné par blue dans la configuration).

Utilisation avancée

Pour consulter des cas d’utilisation avancée de l’intégration, se référer aux exemples de code complets suivants :

Utiliser W&B Launch

Produit bêta en cours de développement actif Launch vous intéresse ? Contactez l’équipe en charge de votre compte pour discuter de votre participation au programme pilote client de W&B Launch. Les clients pilotes doivent utiliser AWS EKS ou SageMaker pour être éligibles au programme bêta. À terme, nous prévoyons de prendre en charge d’autres plateformes.
Avant de continuer, nous vous recommandons de bien comprendre comment utiliser W&B Launch. Nous vous conseillons de lire le Guide de Launch. L’intégration Dagster permet de :
  • Exécuter un ou plusieurs agents Launch dans votre instance Dagster.
  • Exécuter des jobs Launch locaux dans votre instance Dagster.
  • Exécuter des jobs Launch à distance, sur site ou dans le cloud.

agent Launch

L’intégration fournit un @op importable appelé run_launch_agent. Il démarre un agent Launch et l’exécute comme un processus de longue durée jusqu’à ce qu’il soit arrêté manuellement. Les agents sont des processus qui interrogent les files d’attente de Launch et exécutent les jobs dans l’ordre (ou les transmettent à des services externes pour exécution). Référez-vous à la page Launch. Vous pouvez également consulter des descriptions utiles pour toutes les propriétés dans Launchpad.
Interface W&B Launchpad avec les options de configuration de l’agent et les descriptions pour l’intégration Dagster
Exemple simple
# ajoutez ceci à votre config.yaml
# vous pouvez également définir la configuration dans le Launchpad de Dagit ou via JobDefinition.execute_in_process
# Référence : https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # remplacez par votre entité W&B
     project: my_project # remplacez par votre projet W&B
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues: 
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()

Jobs Launch

L’intégration fournit un @op importable appelé run_launch_job. Il exécute votre job Launch. Pour être exécuté, un job Launch doit être assigné à une file d’attente. Vous pouvez créer une file d’attente ou utiliser celle par défaut. Assurez-vous qu’un agent actif écoute cette file d’attente. Vous pouvez exécuter un agent dans votre instance Dagster, mais vous pouvez aussi envisager d’utiliser un agent déployable sur Kubernetes. Consultez la page Launch. Vous pouvez également consulter dans Launchpad des descriptions utiles pour toutes les propriétés.
Interface W&B Launchpad avec des options de configuration de job et des descriptions pour l’intégration Dagster
Exemple simple
# ajoutez ceci à votre config.yaml
# vous pouvez également définir la configuration dans le Launchpad de Dagit ou via JobDefinition.execute_in_process
# Référence : https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # remplacez par votre entité W&B
     project: my_project # remplacez par votre projet W&B
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch


from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # on renomme le job avec un alias

Bonnes pratiques

  1. Utilisez l’IO Manager pour lire et écrire des Artifacts. Évitez d’utiliser directement Artifact.download() ou Run.log_artifact(). Ces méthodes sont prises en charge par l’intégration. Renvoyez plutôt les données que vous souhaitez stocker dans l’Artifact et laissez l’intégration faire le reste. Cette approche offre une meilleure traçabilité pour l’Artifact.
  2. Ne créez vous-même un objet Artifact que pour des cas d’usage complexes. Les objets Python et les objets W&B doivent être renvoyés depuis vos ops/assets. L’intégration se charge d’assembler l’Artifact. Pour les cas d’usage complexes, vous pouvez créer un Artifact directement dans un job Dagster. Nous vous recommandons de transmettre un objet Artifact à l’intégration pour enrichir les métadonnées, par exemple avec le nom et la version de l’intégration source, la version de Python utilisée, la version du protocole pickle, etc.
  3. Ajoutez des fichiers, des répertoires et des références externes à vos Artifacts via les métadonnées. Utilisez l’objet d’intégration wandb_artifact_configuration pour ajouter des fichiers, des répertoires ou des références externes (Amazon S3, GCS, HTTP…). Voir l’exemple avancé dans la section de configuration de l’Artifact pour plus d’informations.
  4. Utilisez un @asset plutôt qu’un @op lorsqu’un Artifact est produit. Les Artifacts sont des assets. Il est recommandé d’utiliser un asset lorsque Dagster gère cet asset. Cela offre une meilleure observabilité dans le catalogue d’assets de Dagit.
  5. Utilisez un SourceAsset pour consommer un Artifact créé en dehors de Dagster. Cela vous permet de tirer parti de l’intégration pour lire des Artifacts créés hors de Dagster. Sinon, vous ne pouvez utiliser que les Artifacts créés par l’intégration.
  6. Utilisez W&B Launch pour orchestrer l’entraînement sur des ressources de calcul dédiées pour les grands modèles. Vous pouvez entraîner de petits modèles dans votre cluster Dagster, et exécuter Dagster dans un cluster Kubernetes avec des nœuds GPU. Nous vous recommandons d’utiliser W&B Launch pour l’entraînement de grands modèles. Cela évite de surcharger votre instance et donne accès à des ressources de calcul plus adaptées.
  7. Lors du suivi des expériences dans Dagster, définissez votre ID de run W&B sur la valeur de votre ID de run Dagster. Nous vous recommandons de faire les deux : rendre le Run reprenable et définir l’ID de run W&B sur l’ID de run Dagster ou sur la chaîne de votre choix. En suivant cette recommandation, vous vous assurez que vos métriques W&B et vos Artifacts W&B sont stockés dans le même Run W&B lorsque vous entraînez des modèles dans Dagster.
Ou définissez l’ID de run W&B sur l’ID de run Dagster.
wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)
Ou choisissez votre propre ID de run W&B et renseignez-le dans la configuration de l’IO Manager.
wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
  1. Ne collectez que les données dont vous avez besoin avec get ou get_path pour les W&B Artifacts volumineux. Par défaut, l’intégration télécharge l’intégralité d’un Artifact. Si vous utilisez des artifacts très volumineux, vous pouvez ne collecter que les fichiers ou objets spécifiques dont vous avez besoin. Cela améliorera la vitesse et l’utilisation des ressources.
  2. Pour les objets Python, adaptez le module de pickling à votre cas d’utilisation. Par défaut, l’intégration W&B utilise le module standard pickle. Mais certains objets ne sont pas compatibles avec celui-ci. Par exemple, les fonctions qui utilisent yield génèrent une erreur si vous essayez de les sérialiser avec pickle. W&B prend en charge d’autres modules de sérialisation basés sur Pickle (dill, cloudpickle, joblib).
Vous pouvez également utiliser des formats de sérialisation plus avancés comme ONNX ou PMML en renvoyant une chaîne sérialisée ou en créant directement un Artifact. Le bon choix dépend de votre cas d’utilisation ; reportez-vous à la littérature disponible sur ce sujet.