AzureBlobStorageToTeradataOperator¶
AzureBlobStorageToTeradataOperator 的目的是定義將 CSV、JSON 和 Parquet 格式資料從 Azure Blob Storage 傳輸到 Teradata 表的任務。使用 AzureBlobStorageToTeradataOperator 將資料從 Azure Blob Storage 傳輸到 Teradata。此運算元利用 Teradata READ_NOS 功能將 CSV、JSON 和 Parquet 格式的資料從 Azure Blob Storage 匯入到 Teradata 中。此運算元直接從物件儲存訪問資料,並使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 語句在資料庫中生成永久表。
CREATE MULTISET TABLE multiset_table_name AS (
SELECT *
FROM (
LOCATION='YOUR-OBJECT-STORE-URI'
AUTHORIZATION=authorization_object
) AS d
) WITH DATA;
它支援從公共和私有物件儲存載入資料。對於私有物件儲存,可以透過 Teradata Authorization 資料庫物件或在 Airflow 中透過 Azure Blob Storage 連線定義的物件儲存登入和物件儲存金鑰授予對物件儲存的訪問許可權。相反,對於從公共物件儲存傳輸資料,則不需要授權或訪問憑據。
Teradata Authorization 資料庫物件訪問型別可與
AzureBlobStorageToTeradataOperator的teradata_authorization_name引數一起使用物件儲存訪問金鑰 ID 和訪問金鑰 Secret 訪問型別可與
S3ToTeradataOperator的azure_conn_id引數一起使用
注意
如果兩種訪問型別都已定義,則 Teradata Authorization 資料庫物件優先。
將資料從公共 Azure Blob Storage 傳輸到 Teradata¶
以下是從公共 Azure Blob Storage 將 CSV 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
使用 AWS 連線將資料從私有 Azure Blob Storage 傳輸到 Teradata¶
以下是使用 AWS 憑據(定義為 AWS 連線)將 CSV 資料格式從私有 S3 物件儲存傳輸到 teradata 的 AzureBlobStorageToTeradataOperator 使用示例
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_key_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
將 CSV 格式資料從 Azure Blob Storage 傳輸到 Teradata¶
以下是從 Azure Blob Storage 將 CSV 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
將 JSON 格式資料從 Azure Blob Storage 傳輸到 Teradata¶
以下是從 Azure Blob Storage 將 JSON 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
public_bucket=True,
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
將 PARQUET 格式資料從 Azure Blob Storage 傳輸到 Teradata¶
以下是從 Azure Blob Storage 將 PARQUET 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
public_bucket=True,
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
完整的 AzureBlobStorageToTeradataOperator 運算元 DAG¶
當我們把所有東西放在一起時,我們的 DAG 應該如下所示
tests/system/teradata/example_azure_blob_to_teradata_transfer.py
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
sql="DROP TABLE example_blob_teradata_csv;",
)
transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_key_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_key_data_table_csv = TeradataOperator(
task_id="read_key_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_key_table_csv = TeradataOperator(
task_id="drop_key_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
create_azure_authorization = TeradataOperator(
task_id="create_azure_authorization",
conn_id=CONN_ID,
sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
)
transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_auth_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
teradata_authorization_name="azure_authorization",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_auth_data_table_csv = TeradataOperator(
task_id="read_auth_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_auth_table_csv = TeradataOperator(
task_id="drop_auth_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
drop_auth = TeradataOperator(
task_id="drop_auth",
conn_id=CONN_ID,
sql="DROP AUTHORIZATION azure_authorization;",
)
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
public_bucket=True,
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
sql="SELECT count(1) from example_blob_teradata_json;",
)
drop_table_json = TeradataOperator(
task_id="drop_table_json",
sql="DROP TABLE example_blob_teradata_json;",
)
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
public_bucket=True,
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
sql="SELECT count(1) from example_blob_teradata_parquet;",
)
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
sql="DROP TABLE example_blob_teradata_parquet;",
)
(
transfer_data_csv
>> transfer_data_json
>> transfer_data_parquet
>> read_data_table_csv
>> read_data_table_json
>> read_data_table_parquet
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
>> transfer_key_data_csv
>> read_key_data_table_csv
>> drop_key_table_csv
>> create_azure_authorization
>> transfer_auth_data_csv
>> read_auth_data_table_csv
>> drop_auth_table_csv
>> drop_auth
)