airflow.providers.common.sql.operators.sql¶
屬性¶
類¶
這是用於通用 SQL Operator 的基類,用於獲取 DB Hook。 |
|
在特定資料庫中執行 SQL 程式碼。 |
|
執行 column_checks 字典中一個或多個模板化的檢查。 |
|
執行 checks 字典中提供的一個或多個檢查。 |
|
對資料庫執行檢查。 |
|
使用 SQL 程式碼執行簡單的值檢查。 |
|
檢查作為 SQL 表示式給出的指標是否在前 days_back 天的指標的容差範圍內。 |
|
使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。 |
|
允許 DAG 根據 SQL 查詢結果進行“分支”或遵循指定路徑。 |
函式¶
|
模組內容¶
- airflow.providers.common.sql.operators.sql.parse_boolean[source]¶
- Sphinx-autoapi-skip:
重要提示!!!保留此程式碼以相容已釋出的 google provider 8.4.0 版本。
遺憾的是,此 provider 使用 _get_failed_checks 和 parse_boolean 作為匯入,我們應該保留這些方法以避免 8.4.0 版本出現故障。
- class airflow.providers.common.sql.operators.sql.BaseSQLOperator(*, conn_id=None, database=None, hook_params=None, retry_on_failure=True, **kwargs)[source]¶
基類:
airflow.models.BaseOperator這是用於通用 SQL Operator 的基類,用於獲取 DB Hook。
提供的方法是 .get_db_hook()。預設行為將嘗試根據連線型別檢索 DB hook。您可以透過覆蓋 .get_db_hook() 方法來自定義行為。
- 引數:
conn_id (str | None) – 對特定資料庫的引用
- template_fields: collections.abc.Sequence[str] = ('conn_id', 'database', 'hook_params')[source]¶
- classmethod get_hook(conn_id, hook_params=None)[source]¶
返回此連線 ID 的預設 hook。
- 引數:
- 返回:
此連線的預設 hook
- 返回型別:
- class airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator(*, sql, autocommit=False, parameters=None, handler=fetch_all_handler, output_processor=None, conn_id=None, database=None, split_statements=None, return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs)[source]¶
基類:
BaseSQLOperator在特定資料庫中執行 SQL 程式碼。
在實現特定 Operator 時,您還可以在 hook 中實現 _process_output 方法,以對您的 DB Hook 返回的值執行額外的處理。例如,您可以將從語句的遊標中檢索到的描述與返回值連線起來,或者將 Operator 的輸出儲存到檔案中。
- 引數:
sql (str | list[str]) – 要執行的 SQL 程式碼或指向模板檔案的字串(模板化)。檔案必須具有“.sql”副檔名。
autocommit (bool) – (可選)如果為 True,則每個命令都會自動提交(預設值:False)。
parameters (collections.abc.Mapping | collections.abc.Iterable | None) – (可選)用於渲染 SQL 查詢的引數。
handler (Callable[[Any], list[tuple] | None]) – (可選)將應用於遊標的函式(預設值:fetch_all_handler)。
output_processor (Callable[[list[Any], list[collections.abc.Sequence[collections.abc.Sequence] | None]], list[Any] | tuple[list[collections.abc.Sequence[collections.abc.Sequence] | None], list]] | None) – (可選)將應用於結果的函式(預設值:default_output_processor)。
split_statements (bool | None) – (可選)是否將單個 SQL 字串拆分為多個語句。預設情況下,遵循已配置 hook 的
run方法中的預設值。conn_id (str | None) – 用於連線到資料庫的連線 ID
database (str | None) – 資料庫名稱,將覆蓋連線中定義的名稱
return_last (bool) – (可選)僅返回最後一條語句的結果(預設值:True)。
show_return_value_in_logs (bool) – (可選)如果為 true,則 operator 輸出將列印到任務日誌中。請謹慎使用。不建議將大型資料集轉儲到日誌中(預設值:False)。
requires_result_fetch (bool) – (可選)如果為 True,則確保在執行完成之前獲取查詢結果。如果 do_xcom_push 為 True,則結果會自動獲取,使此引數變得冗餘(預設值:False)。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:執行 SQL 查詢
- template_fields: collections.abc.Sequence[str] = ('sql', 'parameters', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql', '.json')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator(*, table, column_mapping, partition_clause=None, conn_id=None, database=None, accept_none=True, **kwargs)[source]¶
基類:
BaseSQLOperator執行 column_checks 字典中一個或多個模板化的檢查。
檢查是根據 column_mapping 指定的每列執行的。
每個檢查可以採用以下一個或多個選項
equal_to: 等於的精確值,不能與其他比較選項一起使用greater_than: 結果應嚴格大於的值less_than: 結果應嚴格小於的值geq_to: 結果應大於或等於的值leq_to: 結果應小於或等於的值tolerance: 結果可能與期望值相差的百分比partition_clause: 傳遞到 WHERE 語句中用於分割槽資料的額外子句
- 引數:
table (str) – 要執行檢查的表
column_mapping (dict[str, dict[str, Any]]) –
列及其相關檢查的字典,例如
{ "col_name": { "null_check": { "equal_to": 0, "partition_clause": "foreign_key IS NOT NULL", }, "min": { "greater_than": 5, "leq_to": 10, "tolerance": 0.2, }, "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01}, } }
partition_clause (str | None) –
新增到 operator 構建的查詢的 WHERE 子句中的部分 SQL 語句,用於為要執行的檢查建立 partition_clauses,例如
"date = '1970-01-01'"conn_id (str | None) – 用於連線到資料庫的連線 ID
database (str | None) – 資料庫名稱,將覆蓋連線中定義的名稱
accept_none (bool) – 是否接受查詢返回的 None 值。如果為 true,則將 None 轉換為 0。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:檢查 SQL 表列
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql', 'conn_id', 'database', 'hook_params')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLTableCheckOperator(*, table, checks, partition_clause=None, conn_id=None, database=None, **kwargs)[source]¶
基類:
BaseSQLOperator執行 checks 字典中提供的一個或多個檢查。
檢查應該編寫為返回布林結果。
- 引數:
table (str) – 要執行檢查的表
checks (dict[str, dict[str, Any]]) –
檢查字典,其中檢查名稱後跟一個字典,該字典至少包含一個檢查語句,以及可選的分割槽子句,例如:
{ "row_count_check": {"check_statement": "COUNT(*) = 1000"}, "column_sum_check": {"check_statement": "col_a + col_b < col_c"}, "third_check": {"check_statement": "MIN(col) = 1", "partition_clause": "col IS NOT NULL"}, }
partition_clause (str | None) –
新增到 operator 構建的查詢的 WHERE 子句中的部分 SQL 語句,用於為要執行的檢查建立 partition_clauses,例如
"date = '1970-01-01'"conn_id (str | None) – 用於連線到資料庫的連線 ID
database (str | None) – 資料庫名稱,將覆蓋連線中定義的名稱
另請參閱
有關如何使用此運算子的更多資訊,請查閱指南:檢查 SQL 表值
- template_fields: collections.abc.Sequence[str] = ('table', 'partition_clause', 'sql', 'conn_id', 'database', 'hook_params')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLCheckOperator(*, sql, conn_id=None, database=None, parameters=None, **kwargs)[source]¶
基類:
BaseSQLOperator對資料庫執行檢查。
SQLCheckOperator 需要一個返回單行的 SQL 查詢。第一行的每個值都使用 Python 的
bool型別轉換進行評估。如果任何值返回False,則檢查失敗並報錯。如果返回一個 Python 字典,並且字典中的任何值為False,則檢查失敗並報錯。請注意,Python 的布林型別轉換會將以下值評估為
FalseFalse0空字串 (
"")空列表 (
[])空字典或集合 (
{})值為
False的字典 ({'DUPLICATE_ID_CHECK': False})
例如,給定一個像
SELECT COUNT(*) FROM foo這樣的查詢,它僅在計數== 0時失敗。你可以構建更復雜的查詢,例如檢查表是否與上游源表具有相同的行數,或者今天分割槽的計數是否大於昨天分割槽的計數,或者一組指標是否小於 7 天平均值的 3 個標準差。此運算子可用作管道中的資料質量檢查。根據它在 DAG 中的位置,你可以選擇阻止關鍵路徑,防止釋出可疑資料,或者將其放在側面,接收電子郵件警報而不中斷 DAG 的進展。
- 引數:
sql (str) – 要執行的 SQL。(模板化)
conn_id (str | None) – 用於連線資料庫的連線 ID。
database (str | None) – 資料庫名稱,將覆蓋連線中定義的名稱
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可選) 用於渲染 SQL 查詢的引數。
- template_fields: collections.abc.Sequence[str] = ('sql', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLValueCheckOperator(*, sql, pass_value, tolerance=None, conn_id=None, database=None, **kwargs)[source]¶
基類:
BaseSQLOperator使用 SQL 程式碼執行簡單的值檢查。
- 引數:
- template_fields: collections.abc.Sequence[str] = ('sql', 'pass_value', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, ratio_formula='max_over_min', ignore_zero=True, conn_id=None, database=None, **kwargs)[source]¶
基類:
BaseSQLOperator檢查作為 SQL 表示式給出的指標是否在前 days_back 天的指標的容差範圍內。
- 引數:
table (str) – 表名
conn_id (str | None) – 用於連線資料庫的連線 ID。
database (str | None) – 資料庫名稱,它將覆蓋連線中定義的名稱
days_back (SupportsAbs[int]) –
ds和我們希望檢查的ds之間的天數。預設為 7 天date_filter_column (str | None) – 用於過濾日期的列名。預設為 ‘ds’
ratio_formula (str | None) –
用於計算兩個指標之間比率的公式。假設
cur是今天的指標,ref是今天減去days_back天的指標。預設值:‘max_over_min’max_over_min: 計算 max(cur, ref) / min(cur, ref)relative_diff: 計算 abs(cur-ref) / ref
ignore_zero (bool) – 是否應忽略零值指標
- template_fields: collections.abc.Sequence[str] = ('sql1', 'sql2', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.SQLThresholdCheckOperator(*, sql, min_threshold, max_threshold, conn_id=None, database=None, **kwargs)[source]¶
基類:
BaseSQLOperator使用 SQL 程式碼對最小值閾值和最大值閾值執行值檢查。
閾值可以是數值,也可以是返回數值的 SQL 語句。
- 引數:
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 對照閾值檢查值
- template_fields: collections.abc.Sequence[str] = ('sql', 'min_threshold', 'max_threshold', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.hql', '.sql')[source]¶
- class airflow.providers.common.sql.operators.sql.BranchSQLOperator(*, sql, follow_task_ids_if_true, follow_task_ids_if_false, conn_id='default_conn_id', database=None, parameters=None, **kwargs)[source]¶
Bases:
BaseSQLOperator,airflow.models.SkipMixin允許 DAG 基於 SQL 查詢結果進行“分支”或遵循指定的路徑。
- 引數:
sql (str) – 要執行的 SQL 程式碼,應返回 true 或 false (模板化) 模板引用透過以 '.sql' 結尾的字串識別。預期的 SQL 查詢返回布林值 (True/False)、整數 (0 = False, 其他 = 1) 或字串 (true/y/yes/1/on/false/n/no/0/off)。
follow_task_ids_if_true (list[str]) – 如果查詢返回 true,要遵循的任務 ID 或任務 ID 列表
follow_task_ids_if_false (list[str]) – 如果查詢返回 false,要遵循的任務 ID 或任務 ID 列表
conn_id (str) – 用於連線到資料庫的連線 ID。
database (str | None) – 資料庫名稱,將覆蓋連線中定義的名稱
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – (可選) 用於渲染 SQL 查詢的引數。
- template_fields: collections.abc.Sequence[str] = ('sql', 'conn_id', 'database', 'hook_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