Google Cloud VertexAI Operators

Google Cloud VertexAI 將 AutoML 和 AI Platform 整合到一個統一的 API、客戶端庫和使用者介面中。AutoML 讓您無需編寫程式碼即可在影像、表格、文字和影片資料集上訓練模型,而 AI Platform 中的訓練允許您執行自定義訓練程式碼。藉助 Vertex AI,AutoML 訓練和自定義訓練都是可用的選項。無論您選擇哪種訓練選項,都可以使用 Vertex AI 儲存模型、部署模型並請求預測。

建立資料集

要建立 Google VertexAI 資料集,您可以使用 CreateDatasetOperator。該 operator 在 XCom 中以 dataset_id 鍵返回資料集 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

create_image_dataset_job = CreateDatasetOperator(
    task_id="image_dataset",
    dataset=IMAGE_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_tabular_dataset_job = CreateDatasetOperator(
    task_id="tabular_dataset",
    dataset=TABULAR_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_text_dataset_job = CreateDatasetOperator(
    task_id="text_dataset",
    dataset=TEXT_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_video_dataset_job = CreateDatasetOperator(
    task_id="video_dataset",
    dataset=VIDEO_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)
create_time_series_dataset_job = CreateDatasetOperator(
    task_id="time_series_dataset",
    dataset=TIME_SERIES_DATASET,
    region=REGION,
    project_id=PROJECT_ID,
)

建立資料集後,您可以使用 ImportDataOperator 匯入一些資料。

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

import_data_job = ImportDataOperator(
    task_id="import_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    import_configs=TEST_IMPORT_CONFIG,
)

要匯出資料集,您可以使用 ExportDataOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

export_data_job = ExportDataOperator(
    task_id="export_data",
    dataset_id=create_image_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
    export_config=TEST_EXPORT_CONFIG,
)

要刪除資料集,您可以使用 DeleteDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

delete_dataset_job = DeleteDatasetOperator(
    task_id="delete_dataset",
    dataset_id=create_text_dataset_job.output["dataset_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要獲取資料集,您可以使用 GetDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

get_dataset = GetDatasetOperator(
    task_id="get_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_tabular_dataset_job.output["dataset_id"],
)

要獲取資料集列表,您可以使用 ListDatasetsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

list_dataset_job = ListDatasetsOperator(
    task_id="list_dataset",
    region=REGION,
    project_id=PROJECT_ID,
)

要更新資料集,您可以使用 UpdateDatasetOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_dataset.py

update_dataset_job = UpdateDatasetOperator(
    task_id="update_dataset",
    project_id=PROJECT_ID,
    region=REGION,
    dataset_id=create_video_dataset_job.output["dataset_id"],
    dataset=DATASET_TO_UPDATE,
    update_mask=TEST_UPDATE_MASK,
)

建立訓練作業

要建立 Google Vertex AI 訓練作業,您可以使用三個 operator:CreateCustomContainerTrainingJobOperatorCreateCustomPythonPackageTrainingJobOperatorCreateCustomTrainingJobOperator。它們都會等待操作完成。每個 operator 的結果將是使用者使用這些 operator 訓練的模型。

準備步驟

對於每個 operator,您必須準備並建立資料集。然後將資料集 ID 放入 operator 中的 dataset_id 引數。

如何執行自定義容器訓練作業 CreateCustomContainerTrainingJobOperator

在開始執行此作業之前,您應該建立一個包含訓練指令碼的 docker 映象。如何建立映象的文件可以在此連結找到:https://cloud.google.com/vertex-ai/docs/training/create-custom-container。之後,您應該將映象連結放入 container_uri 引數。此外,您可以在 command 引數中鍵入將從該映象建立的容器的執行命令。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py

create_custom_container_training_job = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=CONTAINER_DISPLAY_NAME,
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomContainerTrainingJobOperator 也提供可延遲模式。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_container.py

create_custom_container_training_job_deferrable = CreateCustomContainerTrainingJobOperator(
    task_id="custom_container_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_CONTAINER_GCS_BUCKET_NAME}",
    display_name=f"{CONTAINER_DISPLAY_NAME}-def",
    container_uri=CUSTOM_CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    command=["python3", "task.py"],
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何執行 Python 包訓練作業 CreateCustomPythonPackageTrainingJobOperator

在開始執行此作業之前,您應該建立一個包含訓練指令碼的 python 包。如何建立的文件可以在此連結找到:https://cloud.google.com/vertex-ai/docs/training/create-python-pre-built-container。接下來,您應該將包連結放入 python_package_gcs_uri 引數,此外,python_module_name 引數應包含將執行您的訓練任務的指令碼名稱。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py

create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=PACKAGE_DISPLAY_NAME,
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateCustomPythonPackageTrainingJobOperator 也提供可延遲模式。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py

create_custom_python_package_training_job_deferrable = CreateCustomPythonPackageTrainingJobOperator(
    task_id="python_package_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_PYTHON_GCS_BUCKET_NAME}",
    display_name=f"{PACKAGE_DISPLAY_NAME}-def",
    python_package_gcs_uri=PYTHON_PACKAGE_GCS_URI,
    python_module_name=PYTHON_MODULE_NAME,
    container_uri=CONTAINER_URI,
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_type=ACCELERATOR_TYPE,
    accelerator_count=ACCELERATOR_COUNT,
    training_fraction_split=TRAINING_FRACTION_SPLIT,
    validation_fraction_split=VALIDATION_FRACTION_SPLIT,
    test_fraction_split=TEST_FRACTION_SPLIT,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

如何執行自定義訓練作業 CreateCustomTrainingJobOperator

要建立和執行自定義訓練作業,您應該將本地訓練指令碼的路徑放入 script_path 引數中。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

create_custom_training_job = CreateCustomTrainingJobOperator(
    task_id="custom_task",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

model_id_v1 = create_custom_training_job.output["model_id"]

相同的操作可以在可延遲模式下執行。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

create_custom_training_job_deferrable = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)
model_id_deferrable_v1 = create_custom_training_job_deferrable.output["model_id"]

此外,您可以建立現有自定義訓練作業的新版本。它將用另一個版本替換現有模型,而不是在模型登錄檔中建立新模型。這可以透過在執行自定義訓練作業時指定 parent_model 引數來完成。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

create_custom_training_job_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=CUSTOM_DISPLAY_NAME,
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

相同的操作可以在可延遲模式下執行。

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

create_custom_training_job_deferrable_v2 = CreateCustomTrainingJobOperator(
    task_id="custom_task_deferrable_v2",
    staging_bucket=f"gs://{CUSTOM_GCS_BUCKET_NAME}",
    display_name=f"{CUSTOM_DISPLAY_NAME}-def",
    script_path=LOCAL_TRAINING_SCRIPT_PATH,
    container_uri=CONTAINER_URI,
    requirements=["gcsfs==0.7.1"],
    model_serving_container_image_uri=MODEL_SERVING_CONTAINER_URI,
    parent_model=model_id_deferrable_v1,
    # run params
    dataset_id=tabular_dataset_id,
    replica_count=REPLICA_COUNT,
    model_display_name=f"{MODEL_DISPLAY_NAME}-def",
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

您可以使用 ListCustomTrainingJobOperator 獲取訓練作業列表。

tests/system/google/cloud/vertex_ai/example_vertex_ai_list_custom_jobs.py

list_custom_training_job = ListCustomTrainingJobOperator(
    task_id="list_custom_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望刪除自定義訓練作業,可以使用 DeleteCustomTrainingJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

delete_custom_training_job = DeleteCustomTrainingJobOperator(
    task_id="delete_custom_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
    custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立 AutoML 訓練作業

要建立 Google Vertex AI Auto ML 訓練作業,您可以使用五個 operator:CreateAutoMLForecastingTrainingJobOperator CreateAutoMLImageTrainingJobOperator CreateAutoMLTabularTrainingJobOperator SupervisedFineTuningTrainOperator CreateAutoMLVideoTrainingJobOperator。它們都會等待操作完成。每個 operator 的結果將是使用者使用這些 operator 訓練的模型。

如何執行 AutoML 預測訓練作業 CreateAutoMLForecastingTrainingJobOperator

在開始執行此作業之前,您必須準備並建立 TimeSeries 資料集。之後,您應該將資料集 ID 放入 operator 中的 dataset_id 引數。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py

create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator(
    task_id="auto_ml_forecasting_task",
    display_name=FORECASTING_DISPLAY_NAME,
    optimization_objective="minimize-rmse",
    column_specs=COLUMN_SPECS,
    # run params
    dataset_id=forecast_dataset_id,
    target_column=TEST_TARGET_COLUMN,
    time_column=TEST_TIME_COLUMN,
    time_series_identifier_column=TEST_TIME_SERIES_IDENTIFIER_COLUMN,
    available_at_forecast_columns=[TEST_TIME_COLUMN],
    unavailable_at_forecast_columns=[TEST_TARGET_COLUMN],
    time_series_attribute_columns=["city", "zip_code", "county"],
    forecast_horizon=30,
    context_window=30,
    data_granularity_unit="day",
    data_granularity_count=1,
    weight_column=None,
    budget_milli_node_hours=1000,
    model_display_name=MODEL_DISPLAY_NAME,
    predefined_split_column_name=None,
    region=REGION,
    project_id=PROJECT_ID,
)

如何執行 AutoML 影像訓練作業 CreateAutoMLImageTrainingJobOperator

在開始執行此作業之前,您必須準備並建立 Image 資料集。之後,您應該將資料集 ID 放入 operator 中的 dataset_id 引數。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_training.py

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="classification",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=8000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

要執行 AutoML 影像檢測訓練作業

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_image_object_detection.py

create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator(
    task_id="auto_ml_image_task",
    display_name=IMAGE_DISPLAY_NAME,
    dataset_id=image_dataset_id,
    prediction_type="object_detection",
    multi_label=False,
    model_type="CLOUD",
    training_fraction_split=0.6,
    validation_fraction_split=0.2,
    test_fraction_split=0.2,
    budget_milli_node_hours=20000,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何執行 AutoML 表格訓練作業 CreateAutoMLTabularTrainingJobOperator

在開始執行此作業之前,您必須準備並建立 Tabular 資料集。之後,您應該將資料集 ID 放入 operator 中的 dataset_id 引數。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_tabular_training.py

create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
    task_id="auto_ml_tabular_task",
    display_name=TABULAR_DISPLAY_NAME,
    optimization_prediction_type="classification",
    column_transformations=COLUMN_TRANSFORMATIONS,
    dataset_id=tabular_dataset_id,
    target_column="Adopted",
    training_fraction_split=0.8,
    validation_fraction_split=0.1,
    test_fraction_split=0.1,
    model_display_name=MODEL_DISPLAY_NAME,
    disable_early_stopping=False,
    region=REGION,
    project_id=PROJECT_ID,
)

如何執行 AutoML 影片訓練作業 CreateAutoMLVideoTrainingJobOperator

在開始執行此作業之前,您必須準備並建立 Video 資料集。之後,您應該將資料集 ID 放入 operator 中的 dataset_id 引數。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)
model_id_v1 = create_auto_ml_video_training_job.output["model_id"]

此外,您可以建立現有 AutoML 影片訓練作業的新版本。在這種情況下,結果將是現有模型的新版本,而不是在模型登錄檔中建立的新模型。這可以透過在執行 AutoML 影片訓練作業時指定 parent_model 引數來完成。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_training.py

create_auto_ml_video_training_job_v2 = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_v2_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="classification",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    parent_model=model_id_v1,
    region=REGION,
    project_id=PROJECT_ID,
)

您還可以使用 vertex_ai AutoML 模型進行影片跟蹤。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_video_tracking.py

create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator(
    task_id="auto_ml_video_task",
    display_name=VIDEO_DISPLAY_NAME,
    prediction_type="object_tracking",
    model_type="CLOUD",
    dataset_id=video_dataset_id,
    model_display_name=MODEL_DISPLAY_NAME,
    region=REGION,
    project_id=PROJECT_ID,
)

您可以使用 ListAutoMLTrainingJobOperator 獲取 AutoML 訓練作業列表。

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_list_training.py

list_auto_ml_training_job = ListAutoMLTrainingJobOperator(
    task_id="list_auto_ml_training_job",
    region=REGION,
    project_id=PROJECT_ID,
)

如果您希望刪除 Auto ML 訓練作業,可以使用 DeleteAutoMLTrainingJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_auto_ml_forecasting_training.py

delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
    task_id="delete_auto_ml_forecasting_training_job",
    training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
    "key='training_id') }}",
    region=REGION,
    project_id=PROJECT_ID,
)

建立批次預測作業

要建立 Google VertexAI 批次預測作業,您可以使用 CreateBatchPredictionJobOperator。該 operator 在 XCom 中以 batch_prediction_job_id 鍵返回批次預測作業 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py

create_batch_prediction_job = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
)

CreateBatchPredictionJobOperator 也提供可延遲模式。

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py

create_batch_prediction_job_def = CreateBatchPredictionJobOperator(
    task_id="create_batch_prediction_job_def",
    job_display_name=JOB_DISPLAY_NAME,
    model_name="{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
    predictions_format="csv",
    bigquery_source=BIGQUERY_SOURCE,
    gcs_destination_prefix=GCS_DESTINATION_PREFIX,
    model_parameters=MODEL_PARAMETERS,
    region=REGION,
    project_id=PROJECT_ID,
    deferrable=True,
)

要刪除批次預測作業,您可以使用 DeleteBatchPredictionJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py

delete_batch_prediction_job = DeleteBatchPredictionJobOperator(
    task_id="delete_batch_prediction_job",
    batch_prediction_job_id=create_batch_prediction_job.output["batch_prediction_job_id"],
    region=REGION,
    project_id=PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取批次預測作業列表,您可以使用 ListBatchPredictionJobsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py

list_batch_prediction_job = ListBatchPredictionJobsOperator(
    task_id="list_batch_prediction_jobs",
    region=REGION,
    project_id=PROJECT_ID,
)

建立端點服務

要建立 Google VertexAI 端點,您可以使用 CreateEndpointOperator。該 operator 在 XCom 中以 endpoint_id 鍵返回端點 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py

create_endpoint = CreateEndpointOperator(
    task_id="create_endpoint",
    endpoint=ENDPOINT_CONF,
    region=REGION,
    project_id=PROJECT_ID,
)

建立端點後,您可以使用 DeployModelOperator 部署一些模型。

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py

deploy_model = DeployModelOperator(
    task_id="deploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model=DEPLOYED_MODEL,
    traffic_split={"0": 100},
    region=REGION,
    project_id=PROJECT_ID,
)

要取消部署模型,您可以使用 UndeployModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py

undeploy_model = UndeployModelOperator(
    task_id="undeploy_model",
    endpoint_id=create_endpoint.output["endpoint_id"],
    deployed_model_id=deploy_model.output["deployed_model_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要刪除端點,您可以使用 DeleteEndpointOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py

delete_endpoint = DeleteEndpointOperator(
    task_id="delete_endpoint",
    endpoint_id=create_endpoint.output["endpoint_id"],
    region=REGION,
    project_id=PROJECT_ID,
)

要獲取端點列表,您可以使用 ListEndpointsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_endpoint.py

list_endpoints = ListEndpointsOperator(
    task_id="list_endpoints",
    region=REGION,
    project_id=PROJECT_ID,
)

建立超引數調優作業

要建立 Google VertexAI 超引數調優作業,您可以使用 CreateHyperparameterTuningJobOperator。該 operator 在 XCom 中以 hyperparameter_tuning_job_id 鍵返回超引數調優作業 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py

create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
)

CreateHyperparameterTuningJobOperator 也支援可延遲模式。

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py

create_hyperparameter_tuning_job_def = CreateHyperparameterTuningJobOperator(
    task_id="create_hyperparameter_tuning_job_def",
    staging_bucket=STAGING_BUCKET,
    display_name=DISPLAY_NAME,
    worker_pool_specs=WORKER_POOL_SPECS,
    region=REGION,
    project_id=PROJECT_ID,
    parameter_spec=PARAM_SPECS,
    metric_spec=METRIC_SPEC,
    max_trial_count=15,
    parallel_trial_count=3,
    deferrable=True,
)

要刪除超引數調優作業,您可以使用 DeleteHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py

delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator(
    task_id="delete_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取超引數調優作業,您可以使用 GetHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py

get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator(
    task_id="get_hyperparameter_tuning_job",
    project_id=PROJECT_ID,
    region=REGION,
    hyperparameter_tuning_job_id="{{ task_instance.xcom_pull("
    "task_ids='create_hyperparameter_tuning_job', key='hyperparameter_tuning_job_id') }}",
)

要獲取超引數調優作業列表,您可以使用 ListHyperparameterTuningJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_hyperparameter_tuning_job.py

list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator(
    task_id="list_hyperparameter_tuning_job",
    region=REGION,
    project_id=PROJECT_ID,
)

建立模型服務

要上傳 Google VertexAI 模型,您可以使用 UploadModelOperator。該 operator 在 XCom 中以 model_id 鍵返回模型 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

upload_model = UploadModelOperator(
    task_id="upload_model",
    region=REGION,
    project_id=PROJECT_ID,
    model=MODEL_OBJ,
)
upload_model_v1 = upload_model.output["model_id"]

要匯出模型,您可以使用 ExportModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

export_model = ExportModelOperator(
    task_id="export_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    output_config=MODEL_OUTPUT_CONFIG,
)

要刪除模型,您可以使用 DeleteModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

delete_model = DeleteModelOperator(
    task_id="delete_model",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=upload_model.output["model_id"],
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取模型列表,您可以使用 ListModelsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

list_models = ListModelsOperator(
    task_id="list_models",
    region=REGION,
    project_id=PROJECT_ID,
)

要透過其 ID 檢索模型,您可以使用 GetModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

get_model = GetModelOperator(
    task_id="get_model", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要列出所有模型版本,您可以使用 ListModelVersionsOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

list_model_versions = ListModelVersionsOperator(
    task_id="list_model_versions", region=REGION, project_id=PROJECT_ID, model_id=model_id_v1
)

要將模型的特定版本設定為預設版本,您可以使用 SetDefaultVersionOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

set_default_version = SetDefaultVersionOnModelOperator(
    task_id="set_default_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v2,
)

要向模型的特定版本新增別名,您可以使用 AddVersionAliasesOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

add_version_alias = AddVersionAliasesOnModelOperator(
    task_id="add_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version", "beta"],
    model_id=model_id_v2,
)

要從模型的特定版本刪除別名,您可以使用 DeleteVersionAliasesOnModelOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

delete_version_alias = DeleteVersionAliasesOnModelOperator(
    task_id="delete_version_alias",
    project_id=PROJECT_ID,
    region=REGION,
    version_aliases=["new-version"],
    model_id=model_id_v2,
)

要刪除模型的特定版本,您可以使用 DeleteModelVersionOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_model_service.py

delete_model_version = DeleteModelVersionOperator(
    task_id="delete_model_version",
    project_id=PROJECT_ID,
    region=REGION,
    model_id=model_id_v1,
    trigger_rule=TriggerRule.ALL_DONE,
)

執行管道作業

要執行 Google VertexAI 管道作業,您可以使用 RunPipelineJobOperator。該 operator 在 XCom 中以 pipeline_job_id 鍵返回管道作業 ID。

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py

run_pipeline_job = RunPipelineJobOperator(
    task_id="run_pipeline_job",
    display_name=DISPLAY_NAME,
    template_path=TEMPLATE_PATH,
    parameter_values=PARAMETER_VALUES,
    region=REGION,
    project_id=PROJECT_ID,
)

要刪除管道作業,您可以使用 DeletePipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py

delete_pipeline_job = DeletePipelineJobOperator(
    task_id="delete_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull(task_ids='run_pipeline_job', key='pipeline_job_id') }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取管道作業,您可以使用 GetPipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py

get_pipeline_job = GetPipelineJobOperator(
    task_id="get_pipeline_job",
    project_id=PROJECT_ID,
    region=REGION,
    pipeline_job_id="{{ task_instance.xcom_pull(task_ids='run_pipeline_job', key='pipeline_job_id') }}",
)

要獲取管道作業列表,您可以使用 ListPipelineJobOperator

tests/system/google/cloud/vertex_ai/example_vertex_ai_pipeline_job.py

list_pipeline_job = ListPipelineJobOperator(
    task_id="list_pipeline_job",
    region=REGION,
    project_id=PROJECT_ID,
)

與生成式 AI 互動

要生成文字嵌入,您可以使用 TextEmbeddingModelGetEmbeddingsOperator。該 operator 在 XCom 中以 model_response 鍵返回模型的響應。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(
    task_id="generate_embeddings_task",
    project_id=PROJECT_ID,
    location=REGION,
    prompt=PROMPT,
    pretrained_model=TEXT_EMBEDDING_MODEL,
)

要使用生成式模型生成內容,您可以使用 GenerativeModelGenerateContentOperator。該 operator 在 XCom 中以 model_response 鍵返回模型的響應。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

generate_content_task = GenerativeModelGenerateContentOperator(
    task_id="generate_content_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    tools=TOOLS,
    location=REGION,
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
    pretrained_model=MULTIMODAL_MODEL,
)

要執行監督式微調作業,您可以使用 SupervisedFineTuningTrainOperator。該 operator 在 XCom 中以 tuned_model_endpoint_name 鍵返回調優模型的端點名稱。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model_tuning.py

sft_train_task = SupervisedFineTuningTrainOperator(
    task_id="sft_train_task",
    project_id=PROJECT_ID,
    location=REGION,
    source_model=SOURCE_MODEL,
    train_dataset=TRAIN_DATASET,
    tuned_model_display_name=TUNED_MODEL_DISPLAY_NAME,
)

在向 Gemini API 傳送請求之前,要計算輸入令牌的數量,您可以使用:CountTokensOperator。該 operator 在 XCom 中以 total_tokens 鍵返回總令牌數。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

count_tokens_task = CountTokensOperator(
    task_id="count_tokens_task",
    project_id=PROJECT_ID,
    contents=CONTENTS,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
)

要評估模型,您可以使用 RunEvaluationOperator。該 operator 在 XCom 中以 summary_metrics 鍵返回評估摘要指標。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

run_evaluation_task = RunEvaluationOperator(
    task_id="run_evaluation_task",
    project_id=PROJECT_ID,
    location=REGION,
    pretrained_model=MULTIMODAL_MODEL,
    eval_dataset=EVAL_DATASET,
    metrics=METRICS,
    experiment_name=EXPERIMENT_NAME,
    experiment_run_name=EXPERIMENT_RUN_NAME,
    prompt_template=PROMPT_TEMPLATE,
)

要建立快取內容,您可以使用 CreateCachedContentOperator。該 operator 在 XCom 中以 return_value 鍵返回快取內容資源名稱。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

create_cached_content_task = CreateCachedContentOperator(
    task_id="create_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    model_name=CACHED_MODEL,
    system_instruction=CACHED_SYSTEM_INSTRUCTION,
    contents=CACHED_CONTENTS,
    ttl_hours=1,
    display_name="example-cache",
)

要從快取內容生成響應,您可以使用 GenerateFromCachedContentOperator。該 operator 在 XCom 中以 return_value 鍵返回快取內容響應。

tests/system/google/cloud/vertex_ai/example_vertex_ai_generative_model.py

generate_from_cached_content_task = GenerateFromCachedContentOperator(
    task_id="generate_from_cached_content_task",
    project_id=PROJECT_ID,
    location=REGION,
    cached_content_name="{{ task_instance.xcom_pull(task_ids='create_cached_content_task', key='return_value') }}",
    contents=["What are the papers about?"],
    generation_config=GENERATION_CONFIG,
    safety_settings=SAFETY_SETTINGS,
)

與 Vertex AI Feature Store 互動

獲取特徵檢視同步作業,可以使用 GetFeatureViewSyncOperator。該 Operator 會將同步作業結果透過 XComreturn_value 鍵返回。

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py

get_task = GetFeatureViewSyncOperator(
    task_id="get_task",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
)

同步特徵檢視,可以使用 SyncFeatureViewOperator。該 Operator 會將同步作業名稱透過 XComreturn_value 鍵返回。

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py

sync_task = SyncFeatureViewOperator(
    task_id="sync_task",
    project_id=PROJECT_ID,
    location=REGION,
    feature_online_store_id=FEATURE_ONLINE_STORE_ID,
    feature_view_id=FEATURE_VIEW_ID,
)

檢查特徵檢視同步是否成功,可以使用 FeatureViewSyncSensor

tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py

wait_for_sync = FeatureViewSyncSensor(
    task_id="wait_for_sync",
    location=REGION,
    feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}",
    poke_interval=60,  # Check every minute
    timeout=600,  # Timeout after 10 minutes
    mode="reschedule",
)

參考

有關詳細資訊,請參閱

本條目有幫助嗎?