除錯 Airflow DAGs¶
使用 dag.test() 測試 DAGs¶
要在 IDE 中除錯 DAGs,您可以設定 dag.test 命令到您的 DAG 檔案中,並在一個序列化的 Python 程序中執行您的 DAG。
這種方法可以與任何受支援的資料庫(包括本地 SQLite 資料庫)一起使用,並且會 快速失敗,因為所有任務都在一個程序中執行。
要設定 dag.test,將這兩行新增到您的 DAG 檔案底部
if __name__ == "__main__":
dag.test()
就是這樣!您可以新增可選引數來微調測試,否則您可以根據需要執行或除錯 DAGs。以下是一些引數示例
execution_date如果您想測試特定引數的 DAG 執行use_executor如果您想使用 executor 測試 DAG。預設情況下,dag.test在沒有 executor 的情況下執行 DAG,它只在本地執行所有任務。透過提供此引數,DAG 將使用 Airflow 環境中配置的 executor 執行。
有條件地跳過任務¶
如果您不想在本地環境中執行某些任務子集(例如依賴檢查 sensors 或清理步驟),您可以自動將它們標記為成功,方法是在 mark_success_pattern 引數中提供一個匹配它們 task_id 的模式。
在以下示例中,測試該 DAG 不會等待任何上游 DAG 完成。而是手動注入測試資料。清理步驟也被跳過,使得中間生成的 CSV 可供檢查。
with DAG("example_dag", default_args=default_args) as dag:
sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
# ... run other tasks
cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])
[sensor, sensor2] >> collect_stats >> cleanup
if __name__ == "__main__":
ingest_testing_data()
run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")
在命令列中除錯 Airflow DAGs¶
透過與上面提到的相同的兩行新增,您現在也可以輕鬆地使用 pdb 除錯 DAG。執行 python -m pdb <path to dag file>.py 以獲得命令列上的互動式除錯體驗。
root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>
IDE 設定步驟
新增
main塊到您的 DAG 檔案末尾,使其可執行。
if __name__ == "__main__":
dag.test()
執行 / 除錯 DAG 檔案。