SQL 運算子

這些運算子對 SQL 資料庫執行各種查詢,包括列級和表級資料質量檢查。

執行 SQL 查詢

使用 SQLExecuteQueryOperator 對不同的資料庫執行 SQL 查詢。該運算子的引數包括:

  • sql - 單個字串、字串列表或指向要執行的模板檔案的字串;

  • autocommit (可選) 如果為 True,則每條命令自動提交(預設:False);

  • parameters (可選) 用於渲染 SQL 查詢的引數。

  • handler (可選) 將應用於遊標的函式。如果為 None,將不返回結果(預設:fetch_all_handler)。

  • split_statements (可選) 是否將單個 SQL 字串拆分成語句並分別執行(預設:False)。

  • return_last (可選) 取決於 split_statements,如果為 True,此引數用於返回最後一條語句的結果或所有拆分語句的結果(預設:True)。

下面的示例展示瞭如何例項化 SQLExecuteQueryOperator 任務。

tests/system/common/sql/example_sql_execute_query.py

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

檢查 SQL 表列

使用 SQLColumnCheckOperator 對給定表的列執行資料質量檢查。除了連線 ID 和表之外,還必須提供一個 `column_mapping`,它描述了列與要執行的測試之間的關係。一個示例的 `column_mapping` 是一個由三個巢狀字典組成的集合,如下所示:

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

其中 `col_name` 是要執行檢查的列的名稱,其字典中的每個條目都是一個檢查。有效的檢查包括:

  • null_check: 檢查列中的 NULL 值數量

  • distinct_check: 檢查列中不同值的計數

  • unique_check: 將列中不同值的數量與行數進行比較

  • min: 檢查列中的最小值

  • max: 檢查列中的最大值

檢查字典中的每個條目可以是檢查成功的條件、容差或分割槽子句。成功的條件包括:

  • 大於

  • 大於等於

  • 小於

  • 小於等於

  • 等於

指定條件時,`equal_to` 與其他條件不相容。可以在同一檢查中同時指定下限和上限條件。容差是指結果可能超出邊界但仍被視為成功的百分比。

分割槽子句可以在運算子級別作為引數給出,它將對所有檢查進行分割槽;也可以在 `column_mapping` 中的列級別給出,它將對該列的所有檢查進行分割槽;或者在列的檢查級別給出,它僅對該檢查進行分割槽。

如果不使用提供的連線中的資料庫,也可以指定資料庫。

`accept_none` 引數,預設為 true,會將查詢返回的 None 值轉換為 0,從而允許空表返回有效的整數。

下面的示例演示瞭如何例項化 SQLColumnCheckOperator 任務。

tests/system/common/sql/example_sql_column_table_check.py

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

檢查 SQL 表值

使用 SQLTableCheckOperator 對給定表執行資料質量檢查。除了連線 ID 和表之外,還必須提供一個 `checks` 字典,它描述了表與要執行的測試之間的關係。一個示例的 `checks` 引數是由兩個巢狀字典組成的集合,如下所示:

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

第一組鍵是檢查名稱,在運算子構建的模板化查詢中引用。檢查名稱下的字典鍵必須包含 `check_statement`,其值是一個解析為布林值的 SQL 語句(這可以是任何在 airflow.operators.sql.parse_boolean 中解析為布林值的字串或整數)。另一個可能提供的鍵是 `partition_clause`,它是一個檢查級別的語句,將使用 WHERE 子句對該檢查的表中的資料進行分割槽。此語句與引數 `partition_clause` 相容,後者會過濾所有檢查。

下面的示例演示瞭如何例項化 SQLTableCheckOperator 任務。

tests/system/common/sql/example_sql_column_table_check.py

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

根據閾值檢查值

使用 SQLThresholdCheckOperator 將特定 SQL 查詢結果與定義的最小和最大閾值進行比較。這兩個閾值既可以是數值,也可以是解析為數值的另一個 SQL 查詢。此運算子需要一個連線 ID 以及要執行的 SQL 查詢,並且允許可選地指定資料庫,如果應覆蓋 `connection_id` 中的資料庫。引數包括:- sql - 要執行的 SQL 查詢,作為模板化字串。- min_threshold - 檢查的最小閾值。可以是數值或模板化 SQL 查詢。- max_threshold - 檢查的最大閾值。可以是數值或模板化 SQL 查詢。- conn_id (可選) - 用於連線到資料庫的連線 ID。- database (可選) - 資料庫名稱,將覆蓋連線中的資料庫。

下面的示例演示瞭如何例項化 SQLThresholdCheckOperator 任務。

tests/system/common/sql/example_sql_threshold_check.py

threshhold_check = SQLThresholdCheckOperator(
    task_id="threshhold_check",
    conn_id="sales_db",
    sql="SELECT count(distinct(customer_id)) FROM sales;",
    min_threshold=1,
    max_threshold=1000,
)

如果查詢返回的值在閾值範圍內,則任務透過。否則,任務失敗。

此條目是否有用?