airflow.providers.openlineage.sqlparser

屬性

log

DEFAULT_NAMESPACE

DEFAULT_INFORMATION_SCHEMA_COLUMNS

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME

GetTableSchemasParams

get_table_schemas 引數。

DatabaseInfo

包含處理 SQL 語句解析結果所需的資料庫特定資訊。

SQLParser

openlineage-sql 的介面。

函式

default_normalize_name_method(name)

from_table_meta(table_meta, database, namespace, ...)

get_openlineage_facets_with_sql(hook, sql, conn_id, ...)

模組內容

airflow.providers.openlineage.sqlparser.log[source]
airflow.providers.openlineage.sqlparser.DEFAULT_NAMESPACE = 'default'[source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_COLUMNS = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = 'information_schema.columns'[source]
airflow.providers.openlineage.sqlparser.default_normalize_name_method(name)[source]
class airflow.providers.openlineage.sqlparser.GetTableSchemasParams[source]

基類: TypedDict

get_table_schemas 引數。

normalize_name: Callable[[str], str][source]
is_cross_db: bool[source]
information_schema_columns: list[str][source]
information_schema_table: str[source]
use_flat_cross_db_query: bool[source]
is_uppercase_names: bool[source]
database: str | None[source]
class airflow.providers.openlineage.sqlparser.DatabaseInfo[source]

包含處理 SQL 語句解析結果所需的資料庫特定資訊。

引數:
  • scheme – OpenLineage 名稱空間中 URI 的 Scheme 部分。

  • authority – OpenLineage 名稱空間中 URI 的 Authority 部分。在大多數情況下,它應該返回 Airflow 連線的 {host}:{port} 部分。參閱: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

  • database – 優先於解析的資料庫名稱。

  • information_schema_columns – 資訊模式表中的列名稱列表。

  • information_schema_table_name – 資訊模式表名稱。

  • use_flat_cross_db_query

    指定是應使用單個“全域性”資訊模式表進行跨資料庫查詢(例如,在 Redshift 中),還是應分別查詢多個基於資料庫的“本地”資訊模式表。

    如果為 True,則假設存在一個單一、通用的資訊模式表(例如,在 Redshift 中,SVV_REDSHIFT_COLUMNS 檢視)[https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_REDSHIFT_COLUMNS.html]。在此模式下,我們僅直接查詢 information_schema_table_name。根據 is_information_schema_cross_db 引數,您還可以在 WHERE 子句中按資料庫名稱進行過濾。

    如果為 False,則將每個資料庫視為擁有包含該資料庫元資料的自己的本地資訊模式表。因此,可能會為每個資料庫生成一個查詢,然後進行合併(通常透過 UNION ALL)。對於不維護所有元資料的單一全域性檢視或需要按資料庫查詢的方言,此方法是必需的。根據 is_information_schema_cross_db 引數,查詢可以在識別符號和過濾器中包含或省略資料庫資訊。

    請參閱 is_information_schema_cross_db,它也會影響最終查詢的構建方式。

  • is_information_schema_cross_db

    指定是否應跟蹤資料庫資訊並將其包含在從 information_schema_table 檢索模式資訊的查詢中。簡而言之,這決定了查詢是否能夠跨越多個數據庫。

    如果為 True,則在適用時包含資料庫識別符號,從而允許從多個數據庫檢索元資料。例如,在 Snowflake 或 MS SQL(其中每個資料庫都被視為頂級名稱空間)中,您可能會有如下查詢:

    ` SELECT ... FROM db1.information_schema.columns WHERE ... UNION ALL SELECT ... FROM db2.information_schema.columns WHERE ... `

    在 Redshift 中,將其設定為 True 並結合 use_flat_cross_db_query=True 允許向查詢新增資料庫過濾器,例如

    ` SELECT ... FROM SVV_REDSHIFT_COLUMNS WHERE SVV_REDSHIFT_COLUMNS.database == db1  # 當為 False AND 時跳過此項 SVV_REDSHIFT_COLUMNS.schema == schema1 AND SVV_REDSHIFT_COLUMNS.table IN (table1, table2) OR ... `

    但是,某些資料庫(例如 PostgreSQL)不允許真正的跨資料庫查詢。在此類方言中,啟用跨資料庫支援可能會導致錯誤或不必要。請始終查閱您的方言文件或測試示例查詢,以確認是否支援跨資料庫查詢。

    如果為 False,則忽略資料庫限定符,有效地將查詢限制到單個數據庫(或使資料庫級別限定符成為可選)。這對於不支援跨資料庫操作或僅提供兩級名稱空間(模式 + 表)而不是三級名稱空間(資料庫 + 模式 + 表)的資料庫通常更安全。例如,某些 MySQL 或 PostgreSQL 上下文可能根本不需要或不允許跨資料庫查詢。

    請參閱 use_flat_cross_db_query,它也會影響最終查詢的構建方式。

  • is_uppercase_names – 指定資料庫是否僅接受大寫名稱(例如 Snowflake)。

  • normalize_name_method – 用於規範化資料庫、模式和表名稱的方法。預設為 name.lower()

scheme: str[source]
authority: str | None = None[source]
database: str | None = None[source]
information_schema_columns: list[str] = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
information_schema_table_name: str = 'information_schema.columns'[source]
use_flat_cross_db_query: bool = False[source]
is_information_schema_cross_db: bool = False[source]
is_uppercase_names: bool = False[source]
normalize_name_method: Callable[[str], str][source]
airflow.providers.openlineage.sqlparser.from_table_meta(table_meta, database, namespace, is_uppercase)[source]
class airflow.providers.openlineage.sqlparser.SQLParser(dialect=None, default_schema=None)[source]

基類: airflow.utils.log.logging_mixin.LoggingMixin

openlineage-sql 的介面。

引數:
  • dialect (str | None) – 資料庫特定的方言

  • default_schema (str | None) – 應用於每個未解析模式的表的模式

dialect = None[source]
default_schema = None[source]
parse(sql)[source]

解析單條或多條 SQL 語句。

parse_table_schemas(hook, inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None, sqlalchemy_engine=None)[source]

解析輸入表和輸出表的模式。

get_metadata_from_parser(inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None)[source]
attach_column_lineage(datasets, database, parse_result)[source]

將列沿襲方面附加到資料集列表中。

請注意,目前每個資料集設定了相同的列沿襲資訊。這將在 OpenLineage SQL Parser 改進後有所變化。

generate_openlineage_metadata_from_sql(sql, hook, database_info, database=None, sqlalchemy_engine=None, use_connection=True)[source]

解析 SQL 語句並生成 OpenLineage 元資料。

生成的 OpenLineage 元資料包含

  • 已解析模式的輸入表

  • 已解析模式的輸出表

  • 執行方面 (run facets)

  • 作業方面 (job facets)。

引數:
static create_namespace(database_info)[source]
classmethod normalize_sql(sql)[source]

確保返回以分號分隔的 SQL 語句。

classmethod split_sql_string(sql)[source]

將 SQL 字串拆分為語句列表。

嘗試使用 DbApiHook.split_sql_string 如果可用。否則,使用相同的邏輯。

create_information_schema_query(tables, normalize_name, is_cross_db, information_schema_columns, information_schema_table, is_uppercase_names, use_flat_cross_db_query, database=None, sqlalchemy_engine=None)[source]

建立 SELECT 語句以查詢資訊模式表。

airflow.providers.openlineage.sqlparser.get_openlineage_facets_with_sql(hook, sql, conn_id, database)[source]

此條目有幫助嗎?