dbt Cloud 運算子

這些運算子可以執行 dbt Cloud 作業,輪詢當前正在執行的作業狀態,並在本地下載執行工件。

每個運算子都可以透過兩種方式繫結到特定的 dbt Cloud 賬戶

  • 顯式提供賬戶 ID(透過 account_id 引數)給運算子。

  • 或者,在 Airflow 連線中指定 dbt Cloud 賬戶。如果未將賬戶 ID 傳遞給運算子,則運算子將自動回退使用此配置。

觸發 dbt Cloud 作業

使用 DbtCloudRunJobOperator 觸發 dbt Cloud 作業執行。預設情況下,運算子將每隔 check_interval 秒定期檢查執行作業的狀態,直到作業達到 timeout 執行時長或以成功狀態終止。此功能由 wait_for_termination 引數控制。或者,可以將 wait_for_termination 設定為 False 以執行非同步等待(通常與 DbtCloudJobRunSensor 結合使用)。將 wait_for_termination 設定為 False 是處理長時間執行的 dbt Cloud 作業的好方法。

deferrable 引數與 wait_for_termination 引數一起控制是在 worker 上輪詢作業狀態還是使用 Triggerer 進行延遲的功能。當 wait_for_termination 為 True 且 deferrable 為 False 時,我們提交作業並在 worker 上 輪詢 其狀態。這將保持 worker 插槽被佔用直到作業執行完成。當 wait_for_termination 為 True 且 deferrable 為 True 時,我們提交作業並使用 Triggerer 進行 延遲。這將釋放 worker 插槽,從而在作業執行時節省資源利用。

wait_for_termination 為 False 且 deferrable 為 False 時,我們只提交作業,並且只能使用 DbtCloudJobRunSensor 跟蹤作業狀態。

retry_from_failure 為 True 時,如果執行失敗,我們將從失敗點重試該作業的執行。否則,我們將觸發新的執行。有關重試邏輯的更多資訊,請參閱 API 文件

雖然 schema_overridesteps_overrideDbtCloudRunJobOperator 的顯式可選引數,但也可以使用 additional_run_config 字典將自定義執行配置傳遞給運算子。此引數可用於初始化作業執行的其他執行時配置或覆蓋,例如 threads_overridegenerate_docs_overridegit_branch 等。有關可以在執行時使用的其他配置的完整列表,請參閱 API 文件

以下示例分別演示瞭如何例項化 DbtCloudRunJobOperator 任務,使用同步和非同步等待執行終止。請注意,運算子的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

下一個示例還展示瞭如何透過 additional_run_config 字典傳入自定義執行時配置(在本例中用於 threads_override)。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

您也可以在不提供 job_id 的情況下觸發 dbt Cloud 作業。作為替代,您可以透過提供 project_nameenvironment_namejob_name 來標識作業。請注意,這僅在上述三個引數能夠唯一標識您賬戶中的作業時才有效(即,您不能在同一專案和環境中擁有兩個同名的作業)。

tests/system/dbt/cloud/example_dbt_cloud.py

trigger_job_run3 = DbtCloudRunJobOperator(
    task_id="trigger_job_run3",
    project_name="my_dbt_project",
    environment_name="prod",
    job_name="my_dbt_job",
    check_interval=10,
    timeout=300,
)

輪詢 dbt Cloud 作業執行狀態

使用 DbtCloudJobRunSensor 定期檢索 dbt Cloud 作業執行的狀態並檢查執行是否成功。此 Sensor 提供了 BaseSensorOperator 可用的所有相同功能。

在下面的示例中,run_id 值來自先前 DbtCloudRunJobOperator 任務的輸出,利用了所有運算子都公開的 .output 屬性。另外請注意,該任務的 account_id 在示例 DAG 的 default_args 中引用。

tests/system/dbt/cloud/example_dbt_cloud.py

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

此外,您還可以使用 延遲 模式非同步輪詢作業執行狀態。在此模式下,Sensor 執行時會釋放 worker 插槽。

tests/system/dbt/cloud/example_dbt_cloud.py

job_run_sensor_deferred = DbtCloudJobRunSensor(
    task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)

下載執行工件

使用 DbtCloudGetJobRunArtifactOperator 下載 dbt Cloud 作業執行生成的工件。指定的 path 值應以 target/ 目錄為根目錄。典型工件包括 manifest.jsoncatalog.jsonrun_results.json,但也可以下載其他工件,例如模型的原始 SQL 或 sources.json

有關 dbt Cloud 工件的更多資訊,請參閱此文件

tests/system/dbt/cloud/example_dbt_cloud.py

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

列出作業

使用 DbtCloudListJobsOperator 列出與指定 dbt Cloud 賬戶關聯的所有作業。account_id 必須透過連線提供或作為引數提供給任務。

如果提供了 project_id,將只檢索與此專案 ID 相關的作業。

有關列出 dbt Cloud 作業的更多資訊,請參閱此文件

tests/system/dbt/cloud/example_dbt_cloud.py

list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)

此條目有幫助嗎?