Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊享受早鳥票優惠!

除錯 Airflow DAGs

使用 dag.test() 測試 DAGs

要在 IDE 中除錯 DAGs,您可以設定 dag.test 命令到您的 DAG 檔案中,並在一個序列化的 Python 程序中執行您的 DAG。

這種方法可以與任何受支援的資料庫(包括本地 SQLite 資料庫)一起使用,並且會 快速失敗,因為所有任務都在一個程序中執行。

要設定 dag.test,將這兩行新增到您的 DAG 檔案底部

if __name__ == "__main__":
    dag.test()

就是這樣!您可以新增可選引數來微調測試,否則您可以根據需要執行或除錯 DAGs。以下是一些引數示例

  • execution_date 如果您想測試特定引數的 DAG 執行

  • use_executor 如果您想使用 executor 測試 DAG。預設情況下,dag.test 在沒有 executor 的情況下執行 DAG,它只在本地執行所有任務。透過提供此引數,DAG 將使用 Airflow 環境中配置的 executor 執行。

有條件地跳過任務

如果您不想在本地環境中執行某些任務子集(例如依賴檢查 sensors 或清理步驟),您可以自動將它們標記為成功,方法是在 mark_success_pattern 引數中提供一個匹配它們 task_id 的模式。

在以下示例中,測試該 DAG 不會等待任何上游 DAG 完成。而是手動注入測試資料。清理步驟也被跳過,使得中間生成的 CSV 可供檢查。

with DAG("example_dag", default_args=default_args) as dag:
    sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
    sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
    collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
    # ... run other tasks
    cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])

    [sensor, sensor2] >> collect_stats >> cleanup

if __name__ == "__main__":
    ingest_testing_data()
    run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
    print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")

在命令列中除錯 Airflow DAGs

透過與上面提到的相同的兩行新增,您現在也可以輕鬆地使用 pdb 除錯 DAG。執行 python -m pdb <path to dag file>.py 以獲得命令列上的互動式除錯體驗。

root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>

IDE 設定步驟

  1. 新增 main 塊到您的 DAG 檔案末尾,使其可執行。

if __name__ == "__main__":
    dag.test()
  1. 執行 / 除錯 DAG 檔案。

此條目有幫助嗎?