使用 OpenLineage 整合¶
OpenLineage 是一個用於資料血緣收集和分析的開放框架。其核心是一個可擴充套件的規範,系統可以使用它來實現與血緣元資料的互操作性。檢視 OpenLineage 文件。
使用 OpenLineage 不需要更改使用者 DAG 檔案。需要進行基本配置,以便 OpenLineage 知道將事件傳送到何處。
快速入門¶
注意
OpenLineage Provider 提供多種資料傳輸選項(http、kafka、file 等),包括建立自定義解決方案的靈活性。配置可以透過多種方式管理,並提供大量設定供使用者微調和增強 OpenLineage 的使用。有關這些功能的全面說明,請參閱本文件的後續部分。
此示例是 OpenLineage 設定的基本演示。
安裝 provider 包或將其新增到
requirements.txt檔案中。pip install apache-airflow-providers-openlineage提供
Transport配置,以便 OpenLineage 知道將事件傳送到何處。在airflow.cfg檔案中[openlineage] transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
或使用
AIRFLOW__OPENLINEAGE__TRANSPORT環境變數AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
就是這樣!執行 DAG 時,OpenLineage 事件應傳送到配置的後端。
使用¶
啟用並配置後,該整合不需要使用者進一步操作。它將自動
收集任務輸入/輸出元資料(來源、模式等)。
收集任務執行級元資料(執行時間、狀態、引數等)
收集任務作業級元資料(所有者、型別、描述等)
收集任務特定元資料(bigquery 作業 ID、python 原始碼等)- 取決於 Operator
所有這些資料將作為 OpenLineage 事件傳送到配置的後端,如 作業層次結構 中所述。
傳輸設定¶
配置 OpenLineage Airflow Provider 的主要和推薦方法是 Airflow 配置(airflow.cfg 檔案)。所有可能的配置選項以及示例值都可以在配置部分中找到。
至少,在每種情況下都需要設定的一件事是 Transport - 您希望事件最終傳送到哪裡 - 例如 Marquez。
以 JSON 字串形式傳輸¶
Airflow 配置中的 transport 選項用於此目的。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數是等效的。
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
如果您想檢視 OpenLineage 事件而不將其傳送到任何地方,您可以設定 ConsoleTransport - 事件將出現在任務日誌中。
[openlineage]
transport = {"type": "console"}
注意
有關內建傳輸型別的完整列表、特定傳輸的選項或如何實現自定義傳輸的說明,請參閱 Python 客戶端文件。
以配置檔案形式傳輸¶
您也可以使用 YAML 檔案(例如 openlineage.yml)配置 OpenLineage Transport。在 Airflow 配置中將 YAML 檔案的路徑作為 config_path 選項提供。
[openlineage]
config_path = '/path/to/openlineage.yml'
AIRFLOW__OPENLINEAGE__CONFIG_PATH 環境變數是等效的。
AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
配置 YAML 檔案的示例內容
transport:
type: http
url: https://backend:5000
endpoint: events/receive
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
注意
該配置方法的詳細說明以及示例配置檔案可以在 Python 客戶端文件中找到。
配置優先順序¶
由於配置 OpenLineage 有多種可能的方法,因此務必牢記不同配置的優先順序。OpenLineage Airflow Provider 按以下順序查詢配置
在
airflow.cfg的openlineage部分下檢查config_path(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 環境變數)在
airflow.cfg的openlineage部分下檢查transport(或 AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數)如果以上所有選項均缺失,則底層使用的 OpenLineage Python 客戶端將按照此處文件中描述的順序查詢配置。請注意,**鼓勵使用 Airflow 配置**,這是唯一面向未來的解決方案。
向後相容性¶
警告
以下變數**不應**使用,將來可能會被移除。請考慮使用 Airflow 配置(如上所述)以獲得面向未來的解決方案。
為了與 openlineage-airflow 包向後相容,一些環境變數仍然可用
OPENLINEAGE_DISABLED等效於AIRFLOW__OPENLINEAGE__DISABLED。OPENLINEAGE_CONFIG等效於AIRFLOW__OPENLINEAGE__CONFIG_PATH。OPENLINEAGE_NAMESPACE等效於AIRFLOW__OPENLINEAGE__NAMESPACE。OPENLINEAGE_EXTRACTORS等效於設定AIRFLOW__OPENLINEAGE__EXTRACTORS。OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE等效於AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE。OPENLINEAGE_URL可用於設定簡單的 http 傳輸。此方法有一些限制,可能需要使用其他環境變數來達到期望的輸出。請參閱文件。
附加選項¶
名稱空間¶
為這個特定例項設定 OpenLineage 名稱空間非常有用。這樣,如果您使用多個 OpenLineage 生產者,來自它們的事件將在邏輯上分開。如果未設定,則使用 default 名稱空間。在 Airflow 配置中將名稱空間的名稱作為 namespace 選項提供。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'
AIRFLOW__OPENLINEAGE__NAMESPACE 環境變數是等效的。
AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
超時¶
為了在任務執行和 OpenLineage 之間增加一層隔離,確保 OpenLineage 執行不會以佔用時間以外的方式干擾任務執行,OpenLineage 方法在單獨的程序中執行。程式碼以預設的 10 秒超時執行。您可以透過設定 execution_timeout 值來增加此時間。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT 環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
停用¶
您可以透過在 Airflow 配置中將 disabled 選項設定為 true 來停用傳送 OpenLineage 事件,而無需解除安裝 OpenLineage provider。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true
AIRFLOW__OPENLINEAGE__DISABLED 環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLED=true
停用原始碼¶
預設情況下,一些 Operators(例如 Python、Bash)會在其 OpenLineage 事件中包含其原始碼。為防止這種情況,請在 Airflow 配置中將 disable_source_code 選項設定為 true。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE 環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true
對 Operator 停用¶
您可以透過在 Airflow 配置中將要停用的 Airflow Operators 的完整匯入路徑字串(用分號分隔)作為 disabled_for_operators 欄位傳遞,輕鬆排除某些 Operator 傳送 OpenLineage 事件。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS 環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
完整任務資訊¶
預設情況下,OpenLineage 整合的 AirflowRunFacet(在每個任務例項事件的 START 事件時附加)不包含完整的序列化任務資訊(給定 operator 的引數),而只包含選定的引數。
但是,我們允許使用者設定 OpenLineage 整合以包含完整的任務資訊。透過這樣做,我們不是僅序列化一些已知屬性,而是排除某些不可序列化的元素併發送其餘所有內容。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO 環境變數是等效的。
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
警告
透過將此變數設定為 true,OpenLineage 整合不控制您傳送的事件大小。它可能包含大小為兆位元組或更大的元素,具體取決於您傳遞給任務的資料大小。
自定義 Extractors¶
要使用 自定義 Extractors 功能,請在 Airflow 配置中將 Airflow Operators 的完整匯入路徑字串(用分號分隔)作為 extractors 選項傳遞來註冊 extractors。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS 環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
自定義 Run Facets¶
要注入 自定義 run facets,請在 Airflow 配置中將自定義 run facet 函式的完整匯入路徑字串(用分號分隔)作為 custom_run_facets 選項傳遞來註冊函式。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 環境變數是等效的。
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
除錯模式¶
您可以透過在 Airflow 配置中將 debug_mode 選項設定為 true 來啟用在 OpenLineage 事件中傳送附加資訊,這對於除錯和重現您的環境設定非常有用。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true
AIRFLOW__OPENLINEAGE__DEBUG_MODE 環境變數是等效的。
AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
警告
透過將此變數設定為 true,OpenLineage 整合可能會記錄和傳送大量詳細資訊。僅應臨時啟用此功能用於除錯目的。
在 DAG/任務級別啟用 OpenLineage¶
可以使用 selective_enable 策略選擇性地為特定的 DAG 和任務啟用 OpenLineage。要啟用此策略,請在 Airflow 配置檔案的 [openlineage] 部分中將 selective_enable 選項設定為 True
[openlineage]
selective_enable = True
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE 環境變數是等效的。
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
雖然 selective_enable 支援選擇性控制,但 disabled 選項 仍然具有優先順序。如果您在配置中將 disabled 設定為 True,無論 selective_enable 設定如何,OpenLineage 都將對所有 DAG 和任務停用。
啟用 selective_enable 策略後,您可以使用 enable_lineage 和 disable_lineage 函式選擇為單個 DAG 和任務啟用 OpenLineage。
在 DAG 上啟用 Lineage
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
MyOperator(...)
AnotherOperator(...)
在 Task 上啟用 Lineage
雖然在 DAG 上啟用 lineage 會隱式地為該 DAG 內的所有任務啟用 lineage,但您仍然可以為特定任務選擇性地停用它
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)
# Enable lineage for the entire DAG
enable_lineage(dag)
# Disable lineage for task t1
disable_lineage(t1)
在 DAG 級別啟用 lineage 會自動為該 DAG 內的所有任務啟用 lineage,除非針對每個任務明確停用。
在任務級別啟用 lineage 會隱式地在其 DAG 上啟用 lineage。這是因為每個傳送事件的任務都會發送一個 ParentRunFacet,這要求在某些 OpenLineage 後端系統中啟用 DAG 級別的 lineage。在啟用任務級別 lineage 的同時停用 DAG 級別 lineage 可能會導致錯誤或不一致。
將父作業資訊傳遞給 Spark 作業¶
OpenLineage 整合可以自動將 Airflow 的資訊(namespace、job name、run id)作為父作業資訊(spark.openlineage.parentJobNamespace、spark.openlineage.parentJobName、spark.openlineage.parentRunId)注入到 Spark 應用程式屬性中,針對支援的 Operator。這使得 Spark 整合能夠自動在應用程式級別的 OpenLineage 事件中包含 parentRunFacet,從而在不同整合的任務之間建立父子關係。請參閱 從 Airflow 排程。
此配置作為支援自動注入 Spark 屬性的所有 Operator 的預設行為,除非在 Operator 級別明確覆蓋。為了防止某個特定 Operator 注入父作業資訊,同時允許所有其他受支援的 Operator 預設執行此操作,可以明確為該特定 Operator 提供 openlineage_inject_parent_job_info=False。
注意
如果在 Spark 作業配置中手動指定了任何 spark.openlineage.parent* 屬性,整合將避免注入父作業屬性,以確保保留手動提供的值。
您可以透過在 Airflow 配置中將 spark_inject_parent_job_info 選項設定為 true 來啟用此自動化。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_parent_job_info = true
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO 環境變數是等效的。
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
將傳輸資訊傳遞給 Spark 作業¶
OpenLineage 整合可以自動將 Airflow 的傳輸資訊注入到 Spark 應用程式屬性中,針對支援的 Operator。這使得 Spark 整合無需手動配置即可將事件傳送到與 Airflow 整合相同的後端。請參閱 從 Airflow 排程。
注意
如果在 Spark 作業配置中手動指定了任何 spark.openlineage.transport* 屬性,整合將避免注入傳輸屬性,以確保保留手動提供的值。
您可以透過在 Airflow 配置中將 spark_inject_transport_info 選項設定為 true 來啟用此自動化。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_transport_info = true
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO 環境變數是等效的。
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
故障排除¶
有關如何排除 OpenLineage 故障的詳細資訊,請參閱故障排除。
新增對自定義 Operator 的支援¶
如果您想為特定 Operator 新增 OpenLineage 支援,請檢視 在 Operator 中實現 OpenLineage
在哪裡可以瞭解更多?¶
檢視 OpenLineage 網站。
訪問我們的 GitHub 倉庫。
觀看多個關於 OpenLineage 的講座。
反饋¶
您可以在 slack 上聯絡我們並留下反饋!
如何貢獻¶
我們歡迎您的貢獻!OpenLineage 是一個活躍開發的開源專案,我們非常歡迎您的幫助!
聽起來很有趣?檢視我們的新貢獻者指南以開始。