在 Operator 中實現 OpenLineage¶
OpenLineage 透過支援直接修改 Airflow Operator,使得向資料管道新增血緣資訊變得容易。當可以修改 Operator 時,新增血緣資訊提取就像向其新增一個方法一樣簡單。有關更多詳細資訊,請參閱 OpenLineage 方法。
可能存在一些您無法修改的 Operator(例如第三方提供商),但您仍然希望從中提取血緣資訊。為了處理這種情況,OpenLineage 允許您為任何 Operator 提供自定義 Extractor。有關更多詳細資訊,請參閱 自定義 Extractor。
提取優先順序¶
由於有多種實現 OpenLineage 對 Operator 支援的方式,因此請務必牢記 OpenLineage 查詢血緣資料的順序。
Extractor - 檢查是否為 Operator 類名指定了自定義 Extractor。使用者註冊的任何自定義 Extractor 都將優先於 Airflow Provider 原始碼中定義的預設 Extractor(例如 BashExtractor)。
OpenLineage 方法 - 如果未為 Operator 類名顯式指定 Extractor,則使用 DefaultExtractor,它會在 Operator 中查詢 OpenLineage 方法。
輸入和輸出 - 如果 Operator 中沒有定義 OpenLineage 方法,則檢查輸入和輸出。
如果以上所有選項都缺失,則不會從 Operator 中提取任何血緣資料。您仍然會收到 OpenLineage 事件,其中包含通用的 Airflow facets、正確的事件時間/型別等資訊,但輸入/輸出將為空,並且 Operator 特定的 facets 將會缺失。
OpenLineage 方法¶
當處理您自己的 Operator 並可以直接實現 OpenLineage 方法時,推薦使用此方法。當處理您無法修改(例如第三方提供商)但仍然希望從中提取血緣資訊的 Operator 時,請參閱 自定義 Extractor。
OpenLineage 定義了一些用於在 Operator 中實現的方法。這些方法被稱為 OpenLineage 方法。
def get_openlineage_facets_on_start() -> OperatorLineage: ...
def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage: ...
def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage: ...
當任務例項狀態變為以下時,OpenLineage 方法會被相應地呼叫:
RUNNING ->
get_openlineage_facets_on_start()SUCCESS ->
get_openlineage_facets_on_complete()FAILED ->
get_openlineage_facets_on_failure()
以下方法中至少必須實現一個:get_openlineage_facets_on_start() 或 get_openlineage_facets_on_complete()。有關在其他方法缺失時會呼叫哪些方法的更多詳細資訊,請參閱 如何正確實現 OpenLineage 方法?。
Provider 定義了 OperatorLineage 結構,供 Operator 返回,而不是返回完整的 OpenLineage 事件
@define
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage 整合本身負責用通用的 Airflow facets、正確的事件時間/型別等資訊對其進行豐富,從而建立正確的 OpenLineage RunEvent。
如何正確實現 OpenLineage 方法?¶
在 Operator 中實現 OpenLineage 時,有幾點值得注意。
首先,不要在頂層匯入 OpenLineage 相關的物件,而是在 OL 方法本身內部匯入。這使得使用者即使沒有安裝 OpenLineage provider 也能使用您的 provider。
第二點重要之處是確保您的 provider 返回符合 OpenLineage 規範的資料集名稱。這使得 OpenLineage 消費者能夠正確匹配從不同來源收集的資料集資訊。命名約定在 OpenLineage 命名文件 中描述。
第三,OpenLineage 實現不應該浪費不使用它的使用者的時間。這意味著不要在 execute 方法中進行不需要其結果的繁重處理或網路呼叫。更好的選擇是將相關資訊儲存在 Operator 屬性中 - 然後在 OpenLineage 方法中使用它。一個很好的例子是 BigQueryExecuteQueryOperator。它儲存了已執行查詢的 job_ids。get_openlineage_facets_on_complete 然後可以呼叫 BigQuery API,查詢這些表的血緣資訊,並將其轉換為 OpenLineage 格式。
第四,並非必須實現所有方法。如果在呼叫 execute 之前已知所有資料集,並且沒有相關的執行時資料,則可能沒有必要實現 get_openlineage_facets_on_complete - get_openlineage_facets_on_start 方法可以提供所有資料。反之,如果在執行之前一切都未知,則可能沒有必要編寫 _on_start 方法。類似地,如果沒有相關的失敗資料 - 或失敗條件未知,則實現 get_openlineage_facets_on_failure 可能不值得。通常:如果不存在 on_failure 方法,則會呼叫 on_complete 方法。如果不存在 on_failure 和 on_complete 方法,則會呼叫 on_start 方法(在任務開始和任務完成時都會呼叫)。如果不存在 on_start 方法,則血緣資訊將不包含在 START 事件中,並且在任務完成時會呼叫 on_complete 方法。
如何測試 OpenLineage 方法?¶
在 Operator 中對 OpenLineage 整合進行單元測試與測試 Operator 本身非常相似。這些測試的目的是確保 get_openlineage_* 方法返回正確的 OperatorLineage 資料結構並填充相關欄位。建議模擬任何外部呼叫。測試作者需要記住呼叫不同 OL 方法的條件是不同的。get_openlineage_facets_on_start 在 execute 呼叫之前被呼叫,因此不能依賴於在那裡設定的值。
有關如何在本地排除 OpenLineage 故障的詳細資訊,請參閱 故障排除。
目前沒有用於系統測試 OpenLineage 整合的現有框架,但最簡單的方法是將發出的事件(例如使用 FileTransport)與預期的事件進行比較。OpenLineage 系統測試作者的目標是提供預期事件鍵的字典。事件鍵標識從特定 Operator 和方法傳送的事件:它們的結構為 <dag_id>.<task_id>.event.<event_type>;透過這種方式總是可以標識從特定任務傳送的特定事件。提供的事件結構不必包含最終事件中的所有欄位。只能比較測試作者提供的欄位;這允許只檢查特定測試關注的欄位。它還允許跳過(半)隨機生成的欄位,如 runId 或 eventTime,或者在 Airflow 中的 OpenLineage 上下文中始終相同的欄位,如 producer。
示例¶
以下是為 GcsToGcsOperator 正確實現的 get_openlineage_facets_on_complete 方法的示例。由於在 execute 方法中進行了一些處理,並且沒有相關的失敗資料,因此只實現這一個方法就足夠了。
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
有關已實現的 OpenLineage 方法的更多示例,請檢視 支援的類 的原始碼。
自定義 Extractor¶
當處理您無法修改(例如第三方提供商)但仍希望從中提取血緣資訊的 Operator 時,推薦使用此方法。如果您想從您自己的 Operator 中提取血緣資訊,您可能更傾向於直接實現 OpenLineage 方法 中描述的 OpenLineage 方法。
此方法的工作原理是檢測您的 DAG 使用了哪些 Airflow Operator,並使用相應的 Extractor 類從它們中提取血緣資料。
介面¶
自定義 Extractor 必須派生自 BaseExtractor 並至少實現兩個方法:_execute_extraction 和 get_operator_classnames。
BaseExtractor 還定義了三個方法:extract、extract_on_complete 和 extract_on_failure,這些方法被呼叫並用於提供實際的血緣資料。區別在於 extract 在 Operator 的 execute 方法之前被呼叫,而 extract_on_complete 和 extract_on_failure 在任務成功或失敗後分別被呼叫。預設情況下,extract 呼叫自定義 Extractor 中實現的 _execute_extraction 方法。當任務成功時,extract_on_complete 被呼叫,如果未被覆蓋,預設情況下會委託給 extract。當任務失敗時,extract_on_failure 被呼叫,如果未被覆蓋,預設情況下會委託給 extract_on_complete。如果您想提供任務執行後的一些附加資訊,可以覆蓋 extract_on_complete 和 extract_on_failure 方法。這對於提取 Operator 在執行期間或之後設定為自身屬性的資料非常有用。一個很好的例子是 SQL Operator,它在執行後設置 query_ids。
的 get_operator_classnames 是一個 classmethod,用於提供您的 Extractor 可以從中獲取血緣資訊的 Operator 列表。
例如
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['CustomPostgresOperator']
如果 Operator 的名稱與列表中的名稱之一匹配,Extractor 將被例項化 - 並在 Extractor 的 self.operator 屬性中提供 Operator - 並且會呼叫 extract 以及 extract_on_complete/extract_on_failure 方法。
這兩個方法都返回 OperatorLineage 結構
@define
class OperatorLineage:
"""Structure returned from lineage extraction."""
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
輸入和輸出是普通 OpenLineage 資料集的列表 (openlineage.client.event_v2.Dataset)。
run_facets 和 job_facets 是可選的 RunFacets 和 JobFacets 的字典,這些 facet 將附加到作業 - 例如,如果您的 Operator 正在執行 SQL,您可能希望附加 SqlJobFacet。
要了解有關 OpenLineage 中 facets 的更多資訊,請參閱 自定義 Facets。
註冊自定義 Extractor¶
除非您註冊 Extractor,否則 OpenLineage 整合不會知道您已提供了一個 Extractor。
可以透過在 Airflow 配置中使用 extractors 選項來完成。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS 環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
您可以選擇使用空格分隔它們。如果您將它們作為某些 YAML 檔案的一部分提供,這將非常有用。
AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
請記住確保該路徑對於 scheduler 和 worker 來說是可匯入的。
除錯自定義 Extractor¶
自定義 Extractor 通常存在兩個問題。
首先,是提供給 Airflow 配置中 extractors 選項的路徑錯誤。該路徑需要與您在程式碼中使用的路徑完全相同。如果路徑錯誤或無法從 worker 匯入,外掛將無法載入 Extractor,並且不會為該 Operator 發出正確的 OpenLineage 事件。
第二個問題,可能更隱蔽,是來自 Airflow 的匯入。由於 OpenLineage 程式碼在 Airflow worker 本身啟動時被例項化,任何來自 Airflow 的匯入都可能悄無聲息地形成迴圈引用。這會導致 OpenLineage 提取失敗。
為了避免此問題,請僅在本地匯入 Airflow - 在 _execute_extraction 或 extract_on_complete/extract_on_failure 方法中。如果您需要用於型別檢查的匯入,請將其放在 typing.TYPE_CHECKING 之後。
測試自定義 Extractor¶
與所有程式碼一樣,自定義 Extractor 也應該進行測試。本節將提供有關編寫測試的最重要資料結構的某些資訊以及有關故障排除的一些注意事項。我們假設您對編寫自定義 Extractor 有先前的瞭解。要詳細瞭解 Operator 和 Extractor 在底層如何協同工作,請檢視 自定義 Extractor。
測試 Extractor 時,我們首先要驗證是否正在建立 OperatorLineage 物件,特別是驗證物件是否使用正確的輸入和輸出資料集以及相關 facets 構建。這在 OpenLineage 中透過 pytest 完成,並對連線和物件進行了適當的模擬和補丁。檢視 示例測試。
測試每個 facet 也非常重要,因為如果 facet 不正確,UI 中的資料或圖表可能會錯誤地渲染。例如,如果在 Extractor 中 facet 名稱建立不正確,則 Operator 的任務將不會顯示在血緣圖譜中,從而在管道可觀測性方面產生空白。
即使進行了單元測試,Extractor 可能仍未按預期執行。判斷資料是否未正確傳輸的最簡單方法是檢查 UI 元素是否未在 Lineage 選項卡中正確顯示。
有關如何在本地排除 OpenLineage 故障的詳細資訊,請參閱 故障排除。
示例¶
這是一個簡單的 Extractor 示例,用於一個 Operator,該 Operator 在 BigQuery 中執行匯出查詢並將結果儲存到 S3 檔案。在呼叫 Operator 的 execute 方法之前已知一些資訊,我們可以在 _execute_extraction 方法中提取一些血緣資訊。在呼叫 Operator 的 execute 方法之後,在 extract_on_complete 中,我們可以簡單地將一些額外的 Facets(例如包含 Bigquery 作業 ID)附加到我們之前準備的資訊中。如果僅在任務失敗時需要包含某些資訊,我們還可以實現 extract_on_failure 方法。這樣,我們可以從 Operator 獲取所有可能的資訊。
請注意,這只是一個示例。OpenLineage 提供了一些內建功能,可以方便地處理不同的過程,例如使用 SQL 解析器從 SQL 查詢中提取列級血緣資訊和輸入/輸出。
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
ErrorMessageRunFacet,
SQLJobFacet,
)
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self._job_id = None
def execute(self, context) -> Any:
self._job_id, self._error_message = run_query(query=self.query)
class ExampleExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ["ExampleOperator"]
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
job_facets={
"sql": SQLJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self, task_instance) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
}
return lineage_metadata
def extract_on_failure(self, task_instance) -> OperatorLineage:
"""Add any failure-specific information."""
lineage_metadata = self.extract_on_complete(task_instance)
lineage_metadata.run_facets = {
"error": ErrorMessageRunFacet(
message=task_instance.task._error_message, programmingLanguage="python"
)
}
return lineage_metadata
有關 OpenLineage Extractor 的更多示例,請檢視 BashExtractor 或 PythonExtractor 的原始碼。
自定義 Facets¶
要了解有關 OpenLineage 中 facets 的更多資訊,請參閱 facet 文件。另請檢視 可用 facets 以及關於 使用 facets 進行擴充套件 的部落格文章。
OpenLineage 規範可能不包含編寫 Extractor 所需的所有 facets,在這種情況下,您將不得不建立自己的 自定義 facets。
您還可以使用 custom_run_facets Airflow 配置將自己的自定義 facets 注入到血緣事件的 run facet 中。
需要採取的步驟,
編寫一個返回自定義 facets 的函式。您可以根據需要編寫任意數量的自定義 facet 函式。
使用
custom_run_facetsAirflow 配置註冊函式。
Airflow OpenLineage 監聽器將在生成血緣事件期間自動執行這些函式,並將其返回值附加到血緣事件中的 run facet。
編寫自定義 facet 函式¶
輸入引數: 函式應接受兩個輸入引數:
TaskInstance和TaskInstanceState。函式體: 執行生成自定義 facets 所需的邏輯。自定義 facets 必須繼承自
RunFacet,以便自動為 facet 新增_producer和_schemaURL。返回值: 要新增到血緣事件的自定義 facets。返回型別應為
dict[str, RunFacet]或None。如果您不想為特定條件新增自定義 facets,可以選擇返回None。
自定義 facet 函式示例
import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
註冊自定義 facet 函式¶
使用 custom_run_facets Airflow 配置來註冊自定義 run facet 函式,方法是傳遞一個以分號分隔的函式完整匯入路徑字串。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 環境變數是等效的。
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
注意
自定義 facet 函式在 TaskInstance 的 START 和 COMPLETE/FAIL 時都會執行,並新增到相應的 OpenLineage 事件中。
在建立 TaskInstance 狀態的條件時,您應該使用提供的第二個引數 (
TaskInstanceState),該引數將包含任務應處於的狀態。這可能與 ti.current_state() 不同,因為 OpenLineage 監聽器可能在 TaskInstance 的狀態在 Airflow 資料庫中更新之前被呼叫。當單個函式的路徑註冊多次時,它仍然只會執行一次。
當多個註冊的函式返回重複的自定義 facet 鍵時,血緣事件中將新增隨機函式的結果。請避免使用重複的 facet 鍵,因為這可能導致意外行為。
作業層級¶
Apache Airflow 具有固有的作業層級結構:DAGs 是大型且可獨立排程的單元,它們包含更小、可執行的任務。
OpenLineage 在其 Job Hierarchy 模型中反映了這種結構。
在 DAG 排程時,會發出一個 START 事件。
隨後,按照 Airflow 的任務順序,每個任務觸發
在 TaskInstance 啟動時的 START 事件。
在 TaskInstance 完成時的 COMPLETE/FAILED 事件。
最後,在 DAG 終止時,會發出一個完成事件(COMPLETE 或 FAILED)。
TaskInstance 事件的 ParentRunFacet 引用了原始的 DAG 執行。
故障排除¶
在本地測試程式碼時,可以使用 Marquez 來檢查正在發出或未發出的資料。使用 Marquez 可以幫助您確定錯誤是由 Extractor 還是 API 引起的。如果資料按預期從 Extractor 發出但未到達 UI,則 Extractor 是正常的,應在 OpenLineage 中提一個 issue。但是,如果資料未正確發出,則可能需要更多單元測試來覆蓋 Extractor 的行為。Marquez 可以幫助您查明哪些 facets 未正確形成,以便您知道在哪裡新增測試覆蓋。
除錯設定¶
為了除錯目的,請確保 Airflow 日誌級別 和 OpenLineage 客戶端日誌級別 都設定為 DEBUG。最新的 provider 會自動將 Airflow 的日誌級別與 OpenLineage 客戶端同步,無需手動配置。
要將包含附加資訊(例如,所有已安裝包的列表)的 DebugFacet 附加到所有 OL 事件,請為 OpenLineage 整合啟用 除錯模式。
請記住,啟用這些設定會增加 Airflow 日誌的詳細程度(這將增加日誌大小)並向 OpenLineage 事件新增額外資訊。建議暫時使用它們,主要用於除錯目的。
在尋求除錯幫助時,請始終嘗試提供以下資訊
將日誌級別設定為 DEBUG 的 Airflow scheduler 日誌
將日誌級別設定為 DEBUG 的 Airflow worker 日誌(任務日誌)
啟用 debug_mode 的 OpenLineage 事件
有關 Airflow 版本和 OpenLineage provider 版本的資訊
有關 Airflow 執行的部署環境所做的任何自定義修改的資訊
我可以在哪裡瞭解更多?¶
檢視 OpenLineage 網站。
訪問我們的 GitHub 倉庫。
觀看多個關於 OpenLineage 的 講座。
反饋¶
您可以在 slack 上聯絡我們並給我們留下反饋!
如何貢獻¶
我們歡迎您的貢獻!OpenLineage 是一個正在積極開發的開源專案,我們非常歡迎您的幫助!
聽起來很有趣?檢視我們的 新貢獻者指南 開始吧。