airflow.providers.google.cloud.openlineage.utils¶
屬性¶
函式¶
|
將多個列血緣關係 Facet 合併為一個統一的 Facet。 |
從給定路徑中提取和處理資料集名稱。 |
|
|
從 BigQuery 表物件獲取 Facet。 |
|
|
|
獲取身份轉換的列血緣關係 Facet。 |
|
從巢狀物件結構中獲取物件,其中不保證巢狀結構中的所有鍵都存在。 |
將 OpenLineage 屬性注入到 Spark 作業定義中。 |
|
將 OpenLineage 屬性注入到 Dataproc 批處理定義中。 |
|
|
將 OpenLineage 屬性注入到工作流模板中的所有 Spark 作業中。 |
模組內容¶
- airflow.providers.google.cloud.openlineage.utils.merge_column_lineage_facets(facets)[原始碼]¶
將多個列血緣關係 Facet 合併為一個統一的 Facet。
具體來說,它會彙總所有提供的 Facet 中每個欄位的輸入欄位和轉換。
- 引數
facets: 要合併的列血緣關係 Facet。
- 返回
一個新的列血緣關係 Facet,包含所有欄位、它們各自的輸入欄位和轉換。
- 注意
輸入欄位透過其 (namespace, name, field) 元組唯一標識。
如果多個 Facet 包含具有相同輸入欄位的同一欄位,則這些輸入欄位將被合併而不會重複。
與輸入欄位關聯的轉換也會被合併。如果 InputField 類版本不支援轉換,則會省略它們。
轉換合併依賴於欄位名和輸入欄位元組的複合鍵來跟蹤和整合轉換。
- 示例
案例 1: 具有相同輸入欄位的兩個 Facet
` >>> facet1 = ColumnLineageDatasetFacet( ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> facet2 = ColumnLineageDatasetFacet( ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields [InputField("namespace1", "dataset1", "field1")] `案例 2: 具有相同輸入欄位但轉換不同的兩個 Facet
` >>> facet1 = ColumnLineageDatasetFacet( ... fields={ ... "columnA": Fields( ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t1"])] ... ) ... } ... ) >>> facet2 = ColumnLineageDatasetFacet( ... fields={ ... "columnA": Fields( ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t2"])] ... ) ... } ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields[0].transformations ["t1", "t2"] `
- airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[原始碼]¶
從給定路徑中提取和處理資料集名稱。
- 引數
path: 要處理的路徑,例如 GCS 檔案路徑。
- 返回
處理後的資料集名稱。
- 示例
>>> extract_ds_name_from_gcs_path("/dir/file.*") 'dir' >>> extract_ds_name_from_gcs_path("/dir/pre_") 'dir' >>> extract_ds_name_from_gcs_path("/dir/file.txt") 'dir/file.txt' >>> extract_ds_name_from_gcs_path("/dir/file.") 'dir' >>> extract_ds_name_from_gcs_path("/dir/") 'dir' >>> extract_ds_name_from_gcs_path("") '/' >>> extract_ds_name_from_gcs_path("/") '/' >>> extract_ds_name_from_gcs_path(".") '/'
- airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[原始碼]¶
從 BigQuery 表物件獲取 Facet。
- airflow.providers.google.cloud.openlineage.utils.get_namespace_name_from_source_uris(source_uris)[原始碼]¶
- airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[原始碼]¶
獲取身份轉換的列血緣關係 Facet。
此函式生成一個簡單的列血緣關係 Facet,其中每個目標列包含來自所有具有該列的輸入資料集的同名源列。該血緣關係假設沒有應用任何轉換,這意味著列在源資料集和目標資料集之間保留其身份。
- 引數
dest_field_names: 一個目標列名列表,需要確定其血緣關係。 input_datasets: 具有 schema Facet 的輸入資料集列表。
- 返回
- 一個字典,包含一個鍵 columnLineage,對映到一個 ColumnLineageDatasetFacet。
如果無法確定列血緣關係,則返回一個空字典 - 詳見下方“注意”部分。
- 注意
如果任何輸入資料集缺少 schema Facet,函式會立即返回一個空字典。
如果源資料集 schema 中的任何欄位不在目標表中,函式將返回一個空字典。目標表可以包含額外的欄位,但所有源列都應存在於目標表中。
如果沒有任何目標列可以與輸入資料集列匹配,則返回一個空字典。
目標表中不存在於輸入資料集中的額外列在血緣關係 Facet 中會被忽略和跳過,因為它們無法追溯到源列。
此函式假設沒有應用任何轉換,這意味著列在源資料集和目標資料集之間保留其身份。
- airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[原始碼]¶
從巢狀物件結構中獲取物件,其中不保證巢狀結構中的所有鍵都存在。
旨在取代一系列 dict.get() 語句。
示例用法
if ( not job._properties.get("statistics") or not job._properties.get("statistics").get("query") or not job._properties.get("statistics").get("query").get("referencedTables") ): return None result = job._properties.get("statistics").get("query").get("referencedTables")
變為
result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"]) if not result: return None
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info, inject_transport_info)[原始碼]¶
將 OpenLineage 屬性注入到 Spark 作業定義中。
- 此函式不會刪除現有配置或以任何方式修改作業定義,
只會新增必需的 OpenLineage 屬性,如果它們尚未存在。
- 如果滿足以下任一條件,整個屬性注入過程將被跳過
OpenLineage provider 不可訪問。
作業型別不受支援。
inject_parent_job_info 和 inject_transport_info 都設定為 False。
此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。
- 如果滿足以下條件,則不會注入父作業資訊
存在任何以 spark.openlineage.parent 為字首的屬性。
inject_parent_job_info 為 False。
- 如果滿足以下條件,則不會注入傳輸資訊
存在任何以 spark.openlineage.transport 為字首的屬性。
inject_transport_info 為 False。
- 引數
job: 原始 Dataproc 作業定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。
- 返回
注入 OpenLineage 屬性後的修改版作業定義(如適用)。
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_batch(batch, context, inject_parent_job_info, inject_transport_info)[原始碼]¶
將 OpenLineage 屬性注入到 Dataproc 批處理定義中。
- 此函式不會刪除現有配置或以任何方式修改批處理定義,
只會新增必需的 OpenLineage 屬性,如果它們尚未存在。
- 如果滿足以下任一條件,整個屬性注入過程將被跳過
OpenLineage provider 不可訪問。
批處理型別不受支援。
inject_parent_job_info 和 inject_transport_info 都設定為 False。
此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。
- 如果滿足以下條件,則不會注入父作業資訊
存在任何以 spark.openlineage.parent 為字首的屬性。
inject_parent_job_info 為 False。
- 如果滿足以下條件,則不會注入傳輸資訊
存在任何以 spark.openlineage.transport 為字首的屬性。
inject_transport_info 為 False。
- 引數
batch: 原始 Dataproc 批處理定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。
- 返回
注入 OpenLineage 屬性後的修改版批處理定義(如適用)。
- airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_workflow_template(template, context, inject_parent_job_info, inject_transport_info)[原始碼]¶
將 OpenLineage 屬性注入到工作流模板中的所有 Spark 作業中。
- 此函式不會刪除現有配置或以任何方式修改作業定義,
只會新增必需的 OpenLineage 屬性,如果它們尚未存在。
- 如果滿足以下任一條件,將跳過每個作業的整個屬性注入過程
OpenLineage provider 不可訪問。
作業型別不受支援。
inject_parent_job_info 和 inject_transport_info 都設定為 False。
此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。
- 如果滿足以下條件,則不會注入父作業資訊
存在任何以 spark.openlineage.parent 為字首的屬性。
inject_parent_job_info 為 False。
- 如果滿足以下條件,則不會注入傳輸資訊
存在任何以 spark.openlineage.transport 為字首的屬性。
inject_transport_info 為 False。
- 引數
template: 原始 Dataproc 工作流模板定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。
- 返回
注入 OpenLineage 屬性後的修改版工作流模板定義(如適用)。