模組管理¶
Airflow 允許您在 DAG 和 Airflow 配置中使用您自己的 Python 模組。 下面的文章將描述如何建立您自己的模組,以便 Airflow 可以正確載入它,以及在模組未正確載入時診斷問題。
通常,您希望在 Airflow 部署中使用您自己的 python 程式碼,例如通用程式碼、庫,您可能希望使用共享 python 程式碼生成 dag,並擁有多個 DAG python 檔案。
您可以透過以下方式之一來完成它
將您的模組新增到 Airflow 自動新增到
PYTHONPATH的資料夾之一將您儲存程式碼的額外資料夾新增到
PYTHONPATH將您的程式碼打包到 Python 包中,並與 Airflow 一起安裝。
下一章將概述 Python 如何載入包和模組,並深入探討上述三種可能性的具體細節。
Python 中包/模組載入的工作原理¶
Python 嘗試從中載入模組的目錄列表由變數 sys.path 給出。 Python 確實嘗試 智慧地確定 此變數的內容,具體取決於作業系統以及 Python 的安裝方式和使用的 Python 版本。
您可以透過執行互動式終端來檢查當前 Python 環境中此變數的內容,如下例所示
>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
'/home/arch/.pyenv/versions/3.9.4/lib/python37.zip',
'/home/arch/.pyenv/versions/3.9.4/lib/python3.9',
'/home/arch/.pyenv/versions/3.9.4/lib/python3.9/lib-dynload',
'/home/arch/venvs/airflow/lib/python3.9/site-packages']
sys.path 在程式啟動期間初始化。 第一個優先順序被賦予當前目錄,即 path[0] 是包含用於呼叫的當前指令碼的目錄,或者如果它是互動式 shell,則為空字串。 第二個優先順序被賦予 PYTHONPATH(如果已提供),然後是由 site 模組管理的、依賴於安裝的預設路徑。
也可以在 Python 會話期間透過簡單地使用 append 來修改 sys.path(例如,sys.path.append("/path/to/custom/package"))。 Python 將在新增新路徑後開始在較新的路徑中搜索包。 Airflow 利用此功能,如 將目錄新增到 PYTHONPATH 一節中所述。
在變數 sys.path 中,有一個目錄 site-packages,其中包含已安裝的 **外部包**,這意味著您可以使用 pip 或 anaconda 安裝包,並且可以在 Airflow 中使用它們。 在下一節中,您將學習如何建立自己的簡單可安裝包,以及如何使用環境變數 PYTHONPATH 指定要新增到 sys.path 的其他目錄。
另請確保 將 init 檔案新增到您的資料夾。
包的典型結構¶
這是您在 dags 資料夾中可能擁有的示例結構
<DIRECTORY ON PYTHONPATH>
| .airflowignore -- only needed in ``dags`` folder, see below
| -- my_company
| __init__.py
| common_package
| | __init__.py
| | common_module.py
| | subpackage
| | __init__.py
| | subpackaged_util_module.py
|
| my_custom_dags
| __init__.py
| my_dag1.py
| my_dag2.py
| base_dag.py
在上面的例子中,這些是您可以匯入 python 檔案的方式
from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag
您可以在資料夾的根目錄中看到 .airflowignore 檔案。 這是一個您可以放在 dags 資料夾中的檔案,用於告訴 Airflow 在 Airflow 排程程式查詢 dags 時應忽略資料夾中的哪些檔案。 它應包含正則表示式(預設)或 glob 表示式,用於要忽略的路徑。 您不需要在 PYTHONPATH 中的任何其他資料夾中擁有該檔案(並且您只能將共享程式碼保留在其他資料夾中,而不是實際的 dags)。
在上面的例子中,dags 僅位於 my_custom_dags 資料夾中,當排程程式搜尋 DAGS 時,不應掃描 common_package,因此我們應該忽略 common_package 資料夾。 如果您在此處保留一個基本 DAG,my_dag1.py 和 my_dag2.py 派生自該基本 DAG,您還需要忽略 base_dag.py。 您的 .airflowignore 應如下所示(使用預設的 glob 語法)
my_company/common_package/
my_company/my_custom_dags/base_dag.py
Airflow 中內建的 PYTHONPATH 條目¶
Airflow 在動態執行時,會將三個目錄新增到 sys.path
dags資料夾:它在[core]部分使用選項dags_folder配置。config資料夾:它透過設定AIRFLOW_HOME變數(預設為{AIRFLOW_HOME}/config)進行配置。plugins資料夾:它在[core]部分使用選項plugins_folder配置。
注意
Airflow 2 中的 DAGS 資料夾不應與 webserver 共享。 雖然您可以這樣做,但與 Airflow 1.10 不同,Airflow 並不期望 webserver 中存在 DAGS 資料夾。 實際上,與 webserver 共享 dags 資料夾存在一定的安全風險,因為這意味著編寫 DAGS 的人可以編寫 webserver 能夠執行的程式碼(理想情況下,webserver 永遠不應執行可以由編寫 dags 的使用者修改的程式碼)。 因此,如果您需要與 webserver 共享一些程式碼,強烈建議您透過 config 或 plugins 資料夾或透過已安裝的 Airflow 包(見下文)共享它。 這些資料夾通常由與 DAG 資料夾(通常是資料科學家)不同的使用者(管理員/DevOps)管理和訪問,因此它們被認為是安全的,因為它們是 Airflow 安裝配置的一部分,並且由管理安裝的人員控制。
程式碼命名的最佳實踐¶
匯入程式碼時,需要注意一些陷阱。
有時,您可能會看到 Airflow 或您使用的其他庫程式碼引發異常 module 'X' has no attribute 'Y'。 這通常是由於您在頂級 PYTHONPATH 中有一個名為“X”的模組或包,並且它被匯入而不是原始程式碼期望的模組。
您應該始終為您的包和模組使用唯一的名稱,並且有多種方法可以確保強制執行唯一性,如下所述。
使用唯一的頂級包名¶
最重要的是,避免對直接新增到 PYTHONPATH 頂層的任何內容使用通用名稱。 例如,如果您將包含 __init__.py 的 airflow 資料夾新增到您的 DAGS_FOLDER,它將與 Airflow 包衝突,您將無法從 Airflow 包中匯入任何內容。 類似地,不要直接在此處新增 airflow.py 檔案。 也不應將標準庫包使用的通用名稱(例如 multiprocessing 或 logging 等)用作頂級 - 無論是作為包(即包含 __init__.py 的資料夾)還是作為模組(即 .py 檔案)。
這同樣適用於 config 和 plugins 資料夾,它們也位於 PYTHONPATH,以及您手動新增到 PYTHONPATH 的任何內容(請參閱以下章節中的詳細資訊)。
建議您始終將 DAG 和通用檔案放在特定於您部署的子包中(以下示例中的 my_company)。 為資料夾使用通用名稱很容易,這會與系統中已存在的其他包衝突。 例如,如果您建立 airflow/operators 子資料夾,則它將無法訪問,因為 Airflow 已經有一個名為 airflow.operators 的包,並且在匯入 from airflow.operators 時它會在此處查詢。
不要使用相對匯入¶
永遠不要使用 Python 3 中新增的相對匯入(以 . 開頭)。
在 my_dag1.py 中這樣做是很誘人的。
from .base_dag import BaseDag # NEVER DO THAT!!!!
您應該使用完整路徑(從新增到 PYTHONPATH 的目錄開始)匯入此類共享 DAG。
from my_company.my_custom_dags.base_dag import BaseDag # This is cool
相對匯入是違反直覺的,並且取決於您啟動 Python 程式碼的方式,它們的行為可能會有所不同。 在 Airflow 中,同一個 DAG 檔案可能會在不同的上下文中被解析(由排程程式、worker 或在測試期間),在這些情況下,相對匯入的行為可能會有所不同。 在 Airflow DAG 中匯入任何內容時,始終使用完整的 Python 包路徑,這將為您節省很多麻煩。 您可以在 此 Stack Overflow 執行緒 中閱讀更多關於相對匯入注意事項的資訊。
在包資料夾中新增 __init__.py¶
建立資料夾時,應將 __init__.py 檔案作為空檔案新增到資料夾中。 雖然在 Python 3 中有一個隱式名稱空間的概念,您不必將這些檔案新增到資料夾中,但 Airflow 希望將這些檔案新增到您新增的所有包中。
檢查您的 PYTHONPATH 載入配置¶
您還可以使用 airflow info 命令檢視確切的路徑,並像使用環境變數 PYTHONPATH 指定的目錄一樣使用它們。 此命令指定的 sys.path 變數的內容示例如下
Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.9/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]
以下是 airflow info 命令的示例輸出
另請參閱
Apache Airflow: 2.0.0b3
System info
OS | Linux
architecture | x86_64
uname | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale | ('en_US', 'UTF-8')
python_version | 3.9.6 (default, Nov 25 2020, 02:47:44) [GCC 8.3.0]
python_location | /usr/local/bin/python
Tools info
git | git version 2.20.1
ssh | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d 10 Sep 2019
kubectl | NOT AVAILABLE
gcloud | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql | mysql Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3 | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)
Paths info
airflow_home | /root/airflow
system_path | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.9:/usr/
| local/lib/python3.9/lib-dynload:/usr/local/lib/python3.9/site-packages:/files/dags:/root/airflow/conf
| ig:/root/airflow/plugins
airflow_on_path | True
Config info
executor | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder | /files/dags
plugins_folder | /root/airflow/plugins
base_log_folder | /root/airflow/logs
Providers info
apache-airflow-providers-amazon | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid | 1.0.0b2
apache-airflow-providers-apache-hdfs | 1.0.0b2
apache-airflow-providers-apache-hive | 1.0.0b2
將目錄新增到 PYTHONPATH¶
您可以使用環境變數 PYTHONPATH 指定要新增到 sys.path 的其他目錄。 透過使用以下命令提供專案根目錄的路徑來啟動 python shell
PYTHONPATH=/home/arch/projects/airflow_operators python
sys.path 變數將如下所示
>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
'/home/arch/projects/airflow_operators'
'/home/arch/.pyenv/versions/3.9.4/lib/python37.zip',
'/home/arch/.pyenv/versions/3.9.4/lib/python3.9',
'/home/arch/.pyenv/versions/3.9.4/lib/python3.9/lib-dynload',
'/home/arch/venvs/airflow/lib/python3.9/site-packages']
我們可以看到我們提供的目錄現在已新增到路徑中,讓我們嘗試現在匯入包
>>> import airflow_operators
Hello from airflow_operators
>>>
我們還可以將 PYTHONPATH 變數與 airflow 命令一起使用。 例如,如果我們執行以下 Airflow 命令
PYTHONPATH=/home/arch/projects/airflow_operators airflow info
我們將看到 Python PATH 使用我們提到的 PYTHONPATH 值進行更新,如下所示
Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/home/arch/venv/lib/python3.9/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]
在 Python 中建立包¶
這是新增自定義程式碼的最有組織的方式。 感謝使用包,您可以組織您的版本控制方法,控制安裝共享程式碼的哪些版本,並透過受控的方式將程式碼部署到您的所有例項和容器 - 全部由系統管理員/DevOps 而不是 DAG 編寫者完成。 當您有一個單獨的團隊管理此共享程式碼時,它通常是合適的,但是如果您瞭解您的 Python 方式,您也可以在較小的部署中以這種方式分發您的程式碼。 您還可以將您的 外掛 和 Providers 作為 Python 包安裝,因此學習如何構建您的包是很有用的。
以下是如何建立您的包
1. 在開始之前,選擇並安裝您將使用的構建/打包工具,理想情況下它應該符合 PEP-621,以便能夠輕鬆切換到其他工具。 流行的選擇是 setuptools、poetry、hatch、flit。
決定何時建立您自己的包。 建立包目錄 - 在我們的例子中,我們將其稱為
airflow_operators。
mkdir airflow_operators
在包內建立檔案
__init__.py並新增以下程式碼
print("Hello from airflow_operators")
當我們匯入這個包時,它應該列印上面的訊息。
4. 建立 pyproject.toml 並使用您選擇的構建工具配置填充它。 請參閱 pyproject.toml 規範
使用您選擇的工具構建您的專案。 例如,對於 hatch,它可以是
hatch build -t wheel
這將在您的 dist 資料夾中建立 .whl 檔案
使用 pip 安裝 .whl 檔案
pip install dist/airflow_operators-0.0.0-py3-none-any.whl
該包現在可以使用了!
>>> import airflow_operators
Hello from airflow_operators
>>>
可以使用 pip 命令刪除該包
pip uninstall airflow_operators
有關如何建立和釋出 Python 包的更多詳細資訊,請參閱 打包 Python 專案。