TeradataOperator

TeradataOperator 的目的是定義與 Teradata 互動的任務。

要在 Teradata 中執行任意 SQL,請使用 TeradataOperator

使用 TeradataOperator 進行常見的資料庫操作

建立一個 Teradata 資料庫表

TeradataOperator 的一個使用示例如下:

tests/system/teradata/example_teradata.py

create_table = TeradataOperator(
    task_id="create_table",
    sql=r"""
    CREATE TABLE Country (
        country_id INTEGER,
        name CHAR(25),
        continent CHAR(25)
    );
    """,
)

您也可以使用外部檔案來執行 SQL 命令。外部檔案必須與 DAG.py 檔案處於同一級別。這樣您可以輕鬆地將 SQL 查詢與程式碼分離維護。

tests/system/teradata/example_teradata.py

    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )

您的 dags/create_table.sql 檔案應該如下所示:

  -- create Users table
  CREATE TABLE Users, FALLBACK (
    username   varchar(50),
    description           varchar(256)
);

向 Teradata 資料庫表中插入資料

然後我們可以建立一個 TeradataOperator 任務來填充 Users 表。

tests/system/teradata/example_teradata.py

    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )

從您的 Teradata 資料庫表中獲取記錄

從您的 Teradata 資料庫表中獲取記錄可以很簡單,如下所示:

tests/system/teradata/example_teradata.py

    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )

向 TeradataOperator 傳遞引數

TeradataOperator 提供了 parameters 屬性,這使得在執行時能夠動態地將值注入到您的 SQL 請求中。

要查詢亞洲大陸的國家:

tests/system/teradata/example_teradata.py

    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )

刪除一個 Teradata 資料庫表

然後我們可以建立一個 TeradataOperator 任務來刪除 Users 表。

tests/system/teradata/example_teradata.py

    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )

完整的 Teradata Operator DAG

當我們把所有內容放在一起時,我們的 DAG 應該如下所示:

tests/system/teradata/example_teradata.py



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"DROP TABLE Country;",
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"DROP TABLE schema_table;",
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"DROP DATABASE airflow_temp;",
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

TeradataStoredProcedureOperator

TeradataStoredProcedureOperator 的目的是定義涉及執行 Teradata 儲存過程的任務。

在 Teradata 資料庫中執行儲存過程

要在 Teradata 中執行儲存過程,請使用 TeradataStoredProcedureOperator

假設資料庫中存在一個儲存過程,它看起來像這樣:

REPLACE PROCEDURE TEST_PROCEDURE (
    IN val_in INTEGER,
    INOUT val_in_out INTEGER,
    OUT val_out INTEGER,
    OUT value_str_out varchar(100)
)
    BEGIN
        set val_out = val_in * 2;
        set val_in_out = val_in_out * 4;
        set value_str_out = 'string output';
    END;
/

這個儲存過程接受一個整數引數 val_in 作為輸入。它使用一個 inout 引數 val_in_out,該引數同時用作輸入和輸出。此外,它返回一個整數引數 val_out 和一個字串引數 value_str_out。

可以使用 TeradataStoredProcedureOperator 以各種方式呼叫此儲存過程。

一種方法是將引數按位置作為列表傳遞,其中輸出引數指定為 Python 資料型別

tests/system/teradata/example_teradata_call_sp.py

    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )

或者,引數可以按位置作為列表傳遞,其中輸出引數被指定為佔位符

tests/system/teradata/example_teradata_call_sp.py

    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )

另一種方法是將引數按位置作為字典傳遞

tests/system/teradata/example_teradata_call_sp.py

    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )

假設資料庫中存在一個儲存過程,它看起來像這樣:

REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
   BEGIN
       -- Assign current timestamp to the OUT parameter
       SET out_timestamp = CURRENT_TIMESTAMP;
   END;
 /

此儲存過程產生一個單一的時間戳引數 out_timestamp,並且可以透過 TeradataStoredProcedureOperator 呼叫,引數按位置作為列表傳遞

tests/system/teradata/example_teradata_call_sp.py

    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )

假設資料庫中存在一個儲存過程,它看起來像這樣:

REPLACE PROCEDURE
TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER)
  BEGIN
    DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ;
    DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ;
    open cur1 ;
    open cur2 ;
    set val_out = val_in * 2;
  END;
/

此儲存過程接受一個整數引數 val_in 作為輸入,併產生一個整數引數 val_out。此外,它還生成兩個遊標,分別代表 SELECT 查詢的輸出。此儲存過程可以使用 TeradataStoredProcedureOperator 呼叫,引數按位置作為列表傳遞

tests/system/teradata/example_teradata_call_sp.py

    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )

完整的 TeradataStoredProcedureOperator DAG

當我們把所有內容放在一起時,我們的 DAG 應該如下所示:

tests/system/teradata/example_teradata_call_sp.py

CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"

with DAG(
    dag_id=DAG_ID,
    max_active_runs=1,
    max_active_tasks=3,
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
    schedule="@once",
    start_date=datetime(2023, 1, 1),
) as dag:
    create_sp_in_inout = TeradataOperator(
        task_id="create_sp_in_inout",
        sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
                    IN val_in INTEGER,
                    INOUT val_in_out INTEGER,
                    OUT val_out INTEGER,
                    OUT value_str_out varchar(100)
                )
                BEGIN
                    set val_out = val_in * 2;
                    set val_in_out = val_in_out * 4;
                    set value_str_out = 'string output';
                END;
            """,
    )
    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )
    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )
    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )
    create_sp_timestamp = TeradataOperator(
        task_id="create_sp_timestamp",
        sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
                    BEGIN
                        -- Assign current timestamp to the OUT parameter
                        SET out_timestamp = CURRENT_TIMESTAMP;
                    END;
                 """,
    )
    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )
    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )
    opr_sp_param_dr = TeradataStoredProcedureOperator(
        task_id="opr_sp_param_dr",
        procedure="examplestoredproc",
        parameters=[3, 2, int],
    )
    drop_sp = TeradataOperator(
        task_id="drop_sp",
        sql=r"drop procedure examplestoredproc;",
    )
    drop_sp_test = TeradataOperator(
        task_id="drop_sp_test",
        sql=r"drop procedure TEST_PROCEDURE;",
    )
    drop_sp_timestamp = TeradataOperator(
        task_id="drop_sp_timestamp",
        sql=r"drop procedure GetTimestampOutParameter;",
    )
    (
        create_sp_in_inout
        >> opr_sp_types
        >> opr_sp_dict
        >> opr_sp_place_holder
        >> create_sp_param_dr
        >> opr_sp_param_dr
        >> drop_sp
        >> drop_sp_test
        >> create_sp_timestamp
        >> opr_sp_timestamp
        >> drop_sp_timestamp
    )

此條目是否有幫助?