dbt Cloud 運算子¶
這些運算子可以執行 dbt Cloud 作業,輪詢當前正在執行的作業狀態,並在本地下載執行工件。
每個運算子都可以透過兩種方式繫結到特定的 dbt Cloud 賬戶
顯式提供賬戶 ID(透過
account_id引數)給運算子。或者,在 Airflow 連線中指定 dbt Cloud 賬戶。如果未將賬戶 ID 傳遞給運算子,則運算子將自動回退使用此配置。
觸發 dbt Cloud 作業¶
使用 DbtCloudRunJobOperator 觸發 dbt Cloud 作業執行。預設情況下,運算子將每隔 check_interval 秒定期檢查執行作業的狀態,直到作業達到 timeout 執行時長或以成功狀態終止。此功能由 wait_for_termination 引數控制。或者,可以將 wait_for_termination 設定為 False 以執行非同步等待(通常與 DbtCloudJobRunSensor 結合使用)。將 wait_for_termination 設定為 False 是處理長時間執行的 dbt Cloud 作業的好方法。
deferrable 引數與 wait_for_termination 引數一起控制是在 worker 上輪詢作業狀態還是使用 Triggerer 進行延遲的功能。當 wait_for_termination 為 True 且 deferrable 為 False 時,我們提交作業並在 worker 上 輪詢 其狀態。這將保持 worker 插槽被佔用直到作業執行完成。當 wait_for_termination 為 True 且 deferrable 為 True 時,我們提交作業並使用 Triggerer 進行 延遲。這將釋放 worker 插槽,從而在作業執行時節省資源利用。
當 wait_for_termination 為 False 且 deferrable 為 False 時,我們只提交作業,並且只能使用 DbtCloudJobRunSensor 跟蹤作業狀態。
當 retry_from_failure 為 True 時,如果執行失敗,我們將從失敗點重試該作業的執行。否則,我們將觸發新的執行。有關重試邏輯的更多資訊,請參閱 API 文件。
雖然 schema_override 和 steps_override 是 DbtCloudRunJobOperator 的顯式可選引數,但也可以使用 additional_run_config 字典將自定義執行配置傳遞給運算子。此引數可用於初始化作業執行的其他執行時配置或覆蓋,例如 threads_override、generate_docs_override、git_branch 等。有關可以在執行時使用的其他配置的完整列表,請參閱 API 文件。
以下示例分別演示瞭如何例項化 DbtCloudRunJobOperator 任務,使用同步和非同步等待執行終止。請注意,運算子的 account_id 在示例 DAG 的 default_args 中引用。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
下一個示例還展示瞭如何透過 additional_run_config 字典傳入自定義執行時配置(在本例中用於 threads_override)。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
您也可以在不提供 job_id 的情況下觸發 dbt Cloud 作業。作為替代,您可以透過提供 project_name、environment_name 和 job_name 來標識作業。請注意,這僅在上述三個引數能夠唯一標識您賬戶中的作業時才有效(即,您不能在同一專案和環境中擁有兩個同名的作業)。
tests/system/dbt/cloud/example_dbt_cloud.py
trigger_job_run3 = DbtCloudRunJobOperator(
task_id="trigger_job_run3",
project_name="my_dbt_project",
environment_name="prod",
job_name="my_dbt_job",
check_interval=10,
timeout=300,
)
輪詢 dbt Cloud 作業執行狀態¶
使用 DbtCloudJobRunSensor 定期檢索 dbt Cloud 作業執行的狀態並檢查執行是否成功。此 Sensor 提供了 BaseSensorOperator 可用的所有相同功能。
在下面的示例中,run_id 值來自先前 DbtCloudRunJobOperator 任務的輸出,利用了所有運算子都公開的 .output 屬性。另外請注意,該任務的 account_id 在示例 DAG 的 default_args 中引用。
tests/system/dbt/cloud/example_dbt_cloud.py
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
此外,您還可以使用 延遲 模式非同步輪詢作業執行狀態。在此模式下,Sensor 執行時會釋放 worker 插槽。
tests/system/dbt/cloud/example_dbt_cloud.py
job_run_sensor_deferred = DbtCloudJobRunSensor(
task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)
下載執行工件¶
使用 DbtCloudGetJobRunArtifactOperator 下載 dbt Cloud 作業執行生成的工件。指定的 path 值應以 target/ 目錄為根目錄。典型工件包括 manifest.json、catalog.json 和 run_results.json,但也可以下載其他工件,例如模型的原始 SQL 或 sources.json。
有關 dbt Cloud 工件的更多資訊,請參閱此文件。
tests/system/dbt/cloud/example_dbt_cloud.py
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)
列出作業¶
使用 DbtCloudListJobsOperator 列出與指定 dbt Cloud 賬戶關聯的所有作業。account_id 必須透過連線提供或作為引數提供給任務。
如果提供了 project_id,將只檢索與此專案 ID 相關的作業。
有關列出 dbt Cloud 作業的更多資訊,請參閱此文件。
tests/system/dbt/cloud/example_dbt_cloud.py
list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)