設定資料庫後端¶
Airflow 構建用於使用 SqlAlchemy 與其元資料進行互動。
以下文件描述了資料庫引擎配置,使用 Airflow 所需的配置更改,以及連線到這些資料庫的 Airflow 配置更改。
選擇資料庫後端¶
如果您想真正試用 Airflow,應考慮將資料庫後端設定為 PostgreSQL 或 MySQL。預設情況下,Airflow 使用 SQLite,它僅用於開發目的。
Airflow 支援以下資料庫引擎版本,請確保您擁有哪個版本。舊版本可能不支援所有 SQL 語句。
PostgreSQL: 12, 13, 14, 15, 16
MySQL: 8.0, 創新版 (Innovation)
SQLite: 3.15.0+
如果您計劃執行多個排程器,則必須滿足額外要求。有關詳細資訊,請參閱排程器高可用性資料庫要求。
警告
儘管 MariaDB 和 MySQL 有很大相似之處,我們 **不** 支援將 MariaDB 作為 Airflow 的後端。MariaDB 和 MySQL 之間存在已知問題(例如索引處理),我們也不在 MariaDB 上測試我們的遷移指令碼或應用程式執行。我們知道有些人曾將 MariaDB 用於 Airflow,這給他們帶來了很多操作上的麻煩,因此我們 **強烈不建議** 嘗試使用 MariaDB 作為後端,使用者也無法期待獲得任何社群支援,因為嘗試將 MariaDB 用於 Airflow 的使用者數量非常少。
資料庫 URI¶
Airflow 使用 SQLAlchemy 連線到資料庫,這需要您配置資料庫 URL。您可以在 [database] 部分的選項 sql_alchemy_conn 中進行配置。通常也可以使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 環境變數來配置此選項。
注意
有關設定配置的更多資訊,請參閱設定配置選項。
如果您想檢查當前值,可以使用 airflow config get-value database sql_alchemy_conn 命令,如下例所示。
$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db
具體格式描述請參見 SQLAlchemy 文件,請參閱 資料庫 URL。我們還會在下方展示一些示例。
設定 SQLite 資料庫¶
SQLite 資料庫可用於開發目的的 Airflow 執行,因為它不需要任何資料庫伺服器(資料庫儲存在本地檔案中)。使用 SQLite 資料庫有很多限制,您可以輕鬆地線上找到,並且 **絕不** 應將其用於生產環境。
執行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本是 3.15.0。一些較舊的系統預設安裝了早期版本的 sqlite,對於這些系統,您需要手動升級 SQLite 以使用 3.15.0 以上的版本。請注意,這不是 python library 的版本,而是需要升級的 SQLite 系統級應用程式。SQLite 的安裝方式多種多樣,您可以在 SQLite 官方網站以及針對您作業系統發行版的文件中找到相關資訊。
故障排除
有時,即使您將 SQLite 升級到更高版本並且您的本地 python 報告了更高版本,Airflow 使用的 python 直譯器可能仍然使用用於啟動 Airflow 的 python 直譯器設定的 LD_LIBRARY_PATH 中提供的舊版本。
您可以透過執行此檢查來確定直譯器使用了哪個版本:
root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>
但請注意,為您的 Airflow 部署設定環境變數可能會改變首先找到的 SQLite 庫,因此您可能需要確保系統中安裝的 SQLite 版本足夠高,並且是唯一安裝的版本。
sqlite 資料庫的 URI 示例
sqlite:////home/airflow/airflow.db
在 AmazonLinux AMI 或容器映象上升級 SQLite
AmazonLinux SQLite 只能使用原始碼倉庫升級到 v3.7。Airflow 需要 v3.15 或更高版本。使用以下說明來設定具有最新 SQLite3 的基礎映象(或 AMI)
先決條件:您需要 wget, tar, gzip, gcc, make 和 expect 來使升級過程正常工作。
yum -y install wget tar gzip gcc make expect
從 https://sqlite.org/ 下載原始碼,進行編譯和本地安裝。
wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
-DSQLITE_ENABLE_FTS3_PARENTHESIS \
-DSQLITE_ENABLE_FTS4 \
-DSQLITE_ENABLE_FTS5 \
-DSQLITE_ENABLE_JSON1 \
-DSQLITE_ENABLE_LOAD_EXTENSION \
-DSQLITE_ENABLE_RTREE \
-DSQLITE_ENABLE_STAT4 \
-DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
-DSQLITE_SOUNDEX \
-DSQLITE_TEMP_STORE=3 \
-DSQLITE_USE_URI \
-O2 \
-fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install
安裝後將 /usr/local/lib 新增到庫路徑
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
設定 PostgreSQL 資料庫¶
您需要建立一個數據庫以及 Airflow 將用於訪問此資料庫的資料庫使用者。在下面的示例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者:
CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
-- Note: Connect to the airflow_db database before running the following GRANT statement
-- You can do this in psql with: \c airflow_db
GRANT ALL ON SCHEMA public TO airflow_user;
注意
資料庫必須使用 UTF-8 字元集
您可能需要更新您的 Postgres pg_hba.conf 檔案,將 airflow 使用者新增到資料庫訪問控制列表;並重新載入資料庫配置以載入您的更改。有關更多資訊,請參閱 Postgres 文件中的pg_hba.conf 檔案。
警告
當您使用 SQLAlchemy 1.4.0+ 時,需要在 sql_alchemy_conn 中使用 postgresql:// 作為資料庫。在早期版本的 SQLAlchemy 中可以使用 postgres://,但在 SQLAlchemy 1.4.0+ 中使用它會導致:
> raise exc.NoSuchModuleError(
"Can't load plugin: %s:%s" % (self.group, name)
)
E sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres
如果您無法立即更改 URL 字首,Airflow 仍可與 SQLAlchemy 1.3 一起工作,並且您可以降級 SQLAlchemy,但我們建議更新字首。
詳細資訊請參見 SQLAlchemy 更新日誌。
我們建議使用 psycopg2 驅動並在您的 SqlAlchemy 連線字串中指定它。
postgresql+psycopg2://<user>:<password>@<host>/<db>
另請注意,由於 SqlAlchemy 未提供在資料庫 URI 中指定特定 schema 的方式,您需要確保 schema public 包含在您的 Postgres 使用者的 search_path 中。
如果您為 Airflow 建立了一個新的 Postgres 賬戶
新 Postgres 使用者的預設 search_path 為:
"$user", public,無需更改。
如果您使用具有自定義 search_path 的現有 Postgres 使用者,可以透過命令更改 search_path:
ALTER USER airflow_user SET search_path = public;
有關設定 PostgreSQL 連線的更多資訊,請參閱 SQLAlchemy 文件中的PostgreSQL 方言。
注意
眾所周知,Airflow(尤其是在高效能設定中)會開啟許多連線到元資料資料庫。這可能會導致 Postgres 資源使用問題,因為在 Postgres 中,每個連線都會建立一個新程序,當開啟大量連線時,這會使 Postgres 資源消耗巨大。因此,我們建議在所有 Postgres 生產安裝中使用 PGBouncer 作為資料庫代理。PGBouncer 可以處理來自多個元件的連線池,而且如果您有一個連線可能不穩定的遠端資料庫,它將使您的資料庫連線對臨時網路問題更具彈性。PGBouncer 部署的示例實現可以在Apache Airflow 的 Helm Chart 中找到,您可以透過翻轉一個布林標誌來啟用預配置的 PGBouncer 例項。您可以參考我們在那裡採用的方法,並在準備自己的部署時將其作為參考,即使您不使用官方的 Helm Chart。
另請參閱Helm Chart 生產指南
注意
對於託管的 Postgres 服務,如 Azure Postgresql、CloudSQL、Amazon RDS,您應該在連線引數中使用 keepalives_idle 並將其設定為小於空閒時間,因為這些服務會在一段時間不活動後(通常是 300 秒)關閉空閒連線,從而導致錯誤 The error: psycopg2.operationalerror: SSL SYSCALL error: EOF detected。可以透過 [database] 部分中的 sql_alchemy_connect_args 配置引數 配置參考 來更改 keepalive 設定。您可以在例如您的 local_settings.py 檔案中配置這些引數,sql_alchemy_connect_args 應該是儲存配置引數的字典的完整匯入路徑。您可以閱讀有關 Postgres Keepalives 的資訊。一個已被觀察到能解決問題的 keepalives 設定示例可能是:
keepalive_kwargs = {
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 5,
"keepalives_count": 5,
}
然後,如果將其放置在 airflow_local_settings.py 中,配置匯入路徑將是:
sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs
有關如何配置本地設定的詳細資訊,請參閱配置本地設定。
設定 MySQL 資料庫¶
您需要建立一個數據庫以及 Airflow 將用於訪問此資料庫的資料庫使用者。在下面的示例中,將建立資料庫 airflow_db 和使用者名稱為 airflow_user、密碼為 airflow_pass 的使用者:
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
注意
資料庫必須使用 UTF-8 字元集。您必須注意的一個小問題是,較新版本 MySQL 中的 utf8 實際上是 utf8mb4,這會導致 Airflow 索引變得過大(請參閱https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此,自 Airflow 2.2 起,所有 MySQL 資料庫的 sql_engine_collation_for_ids 都自動設定為 utf8mb3_bin(除非您覆蓋它)。這可能導致 Airflow 資料庫中 ID 欄位的 collation id 混合,但由於 Airflow 中所有相關的 ID 僅使用 ASCII 字元,因此沒有負面影響。
為了擁有合理的預設設定,我們依賴於 MySQL 更嚴格的 ANSI SQL 設定。請確保在 my.cnf 檔案中的 [mysqld] 部分指定了 explicit_defaults_for_timestamp=1 選項。您還可以透過傳遞給 mysqld 可執行檔案的 --explicit-defaults-for-timestamp 開關來啟用這些選項:
我們建議使用 mysqlclient 驅動並在您的 SqlAlchemy 連線字串中指定它。
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
重要提示
MySQL 後端的整合僅在 Apache Airflow 的持續整合 (CI) 過程中使用 mysqlclient 驅動進行了驗證。
如果您想使用其他驅動,請訪問 SQLAlchemy 文件中的MySQL 方言,以獲取有關下載和設定 SqlAlchemy 連線的更多資訊。
此外,您還應特別注意 MySQL 的編碼。儘管 utf8mb4 字元集在 MySQL 中越來越流行(實際上,utf8mb4 在 MySQL 8.0 中成為了預設字元集),但在 Airflow 2+ 中使用 utf8mb4 編碼需要額外設定(詳情請參閱#7570)。如果您使用 utf8mb4 作為字元集,您還應設定 sql_engine_collation_for_ids=utf8mb3_bin。
注意
在嚴格模式下,MySQL 不允許將 0000-00-00 作為有效日期。在這種情況下,您可能會遇到類似 "Invalid default value for 'end_date'" 的錯誤(某些 Airflow 表使用 0000-00-00 00:00:00 作為時間戳欄位的預設值)。為了避免此錯誤,您可以在您的 MySQL 伺服器上停用 NO_ZERO_DATE 模式。請閱讀https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field 瞭解如何停用它。有關更多資訊,請參閱SQL Mode - NO_ZERO_DATE。
MsSQL 資料庫¶
警告
經過討論和投票過程,Airflow 的 PMC 成員和提交者已達成決議,不再將 MsSQL 作為受支援的資料庫後端進行維護。
自 Airflow 2.9.0 起,已移除對 MsSQL 作為 Airflow 資料庫後端的支援。這不影響現有的 providers (operators 和 hooks),dags 仍然可以訪問和處理來自 MsSQL 的資料。但是,進一步使用可能會丟擲錯誤,導致 Airflow 的核心功能無法使用。
從 MsSQL Server 遷移¶
由於 Airflow 2.9.0 已終止對 MSSQL 的支援,因此可以使用遷移指令碼幫助 Airflow 2.7.x 或 2.8.x 版本從 SQL-Server 遷移出來。該遷移指令碼位於 Github 上的airflow-mssql-migration 倉庫中。
請注意,此遷移指令碼不提供支援和擔保。
其他配置選項¶
有更多配置選項可用於配置 SQLAlchemy 行為。有關詳細資訊,請參閱 [database] 部分中 sqlalchemy_* 選項的參考文件。
例如,您可以指定 Airflow 建立所需表的資料庫 schema。如果您希望 Airflow 將其表安裝在 PostgreSQL 資料庫的 airflow schema 中,請指定以下環境變數:
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"
請注意 SQL_ALCHEMY_CONN 資料庫 URL 末尾的 search_path。
初始化資料庫¶
在 Airflow 配置中配置資料庫並連線到它之後,您應該建立資料庫 schema。
airflow db migrate
Airflow 中的資料庫監控和維護¶
Airflow 廣泛使用關係型元資料資料庫進行任務排程和執行。監控和正確配置此資料庫對於 Airflow 的最佳效能至關重要。
關鍵考量¶
效能影響:長或過多的查詢會顯著影響 Airflow 的功能。這可能由工作流特性、缺乏最佳化或程式碼錯誤引起。
資料庫統計資訊:資料庫引擎做出不正確的最佳化決策(通常是由於資料統計資訊過時)會降低效能。
職責¶
在 Airflow 環境中,資料庫監控和維護的職責因您是使用自管資料庫和 Airflow 例項還是選擇託管服務而異。
自管環境:
在資料庫和 Airflow 都是自管的設定中,部署經理負責設定、配置和維護資料庫。這包括監控其效能、管理備份、定期清理以及確保其與 Airflow 的最佳執行。
託管服務:
託管資料庫服務:使用託管資料庫服務時,許多維護任務(如備份、補丁和基本監控)由服務提供商處理。但是,部署經理仍然需要監督 Airflow 的配置,最佳化針對其工作流的效能設定,管理定期清理,並監控其資料庫以確保與 Airflow 的最佳執行。
託管 Airflow 服務:使用託管 Airflow 服務時,服務提供商負責 Airflow 及其資料庫的配置和維護。但是,部署經理需要與服務配置方協作,以確保大小和工作流要求與託管服務的大小和配置匹配。
監控方面¶
定期監控應包括:
CPU、I/O 和記憶體使用。
查詢頻率和數量。
識別和記錄慢查詢或長時間執行的查詢。
檢測低效的查詢執行計劃。
分析磁碟交換與記憶體使用以及快取交換頻率。
工具和策略¶
Airflow 不直接提供資料庫監控工具。
使用伺服器端監控和日誌記錄來獲取指標。
根據定義的閾值啟用長時間執行查詢的跟蹤。
定期執行內部維護任務(如
ANALYZESQL 命令)進行維護。
資料庫清理工具¶
Airflow DB Clean 命令:利用
airflow db clean命令來幫助管理和清理您的資料庫。``airflow.utils.db_cleanup`` 中的 Python 方法:此模組提供了額外的 Python 方法用於資料庫清理和維護,為特定需求提供更細粒度的控制和自定義。
建議¶
主動監控:在生產環境中實施監控和日誌記錄,同時不顯著影響效能。
資料庫特定指南:查閱所選資料庫的文件,獲取具體的監控設定說明。
託管資料庫服務:檢查您的資料庫提供商是否提供自動維護任務。
SQLAlchemy 日誌記錄¶
為了進行詳細的查詢分析,啟用 SQLAlchemy 客戶端日誌記錄(在 SQLAlchemy 引擎配置中設定 echo=True)。
此方法更具侵擾性,可能影響 Airflow 的客戶端效能。
它會生成大量日誌,尤其是在繁忙的 Airflow 環境中。
適用於非生產環境,如預發系統。
您可以按照SQLAlchemy 日誌記錄文件中的說明,將 echo=True 作為 sqlalchemy 引擎配置進行設定。
使用 sql_alchemy_engine_args 配置引數將 echo 引數設定為 True。
注意¶
啟用大量日誌記錄時,請注意對 Airflow 效能和系統資源的影響。
對於生產環境,優先使用伺服器端監控而非客戶端日誌記錄,以最大程度地減少性能干擾。
下一步是什麼?¶
預設情況下,Airflow 使用 LocalExecutor。您應考慮配置不同的executor以獲得更好的效能。