SQL 運算子¶
這些運算子對 SQL 資料庫執行各種查詢,包括列級和表級資料質量檢查。
執行 SQL 查詢¶
使用 SQLExecuteQueryOperator 對不同的資料庫執行 SQL 查詢。該運算子的引數包括:
sql- 單個字串、字串列表或指向要執行的模板檔案的字串;autocommit(可選) 如果為 True,則每條命令自動提交(預設:False);parameters(可選) 用於渲染 SQL 查詢的引數。handler(可選) 將應用於遊標的函式。如果為None,將不返回結果(預設:fetch_all_handler)。split_statements(可選) 是否將單個 SQL 字串拆分成語句並分別執行(預設:False)。return_last(可選) 取決於split_statements,如果為True,此引數用於返回最後一條語句的結果或所有拆分語句的結果(預設:True)。
下面的示例展示瞭如何例項化 SQLExecuteQueryOperator 任務。
tests/system/common/sql/example_sql_execute_query.py
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
split_statements=True,
return_last=False,
)
檢查 SQL 表列¶
使用 SQLColumnCheckOperator 對給定表的列執行資料質量檢查。除了連線 ID 和表之外,還必須提供一個 `column_mapping`,它描述了列與要執行的測試之間的關係。一個示例的 `column_mapping` 是一個由三個巢狀字典組成的集合,如下所示:
column_mapping = {
"col_name": {
"null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
"min": {
"greater_than": 5,
"leq_to": 10,
"tolerance": 0.2,
},
"max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
}
}
其中 `col_name` 是要執行檢查的列的名稱,其字典中的每個條目都是一個檢查。有效的檢查包括:
null_check: 檢查列中的 NULL 值數量
distinct_check: 檢查列中不同值的計數
unique_check: 將列中不同值的數量與行數進行比較
min: 檢查列中的最小值
max: 檢查列中的最大值
檢查字典中的每個條目可以是檢查成功的條件、容差或分割槽子句。成功的條件包括:
大於
大於等於
小於
小於等於
等於
指定條件時,`equal_to` 與其他條件不相容。可以在同一檢查中同時指定下限和上限條件。容差是指結果可能超出邊界但仍被視為成功的百分比。
分割槽子句可以在運算子級別作為引數給出,它將對所有檢查進行分割槽;也可以在 `column_mapping` 中的列級別給出,它將對該列的所有檢查進行分割槽;或者在列的檢查級別給出,它僅對該檢查進行分割槽。
如果不使用提供的連線中的資料庫,也可以指定資料庫。
`accept_none` 引數,預設為 true,會將查詢返回的 None 值轉換為 0,從而允許空表返回有效的整數。
下面的示例演示瞭如何例項化 SQLColumnCheckOperator 任務。
tests/system/common/sql/example_sql_column_table_check.py
column_check = SQLColumnCheckOperator(
task_id="column_check",
table=AIRFLOW_DB_METADATA_TABLE,
column_mapping={
"id": {
"null_check": {
"equal_to": 0,
"tolerance": 0,
},
"distinct_check": {
"equal_to": 1,
},
}
},
)
檢查 SQL 表值¶
使用 SQLTableCheckOperator 對給定表執行資料質量檢查。除了連線 ID 和表之外,還必須提供一個 `checks` 字典,它描述了表與要執行的測試之間的關係。一個示例的 `checks` 引數是由兩個巢狀字典組成的集合,如下所示:
checks = (
{
"row_count_check": {
"check_statement": "COUNT(*) = 1000",
},
"column_sum_check": {
"check_statement": "col_a + col_b < col_c",
"partition_clause": "col_a IS NOT NULL",
},
},
)
第一組鍵是檢查名稱,在運算子構建的模板化查詢中引用。檢查名稱下的字典鍵必須包含 `check_statement`,其值是一個解析為布林值的 SQL 語句(這可以是任何在 airflow.operators.sql.parse_boolean 中解析為布林值的字串或整數)。另一個可能提供的鍵是 `partition_clause`,它是一個檢查級別的語句,將使用 WHERE 子句對該檢查的表中的資料進行分割槽。此語句與引數 `partition_clause` 相容,後者會過濾所有檢查。
下面的示例演示瞭如何例項化 SQLTableCheckOperator 任務。
tests/system/common/sql/example_sql_column_table_check.py
row_count_check = SQLTableCheckOperator(
task_id="row_count_check",
table=AIRFLOW_DB_METADATA_TABLE,
checks={
"row_count_check": {
"check_statement": "COUNT(*) = 1",
}
},
)
根據閾值檢查值¶
使用 SQLThresholdCheckOperator 將特定 SQL 查詢結果與定義的最小和最大閾值進行比較。這兩個閾值既可以是數值,也可以是解析為數值的另一個 SQL 查詢。此運算子需要一個連線 ID 以及要執行的 SQL 查詢,並且允許可選地指定資料庫,如果應覆蓋 `connection_id` 中的資料庫。引數包括:- sql - 要執行的 SQL 查詢,作為模板化字串。- min_threshold - 檢查的最小閾值。可以是數值或模板化 SQL 查詢。- max_threshold - 檢查的最大閾值。可以是數值或模板化 SQL 查詢。- conn_id (可選) - 用於連線到資料庫的連線 ID。- database (可選) - 資料庫名稱,將覆蓋連線中的資料庫。
下面的示例演示瞭如何例項化 SQLThresholdCheckOperator 任務。
tests/system/common/sql/example_sql_threshold_check.py
threshhold_check = SQLThresholdCheckOperator(
task_id="threshhold_check",
conn_id="sales_db",
sql="SELECT count(distinct(customer_id)) FROM sales;",
min_threshold=1,
max_threshold=1000,
)
如果查詢返回的值在閾值範圍內,則任務透過。否則,任務失敗。