SQLExecuteQueryOperator 連線到 Snowflake

使用 SQLExecuteQueryOperatorSnowflake 資料庫中執行 SQL 命令。

警告

之前,`SnowflakeOperator` 用於執行此類操作。但目前 `SnowflakeOperator` 已棄用,並將在提供程式的未來版本中移除。請儘快考慮切換到 `SQLExecuteQueryOperator`。

使用運算子

使用 conn_id 引數連線到您的 Snowflake 例項,連線元資料結構如下所示

Snowflake Airflow 連線元資料

引數

輸入

登入名: string

Snowflake 使用者名稱

密碼: string

Snowflake 使用者的密碼

Schema: string

預設設定執行 SQL 操作的 schema

Extra: dictionary

warehouse, account, database, region, role, authenticator

SQLExecuteQueryOperator 連線到 Snowflake 的使用示例如下所示

tests/system/snowflake/example_snowflake.py

snowflake_op_sql_str = SQLExecuteQueryOperator(
    task_id="snowflake_op_sql_str", sql=CREATE_TABLE_SQL_STRING
)

snowflake_op_with_params = SQLExecuteQueryOperator(
    task_id="snowflake_op_with_params",
    sql=SQL_INSERT_STATEMENT,
    parameters={"id": 56},
)

snowflake_op_sql_list = SQLExecuteQueryOperator(task_id="snowflake_op_sql_list", sql=SQL_LIST)

snowflake_op_sql_multiple_stmts = SQLExecuteQueryOperator(
    task_id="snowflake_op_sql_multiple_stmts",
    sql=SQL_MULTIPLE_STMTS,
    split_statements=True,
)

snowflake_op_template_file = SQLExecuteQueryOperator(
    task_id="snowflake_op_template_file",
    sql="example_snowflake_snowflake_op_template_file.sql",
)

注意

可以傳遞給運算子的引數將優先於 Airflow 連線元資料中已給定的引數(例如 schemaroledatabase 等)。

SnowflakeSqlApiOperator

使用 SnowflakeSqlApiHookSnowflake 資料庫中執行 SQL 命令。

您還可以透過將 deferrable 引數設定為 True 來以可推遲模式執行此運算子。這將確保任務從 Airflow worker slot 中推遲,並在觸發器上進行任務狀態輪詢。

使用運算子

使用 snowflake_conn_id 引數連線到您的 Snowflake 例項,連線元資料結構如下所示

Snowflake Airflow 連線元資料

引數

輸入

登入名: string

Snowflake 使用者名稱。如果使用 OAuth 連線,這是 client_id

密碼: string

Snowflake 使用者的密碼。如果使用 OAuth,這是 client_secret

Schema: string

預設設定執行 SQL 操作的 schema

Extra: dictionary

warehouse, account, database, region, role, authenticator, refresh_token。如果使用 OAuth,必須指定 refresh_token此處獲取

SnowflakeSqlApiHook 的使用示例如下所示

tests/system/snowflake/example_snowflake.py

snowflake_sql_api_op_sql_multiple_stmt = SnowflakeSqlApiOperator(
    task_id="snowflake_op_sql_multiple_stmt",
    sql=SQL_MULTIPLE_STMTS,
    statement_count=len(SQL_LIST),
)

注意

可以傳遞給運算子的引數將優先於 Airflow 連線元資料中已給定的引數(例如 schemaroledatabase 等)。

此條目有幫助嗎?