Airflow Summit 2025 即將於 10 月 07 日至 09 日舉行。立即註冊獲取早鳥票!

設定資料庫後端

Airflow 構建用於使用 SqlAlchemy 與其元資料進行互動。

以下文件描述了資料庫引擎配置,使用 Airflow 所需的配置更改,以及連線到這些資料庫的 Airflow 配置更改。

選擇資料庫後端

如果您想真正試用 Airflow,應考慮將資料庫後端設定為 PostgreSQLMySQL。預設情況下,Airflow 使用 SQLite,它僅用於開發目的。

Airflow 支援以下資料庫引擎版本,請確保您擁有哪個版本。舊版本可能不支援所有 SQL 語句。

如果您計劃執行多個排程器,則必須滿足額外要求。有關詳細資訊,請參閱排程器高可用性資料庫要求

警告

儘管 MariaDB 和 MySQL 有很大相似之處,我們 **不** 支援將 MariaDB 作為 Airflow 的後端。MariaDB 和 MySQL 之間存在已知問題(例如索引處理),我們也不在 MariaDB 上測試我們的遷移指令碼或應用程式執行。我們知道有些人曾將 MariaDB 用於 Airflow,這給他們帶來了很多操作上的麻煩,因此我們 **強烈不建議** 嘗試使用 MariaDB 作為後端,使用者也無法期待獲得任何社群支援,因為嘗試將 MariaDB 用於 Airflow 的使用者數量非常少。

資料庫 URI

Airflow 使用 SQLAlchemy 連線到資料庫,這需要您配置資料庫 URL。您可以在 [database] 部分的選項 sql_alchemy_conn 中進行配置。通常也可以使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 環境變數來配置此選項。

注意

有關設定配置的更多資訊,請參閱設定配置選項

如果您想檢查當前值,可以使用 airflow config get-value database sql_alchemy_conn 命令,如下例所示。

$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db

具體格式描述請參見 SQLAlchemy 文件,請參閱 資料庫 URL。我們還會在下方展示一些示例。

設定 SQLite 資料庫

SQLite 資料庫可用於開發目的的 Airflow 執行,因為它不需要任何資料庫伺服器(資料庫儲存在本地檔案中)。使用 SQLite 資料庫有很多限制,您可以輕鬆地線上找到,並且 **絕不** 應將其用於生產環境。

執行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本是 3.15.0。一些較舊的系統預設安裝了早期版本的 sqlite,對於這些系統,您需要手動升級 SQLite 以使用 3.15.0 以上的版本。請注意,這不是 python library 的版本,而是需要升級的 SQLite 系統級應用程式。SQLite 的安裝方式多種多樣,您可以在 SQLite 官方網站以及針對您作業系統發行版的文件中找到相關資訊。

故障排除

有時,即使您將 SQLite 升級到更高版本並且您的本地 python 報告了更高版本,Airflow 使用的 python 直譯器可能仍然使用用於啟動 Airflow 的 python 直譯器設定的 LD_LIBRARY_PATH 中提供的舊版本。

您可以透過執行此檢查來確定直譯器使用了哪個版本:

root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>

但請注意,為您的 Airflow 部署設定環境變數可能會改變首先找到的 SQLite 庫,因此您可能需要確保系統中安裝的 SQLite 版本足夠高,並且是唯一安裝的版本。

sqlite 資料庫的 URI 示例

sqlite:////home/airflow/airflow.db

在 AmazonLinux AMI 或容器映象上升級 SQLite

AmazonLinux SQLite 只能使用原始碼倉庫升級到 v3.7。Airflow 需要 v3.15 或更高版本。使用以下說明來設定具有最新 SQLite3 的基礎映象(或 AMI)

先決條件:您需要 wget, tar, gzip, gcc, makeexpect 來使升級過程正常工作。

yum -y install wget tar gzip gcc make expect

https://sqlite.org/ 下載原始碼,進行編譯和本地安裝。

wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
    -DSQLITE_ENABLE_FTS3_PARENTHESIS \
    -DSQLITE_ENABLE_FTS4 \
    -DSQLITE_ENABLE_FTS5 \
    -DSQLITE_ENABLE_JSON1 \
    -DSQLITE_ENABLE_LOAD_EXTENSION \
    -DSQLITE_ENABLE_RTREE \
    -DSQLITE_ENABLE_STAT4 \
    -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
    -DSQLITE_SOUNDEX \
    -DSQLITE_TEMP_STORE=3 \
    -DSQLITE_USE_URI \
    -O2 \
    -fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install

安裝後將 /usr/local/lib 新增到庫路徑

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

設定 PostgreSQL 資料庫

您需要建立一個數據庫以及 Airflow 將用於訪問此資料庫的資料庫使用者。在下面的示例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者:

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

-- PostgreSQL 15 requires additional privileges:
-- Note: Connect to the airflow_db database before running the following GRANT statement
-- You can do this in psql with: \c airflow_db
GRANT ALL ON SCHEMA public TO airflow_user;

注意

資料庫必須使用 UTF-8 字元集

您可能需要更新您的 Postgres pg_hba.conf 檔案,將 airflow 使用者新增到資料庫訪問控制列表;並重新載入資料庫配置以載入您的更改。有關更多資訊,請參閱 Postgres 文件中的pg_hba.conf 檔案

警告

當您使用 SQLAlchemy 1.4.0+ 時,需要在 sql_alchemy_conn 中使用 postgresql:// 作為資料庫。在早期版本的 SQLAlchemy 中可以使用 postgres://,但在 SQLAlchemy 1.4.0+ 中使用它會導致:

>       raise exc.NoSuchModuleError(
            "Can't load plugin: %s:%s" % (self.group, name)
        )
E       sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres

如果您無法立即更改 URL 字首,Airflow 仍可與 SQLAlchemy 1.3 一起工作,並且您可以降級 SQLAlchemy,但我們建議更新字首。

詳細資訊請參見 SQLAlchemy 更新日誌

我們建議使用 psycopg2 驅動並在您的 SqlAlchemy 連線字串中指定它。

postgresql+psycopg2://<user>:<password>@<host>/<db>

另請注意,由於 SqlAlchemy 未提供在資料庫 URI 中指定特定 schema 的方式,您需要確保 schema public 包含在您的 Postgres 使用者的 search_path 中。

如果您為 Airflow 建立了一個新的 Postgres 賬戶

  • 新 Postgres 使用者的預設 search_path 為:"$user", public,無需更改。

如果您使用具有自定義 search_path 的現有 Postgres 使用者,可以透過命令更改 search_path:

ALTER USER airflow_user SET search_path = public;

有關設定 PostgreSQL 連線的更多資訊,請參閱 SQLAlchemy 文件中的PostgreSQL 方言

注意

眾所周知,Airflow(尤其是在高效能設定中)會開啟許多連線到元資料資料庫。這可能會導致 Postgres 資源使用問題,因為在 Postgres 中,每個連線都會建立一個新程序,當開啟大量連線時,這會使 Postgres 資源消耗巨大。因此,我們建議在所有 Postgres 生產安裝中使用 PGBouncer 作為資料庫代理。PGBouncer 可以處理來自多個元件的連線池,而且如果您有一個連線可能不穩定的遠端資料庫,它將使您的資料庫連線對臨時網路問題更具彈性。PGBouncer 部署的示例實現可以在Apache Airflow 的 Helm Chart 中找到,您可以透過翻轉一個布林標誌來啟用預配置的 PGBouncer 例項。您可以參考我們在那裡採用的方法,並在準備自己的部署時將其作為參考,即使您不使用官方的 Helm Chart。

另請參閱Helm Chart 生產指南

注意

對於託管的 Postgres 服務,如 Azure Postgresql、CloudSQL、Amazon RDS,您應該在連線引數中使用 keepalives_idle 並將其設定為小於空閒時間,因為這些服務會在一段時間不活動後(通常是 300 秒)關閉空閒連線,從而導致錯誤 The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detected。可以透過 [database] 部分中的 sql_alchemy_connect_args 配置引數 配置參考 來更改 keepalive 設定。您可以在例如您的 local_settings.py 檔案中配置這些引數,sql_alchemy_connect_args 應該是儲存配置引數的字典的完整匯入路徑。您可以閱讀有關 Postgres Keepalives 的資訊。一個已被觀察到能解決問題的 keepalives 設定示例可能是:

