TeradataOperator¶
TeradataOperator 的目的是定義與 Teradata 互動的任務。
要在 Teradata 中執行任意 SQL,請使用 TeradataOperator。
使用 TeradataOperator 進行常見的資料庫操作¶
建立一個 Teradata 資料庫表¶
TeradataOperator 的一個使用示例如下:
tests/system/teradata/example_teradata.py
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
您也可以使用外部檔案來執行 SQL 命令。外部檔案必須與 DAG.py 檔案處於同一級別。這樣您可以輕鬆地將 SQL 查詢與程式碼分離維護。
tests/system/teradata/example_teradata.py
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
您的 dags/create_table.sql 檔案應該如下所示:
-- create Users table
CREATE TABLE Users, FALLBACK (
username varchar(50),
description varchar(256)
);
向 Teradata 資料庫表中插入資料¶
然後我們可以建立一個 TeradataOperator 任務來填充 Users 表。
tests/system/teradata/example_teradata.py
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
從您的 Teradata 資料庫表中獲取記錄¶
從您的 Teradata 資料庫表中獲取記錄可以很簡單,如下所示:
tests/system/teradata/example_teradata.py
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
向 TeradataOperator 傳遞引數¶
TeradataOperator 提供了 parameters 屬性,這使得在執行時能夠動態地將值注入到您的 SQL 請求中。
要查詢亞洲大陸的國家:
tests/system/teradata/example_teradata.py
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
刪除一個 Teradata 資料庫表¶
然後我們可以建立一個 TeradataOperator 任務來刪除 Users 表。
tests/system/teradata/example_teradata.py
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
完整的 Teradata Operator DAG¶
當我們把所有內容放在一起時,我們的 DAG 應該如下所示:
tests/system/teradata/example_teradata.py
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"DROP TABLE Country;",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"DROP TABLE schema_table;",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"DROP DATABASE airflow_temp;",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)
TeradataStoredProcedureOperator¶
TeradataStoredProcedureOperator 的目的是定義涉及執行 Teradata 儲存過程的任務。
在 Teradata 資料庫中執行儲存過程¶
要在 Teradata 中執行儲存過程,請使用 TeradataStoredProcedureOperator。
假設資料庫中存在一個儲存過程,它看起來像這樣:
REPLACE PROCEDURE TEST_PROCEDURE ( IN val_in INTEGER, INOUT val_in_out INTEGER, OUT val_out INTEGER, OUT value_str_out varchar(100) ) BEGIN set val_out = val_in * 2; set val_in_out = val_in_out * 4; set value_str_out = 'string output'; END; /
這個儲存過程接受一個整數引數 val_in 作為輸入。它使用一個 inout 引數 val_in_out,該引數同時用作輸入和輸出。此外,它返回一個整數引數 val_out 和一個字串引數 value_str_out。
可以使用 TeradataStoredProcedureOperator 以各種方式呼叫此儲存過程。
一種方法是將引數按位置作為列表傳遞,其中輸出引數指定為 Python 資料型別
tests/system/teradata/example_teradata_call_sp.py
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
或者,引數可以按位置作為列表傳遞,其中輸出引數被指定為佔位符
tests/system/teradata/example_teradata_call_sp.py
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
另一種方法是將引數按位置作為字典傳遞
tests/system/teradata/example_teradata_call_sp.py
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
假設資料庫中存在一個儲存過程,它看起來像這樣:
REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN -- Assign current timestamp to the OUT parameter SET out_timestamp = CURRENT_TIMESTAMP; END; /
此儲存過程產生一個單一的時間戳引數 out_timestamp,並且可以透過 TeradataStoredProcedureOperator 呼叫,引數按位置作為列表傳遞
tests/system/teradata/example_teradata_call_sp.py
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
假設資料庫中存在一個儲存過程,它看起來像這樣:
REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) BEGIN DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; open cur1 ; open cur2 ; set val_out = val_in * 2; END; /
此儲存過程接受一個整數引數 val_in 作為輸入,併產生一個整數引數 val_out。此外,它還生成兩個遊標,分別代表 SELECT 查詢的輸出。此儲存過程可以使用 TeradataStoredProcedureOperator 呼叫,引數按位置作為列表傳遞
tests/system/teradata/example_teradata_call_sp.py
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
完整的 TeradataStoredProcedureOperator DAG¶
當我們把所有內容放在一起時,我們的 DAG 應該如下所示:
tests/system/teradata/example_teradata_call_sp.py
CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"
with DAG(
dag_id=DAG_ID,
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
create_sp_in_inout = TeradataOperator(
task_id="create_sp_in_inout",
sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
IN val_in INTEGER,
INOUT val_in_out INTEGER,
OUT val_out INTEGER,
OUT value_str_out varchar(100)
)
BEGIN
set val_out = val_in * 2;
set val_in_out = val_in_out * 4;
set value_str_out = 'string output';
END;
""",
)
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
create_sp_timestamp = TeradataOperator(
task_id="create_sp_timestamp",
sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
BEGIN
-- Assign current timestamp to the OUT parameter
SET out_timestamp = CURRENT_TIMESTAMP;
END;
""",
)
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
opr_sp_param_dr = TeradataStoredProcedureOperator(
task_id="opr_sp_param_dr",
procedure="examplestoredproc",
parameters=[3, 2, int],
)
drop_sp = TeradataOperator(
task_id="drop_sp",
sql=r"drop procedure examplestoredproc;",
)
drop_sp_test = TeradataOperator(
task_id="drop_sp_test",
sql=r"drop procedure TEST_PROCEDURE;",
)
drop_sp_timestamp = TeradataOperator(
task_id="drop_sp_timestamp",
sql=r"drop procedure GetTimestampOutParameter;",
)
(
create_sp_in_inout
>> opr_sp_types
>> opr_sp_dict
>> opr_sp_place_holder
>> create_sp_param_dr
>> opr_sp_param_dr
>> drop_sp
>> drop_sp_test
>> create_sp_timestamp
>> opr_sp_timestamp
>> drop_sp_timestamp
)