airflow.providers.google.cloud.openlineage.utils

屬性

log

BIGQUERY_NAMESPACE

BIGQUERY_URI

WILDCARD

函式

merge_column_lineage_facets(facets)

將多個列血緣關係 Facet 合併為一個統一的 Facet。

extract_ds_name_from_gcs_path(path)

從給定路徑中提取和處理資料集名稱。

get_facets_from_bq_table(table)

從 BigQuery 表物件獲取 Facet。

get_namespace_name_from_source_uris(source_uris)

get_identity_column_lineage_facet(dest_field_names, ...)

獲取身份轉換的列血緣關係 Facet。

get_from_nullable_chain(source, chain)

從巢狀物件結構中獲取物件,其中不保證巢狀結構中的所有鍵都存在。

inject_openlineage_properties_into_dataproc_job(job, ...)

將 OpenLineage 屬性注入到 Spark 作業定義中。

inject_openlineage_properties_into_dataproc_batch(...)

將 OpenLineage 屬性注入到 Dataproc 批處理定義中。

inject_openlineage_properties_into_dataproc_workflow_template(...)

將 OpenLineage 屬性注入到工作流模板中的所有 Spark 作業中。

模組內容

airflow.providers.google.cloud.openlineage.utils.log[原始碼]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_NAMESPACE = 'bigquery'[原始碼]
airflow.providers.google.cloud.openlineage.utils.BIGQUERY_URI = 'bigquery'[原始碼]
airflow.providers.google.cloud.openlineage.utils.WILDCARD = '*'[原始碼]
airflow.providers.google.cloud.openlineage.utils.merge_column_lineage_facets(facets)[原始碼]

將多個列血緣關係 Facet 合併為一個統一的 Facet。

具體來說,它會彙總所有提供的 Facet 中每個欄位的輸入欄位和轉換。

引數

facets: 要合併的列血緣關係 Facet。

返回

一個新的列血緣關係 Facet,包含所有欄位、它們各自的輸入欄位和轉換。

注意
  • 輸入欄位透過其 (namespace, name, field) 元組唯一標識。

  • 如果多個 Facet 包含具有相同輸入欄位的同一欄位,則這些輸入欄位將被合併而不會重複。

  • 與輸入欄位關聯的轉換也會被合併。如果 InputField 類版本不支援轉換,則會省略它們。

  • 轉換合併依賴於欄位名和輸入欄位元組的複合鍵來跟蹤和整合轉換。

示例

案例 1: 具有相同輸入欄位的兩個 Facet ` >>> facet1 = ColumnLineageDatasetFacet( ...     fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> facet2 = ColumnLineageDatasetFacet( ...     fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields [InputField("namespace1", "dataset1", "field1")] `

案例 2: 具有相同輸入欄位但轉換不同的兩個 Facet ` >>> facet1 = ColumnLineageDatasetFacet( ...     fields={ ...         "columnA": Fields( ...             inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t1"])] ...         ) ...     } ... ) >>> facet2 = ColumnLineageDatasetFacet( ...     fields={ ...         "columnA": Fields( ...             inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t2"])] ...         ) ...     } ... ) >>> merged = merge_column_lineage_facets([facet1, facet2]) >>> merged.fields["columnA"].inputFields[0].transformations ["t1", "t2"] `

airflow.providers.google.cloud.openlineage.utils.extract_ds_name_from_gcs_path(path)[原始碼]

從給定路徑中提取和處理資料集名稱。

引數

path: 要處理的路徑,例如 GCS 檔案路徑。

返回

處理後的資料集名稱。

示例
>>> extract_ds_name_from_gcs_path("/dir/file.*")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/pre_")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/file.txt")
'dir/file.txt'
>>> extract_ds_name_from_gcs_path("/dir/file.")
'dir'
>>> extract_ds_name_from_gcs_path("/dir/")
'dir'
>>> extract_ds_name_from_gcs_path("")
'/'
>>> extract_ds_name_from_gcs_path("/")
'/'
>>> extract_ds_name_from_gcs_path(".")
'/'
airflow.providers.google.cloud.openlineage.utils.get_facets_from_bq_table(table)[原始碼]

從 BigQuery 表物件獲取 Facet。

airflow.providers.google.cloud.openlineage.utils.get_namespace_name_from_source_uris(source_uris)[原始碼]
airflow.providers.google.cloud.openlineage.utils.get_identity_column_lineage_facet(dest_field_names, input_datasets)[原始碼]

獲取身份轉換的列血緣關係 Facet。

此函式生成一個簡單的列血緣關係 Facet,其中每個目標列包含來自所有具有該列的輸入資料集的同名源列。該血緣關係假設沒有應用任何轉換,這意味著列在源資料集和目標資料集之間保留其身份。

引數

dest_field_names: 一個目標列名列表,需要確定其血緣關係。 input_datasets: 具有 schema Facet 的輸入資料集列表。

