Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

使用命令列介面

本文件旨在概述使用 CLI 時的所有常見任務。

注意

有關 CLI 命令的更多資訊,請參閱 命令列介面和環境變數參考

設定 Bash/Zsh 自動補全

當使用 bash(或 zsh)作為 shell 時,airflow 可以使用 argcomplete 進行自動補全。

對於所有啟用 argcomplete 的 Python 應用程式的全域性啟用,執行

sudo activate-global-python-argcomplete

對於永久(非全域性)Airflow 啟用,使用

register-python-argcomplete airflow >> ~/.bashrc

對於僅 Airflow 的 argcomplete 一次性啟用,使用

eval "$(register-python-argcomplete airflow)"
../_images/cli_completion.gif

如果你使用 zsh,將以下內容新增到你的 .zshrc

autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"

建立連線

有關使用 CLI 建立連線的資訊,請參閱 從 CLI 建立連線

將 DAG 結構匯出為圖片

Airflow 允許你將 DAG 結構列印或儲存為圖片。這對於記錄或分享你的 DAG 結構很有用。你需要安裝 Graphviz

例如,將 example_complex DAG 列印到終端

airflow dags show example_complex

這將把渲染後的 DAG 結構以 DOT 格式列印到螢幕上。

支援多種檔案格式。要使用它們,請新增引數 --save [filename].[format]

example_complex DAG 儲存為 PNG 檔案

airflow dags show example_complex --save example_complex.png

這將把以下圖片儲存為一個檔案

../_images/usage_cli_export.png

DAG 示例表示

支援以下檔案格式

  • bmp

  • canon, dot, gv, xdot, xdot1.2, xdot1.4

  • cgimage

  • cmap

  • eps

  • exr

  • fig

  • gd, gd2

  • gif

  • gtk

  • ico

  • imap, cmapx

  • imap_np, cmapx_np

  • ismap

  • jp2

  • jpg, jpeg, jpe

  • json, json0, dot_json, xdot_json

  • pct, pict

  • pdf

  • pic

  • plain, plain-ext

  • png

  • pov

  • ps

  • ps2

  • psd

  • sgi

  • svg, svgz

  • tga

  • tif, tiff

  • tk

  • vml, vmlz

  • vrml

  • wbmp

  • webp

  • xlib

  • x11

預設情況下,Airflow 在 airflow.cfg 檔案 [core] 部分的 dags_folder 選項指定的目錄中查詢 DAG。可以使用 --subdir 引數選擇新目錄。

顯示 DAG 結構

有時你會處理包含複雜依賴關係的 DAG。此時預覽 DAG 以檢查其是否正確會很有幫助。

如果你使用 macOS,可以結合 imgcat 指令碼使用 iTerm2 在控制檯中顯示 DAG 結構。你還需要安裝 Graphviz

其他終端不支援顯示高質量圖形。你可以將圖片轉換為文字形式,但其解析度會讓你無法閱讀。

為此,你應該在 airflow dags show 命令中使用 --imgcat 開關。例如,如果你想顯示 example_bash_operator DAG,可以使用以下命令

airflow dags show example_bash_operator --imgcat

你將看到與下方截圖類似的結果。

../_images/usage_cli_imgcat.png

在 iTerm2 中預覽 DAG

格式化命令輸出

一些 Airflow 命令,例如 airflow dags listairflow tasks states-for-dag-run 支援 --output 標誌,允許使用者更改命令輸出的格式。可能的選項有

  • table - 將資訊渲染為純文字表格

  • simple - 將資訊渲染為簡單表格,可由標準 Linux 工具解析

  • json - 將資訊渲染為 JSON 字串形式

  • yaml - 將資訊渲染為有效的 YAML 形式

jsonyaml 格式都使得使用命令列工具(如 jqyq)處理資料更加容易。例如

airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
  "sd": "2020-11-29T14:53:46.811030+00:00",
  "ed": "2020-11-29T14:53:46.974545+00:00"
}
{
  "sd": "2020-11-29T14:53:56.926441+00:00",
  "ed": "2020-11-29T14:53:57.118781+00:00"
}
{
  "sd": "2020-11-29T14:53:56.915802+00:00",
  "ed": "2020-11-29T14:53:57.125230+00:00"
}
{
  "sd": "2020-11-29T14:53:56.922131+00:00",
  "ed": "2020-11-29T14:53:57.129091+00:00"
}
{
  "sd": "2020-11-29T14:53:56.931243+00:00",
  "ed": "2020-11-29T14:53:57.126306+00:00"
}

清除元資料資料庫中的歷史記錄

注意

強烈建議你在執行 db clean 命令之前備份元資料資料庫。

db clean 命令透過從每個表中刪除早於提供的 --clean-before-timestamp 的記錄來工作。

你可以選擇提供一個要執行刪除操作的表列表。如果未提供表列表,將包含所有表。

可以使用 --dry-run 選項列印要清理的主表中的行數。

預設情況下,db clean 會將清除的行存檔到形如 _airflow_deleted__<table>__<timestamp> 的表中。如果你不想以這種方式保留資料,可以提供引數 --skip-archive

當你在未使用 --skip-archive 的情況下遇到錯誤時,_airflow_deleted__<table>__<timestamp> 仍會存在於資料庫中。可以使用 db drop-archived 命令手動刪除這些表。

從存檔表中匯出已清除的記錄

db export-archived 命令將 db clean 命令建立的存檔表內容匯出為指定格式,預設為 CSV 檔案。匯出的檔案將包含在 db clean 過程中從主表中清除的記錄。

可以使用 --export-format 選項指定匯出格式。預設格式為 csv,目前也是唯一支援的格式。

還必須使用 --output-path 選項指定要匯出資料到的路徑位置。該位置必須存在。

其他選項包括:使用 --tables 指定要匯出的表,使用 --drop-archives 在匯出後刪除存檔表。

刪除存檔表

如果在 db clean 過程中未使用刪除存檔表的 --skip-archive 選項,仍然可以使用 db drop-archived 命令刪除存檔表。此操作不可逆,建議在刪除表之前使用 db export-archived 命令將表備份到磁碟。

可以使用 --tables 選項指定要刪除的表。如果未指定表,將刪除所有存檔表。

警惕級聯刪除

請記住,有些表定義了帶有 ON DELETE CASCADE 的外部索引鍵關係,因此在一個表中的刪除可能會觸發其他表中的刪除。例如,task_instance 表與 dag_run 表關聯,因此如果一個 DagRun 記錄被刪除,所有關聯的任務例項也將被刪除。

DAG 執行的特殊處理

通常,Airflow 透過查詢最新的 DagRun 來確定下一個要執行的 DagRun。如果你刪除所有 DAG 執行,Airflow 可能會排程一個已完成的舊 DagRun,例如如果你設定了 catchup=True。因此,db clean 命令將保留最新的非手動觸發的 DAG 執行,以保持排程的連續性。

可回填 DAG 的注意事項

並非所有 DAG 都設計用於 Airflow 的回填命令。但對於那些適用的 DAG,需要特別注意。如果你刪除 DAG 執行,並且在包含已刪除 DAG 執行的日期範圍內執行回填,這些執行將被重新建立並再次執行。因此,如果你的 DAG 屬於此類,你可能希望避免刪除 DAG 執行,而只清理其他大型表,例如任務例項和日誌等。

升級 Airflow

執行 airflow db migrate --help 獲取使用詳情。

手動執行遷移

如果需要,可以為升級生成 SQL 語句,並手動逐個應用每個升級遷移。為此,可以在 db migrate 命令中使用 --range(針對 Airflow 版本)或 --revision-range(針對 Alembic revision)選項。切勿跳過執行 Alembic revision ID 更新命令;Airflow 透過此資訊知道下次需要從何處升級。有關 revision 和版本之間的對映,請參閱 資料庫遷移參考

降級 Airflow

注意

