血緣¶
注意
血緣支援處於非常實驗性的階段,可能會發生變化。
Airflow 提供了一個強大的功能,用於跟蹤任務之間以及任務中使用的 Hook 的資料血緣。此功能可幫助您瞭解資料在整個 Airflow 管道中的流向。
HookLineageCollector 的全域性例項充當收集血緣資訊的中央樞紐。Hook 可以將它們互動的資產的詳細資訊傳送到此收集器。然後,收集器使用這些資料來構建符合 AIP-60 的資產,這是一種描述資產的標準格式。
from airflow.lineage.hook import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})
透過 HookLineageCollector 收集的血緣資料可以使用在 Airflow 外掛中註冊的 HookLineageReader 例項訪問。
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_assets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
如果 Airflow 中未註冊 HookLineageReader,則會改用預設的 NoOpCollector。此收集器不會建立符合 AIP-60 的資產或收集血緣資訊。