Dagster と W&B (W&B) を使用して、MLOps パイプラインをオーケストレーションし、ML アセットを管理できます。W&B とのインテグレーションにより、Dagster 内で次のことを簡単に行えます。
W&B Dagster インテグレーションでは、W&B 専用の Dagster リソースと IO Manager を提供します。
wandb_resource: W&B API への認証と通信に使用する Dagster リソース。
wandb_artifacts_io_manager: W&B Artifacts を利用するための Dagster IO Manager。
次のガイドでは、Dagster で W&B を使用するための前提条件を満たす方法、ops と assets で W&B Artifacts を作成して使用する方法、W&B Launch の使い方、推奨されるベストプラクティスを紹介します。
W&B で Dagster を使用するには、次のリソースが必要です。
- W&B APIキー。
- W&B entity (ユーザーまたはチーム) : entity とは、W&B Runs と Artifacts の送信先となるユーザー名またはチーム名です。run をログする前に、W&B App UI でアカウントまたはチーム entity を作成しておいてください。entity を指定しない場合、run はデフォルトの entity (通常はユーザー名) に送信されます。デフォルトの entity は、Settings の Project Defaults で変更できます。
- W&B プロジェクト: W&B Runs が保存されるプロジェクトの名です。
W&B entity は、W&B App で該当するユーザーまたはチームのプロフィールページを確認すると見つけられます。W&B プロジェクトは既存のものを使用することも、新しく作成することもできます。新しいプロジェクトは、W&B App のホームページまたはユーザー/チームのプロフィールページで作成できます。プロジェクトが存在しない場合は、最初に使用したときに自動的に作成されます。
- W&Bにログインします。注: W&B Server を使用している場合は、インスタンスのホスト名を管理者に確認してください。
- User Settings でAPIキーを作成します。本番環境では、そのキーの所有にサービスアカウントを使用することを推奨します。
- そのAPIキー用の環境変数を設定します:
export WANDB_API_KEY=YOUR_KEY。
以下の例では、Dagster のコード内でAPIキーを指定する場所を示します。wandb_config のネストされた辞書内に、entity とプロジェクト名を必ず指定してください。別の W&B プロジェクトを使用したい場合は、op や アセット ごとに異なる wandb_config の値を渡せます。指定できるキーの詳細については、以下の設定セクションを参照してください。
例: アセット を使用した @repository の設定from dagster_wandb import wandb_artifacts_io_manager, wandb_resource
from dagster import (
load_assets_from_package_module,
make_values_resource,
repository,
with_resources,
)
from . import assets
@repository
def my_repository():
return [
*with_resources(
load_assets_from_package_module(assets),
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
"wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
{"cache_duration_in_minutes": 60} # ファイルは1時間だけキャッシュします
),
},
resource_config_by_key={
"wandb_config": {
"config": {
"entity": "my_entity", # これを自分のW&B entityに置き換えます
"project": "my_project", # これを自分のW&Bプロジェクトに置き換えます
}
}
},
),
]
この例では、@job の例とは異なり、IO Manager のキャッシュ時間を設定しています。
以下の設定オプションは、インテグレーションで提供される W&B 専用の Dagster リソース および IO Manager の設定として使用します。
wandb_resource: W&B API との通信に使用する Dagster リソース です。指定した APIキーを使って自動的に認証されます。プロパティ:
api_key: (str, required): W&B API との通信に必要な W&B APIキー。
host: (str, optional): 使用する API ホストサーバー。W&B Server を使用している場合にのみ必要です。デフォルトは Public Cloud ホスト https://api.wandb.ai です。
wandb_artifacts_io_manager: W&B Artifacts を利用するための Dagster IO Manager です。プロパティ:
base_dir: (int, optional) ローカルストレージおよびキャッシュに使用するベースディレクトリー。W&B Artifacts と W&B Run のログは、このディレクトリーに書き込まれ、ここから読み込まれます。デフォルトでは DAGSTER_HOME ディレクトリーを使用します。
cache_duration_in_minutes: (int, optional) W&B Artifacts と W&B Run のログをローカルストレージに保持する時間を定義します。その時間が経過するまで開かれなかったファイルとディレクトリーのみがキャッシュから削除されます。キャッシュのパージは IO Manager の実行終了時に行われます。キャッシュを完全に無効にしたい場合は、0 に設定できます。同じマシン上で実行されるジョブ間で Artifact が再利用される場合、キャッシュによって処理速度が向上します。デフォルトは 30 日です。
run_id: (str, optional): 再開に使用する、この run の一意の ID です。プロジェクト内で一意である必要があり、run を削除した場合、その ID は再利用できません。短い説明的な名前には name フィールドを使用し、run 間で比較するためにハイパーパラメーターを保存するには config を使用します。ID には次の特殊文字を含めることはできません: /\#?%:.. Dagster 内で実験管理を行う場合は、IO Manager が run を再開できるように Run ID を設定する必要があります。デフォルトでは Dagster Run ID (例: 7e4df022-1bf2-44b5-a383-bb852df4077e) が設定されます。
run_name: (str, optional) UI でこの run を識別しやすくするための短い表示名です。デフォルトでは、dagster-run-[8 first characters of the Dagster Run ID] 形式の文字列になります。たとえば dagster-run-7e4df022 です。
run_tags: (list[str], optional): UI でこの run のタグ一覧に表示される文字列のリストです。タグは、run をまとめて整理したり、baseline や production のような一時的なラベルを付けたりするのに便利です。UI ではタグの追加や削除、特定のタグが付いた run のみに絞り込むことを簡単に行えます。インテグレーションで使用されるすべての W&B Run には dagster_wandb タグが付きます。
W&B Artifact とのインテグレーションは、Dagster の IO Manager に依存しています。
IO Managers はユーザーが提供するオブジェクトで、アセット または op の出力を保存し、それを下流の アセット または op への入力として読み込む役割を持ちます。たとえば、IO Manager はファイルシステム上のファイルにオブジェクトを保存し、そこから読み込むことがあります。
このインテグレーションでは、W&B Artifacts 用の IO Manager を提供しています。これにより、任意の Dagster @op または @asset で W&B Artifacts をネイティブに作成したり利用したりできます。以下は、Python の list を含む dataset タイプの W&B Artifact を生成する @asset の簡単な例です。
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3] # これはArtifactに保存されます
@op、@asset、@multi_asset には、Artifacts に書き込むためのメタデータ設定を付与できます。同様に、Dagster の外部で作成された W&B Artifacts も利用できます。
先に進む前に、W&B Artifacts の使い方を十分に理解しておくことをおすすめします。Artifacts ガイドを参照してください。
W&B Artifact に書き込むには、Python 関数からオブジェクトを返します。W&B では、次のオブジェクトがサポートされます。
- Python オブジェクト (
int、dict、list など)
- W&B オブジェクト (
Table、Image、Graph など)
- W&B Artifact オブジェクト
以下の例は、Dagster アセット (@asset) を使用して W&B Artifacts に書き込む方法を示しています。
Pythonオブジェクト
W&Bオブジェクト
W&B Artifact
pickle モジュールでシリアライズ可能なものはすべて pickle 化され、インテグレーションによって作成された Artifact に追加されます。Dagster 内でその Artifact を読み取ると、内容はアンピクルされます (詳細は Artifactsを読み取る を参照してください) 。@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
W&B は複数の pickle ベースのシリアライズモジュール (pickle、dill、cloudpickle、joblib) をサポートします。ONNX や PMML のような、より高度なシリアライズを使用することもできます。詳細は Serialization セクションを参照してください。 Table や Image などの W&B オブジェクトは、インテグレーションによって作成された Artifact に追加されます。この例では、Artifact に Table を追加しています。import wandb
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset_in_table():
return wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
より複雑なユースケースでは、独自の Artifact オブジェクトを作成する必要がある場合があります。その場合でも、インテグレーションの両側でメタデータを補完するなどの便利な追加機能を利用できます。import wandb
MY_ASSET = "my_asset"
@asset(
name=MY_ASSET,
io_manager_key="wandb_artifacts_manager",
)
def create_artifact():
artifact = wandb.Artifact(MY_ASSET, "dataset")
table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
artifact.add(table, "my_table")
return artifact
wandb_artifact_configuration という設定用の辞書を、@op、@asset、@multi_asset に指定できます。この辞書は、デコレータの引数で metadata として渡す必要があります。この設定は、W&B Artifacts に対する IO Manager の読み取りと書き込みを制御するために必須です。
@op の場合、これは Out の metadata 引数を介して出力 metadata に指定します。
@asset の場合、これは アセット の metadata 引数に指定します。
@multi_asset の場合、これは AssetOut の metadata 引数を介して各出力の metadata に指定します。
以下のコード例は、@op、@asset、@multi_asset の計算に対して辞書を設定する方法を示しています。
@op の例
@asset の例
@multi_asset の例
@op の例:@op(
out=Out(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
"type": "dataset",
}
}
)
)
def create_dataset():
return [1, 2, 3]
@asset の例:@asset(
name="my_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
@asset にはすでに名があるため、設定で名を渡す必要はありません。インテグレーションにより、アセット の名が Artifact の名として設定されます。@multi_asset の例:@multi_asset(
name="create_datasets",
outs={
"first_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "training_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
"second_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "validation_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="my_multi_asset_group",
)
def create_datasets():
first_table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
second_table = wandb.Table(columns=["d", "e"], data=[[4, 5]])
return first_table, second_table
サポートされるプロパティ:
name: (str) この Artifact の人間が読める名前です。UI でこの Artifact を識別したり、use_artifact call で参照したりするために使います。名前には、英字、数字、アンダースコア、ハイフン、ドットを使用できます。名前はプロジェクト内で一意である必要があります。@op では必須です。
type: (str) Artifact のタイプです。Artifacts を整理して区別するために使用します。一般的なタイプにはデータセットやモデルがありますが、英字、数字、アンダースコア、ハイフン、ドットを含む任意の文字列を使用できます。出力がすでに Artifact でない場合は必須です。
description: (str) Artifact の説明を記述する自由形式のテキストです。説明は UI で markdown として表示されるため、表やリンクなどを記載するのに適しています。
aliases: (list[str]) Artifact に適用する 1 つ以上の alias を含む配列です。インテグレーションは、設定の有無にかかわらず、このリストに “latest” タグも追加します。これは、モデルやデータセットのバージョン管理を行う効果的な方法です。
add_dirs: (list[dict[str, Any]]): Artifact に含める各ローカルディレクトリーの設定を含む配列です。
add_files: (list[dict[str, Any]]): Artifact に含める各ローカルファイルの設定を含む配列です。
add_references: (list[dict[str, Any]]): Artifact に含める各外部参照の設定を含む配列です。
serialization_module: (dict) 使用するシリアライズモジュールの設定です。詳細は Serialization セクションを参照してください。
name: (str) シリアライズモジュールの名前です。使用可能な値: pickle, dill, cloudpickle, joblib。このモジュールはローカルで利用可能である必要があります。
parameters: (dict[str, Any]) シリアライズ関数に渡すオプションの引数です。このモジュールの dump method と同じパラメーターを受け付けます。たとえば、{"compress": 3, "protocol": 4}。
高度な例:
@asset(
name="my_advanced_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
"description": "My *Markdown* description",
"aliases": ["my_first_alias", "my_second_alias"],
"add_dirs": [
{
"name": "My directory",
"local_path": "path/to/directory",
}
],
"add_files": [
{
"name": "validation_dataset",
"local_path": "path/to/data.json",
},
{
"is_tmp": True,
"local_path": "path/to/temp",
},
],
"add_references": [
{
"uri": "https://picsum.photos/200/300",
"name": "External HTTP reference to an image",
},
{
"uri": "s3://my-bucket/datasets/mnist",
"name": "External S3 reference",
},
],
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
return [1, 2, 3]
この アセット は、インテグレーションの両側で有用なメタデータ付きでマテリアライズされます。
- W&B 側: ソース インテグレーションの名前とバージョン、使用した Python のバージョン、pickle プロトコルのバージョンなど。
- Dagster 側:
- Dagster Run ID
- W&B Run: ID、名、パス、URL
- W&B Artifact: ID、名、タイプ、バージョン、サイズ、URL
- W&B entity
- W&B プロジェクト
次の画像は、Dagster アセット に追加された W&B のメタデータを示しています。この情報は、インテグレーションによって Dagster に伝播されます。
次の画像は、指定した設定が W&B Artifact 上でどのように有用なメタデータで拡張されるかを示しています。この情報は、再現性とメンテナンスに役立ちます。インテグレーションがなければ利用できません。
mypy のような静的型チェッカーを使用している場合は、以下を使用して設定の型定義オブジェクトをインポートします。from dagster_wandb import WandbArtifactConfiguration
このインテグレーションは、Dagster partitions をネイティブでサポートします。
以下は、DailyPartitionsDefinition を使用したパーティション分割の例です。
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
name="my_daily_partitioned_asset",
compute_kind="wandb",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
)
def create_my_daily_partitioned_asset(context):
partition_key = context.asset_partition_key_for_output()
context.log.info(f"Creating partitioned asset for {partition_key}")
return random.randint(0, 100)
このコードは、パーティションごとに 1 つの W&B Artifact を生成します。Artifact パネル (UI) では、パーティションキーが末尾に付いたアセット名の下に Artifact が表示されます。たとえば、my_daily_partitioned_asset.2023-01-01、my_daily_partitioned_asset.2023-01-02、my_daily_partitioned_asset.2023-01-03 です。複数のディメンションにまたがってパーティション化されたアセットでは、各ディメンションがドット区切り形式で表示されます。たとえば、my_asset.car.blue です。
このインテグレーションでは、1 つの run 内で複数のパーティションをマテリアライズできません。アセットをマテリアライズするには、複数の run を実行する必要があります。これは、アセットのマテリアライズ時に Dagit で実行できます。
W&B Artifacts の読み込みは、書き込みと似ています。wandb_artifact_configuration という設定用の辞書を @op または @asset に設定できます。唯一の違いは、出力ではなく入力に設定する必要があることです。
@op の場合、これは In の metadata 引数を通じて入力メタデータ内にあります。Artifact の名を
明示的に渡す必要があります。
@asset の場合、これは Asset In metadata 引数を通じて入力メタデータ内にあります。親 asset の名がそれと一致するはずなので、Artifact の名は渡さないでください。
インテグレーションの外部で作成された Artifact に依存させたい場合は、SourceAsset を使用する必要があります。これにより、その asset の最新バージョンが常に読み込まれます。
以下の例は、さまざまな op から Artifact を読み込む方法を示しています。
@op から artifact を読み込む@op(
ins={
"artifact": In(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
}
}
)
},
io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
context.log.info(artifact)
別の @asset によって作成された artifact を読み込む@asset(
name="my_asset",
ins={
"artifact": AssetIn(
# 入力引数の名前を変更したくない場合は、'key' を削除できます
key="parent_dagster_asset_name",
input_manager_key="wandb_artifacts_manager",
)
},
)
def read_artifact(context, artifact):
context.log.info(artifact)
Dagster の外部で作成された Artifact を読み込む:my_artifact = SourceAsset(
key=AssetKey("my_artifact"), # W&B Artifact の名前
description="Artifact created outside Dagster",
io_manager_key="wandb_artifacts_manager",
)
@asset
def read_artifact(context, my_artifact):
context.log.info(my_artifact)
上記の設定は、IO Manager が収集し、デコレートされた関数への入力として渡す内容を指定するために使用します。以下の読み取りパターンがサポートされます。
- Artifact 内の名前付きオブジェクトを取得するには、
get を使用します。
@asset(
ins={
"table": AssetIn(
key="my_artifact_with_table",
metadata={
"wandb_artifact_configuration": {
"get": "my_table",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_table(context, table):
context.log.info(table.get_column("a"))
- Artifact に含まれるダウンロード済みファイルのローカルパスを取得するには、get_path を使用します:
@asset(
ins={
"path": AssetIn(
key="my_artifact_with_file",
metadata={
"wandb_artifact_configuration": {
"get_path": "name_of_file",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_path(context, path):
context.log.info(path)
- コンテンツをローカルにダウンロードした状態で、Artifact オブジェクト全体を取得するには:
@asset(
ins={
"artifact": AssetIn(
key="my_artifact",
input_manager_key="wandb_artifacts_manager",
)
},
)
def get_artifact(context, artifact):
context.log.info(artifact.name)
サポートされているプロパティ
get: (str) artifact の相対名で指定された W&B オブジェクトを取得します。
get_path: (str) artifact の相対名で指定されたファイルへのパスを取得します。
デフォルトでは、このインテグレーションは標準のpickleモジュールを使用しますが、互換性のないオブジェクトもあります。たとえば、yieldを含む関数を pickle 化しようとするとエラーになります。
dill、cloudpickle、joblib など、他の Pickle ベースのシリアライズモジュールもサポートしています。また、シリアライズ済みの文字列を返すか、Artifact を直接作成することで、ONNX や PMML のような、より高度なシリアライズ方式を使用することもできます。どの方法が適しているかはユースケースによって異なるため、このテーマに関する関連文献を参照してください。
Pickle は安全ではないことが知られています。セキュリティが重要な場合は、W&B オブジェクトのみを使用してください。データに署名し、ハッシュキーはご自身のシステムに保存することを推奨します。より複雑なユースケースについては、お気軽にお問い合わせください。喜んでサポートします。
使用するシリアライズは、wandb_artifact_configuration の serialization_module 辞書で設定できます。Dagster を実行しているマシンで、そのモジュールが利用可能であることを確認してください。
その Artifact を読み込む際、どのシリアライズモジュールを使うかはインテグレーションが自動的に判別します。
現在サポートされるモジュールは、pickle、dill、cloudpickle、joblib です。
以下は、joblib でシリアライズした「モデル」を作成し、それを推論に使用する簡略化した例です。
@asset(
name="my_joblib_serialized_model",
compute_kind="Python",
metadata={
"wandb_artifact_configuration": {
"type": "model",
"serialization_module": {
"name": "joblib"
},
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
# これは実際のMLモデルではありませんが、pickleモジュールでは不可能な処理です
return lambda x, y: x + y
@asset(
name="inference_result_from_joblib_serialized_model",
compute_kind="Python",
ins={
"my_joblib_serialized_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
)
},
metadata={
"wandb_artifact_configuration": {
"type": "results",
}
},
io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
context: OpExecutionContext, my_joblib_serialized_model
):
inference_result = my_joblib_serialized_model(1, 2)
context.log.info(inference_result) # 出力: 3
return inference_result
ONNX や PMML のような交換用ファイル形式を使用するのは一般的です。インテグレーションはこれらの形式をサポートしていますが、Pickle ベースのシリアライズよりも少し手間がかかります。
これらの形式を使用するには、2 つの異なる method があります。
- モデルを選択した形式に変換し、その形式の文字列表現を通常の Python オブジェクトであるかのように返します。インテグレーションはその文字列を pickle 化します。後でその文字列を使ってモデルを再構築できます。
- シリアライズしたモデルを含む新しいローカルファイルを作成し、
add_file 設定を使用して、そのファイルを含む custom Artifact を build します。
以下は、Scikit-learn モデルを ONNX を使用してシリアライズする例です。
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from dagster import AssetIn, AssetOut, asset, multi_asset
@multi_asset(
compute_kind="Python",
outs={
"my_onnx_model": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "model",
}
},
io_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "test_set",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def create_onnx_model():
# Inspired from https://onnx.ai/sklearn-onnx/
# Train a model.
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y)
clr = RandomForestClassifier()
clr.fit(X_train, y_train)
# Convert into ONNX format
initial_type = [("float_input", FloatTensorType([None, 4]))]
onx = convert_sklearn(clr, initial_types=initial_type)
# Write artifacts (model + test_set)
return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}
@asset(
name="experiment_results",
compute_kind="Python",
ins={
"my_onnx_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
# https://onnx.ai/sklearn-onnx/ を参考
# ONNX Runtimeで予測を計算する
sess = rt.InferenceSession(my_onnx_model)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onx = sess.run(
[label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
)[0]
context.log.info(pred_onx)
return pred_onx
パーティションの使用
このインテグレーションは、Dagster partitions をネイティブでサポートしています。
asset の 1 つ、複数、またはすべてのパーティションを選択して読み取ることができます。
すべてのパーティションは辞書として提供され、キーと値はそれぞれパーティションキーと Artifact の内容を表します。
すべてのパーティションを読み取る
特定のパーティションを読み取る
アップストリームの @asset のすべてのパーティションを読み取ります。これらは辞書として渡されます。この辞書では、キーと値がそれぞれパーティションキーと Artifact の内容に対応します。@asset(
compute_kind="wandb",
ins={"my_daily_partitioned_asset": AssetIn()},
output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
AssetIn の partition_mapping 設定を使用すると、特定のパーティションを選択できます。この場合は TimeWindowPartitionMapping を使用しています。@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
compute_kind="wandb",
ins={
"my_daily_partitioned_asset": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
)
},
output_required=False,
)
def read_specific_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
設定オブジェクト metadata は、W&B がプロジェクト内の異なる artifact パーティションとどのようにやり取りするかを設定します。
オブジェクト metadata には wandb_artifact_configuration というキーが含まれており、その中にネストされた partitions オブジェクトがあります。
partitions オブジェクトは、各パーティションの名をその設定にマッピングします。各パーティションの設定では、そのパーティションからデータをどのように取得するかを指定できます。これらの設定には、各パーティションの要件に応じて get、version、alias などのキーを含めることができます。
設定キー
get:
get キーは、データの取得元となる W&B Object (Table、Image など) の名を指定します。
version:
version キーは、Artifact の特定のバージョンを取得したい場合に使用します。
alias:
alias キーを使用すると、Artifact をその alias で取得できます。
ワイルドカード設定
ワイルドカード "*" は、設定されていないすべてのパーティションを表します。これにより、partitions オブジェクトで明示的に指定されていないパーティションに対するデフォルト設定を提供できます。
たとえば、
"*": {
"get": "default_table_name",
},
この設定では、明示的に設定されていないすべてのパーティションのデータは、default_table_name という名前の表から取得されます。
特定のパーティションの設定
特定のパーティションについては、キーを使ってそのパーティション固有の設定を指定することで、ワイルドカード設定を上書きできます。
たとえば、
"yellow": {
"get": "custom_table_name",
},
この設定では、yellow という名前のパーティションについては、ワイルドカード設定を上書きし、custom_table_name という名前の表からデータを取得します。
バージョン管理とエイリアス
バージョン管理とエイリアスのために、設定で version キーと alias キーを個別に指定できます。
バージョンについては、
"orange": {
"version": "v0",
},
この設定では、orange Artifact パーティションのバージョン v0 からデータを取得します。
エイリアスについては、
"blue": {
"alias": "special_alias",
},
この設定では、エイリアス special_alias を持つ Artifact パーティションの default_table_name 表 (設定内では blue と呼びます) からデータを取得します。
高度な使い方
インテグレーションの高度な使い方については、以下のコード例全体を参照してください。
現在も開発中のベータ版プロダクト
Launch にご関心がある場合は、W&B Launch のカスタマーパイロットプログラムへの参加について、担当のアカウントチームにお問い合わせください。
ベータプログラムの対象となるには、パイロット顧客は AWS EKS または SageMaker を使用する必要があります。今後は追加のプラットフォームにも対応する予定です。
続行する前に、W&B Launch の使い方を十分に理解しておくことをおすすめします。Launch ガイドを読むこともご検討ください。
Dagster インテグレーションは、次のことに役立ちます。
- Dagster インスタンスで 1 つ以上の Launch エージェントを実行する。
- Dagster インスタンス内でローカルの Launch ジョブを実行する。
- オンプレミスまたはクラウドでリモートの Launch ジョブを実行する。
このインテグレーションでは、インポートして使える run_launch_agent という @op が提供されています。これにより Launch エージェントが起動し、手動で停止するまで長時間実行されるプロセスとして動作します。
エージェントは、Launch キューをポーリングし、ジョブを順番に実行するプロセスです (または、外部サービスにディスパッチして実行します) 。
Launch ページを参照してください。
また、Launchpad では、すべてのプロパティについて参考になる説明を確認できます。
シンプルな例
# これを config.yaml に追加してください
# または、Dagit の Launchpad か JobDefinition.execute_in_process で設定することもできます
# 参考: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # ここを自分の W&B entity に置き換えてください
project: my_project # ここを自分の W&B プロジェクトに置き換えてください
ops:
run_launch_agent:
config:
max_jobs: -1
queues:
- my_dagster_queue
from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_agent_example():
run_launch_agent()
このインテグレーションは、インポート可能な @op である run_launch_job を提供します。これにより、Launch ジョブを実行できます。
Launch ジョブは、実行するためにキューに割り当てる必要があります。キューは作成することも、デフォルトのキューを使用することもできます。そのキューを監視するアクティブな エージェント があることを確認してください。エージェント は Dagster インスタンス内で実行できますが、Kubernetes 上でデプロイ可能な エージェント を使用することも検討できます。
Launch ページを参照してください。
また、Launchpad ではすべてのプロパティに関する有用な説明を確認できます。
簡単な例
# これをconfig.yamlに追加してください
# または、DagitのLaunchpadまたはJobDefinition.execute_in_processで設定することもできます
# 参照: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # W&B entityに置き換えてください
project: my_project # W&Bプロジェクトに置き換えてください
ops:
my_launched_job:
config:
entry_point:
- python
- train.py
queue: my_dagster_queue
uri: https://github.com/wandb/example-dagster-integration-with-launch
from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_job_example():
run_launch_job.alias("my_launched_job")() # エイリアスでジョブ名を変更します
-
IO Manager を使用して Artifacts を読み書きしてください。
Artifact.download() や Run.log_artifact() を直接使用することは避けてください。これらの method はインテグレーションによって処理されます。代わりに、Artifact に保存したいデータを返し、残りの処理はインテグレーションに任せてください。この方法により、Artifact のリネージがより適切になります。
-
複雑なユースケースでのみ、自分で Artifact object を build してください。
Python object と W&B object は、ops/assets から返すようにしてください。Artifact のバンドルはインテグレーションが処理します。
複雑なユースケースでは、Dagster ジョブ内で直接 Artifact を build できます。ソース インテグレーション名とバージョン、使用した python バージョン、pickle protocol バージョンなどのメタデータを拡充するため、Artifact object をインテグレーションに渡すことを推奨します。
-
ファイル、ディレクトリ、外部参照はメタデータ経由で Artifacts に追加してください。
インテグレーションの
wandb_artifact_configuration object を使用して、任意のファイル、ディレクトリ、または外部参照 (Amazon S3、GCS、HTTP…) を追加します。詳細については、Artifact 設定セクション の高度な例を参照してください。
-
Artifact が生成される場合は、@op ではなく @asset を使用してください。
Artifacts は asset です。Dagster がその asset を管理する場合は、asset を使用することを推奨します。これにより、Dagit Asset Catalog での可観測性が向上します。
-
Dagster の外部で作成された Artifact を利用するには SourceAsset を使用してください。
これにより、インテグレーションを活用して外部で作成された Artifacts を読み取れます。そうしない場合は、インテグレーションによって作成された Artifacts しか使用できません。
-
大規模モデルのトレーニングを専用の計算環境でオーケストレーションするには W&B Launch を使用してください。
小規模なモデルは Dagster cluster 内でトレーニングでき、GPU ノードを備えた Kubernetes cluster 上で Dagster を実行することもできます。大規模なモデル トレーニングには W&B Launch の使用を推奨します。これにより、インスタンスの過負荷を防ぎ、より適切な計算環境を利用できます。
-
Dagster 内で実験管理を行う場合は、W&B Run ID を Dagster Run ID の値に設定してください。
Run を再開可能にする ことと、W&B Run ID を Dagster Run ID または任意の文字列に設定することの両方を推奨します。この推奨に従うことで、Dagster 内でモデルをトレーニングするときに、W&B メトリクスと W&B Artifacts が同じ W&B Run に保存されるようになります。
または、W&B Run ID を Dagster Run ID に設定してください。
wandb.init(
id=context.run_id,
resume="allow",
...
)
または、任意の W&B Run ID を選択し、それを IO Manager の設定に渡します。
wandb.init(
id="my_resumable_run_id",
resume="allow",
...
)
@job(
resource_defs={
"io_manager": wandb_artifacts_io_manager.configured(
{"wandb_run_id": "my_resumable_run_id"}
),
}
)
-
大規模な W&B Artifacts では、
get または get_path を使って必要なデータだけを取得してください。
デフォルトでは、このインテグレーションは Artifact 全体をダウンロードします。非常に大きな Artifact を使用している場合は、必要な特定のファイルやオブジェクトだけを取得するとよいでしょう。これにより、速度とリソース利用効率が向上します。
-
Python オブジェクトでは、ユースケースに合わせて pickling モジュールを使い分けてください。
デフォルトでは、W&B インテグレーションは標準の pickle モジュールを使用します。ただし、一部のオブジェクトはこれに対応していません。たとえば、yield を含む関数は pickle 化しようとするとエラーになります。W&B は、他の Pickle ベースのシリアライズモジュール (dill、cloudpickle、joblib) もサポートしています。
また、シリアライズ済みの文字列を返す、または Artifact を直接作成することで、ONNX や PMML のような、より高度なシリアライズを使用することもできます。どれを選ぶべきかはユースケースによって異なるため、このテーマに関する関連文献を参照してください。