建議在執行 db downgrade 或任何其他資料庫操作之前備份資料庫。

可以使用 db downgrade 命令降級到特定的 Airflow 版本。或者,可以提供一個 Alembic revision ID 進行降級。

如果只想預覽命令而不執行它們,請使用 --show-sql-only 選項。

選項 --from-revision--from-version 只能與 --show-sql-only 選項結合使用,因為在實際執行遷移時,我們應該總是從當前 revision 降級。

有關 Airflow 版本和 Alembic revision 之間的對映,請參閱 資料庫遷移參考

注意

強烈建議你在完成 Airflow 環境降級後(即在你降級了 Python 環境中安裝的 Airflow 版本後,而不是立即在降級資料庫後)使用 dags reserialize 命令重新序列化你的 DAG。這是為了確保序列化的 DAG 與降級後的 Airflow 版本相容。

匯出連線

可以使用 CLI 從資料庫匯出連線。支援的檔案格式有 jsonyamlenv

可以將目標檔案指定為引數

airflow connections export connections.json

或者可以指定 file-format 引數來覆蓋檔案格式

airflow connections export /tmp/connections --file-format yaml

也可以指定 - 表示輸出到標準輸出 (STDOUT)

airflow connections export -

JSON 格式包含一個物件,其中鍵是連線 ID,值是連線的定義。在這種格式中,連線被定義為一個 JSON 物件。以下是一個示例 JSON 檔案。

{
  "airflow_db": {
    "conn_type": "mysql",
    "host": "mysql",
    "login": "root",
    "password": "plainpassword",
    "schema": "airflow",
    "port": null,
    "extra": null
  },
  "druid_broker_default": {
    "conn_type": "druid",
    "host": "druid-broker",
    "login": null,
    "password": null,
    "schema": null,
    "port": 8082,
    "extra": "{\"endpoint\": \"druid/v2/sql\"}"
  }
}

YAML 檔案結構與 JSON 類似。它是一個由連線 ID 和一個或多個連線定義組成的鍵值對。在這種格式中,連線被定義為一個 YAML 物件。以下是一個示例 YAML 檔案。

airflow_db:
  conn_type: mysql
  extra: null
  host: mysql
  login: root
  password: plainpassword
  port: null
  schema: airflow
druid_broker_default:
  conn_type: druid
  extra: '{"endpoint": "druid/v2/sql"}'
  host: druid-broker
  login: null
  password: null
  port: 8082
  schema: null

還可以以 .env 格式匯出連線。鍵是連線 ID,值是連線的序列化表示,可以使用 Airflow 的 Connection URI 格式或 JSON 格式。要使用 JSON,請提供選項 --serialization-format=json,否則將使用 Airflow Connection URI 格式。以下是使用這兩種格式的示例 .env 檔案。

URI 示例

airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql

JSON 示例輸出

airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}

測試 DAG 匯入錯誤

CLI 可用於透過 list-import-errors 子命令檢查發現的任何 DAG 是否存在匯入錯誤。可以建立一個自動化步驟,透過檢查命令輸出(特別是在與 --output 結合使用生成標準檔案格式時),如果任何 DAG 無法匯入則使其失敗。例如,沒有錯誤時的預設輸出是 No data found,而 json 輸出是 []。然後可以在 CI 或 pre-commit 中執行此檢查,以加快評審和測試過程。

使用 jq 解析輸出,當存在任何錯誤時會失敗的示例命令

airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'

該行可以直接新增到自動化流程中,或者如果你想列印輸出,可以使用 tee 命令

airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt

Jenkins pipeline 中的示例

stage('All dags are loadable') {
    steps {
        sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
    }
}

注意

為了使其準確工作,必須確保 Airflow 不會向標準輸出 (stdout) 記錄任何額外文字。例如,你可能需要修復任何棄用警告,向命令新增 2>/dev/null,或者如果你有在載入時生成日誌的外掛,則在 Airflow 配置中設定 lazy_load_plugins = True

本條目有幫助嗎?