OpenLineage 宏¶
OpenLineage 外掛中包含的宏被整合到 Airflow 的主集合中,並可供使用。
它們可以作為 Jinja 模板被呼叫,例如:
Lineage 作業與執行宏¶
- 這些宏
lineage_job_namespace()lineage_job_name(task_instance)lineage_run_id(task_instance)
允許將某個 Airflow 任務的部分執行資訊注入到傳送給遠端處理作業的引數中。例如,SparkSubmitOperator 可以像這樣設定:
SparkSubmitOperator(
task_id="my_task",
application="/script.py",
conf={
# separated components
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
},
)
Lineage 父 ID¶
相同的資訊,但壓縮成一個字串,可以使用 linage_parent_id(task_instance) 宏傳遞。
def my_task_function(templates_dict, **kwargs):
parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/")
...
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
templates_dict={
# joined components as one string `<namespace>/<name>/<run_id>`
"parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
},
provide_context=False,
dag=dag,
)