使用 SQLExecuteQueryOperator 連線到 Trino¶
使用 SQLExecuteQueryOperator 在 Trino 查詢引擎中執行 SQL 命令。
警告
TrinoOperator 已廢棄,推薦使用 SQLExecuteQueryOperator。如果您正在使用 TrinoOperator,應儘快遷移。
使用 Operator¶
使用 trino_conn_id 引數連線到您的 Trino 例項
以下是使用 SQLExecuteQueryOperator 連線到 Trino 的示例用法
tests/system/trino/example_trino.py
with models.DAG(
dag_id="example_trino",
schedule="@once", # Override to match your needs
start_date=datetime(2025, 2, 24),
catchup=False,
tags=["example"],
) as dag:
trino_create_schema = SQLExecuteQueryOperator(
task_id="trino_create_schema",
sql=f" CREATE SCHEMA IF NOT EXISTS {SCHEMA} WITH (location = 's3://irisbkt/cities/') ",
handler=list,
)
trino_create_table = SQLExecuteQueryOperator(
task_id="trino_create_table",
sql=f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE}( cityid bigint, cityname varchar) ",
handler=list,
)
trino_insert = SQLExecuteQueryOperator(
task_id="trino_insert",
sql=f" INSERT INTO {SCHEMA}.{TABLE} VALUES (1, 'San Francisco') ",
handler=list,
requires_result_fetch=True,
)
trino_multiple_queries = SQLExecuteQueryOperator(
task_id="trino_multiple_queries",
sql=[
f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE1}(cityid bigint,cityname varchar) ",
f" INSERT INTO {SCHEMA}.{TABLE1} VALUES (2, 'San Jose') ",
f" CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE2}(cityid bigint,cityname varchar) ",
f" INSERT INTO {SCHEMA}.{TABLE2} VALUES (3, 'San Diego') ",
],
handler=list,
requires_result_fetch=True,
)
trino_templated_query = SQLExecuteQueryOperator(
task_id="trino_templated_query",
sql="SELECT * FROM {{ params.SCHEMA }}.{{ params.TABLE }}",
handler=list,
params={"SCHEMA": SCHEMA, "TABLE": TABLE1},
)
trino_parameterized_query = SQLExecuteQueryOperator(
task_id="trino_parameterized_query",
sql=f" SELECT * FROM {SCHEMA}.{TABLE2} WHERE cityname = ?",
parameters=("San Diego",),
handler=list,
)
(
trino_create_schema
>> trino_create_table
>> trino_insert
>> trino_multiple_queries
>> trino_templated_query
>> trino_parameterized_query
)
注意
此 Operator 可用於執行任何語法正確的 Trino 查詢,並且可以透過使用 list 或 string 傳遞多個查詢。