Dagster와 W&B를 사용해 MLOps 파이프라인을 오케스트레이션하고 ML asset을 관리하세요. W&B 인테그레이션을 사용하면 Dagster 내에서 다음 작업을 쉽게 수행할 수 있습니다:
W&B Dagster 인테그레이션은 W&B 전용 Dagster 리소스와 IO Manager를 제공합니다:
wandb_resource: W&B API에 인증하고 통신하는 데 사용되는 Dagster 리소스입니다.
wandb_artifacts_io_manager: W&B Artifacts를 가져와 사용하는 데 쓰이는 Dagster IO Manager입니다.
다음 가이드에서는 Dagster에서 W&B를 사용하기 위한 사전 요구 사항을 충족하는 방법, ops와 asset에서 W&B Artifacts를 생성하고 사용하는 방법, W&B Launch를 사용하는 방법, 그리고 권장되는 모범 사례를 설명합니다.
W&B에서 Dagster를 사용하려면 다음 리소스가 필요합니다:
- W&B API 키.
- W&B entity(사용자 또는 팀): entity는 W&B Runs 및 Artifacts를 전송하는 사용자 이름 또는 팀 이름입니다. run을 기록하기 전에 W&B App UI에서 계정 또는 팀 entity를 만들어야 합니다. entity를 지정하지 않으면 run은 기본 entity로 전송되며, 이는 보통 사용자 이름입니다. 기본 entity는 Settings의 Project Defaults에서 변경할 수 있습니다.
- W&B 프로젝트: W&B Runs가 저장되는 프로젝트 이름입니다.
W&B App에서 해당 사용자 또는 팀의 프로필 페이지를 확인해 W&B entity를 찾으세요. 기존 W&B 프로젝트를 사용하거나 새로 만들 수 있습니다. 새 프로젝트는 W&B App 홈페이지 또는 사용자/팀 프로필 페이지에서 만들 수 있습니다. 프로젝트가 없으면 처음 사용할 때 자동으로 생성됩니다.
- W&B에 로그인합니다. 참고: W&B Server를 사용하는 경우 관리자에게 인스턴스 호스트 이름을 문의하세요.
- User Settings에서 API 키를 생성합니다. 프로덕션 환경에서는 해당 키를 service account에서 소유하도록 설정하는 것을 권장합니다.
- 해당 API 키에 대한 환경 변수를 설정합니다:
export WANDB_API_KEY=YOUR_KEY.
다음 예제에서는 Dagster 코드에서 API 키를 어디에 지정해야 하는지 보여줍니다. wandb_config 중첩 딕셔너리 안에 entity와 프로젝트 이름을 반드시 지정하세요. 다른 W&B 프로젝트를 사용하려면 op/asset마다 서로 다른 wandb_config 값을 전달할 수 있습니다. 전달할 수 있는 키에 대한 자세한 내용은 아래 설정 섹션을 참조하세요.
예: @job용 설정# 이 내용을 config.yaml에 추가합니다
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # 여기에 W&B entity를 입력하세요
project: my_project # 여기에 W&B 프로젝트를 입력하세요
@job(
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
"io_manager": wandb_artifacts_io_manager,
}
)
def simple_job_example():
my_op()
예: assets를 사용하는 @repository용 설정from dagster_wandb import wandb_artifacts_io_manager, wandb_resource
from dagster import (
load_assets_from_package_module,
make_values_resource,
repository,
with_resources,
)
from . import assets
@repository
def my_repository():
return [
*with_resources(
load_assets_from_package_module(assets),
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
"wandb_artifacts_manager": wandb_artifacts_io_manager.configured(
{"cache_duration_in_minutes": 60} # 파일은 한 시간 동안만 캐시합니다
),
},
resource_config_by_key={
"wandb_config": {
"config": {
"entity": "my_entity", # 여기에 W&B entity를 입력하세요
"project": "my_project", # 여기에 W&B 프로젝트를 입력하세요
}
}
},
),
]
이 예제에서는 @job 예제와 달리 IO Manager 캐시 기간을 설정합니다.
다음 설정 옵션은 인테그레이션이 제공하는 W&B 전용 Dagster 리소스와 IO Manager의 설정으로 사용됩니다.
wandb_resource: W&B API와 통신하는 데 사용하는 Dagster resource입니다. 제공된 API 키로 자동 인증됩니다. 속성:
api_key: (str, 필수): W&B API와 통신하는 데 필요한 W&B API 키입니다.
host: (str, 선택): 사용할 API 호스트 서버입니다. W&B Server를 사용하는 경우에만 필요합니다. 기본값은 Public Cloud 호스트인 https://api.wandb.ai입니다.
wandb_artifacts_io_manager: W&B Artifacts를 사용하기 위한 Dagster IO Manager입니다. 속성:
base_dir: (int, 선택) 로컬 저장소와 캐싱에 사용하는 기본 디렉터리입니다. W&B Artifacts와 W&B Run 로그는 해당 디렉터리에 기록되고, 해당 디렉터리에서 읽어 옵니다. 기본적으로 DAGSTER_HOME 디렉터리를 사용합니다.
cache_duration_in_minutes: (int, 선택) W&B Artifacts와 W&B Run 로그를 로컬 저장소에 유지할 시간을 정의합니다. 해당 시간 동안 열리지 않은 파일과 디렉터리만 캐시에서 제거됩니다. 캐시 정리는 IO Manager 실행이 끝날 때 수행됩니다. 캐싱을 완전히 끄려면 0으로 설정할 수 있습니다. 캐싱은 동일한 머신에서 실행되는 작업 간에 Artifact를 재사용할 때 속도를 높여 줍니다. 기본값은 30일입니다.
run_id: (str, 선택): 재개에 사용하는 이 run의 고유 ID입니다. 프로젝트 내에서 고유해야 하며, run을 삭제하면 해당 ID를 다시 사용할 수 없습니다. 짧은 설명용 이름에는 name 필드를 사용하고, runs 간 비교를 위해 하이퍼파라미터를 저장하려면 config를 사용하세요. ID에는 다음 특수 문자를 포함할 수 없습니다: /\#?%:.. Dagster 내에서 실험 추적을 수행할 때는 IO Manager가 run을 재개할 수 있도록 Run ID를 설정해야 합니다. 기본적으로 Dagster Run ID(예: 7e4df022-1bf2-44b5-a383-bb852df4077e)로 설정됩니다.
run_name: (str, 선택) UI에서 이 run을 쉽게 파악할 수 있도록 지정하는 짧은 표시 이름입니다. 기본적으로 다음 형식의 문자열입니다: dagster-run-[8 first characters of the Dagster Run ID]. 예: dagster-run-7e4df022.
run_tags: (list[str], 선택): UI에서 이 run의 태그 목록을 채우는 문자열 목록입니다. 태그는 runs를 함께 구성하거나 baseline 또는 production 같은 임시 레이블을 적용하는 데 유용합니다. UI에서 태그를 쉽게 추가하거나 제거할 수 있고, 특정 태그가 있는 runs만 필터링할 수도 있습니다. 인테그레이션에서 사용하는 모든 W&B Run에는 dagster_wandb 태그가 포함됩니다.
W&B Artifact 인테그레이션은 Dagster IO Manager를 기반으로 합니다.
IO Managers는 사용자가 제공하는 객체로, asset 또는 op의 출력을 저장하고 이를 다운스트림 asset 또는 op의 입력으로 로드하는 역할을 합니다. 예를 들어 IO Manager는 파일 시스템의 파일에 객체를 저장하거나, 파일에서 객체를 로드할 수 있습니다.
이 인테그레이션은 W&B Artifacts용 IO Manager를 제공합니다. 이를 통해 모든 Dagster @op 또는 @asset에서 W&B Artifacts를 기본적으로 생성하고 사용할 수 있습니다. 다음은 Python 목록을 포함하는 데이터셋 유형의 W&B Artifact를 생성하는 @asset의 간단한 예입니다.
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3] # 이 값은 Artifact에 저장됩니다
@op, @asset, @multi_asset에 메타데이터 설정을 지정하여 Artifacts를 기록할 수 있습니다. 마찬가지로 Dagster 외부에서 생성된 W&B Artifacts도 사용할 수 있습니다.
계속하기 전에 W&B Artifacts 사용 방법을 충분히 이해하는 것이 좋습니다. Artifacts 가이드를 읽어보는 것을 권장합니다.
W&B Artifact를 작성하려면 Python 함수에서 객체를 반환하세요. W&B는 다음 객체를 지원합니다.
- Python 객체 (int, dict, list…)
- W&B 객체 (Table, Image, Graph…)
- W&B Artifact 객체
다음 예제에서는 Dagster asset(@asset)을 사용해 W&B Artifacts를 작성하는 방법을 보여줍니다.
Python 객체
W&B 객체
W&B Artifact
pickle 모듈로 직렬화할 수 있는 모든 항목은 피클링되어 인테그레이션이 생성한 Artifact에 추가됩니다. Dagster 내에서 해당 Artifact를 조회하면 콘텐츠가 언피클링됩니다(자세한 내용은 Read artifacts를 참조하세요).@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
W&B는 여러 Pickle 기반 직렬화 모듈(pickle, dill, cloudpickle, joblib)을 지원합니다. ONNX나 PMML 같은 더 고급 직렬화 방식도 사용할 수 있습니다. 자세한 내용은 Serialization 섹션을 참조하세요. Table이나 Image 같은 모든 W&B 객체는 인테그레이션이 생성한 Artifact에 추가됩니다. 이 예제에서는 Table을 Artifact에 추가합니다.import wandb
@asset(
name="my_artifact",
metadata={
"wandb_artifact_arguments": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset_in_table():
return wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
더 복잡한 사용 사례에서는 직접 Artifact 객체를 만들어야 할 수 있습니다. 이 경우에도 인테그레이션은 통합 양쪽의 메타데이터를 보강하는 등 유용한 추가 기능을 제공합니다.import wandb
MY_ASSET = "my_asset"
@asset(
name=MY_ASSET,
io_manager_key="wandb_artifacts_manager",
)
def create_artifact():
artifact = wandb.Artifact(MY_ASSET, "dataset")
table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
artifact.add(table, "my_table")
return artifact
wandb_artifact_configuration라는 설정 딕셔너리를 @op, @asset, @multi_asset에 설정할 수 있습니다. 이 딕셔너리는 metadata로 데코레이터 인자에 전달해야 합니다. 이 설정은 W&B Artifacts에 대한 IO Manager의 읽기 및 쓰기를 제어하는 데 필요합니다.
@op의 경우 Out의 metadata 인자를 통해 출력 metadata에 위치합니다.
@asset의 경우 asset의 metadata 인자에 위치합니다.
@multi_asset의 경우 AssetOut의 metadata 인자를 통해 각 출력 metadata에 위치합니다.
다음 코드 예제는 @op, @asset, @multi_asset 컴퓨팅에 딕셔너리를 설정하는 방법을 보여줍니다.
@op 예제
@asset 예제
@multi_asset 예제
@op 예제:@op(
out=Out(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
"type": "dataset",
}
}
)
)
def create_dataset():
return [1, 2, 3]
@asset 예제:@asset(
name="my_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
return [1, 2, 3]
@asset에는 이미 name이 있으므로 설정을 통해 name을 전달할 필요가 없습니다. 인테그레이션이 Artifact name을 asset name으로 설정합니다.@multi_asset 예제:@multi_asset(
name="create_datasets",
outs={
"first_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "training_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
"second_table": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "validation_dataset",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="my_multi_asset_group",
)
def create_datasets():
first_table = wandb.Table(columns=["a", "b", "c"], data=[[1, 2, 3]])
second_table = wandb.Table(columns=["d", "e"], data=[[4, 5]])
return first_table, second_table
지원되는 속성:
name: (str) 이 artifact를 사람이 읽기 쉬운 이름으로, UI에서 이 artifact를 식별하거나 use_artifact calls에서 참조할 때 사용합니다. 이름에는 문자, 숫자, 밑줄, 하이픈, 점을 포함할 수 있습니다. 이름은 프로젝트 전체에서 고유해야 합니다. @op에 필요합니다.
type: (str) artifact를 구성하고 구분하는 데 사용하는 artifact의 유형입니다. 일반적인 유형으로는 dataset 또는 model이 있지만, 문자, 숫자, 밑줄, 하이픈, 점을 포함하는 임의의 문자열을 사용할 수 있습니다. 출력이 이미 Artifact가 아닌 경우 필요합니다.
description: (str) artifact에 대한 설명을 제공하는 자유 텍스트입니다. 설명은 UI에서 마크다운으로 렌더링되므로 표, 링크 등을 넣기에 적합합니다.
aliases: (list[str]) Artifact에 적용할 하나 이상의 alias를 포함하는 배열입니다. 설정 여부와 관계없이 인테그레이션은 이 목록에 “latest” 태그도 추가합니다. 이는 모델과 데이터셋의 versioning을 관리하는 효과적인 방법입니다.
add_dirs: (list[dict[str, Any]]): Artifact에 포함할 각 로컬 디렉터리에 대한 설정을 담은 배열입니다.
add_files: (list[dict[str, Any]]): Artifact에 포함할 각 로컬 파일에 대한 설정을 담은 배열입니다.
add_references: (list[dict[str, Any]]): Artifact에 포함할 각 외부 Reference에 대한 설정을 담은 배열입니다.
serialization_module: (dict) 사용할 직렬화 모듈의 설정입니다. 자세한 내용은 Serialization 섹션을 참고하세요.
name: (str) 직렬화 모듈의 이름입니다. 허용되는 값: pickle, dill, cloudpickle, joblib. 이 모듈은 로컬에서 사용할 수 있어야 합니다.
parameters: (dict[str, Any]) 직렬화 함수에 전달되는 선택 인수입니다. 이 모듈의 dump method와 동일한 parameters를 받습니다. 예를 들어 {"compress": 3, "protocol": 4}입니다.
고급 예시:
@asset(
name="my_advanced_artifact",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
"description": "My *Markdown* description",
"aliases": ["my_first_alias", "my_second_alias"],
"add_dirs": [
{
"name": "My directory",
"local_path": "path/to/directory",
}
],
"add_files": [
{
"name": "validation_dataset",
"local_path": "path/to/data.json",
},
{
"is_tmp": True,
"local_path": "path/to/temp",
},
],
"add_references": [
{
"uri": "https://picsum.photos/200/300",
"name": "External HTTP reference to an image",
},
{
"uri": "s3://my-bucket/datasets/mnist",
"name": "External S3 reference",
},
],
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
return [1, 2, 3]
asset는 인테그레이션 양쪽에서 유용한 메타데이터와 함께 머티리얼라이즈됩니다:
- W&B 측: 소스 인테그레이션 이름과 버전, 사용된 Python 버전, pickle 프로토콜 버전 등
- Dagster 측:
- Dagster Run ID
- W&B Run: ID, 이름, 경로, URL
- W&B Artifact: ID, 이름, 유형, 버전, 크기, URL
- W&B Entity
- W&B 프로젝트
다음 이미지는 Dagster asset에 추가된 W&B 메타데이터를 보여줍니다. 이 정보는 인테그레이션을 통해 Dagster로 전파됩니다.
다음 이미지는 제공된 설정이 W&B Artifact에서 유용한 메타데이터로 어떻게 보강되었는지 보여줍니다. 이 정보는 재현성과 유지보수에 도움이 됩니다. 인테그레이션이 없으면 이 정보는 사용할 수 없습니다.
mypy와 같은 정적 유형 검사기를 사용하는 경우, 설정 유형 정의 객체를 다음과 같이 임포트하세요:from dagster_wandb import WandbArtifactConfiguration
이 인테그레이션은 Dagster 파티션을 기본으로 지원합니다.
다음은 DailyPartitionsDefinition을 사용해 파티션을 나눈 예시입니다.
@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
name="my_daily_partitioned_asset",
compute_kind="wandb",
metadata={
"wandb_artifact_configuration": {
"type": "dataset",
}
},
)
def create_my_daily_partitioned_asset(context):
partition_key = context.asset_partition_key_for_output()
context.log.info(f"Creating partitioned asset for {partition_key}")
return random.randint(0, 100)
이 코드는 각 파티션에 대해 W&B Artifact를 하나씩 생성합니다. 파티션 키가 덧붙은 asset 이름 아래의 Artifact 패널(UI)에서 artifact를 확인할 수 있습니다. 예를 들어 my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02, 또는 my_daily_partitioned_asset.2023-01-03입니다. 여러 차원으로 파티션된 asset은 각 차원을 점(.)으로 구분된 형식으로 표시합니다. 예를 들어 my_asset.car.blue입니다.
이 인테그레이션에서는 하나의 run 내에서 여러 파티션을 머티리얼라이즈할 수 없습니다. asset을 머티리얼라이즈하려면 여러 run을 수행해야 합니다. asset을 머티리얼라이즈할 때 Dagit에서 이를 실행할 수 있습니다.
W&B Artifacts를 조회하는 방법은 작성하는 방법과 비슷합니다. wandb_artifact_configuration이라는 설정 딕셔너리를 @op 또는 @asset에 설정할 수 있습니다. 차이점은 출력을 기준으로 설정하는 것이 아니라 입력에 설정해야 한다는 점뿐입니다.
@op의 경우 In metadata argument를 통해 입력 메타데이터에 설정합니다. Artifact 이름을
명시적으로 전달해야 합니다.
@asset의 경우 Asset In metadata argument를 통해 입력 메타데이터에 설정합니다. 상위 asset의 이름이 Artifact 이름과 일치해야 하므로 Artifact 이름을 전달하면 안 됩니다.
인테그레이션 외부에서 생성된 Artifact에 대한 종속성을 두려면 SourceAsset을 사용해야 합니다. 이 경우 항상 해당 asset의 최신 버전을 조회합니다.
다음 예제에서는 여러 op에서 Artifact를 조회하는 방법을 보여줍니다.
@op에서 artifact 조회@op(
ins={
"artifact": In(
metadata={
"wandb_artifact_configuration": {
"name": "my_artifact",
}
}
)
},
io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
context.log.info(artifact)
다른 @asset이 생성한 artifact 조회@asset(
name="my_asset",
ins={
"artifact": AssetIn(
# 입력 argument의 이름을 바꾸고 싶지 않다면 'key'를 제거하면 됩니다
key="parent_dagster_asset_name",
input_manager_key="wandb_artifacts_manager",
)
},
)
def read_artifact(context, artifact):
context.log.info(artifact)
Dagster 외부에서 생성된 Artifact 조회:my_artifact = SourceAsset(
key=AssetKey("my_artifact"), # W&B Artifact의 이름
description="Artifact created outside Dagster",
io_manager_key="wandb_artifacts_manager",
)
@asset
def read_artifact(context, my_artifact):
context.log.info(my_artifact)
다음 설정은 IO Manager가 데코레이트된 함수의 입력으로 무엇을 수집하고 제공할지 지정하는 데 사용됩니다. 다음 조회 패턴을 지원합니다.
- Artifact에 포함된 이름이 지정된 객체를 조회하려면
get을 사용합니다:
@asset(
ins={
"table": AssetIn(
key="my_artifact_with_table",
metadata={
"wandb_artifact_configuration": {
"get": "my_table",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_table(context, table):
context.log.info(table.get_column("a"))
- Artifact에 포함된 다운로드한 파일의 로컬 경로를 조회하려면 get_path를 사용합니다:
@asset(
ins={
"path": AssetIn(
key="my_artifact_with_file",
metadata={
"wandb_artifact_configuration": {
"get_path": "name_of_file",
}
},
input_manager_key="wandb_artifacts_manager",
)
}
)
def get_path(context, path):
context.log.info(path)
- 콘텐츠가 로컬에 다운로드된 전체 Artifact 객체를 조회하려면:
@asset(
ins={
"artifact": AssetIn(
key="my_artifact",
input_manager_key="wandb_artifacts_manager",
)
},
)
def get_artifact(context, artifact):
context.log.info(artifact.name)
지원되는 속성
get: (str) artifact의 상대 이름에 있는 W&B 객체를 조회합니다.
get_path: (str) artifact의 상대 이름에 있는 파일의 경로를 조회합니다.
기본적으로 이 인테그레이션은 표준 pickle 모듈을 사용하지만, 일부 객체는 이 모듈과 호환되지 않습니다. 예를 들어 yield를 사용하는 함수는 pickle하려고 하면 오류가 발생합니다.
추가적인 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)도 지원합니다. 직렬화된 문자열을 반환하거나 Artifact를 직접 생성해 ONNX나 PMML 같은 더 고급 직렬화 방식도 사용할 수 있습니다. 어떤 방식을 선택할지는 사용 사례에 따라 다르므로, 이 주제에 관한 관련 문헌을 참고하세요.
Pickle은 보안에 취약한 것으로 알려져 있습니다. 보안이 중요하다면 W&B 객체만 사용하세요. 데이터에 서명하고 해시 키는 자체 시스템에 저장하는 것을 권장합니다. 더 복잡한 사용 사례가 있다면 언제든지 문의해 주세요. 기꺼이 도와드리겠습니다.
wandb_artifact_configuration의 serialization_module 딕셔너리를 통해 사용할 직렬화 방식을 설정할 수 있습니다. 해당 모듈을 Dagster를 실행하는 머신에서 사용할 수 있는지 반드시 확인하세요.
이 인테그레이션은 해당 Artifact를 조회할 때 어떤 직렬화 모듈을 사용해야 하는지 자동으로 판단합니다.
현재 지원되는 모듈은 pickle, dill, cloudpickle, joblib입니다.
다음은 joblib으로 직렬화한 “모델”을 만든 다음 이를 Inference에 사용하는 간단한 예시입니다.
@asset(
name="my_joblib_serialized_model",
compute_kind="Python",
metadata={
"wandb_artifact_configuration": {
"type": "model",
"serialization_module": {
"name": "joblib"
},
}
},
io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
# 실제 ML 모델은 아니지만 pickle 모듈로는 이 작업이 불가능합니다
return lambda x, y: x + y
@asset(
name="inference_result_from_joblib_serialized_model",
compute_kind="Python",
ins={
"my_joblib_serialized_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
)
},
metadata={
"wandb_artifact_configuration": {
"type": "results",
}
},
io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
context: OpExecutionContext, my_joblib_serialized_model
):
inference_result = my_joblib_serialized_model(1, 2)
context.log.info(inference_result) # 출력: 3
return inference_result
ONNX 및 PMML 같은 교환용 파일 형식을 사용하는 경우가 많습니다. 이 인테그레이션은 이러한 형식을 지원하지만, Pickle 기반 직렬화보다 약간 더 많은 작업이 필요합니다.
이러한 형식을 사용하는 방법은 두 가지입니다.
- 모델을 선택한 형식으로 변환한 다음, 일반적인 Python 객체인 것처럼 해당 형식의 문자열 표현을 반환합니다. 그러면 인테그레이션이 해당 문자열을 pickle합니다. 그런 다음 그 문자열을 사용해 모델을 다시 구성할 수 있습니다.
- 직렬화된 모델이 들어 있는 새 로컬 파일을 만든 다음,
add_file 설정을 사용해 해당 파일로 맞춤형 Artifact를 생성합니다.
다음은 ONNX를 사용해 Scikit-learn 모델을 직렬화하는 예입니다.
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from dagster import AssetIn, AssetOut, asset, multi_asset
@multi_asset(
compute_kind="Python",
outs={
"my_onnx_model": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "model",
}
},
io_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetOut(
metadata={
"wandb_artifact_configuration": {
"type": "test_set",
}
},
io_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def create_onnx_model():
# Inspired from https://onnx.ai/sklearn-onnx/
# Train a model.
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y)
clr = RandomForestClassifier()
clr.fit(X_train, y_train)
# Convert into ONNX format
initial_type = [("float_input", FloatTensorType([None, 4]))]
onx = convert_sklearn(clr, initial_types=initial_type)
# Write artifacts (model + test_set)
return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}
@asset(
name="experiment_results",
compute_kind="Python",
ins={
"my_onnx_model": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
"my_test_set": AssetIn(
input_manager_key="wandb_artifacts_manager",
),
},
group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
# https://onnx.ai/sklearn-onnx/ 참고
# ONNX Runtime으로 예측 계산
sess = rt.InferenceSession(my_onnx_model)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onx = sess.run(
[label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
)[0]
context.log.info(pred_onx)
return pred_onx
이 인테그레이션은 Dagster partitions을 기본 지원합니다.
asset의 파티션 하나만, 여러 개, 또는 전체를 선택적으로 조회할 수 있습니다.
모든 파티션은 딕셔너리로 제공되며, 여기서 키와 값은 각각 파티션 키와 Artifact 콘텐츠를 나타냅니다.
업스트림 @asset의 모든 파티션을 조회하며, 이는 딕셔너리 형태로 제공됩니다. 이 딕셔너리에서 키와 값은 각각 파티션 키와 Artifact 콘텐츠에 해당합니다.@asset(
compute_kind="wandb",
ins={"my_daily_partitioned_asset": AssetIn()},
output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
AssetIn의 partition_mapping 설정을 사용하면 특정 파티션을 선택할 수 있습니다. 이 경우에는 TimeWindowPartitionMapping을 사용합니다.@asset(
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
compute_kind="wandb",
ins={
"my_daily_partitioned_asset": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
)
},
output_required=False,
)
def read_specific_partitions(context, my_daily_partitioned_asset):
for partition, content in my_daily_partitioned_asset.items():
context.log.info(f"partition={partition}, content={content}")
설정 객체 metadata는 프로젝트에서 W&B가 서로 다른 artifact 파티션과 상호작용하는 방식을 정의합니다.
metadata 객체에는 wandb_artifact_configuration라는 키가 있으며, 그 안에 중첩된 partitions 객체가 들어 있습니다.
partitions 객체는 각 파티션 이름을 해당 설정에 매핑합니다. 각 파티션의 설정에서는 해당 파티션에서 데이터를 어떻게 조회할지 지정할 수 있습니다. 이러한 설정에는 각 파티션의 요구 사항에 따라 get, version, alias 등의 키가 포함될 수 있습니다.
설정 키
get:
get 키는 데이터를 가져올 W&B 객체(Table, Image…)의 이름을 지정합니다.
version:
version 키는 Artifact의 특정 버전을 가져오려는 경우에 사용합니다.
alias:
alias 키를 사용하면 별칭으로 Artifact를 조회할 수 있습니다.
와일드카드 설정
와일드카드 "*"는 설정되지 않은 모든 파티션을 의미합니다. 즉, partitions 객체에 명시적으로 지정되지 않은 파티션에 대한 기본 설정을 제공합니다.
예를 들어,
"*": {
"get": "default_table_name",
},
이 설정은 명시적으로 설정되지 않은 모든 파티션의 데이터를 default_table_name이라는 테이블에서 가져오도록 한다는 뜻입니다.
특정 파티션 설정
각 파티션의 키를 사용해 해당 파티션별 설정을 지정하면, 특정 파티션에 대해서는 와일드카드 설정을 재정의할 수 있습니다.
예를 들어,
"yellow": {
"get": "custom_table_name",
},
이 설정은 yellow라는 이름의 파티션에 대해, 와일드카드 설정 대신 custom_table_name라는 이름의 테이블에서 데이터를 가져오도록 한다는 의미입니다.
버전 관리 및 별칭 지정
버전 관리 및 별칭 지정을 위해 설정에 특정 version 및 alias 키를 지정할 수 있습니다.
버전의 경우,
"orange": {
"version": "v0",
},
이 설정은 orange Artifact 파티션의 v0 버전에서 데이터를 가져옵니다.
별칭의 경우,
"blue": {
"alias": "special_alias",
},
이 설정은 special_alias라는 별칭을 가진 Artifact 파티션의 테이블 default_table_name에서 데이터를 가져옵니다 (설정에서는 blue로 지칭됨).
고급 활용
인테그레이션의 고급 활용 방법은 다음 전체 코드 예제를 참고하세요:
활발히 개발 중인 베타 제품
Launch에 관심이 있으신가요? W&B Launch 고객 파일럿 프로그램 참여에 대해 담당 account team에 문의하세요.
파일럿 고객은 베타 프로그램 참여 자격을 충족하려면 AWS EKS 또는 SageMaker를 사용해야 합니다. 향후에는 추가 플랫폼도 지원할 계획입니다.
계속하기 전에 W&B Launch 사용 방법을 충분히 이해하는 것이 좋습니다. Launch 가이드를 읽어보세요.
Dagster 인테그레이션은 다음을 지원합니다:
- Dagster 인스턴스에서 하나 이상의 Launch 에이전트를 실행합니다.
- Dagster 인스턴스 내에서 로컬 Launch 작업을 실행합니다.
- 온프레미스 또는 클라우드에서 원격 Launch 작업을 실행합니다.
이 인테그레이션은 임포트 가능한 @op인 run_launch_agent를 제공합니다. 이 @op는 Launch 에이전트를 시작하고, 수동으로 중지할 때까지 장기 실행 프로세스로 실행합니다.
에이전트는 Launch 큐를 폴링하며, 작업을 순서대로 실행하는 프로세스입니다(또는 외부 서비스가 실행하도록 작업을 디스패치합니다).
Launch 페이지를 참고하세요.
Launchpad에서는 모든 속성에 대한 유용한 설명도 확인할 수 있습니다.
간단한 예
# config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # 본인의 W&B entity로 교체하세요
project: my_project # 본인의 W&B 프로젝트로 교체하세요
ops:
run_launch_agent:
config:
max_jobs: -1
queues:
- my_dagster_queue
from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(
resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_agent_example():
run_launch_agent()
이 인테그레이션은 run_launch_job이라는 임포트할 수 있는 @op를 제공합니다. 이 @op는 Launch 작업을 실행합니다.
Launch 작업을 실행하려면 큐에 할당되어 있어야 합니다. 큐를 생성하거나 기본 큐를 사용할 수 있습니다. 해당 큐를 모니터링하는 활성 에이전트가 있는지 확인하세요. Dagster 인스턴스 내에서 에이전트를 실행할 수 있으며, Kubernetes에 배포 가능한 agent를 사용하는 방법도 고려할 수 있습니다.
Launch 페이지를 참고하세요.
Launchpad에서는 모든 속성에 대한 유용한 설명도 확인할 수 있습니다.
단순한 예시
# config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
wandb_config:
config:
entity: my_entity # W&B entity로 교체하세요
project: my_project # W&B 프로젝트로 교체하세요
ops:
my_launched_job:
config:
entry_point:
- python
- train.py
queue: my_dagster_queue
uri: https://github.com/wandb/example-dagster-integration-with-launch
from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource
from dagster import job, make_values_resource
@job(resource_defs={
"wandb_config": make_values_resource(
entity=str,
project=str,
),
"wandb_resource": wandb_resource.configured(
{"api_key": {"env": "WANDB_API_KEY"}}
),
},
)
def run_launch_job_example():
run_launch_job.alias("my_launched_job")() # alias로 작업 이름을 변경합니다
-
IO Manager를 사용해 Artifacts를 읽고 쓰세요.
Artifact.download() 또는 Run.log_artifact()를 직접 사용하지 마세요. 이러한 방법은 인테그레이션이 처리합니다. 대신 Artifact에 저장할 데이터를 반환하고, 나머지는 인테그레이션이 처리하도록 하세요. 이렇게 하면 Artifact의 리니지를 더 잘 추적할 수 있습니다.
-
복잡한 사용 사례에서만 Artifact 객체를 직접 빌드하세요.
Python 객체와 W&B 객체는 ops/assets에서 반환해야 합니다. Artifact 번들링은 인테그레이션이 처리합니다.
복잡한 사용 사례에서는 Dagster 작업에서 Artifact를 직접 빌드할 수 있습니다. 소스 인테그레이션 이름 및 버전, 사용한 Python 버전, pickle 프로토콜 버전 등의 메타데이터를 보강할 수 있도록 Artifact 객체를 인테그레이션에 전달하는 것을 권장합니다.
-
메타데이터를 통해 Artifacts에 파일, 디렉터리, 외부 참조를 추가하세요.
파일, 디렉터리 또는 외부 참조(Amazon S3, GCS, HTTP…)를 추가하려면 인테그레이션
wandb_artifact_configuration 객체를 사용하세요. 자세한 내용은 Artifact configuration section의 고급 예제를 참조하세요.
-
Artifact가 생성될 때는 @op 대신 @asset을 사용하세요.
Artifacts는 asset입니다. Dagster가 해당 asset을 관리하는 경우 asset을 사용하는 것이 좋습니다. 그러면 Dagit Asset Catalog에서 더 나은 가시성을 확보할 수 있습니다.
-
Dagster 외부에서 생성된 Artifact를 사용하려면 SourceAsset을 사용하세요.
이렇게 하면 외부에서 생성된 Artifacts를 읽을 때도 인테그레이션을 활용할 수 있습니다. 그렇지 않으면 인테그레이션이 생성한 Artifacts만 사용할 수 있습니다.
-
대규모 모델 트레이닝을 위해 전용 컴퓨팅에서 오케스트레이션하려면 W&B Launch를 사용하세요.
Dagster cluster 내에서 소규모 모델을 트레이닝할 수 있고, GPU 노드가 있는 Kubernetes cluster에서 Dagster를 실행할 수도 있습니다. 대규모 모델 트레이닝에는 W&B Launch를 사용하는 것을 권장합니다. 이렇게 하면 인스턴스 과부하를 방지하고 더 적합한 컴퓨팅에 액세스할 수 있습니다.
-
Dagster 내에서 실험 추적을 할 때는 W&B Run ID를 Dagster Run ID 값으로 설정하세요.
Run resumable을 설정하고, W&B Run ID를 Dagster Run ID 또는 원하는 문자열로 지정하는 두 가지를 모두 권장합니다. 이 권장 사항을 따르면 Dagster 내에서 모델을 트레이닝할 때 W&B 메트릭과 W&B Artifacts가 동일한 W&B run에 저장됩니다.
W&B Run ID를 Dagster Run ID로 설정하세요.
wandb.init(
id=context.run_id,
resume="allow",
...
)
또는 직접 W&B Run ID를 선택해 IO Manager 설정에 전달합니다.
wandb.init(
id="my_resumable_run_id",
resume="allow",
...
)
@job(
resource_defs={
"io_manager": wandb_artifacts_io_manager.configured(
{"wandb_run_id": "my_resumable_run_id"}
),
}
)
-
대용량 W&B Artifacts에서는 get 또는 get_path를 사용해 필요한 데이터만 가져오세요.
기본적으로 인테그레이션은 Artifact 전체를 다운로드합니다. 매우 큰 Artifact를 사용하는 경우 필요한 특정 파일이나 객체만 가져오는 것이 좋습니다. 이렇게 하면 속도와 리소스 사용량이 개선됩니다.
-
Python 객체의 경우 사용 사례에 맞게 피클링 모듈을 조정하세요.
기본적으로 W&B 인테그레이션은 표준 pickle 모듈을 사용합니다. 하지만 일부 객체는 이 모듈과 호환되지 않습니다. 예를 들어, yield를 사용하는 함수는 pickle하려고 하면 오류가 발생합니다. W&B는 다른 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)도 지원합니다.
직렬화된 문자열을 반환하거나 Artifact를 직접 생성해 ONNX나 PMML 같은 더 고급 직렬화 방식을 사용할 수도 있습니다. 어떤 방식을 선택할지는 사용 사례에 따라 달라지므로, 이 주제에 관한 관련 문헌을 참고하세요.