返回
一個字典,包含一個鍵 columnLineage,對映到一個 ColumnLineageDatasetFacet

如果無法確定列血緣關係,則返回一個空字典 - 詳見下方“注意”部分。

注意
  • 如果任何輸入資料集缺少 schema Facet,函式會立即返回一個空字典。

  • 如果源資料集 schema 中的任何欄位不在目標表中,函式將返回一個空字典。目標表可以包含額外的欄位,但所有源列都應存在於目標表中。

  • 如果沒有任何目標列可以與輸入資料集列匹配,則返回一個空字典。

  • 目標表中不存在於輸入資料集中的額外列在血緣關係 Facet 中會被忽略和跳過,因為它們無法追溯到源列。

  • 此函式假設沒有應用任何轉換,這意味著列在源資料集和目標資料集之間保留其身份。

airflow.providers.google.cloud.openlineage.utils.get_from_nullable_chain(source, chain)[原始碼]

從巢狀物件結構中獲取物件,其中不保證巢狀結構中的所有鍵都存在。

旨在取代一系列 dict.get() 語句。

示例用法

if (
    not job._properties.get("statistics")
    or not job._properties.get("statistics").get("query")
    or not job._properties.get("statistics").get("query").get("referencedTables")
):
    return None
result = job._properties.get("statistics").get("query").get("referencedTables")

變為

result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"])
if not result:
    return None
airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_job(job, context, inject_parent_job_info, inject_transport_info)[原始碼]

將 OpenLineage 屬性注入到 Spark 作業定義中。

此函式不會刪除現有配置或以任何方式修改作業定義,

只會新增必需的 OpenLineage 屬性,如果它們尚未存在。

如果滿足以下任一條件,整個屬性注入過程將被跳過
  • OpenLineage provider 不可訪問。

  • 作業型別不受支援。

  • inject_parent_job_infoinject_transport_info 都設定為 False。

此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。

如果滿足以下條件,則不會注入父作業資訊
  • 存在任何以 spark.openlineage.parent 為字首的屬性。

  • inject_parent_job_info 為 False。

如果滿足以下條件,則不會注入傳輸資訊
  • 存在任何以 spark.openlineage.transport 為字首的屬性。

  • inject_transport_info 為 False。

引數

job: 原始 Dataproc 作業定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。

返回

注入 OpenLineage 屬性後的修改版作業定義(如適用)。

airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_batch(batch, context, inject_parent_job_info, inject_transport_info)[原始碼]

將 OpenLineage 屬性注入到 Dataproc 批處理定義中。

此函式不會刪除現有配置或以任何方式修改批處理定義,

只會新增必需的 OpenLineage 屬性,如果它們尚未存在。

如果滿足以下任一條件,整個屬性注入過程將被跳過
  • OpenLineage provider 不可訪問。

  • 批處理型別不受支援。

  • inject_parent_job_infoinject_transport_info 都設定為 False。

此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。

如果滿足以下條件,則不會注入父作業資訊
  • 存在任何以 spark.openlineage.parent 為字首的屬性。

  • inject_parent_job_info 為 False。

如果滿足以下條件,則不會注入傳輸資訊
  • 存在任何以 spark.openlineage.transport 為字首的屬性。

  • inject_transport_info 為 False。

引數

batch: 原始 Dataproc 批處理定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。

返回

注入 OpenLineage 屬性後的修改版批處理定義(如適用)。

airflow.providers.google.cloud.openlineage.utils.inject_openlineage_properties_into_dataproc_workflow_template(template, context, inject_parent_job_info, inject_transport_info)[原始碼]

將 OpenLineage 屬性注入到工作流模板中的所有 Spark 作業中。

此函式不會刪除現有配置或以任何方式修改作業定義,

只會新增必需的 OpenLineage 屬性,如果它們尚未存在。

如果滿足以下任一條件,將跳過每個作業的整個屬性注入過程
  • OpenLineage provider 不可訪問。

  • 作業型別不受支援。

  • inject_parent_job_infoinject_transport_info 都設定為 False。

此外,如果相關的 OpenLineage 屬性已經存在,則不會注入特定資訊。

如果滿足以下條件,則不會注入父作業資訊
  • 存在任何以 spark.openlineage.parent 為字首的屬性。

  • inject_parent_job_info 為 False。

如果滿足以下條件,則不會注入傳輸資訊
  • 存在任何以 spark.openlineage.transport 為字首的屬性。

  • inject_transport_info 為 False。

引數

template: 原始 Dataproc 工作流模板定義。 context: 作業執行所在的 Airflow 上下文。 inject_parent_job_info: 指示是否注入父作業資訊的標誌。 inject_transport_info: 指示是否注入傳輸資訊的標誌。

返回

注入 OpenLineage 屬性後的修改版工作流模板定義(如適用)。

此條目是否有幫助?