keepalive_kwargs = {
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 5,
    "keepalives_count": 5,
}

然後,如果將其放置在 airflow_local_settings.py 中,配置匯入路徑將是:

sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs

有關如何配置本地設定的詳細資訊,請參閱配置本地設定

設定 MySQL 資料庫

您需要建立一個數據庫以及 Airflow 將用於訪問此資料庫的資料庫使用者。在下面的示例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者:

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

注意

資料庫必須使用 UTF-8 字元集。您必須注意的一個小問題是,較新版本 MySQL 中的 utf8 實際上是 utf8mb4,這會導致 Airflow 索引變得過大(請參閱https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此,自 Airflow 2.2 起,所有 MySQL 資料庫的 sql_engine_collation_for_ids 都自動設定為 utf8mb3_bin(除非您覆蓋它)。這可能導致 Airflow 資料庫中 ID 欄位的 collation id 混合,但由於 Airflow 中所有相關的 ID 僅使用 ASCII 字元,因此沒有負面影響。

為了擁有合理的預設設定,我們依賴於 MySQL 更嚴格的 ANSI SQL 設定。請確保在 my.cnf 檔案中的 [mysqld] 部分指定了 explicit_defaults_for_timestamp=1 選項。您還可以透過傳遞給 mysqld 可執行檔案的 --explicit-defaults-for-timestamp 開關來啟用這些選項:

我們建議使用 mysqlclient 驅動並在您的 SqlAlchemy 連線字串中指定它。

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

重要提示

MySQL 後端的整合僅在 Apache Airflow 的持續整合 (CI) 過程中使用 mysqlclient 驅動進行了驗證。

如果您想使用其他驅動,請訪問 SQLAlchemy 文件中的MySQL 方言,以獲取有關下載和設定 SqlAlchemy 連線的更多資訊。

此外,您還應特別注意 MySQL 的編碼。儘管 utf8mb4 字元集在 MySQL 中越來越流行(實際上,utf8mb4 在 MySQL 8.0 中成為了預設字元集),但在 Airflow 2+ 中使用 utf8mb4 編碼需要額外設定(詳情請參閱#7570)。如果您使用 utf8mb4 作為字元集,您還應設定 sql_engine_collation_for_ids=utf8mb3_bin

注意

在嚴格模式下,MySQL 不允許將 0000-00-00 作為有效日期。在這種情況下,您可能會遇到類似 "Invalid default value for 'end_date'" 的錯誤(某些 Airflow 表使用 0000-00-00 00:00:00 作為時間戳欄位的預設值)。為了避免此錯誤,您可以在您的 MySQL 伺服器上停用 NO_ZERO_DATE 模式。請閱讀https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field 瞭解如何停用它。有關更多資訊,請參閱SQL Mode - NO_ZERO_DATE

MsSQL 資料庫

警告

經過討論投票過程,Airflow 的 PMC 成員和提交者已達成決議,不再將 MsSQL 作為受支援的資料庫後端進行維護。

自 Airflow 2.9.0 起,已移除對 MsSQL 作為 Airflow 資料庫後端的支援。這不影響現有的 providers (operators 和 hooks),dags 仍然可以訪問和處理來自 MsSQL 的資料。但是,進一步使用可能會丟擲錯誤,導致 Airflow 的核心功能無法使用。

從 MsSQL Server 遷移

由於 Airflow 2.9.0 已終止對 MSSQL 的支援,因此可以使用遷移指令碼幫助 Airflow 2.7.x 或 2.8.x 版本從 SQL-Server 遷移出來。該遷移指令碼位於 Github 上的airflow-mssql-migration 倉庫中。

請注意,此遷移指令碼不提供支援和擔保。

其他配置選項

有更多配置選項可用於配置 SQLAlchemy 行為。有關詳細資訊,請參閱 [database] 部分中 sqlalchemy_* 選項的參考文件

例如,您可以指定 Airflow 建立所需表的資料庫 schema。如果您希望 Airflow 將其表安裝在 PostgreSQL 資料庫的 airflow schema 中,請指定以下環境變數:

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

請注意 SQL_ALCHEMY_CONN 資料庫 URL 末尾的 search_path

初始化資料庫

在 Airflow 配置中配置資料庫並連線到它之後,您應該建立資料庫 schema。

airflow db migrate

Airflow 中的資料庫監控和維護

Airflow 廣泛使用關係型元資料資料庫進行任務排程和執行。監控和正確配置此資料庫對於 Airflow 的最佳效能至關重要。

關鍵考量

  1. 效能影響:長或過多的查詢會顯著影響 Airflow 的功能。這可能由工作流特性、缺乏最佳化或程式碼錯誤引起。

  2. 資料庫統計資訊:資料庫引擎做出不正確的最佳化決策(通常是由於資料統計資訊過時)會降低效能。

職責

在 Airflow 環境中,資料庫監控和維護的職責因您是使用自管資料庫和 Airflow 例項還是選擇託管服務而異。

自管環境:

在資料庫和 Airflow 都是自管的設定中,部署經理負責設定、配置和維護資料庫。這包括監控其效能、管理備份、定期清理以及確保其與 Airflow 的最佳執行。

託管服務:

  • 託管資料庫服務:使用託管資料庫服務時,許多維護任務(如備份、補丁和基本監控)由服務提供商處理。但是,部署經理仍然需要監督 Airflow 的配置,最佳化針對其工作流的效能設定,管理定期清理,並監控其資料庫以確保與 Airflow 的最佳執行。

  • 託管 Airflow 服務:使用託管 Airflow 服務時,服務提供商負責 Airflow 及其資料庫的配置和維護。但是,部署經理需要與服務配置方協作,以確保大小和工作流要求與託管服務的大小和配置匹配。

監控方面

定期監控應包括:

  • CPU、I/O 和記憶體使用。

  • 查詢頻率和數量。

  • 識別和記錄慢查詢或長時間執行的查詢。

  • 檢測低效的查詢執行計劃。

  • 分析磁碟交換與記憶體使用以及快取交換頻率。

工具和策略

  • Airflow 不直接提供資料庫監控工具。

  • 使用伺服器端監控和日誌記錄來獲取指標。

  • 根據定義的閾值啟用長時間執行查詢的跟蹤。

  • 定期執行內部維護任務(如 ANALYZE SQL 命令)進行維護。

資料庫清理工具

  • Airflow DB Clean 命令:利用 airflow db clean 命令來幫助管理和清理您的資料庫。

  • ``airflow.utils.db_cleanup`` 中的 Python 方法:此模組提供了額外的 Python 方法用於資料庫清理和維護,為特定需求提供更細粒度的控制和自定義。

建議

  • 主動監控:在生產環境中實施監控和日誌記錄,同時不顯著影響效能。

  • 資料庫特定指南:查閱所選資料庫的文件,獲取具體的監控設定說明。

  • 託管資料庫服務:檢查您的資料庫提供商是否提供自動維護任務。

SQLAlchemy 日誌記錄

為了進行詳細的查詢分析,啟用 SQLAlchemy 客戶端日誌記錄(在 SQLAlchemy 引擎配置中設定 echo=True)。

  • 此方法更具侵擾性,可能影響 Airflow 的客戶端效能。

  • 它會生成大量日誌,尤其是在繁忙的 Airflow 環境中。

  • 適用於非生產環境,如預發系統。

您可以按照SQLAlchemy 日誌記錄文件中的說明,將 echo=True 作為 sqlalchemy 引擎配置進行設定。

使用 sql_alchemy_engine_args 配置引數將 echo 引數設定為 True。

注意

  • 啟用大量日誌記錄時,請注意對 Airflow 效能和系統資源的影響。

  • 對於生產環境,優先使用伺服器端監控而非客戶端日誌記錄,以最大程度地減少性能干擾。

下一步是什麼?

預設情況下,Airflow 使用 LocalExecutor。您應考慮配置不同的executor以獲得更好的效能。

此條目有幫助嗎?