Google Cloud Composer Operator

Cloud Composer 是一項完全託管的工作流編排服務,使您能夠建立、排程、監控和管理跨雲及本地資料中心的工作流。

Cloud Composer 構建於流行的 Apache Airflow 開源專案之上,並使用 Python 程式語言執行。

透過使用 Cloud Composer 而非本地 Apache Airflow 例項,您無需安裝或管理開銷即可獲得 Airflow 的最佳體驗。Cloud Composer 可幫助您快速建立 Airflow 環境並使用 Airflow 原生工具,例如強大的 Airflow Web 介面和命令列工具,因此您可以專注於您的工作流而不是您的基礎設施。

有關該服務的更多資訊,請訪問 Cloud Composer 產品文件 <產品文件

建立環境

在建立 Cloud Composer 環境之前,您需要對其進行定義。有關建立環境時可傳遞的可用欄位的更多資訊,請訪問 Cloud Composer 建立環境 API。

一個簡單的環境配置示例如下

tests/system/google/cloud/composer/example_cloud_composer.py


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2-airflow-2"},
    }
}

使用此配置我們可以建立環境: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

或者您可以在可推遲模式下定義相同的 operator: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

獲取環境

要獲取環境,您可以使用

CloudComposerGetEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

列出環境

要獲取環境,您可以使用

CloudComposerListEnvironmentsOperator

tests/system/google/cloud/composer/example_cloud_composer.py

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

更新環境

您可以透過提供環境配置和 updateMask 來更新環境。在 updateMask 引數中,您指定相對於環境的欄位路徑進行更新。有關 updateMask 和其他引數的更多資訊,請參閱 Cloud Composer 更新環境 API。

新的環境配置和 updateMask 的示例

tests/system/google/cloud/composer/example_cloud_composer.py

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

要更新環境,您可以使用: CloudComposerUpdateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

或者您可以在可推遲模式下定義相同的 operator: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

刪除環境

要刪除環境,您可以使用

CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

或者您可以在可推遲模式下定義相同的 operator: CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

Composer 映象列表

您還可以列出所有支援的 Cloud Composer 映象

CloudComposerListImageVersionsOperator

tests/system/google/cloud/composer/example_cloud_composer.py

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

執行 Airflow CLI 命令

您可以在您的環境中執行 Airflow CLI 命令,使用: CloudComposerRunAirflowCLICommandOperator

tests/system/google/cloud/composer/example_cloud_composer.py

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

或者您可以在可推遲模式下定義相同的 operator

tests/system/google/cloud/composer/example_cloud_composer.py

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

檢查 DAG 執行是否完成

您可以使用 sensor 檢查 DAG 執行在您的環境中是否完成,使用: CloudComposerDAGRunSensor

tests/system/google/cloud/composer/example_cloud_composer.py

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
)

或者您可以在可推遲模式下定義相同的 sensor

tests/system/google/cloud/composer/example_cloud_composer.py

defer_dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="defer_dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
    deferrable=True,
)

此條目有幫助嗎?