AzureBlobStorageToTeradataOperator

AzureBlobStorageToTeradataOperator 的目的是定義將 CSV、JSON 和 Parquet 格式資料從 Azure Blob Storage 傳輸到 Teradata 表的任務。使用 AzureBlobStorageToTeradataOperator 將資料從 Azure Blob Storage 傳輸到 Teradata。此運算元利用 Teradata READ_NOS 功能將 CSV、JSON 和 Parquet 格式的資料從 Azure Blob Storage 匯入到 Teradata 中。此運算元直接從物件儲存訪問資料,並使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 語句在資料庫中生成永久表。

CREATE MULTISET TABLE multiset_table_name AS (
  SELECT *
  FROM (
    LOCATION='YOUR-OBJECT-STORE-URI'
    AUTHORIZATION=authorization_object
  ) AS d
) WITH DATA;

它支援從公共和私有物件儲存載入資料。對於私有物件儲存,可以透過 Teradata Authorization 資料庫物件或在 Airflow 中透過 Azure Blob Storage 連線定義的物件儲存登入和物件儲存金鑰授予對物件儲存的訪問許可權。相反,對於從公共物件儲存傳輸資料,則不需要授權或訪問憑據。

  • Teradata Authorization 資料庫物件訪問型別可與 AzureBlobStorageToTeradataOperatorteradata_authorization_name 引數一起使用

  • 物件儲存訪問金鑰 ID 和訪問金鑰 Secret 訪問型別可與 S3ToTeradataOperatorazure_conn_id 引數一起使用

https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges

注意

如果兩種訪問型別都已定義,則 Teradata Authorization 資料庫物件優先。

將資料從公共 Azure Blob Storage 傳輸到 Teradata

以下是從公共 Azure Blob Storage 將 CSV 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

使用 AWS 連線將資料從私有 Azure Blob Storage 傳輸到 Teradata

以下是使用 AWS 憑據(定義為 AWS 連線)將 CSV 資料格式從私有 S3 物件儲存傳輸到 teradata 的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

使用 Teradata Authorization 物件將資料從私有 Azure Blob Storage 傳輸到 Teradata

Teradata authorization 資料庫物件用於控制誰可以訪問外部物件儲存。Teradata authorization 資料庫物件必須存在於 Teradata 資料庫中,才能在將資料從 S3 傳輸到 Teradata 時使用它。請參閱 Teradata 中的 External Object Stores 身份驗證

以下是使用 Teradata 中定義的 Authorization 資料庫物件將 CSV 資料格式從私有 S3 物件儲存傳輸到 teradata 的 AzureBlobStorageToTeradataOperator 使用示例。

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

將 CSV 格式資料從 Azure Blob Storage 傳輸到 Teradata

以下是從 Azure Blob Storage 將 CSV 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

將 JSON 格式資料從 Azure Blob Storage 傳輸到 Teradata

以下是從 Azure Blob Storage 將 JSON 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

將 PARQUET 格式資料從 Azure Blob Storage 傳輸到 Teradata

以下是從 Azure Blob Storage 將 PARQUET 資料格式傳輸到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

完整的 AzureBlobStorageToTeradataOperator 運算元 DAG

當我們把所有東西放在一起時,我們的 DAG 應該如下所示

tests/system/teradata/example_azure_blob_to_teradata_transfer.py


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    create_azure_authorization = TeradataOperator(
        task_id="create_azure_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
    )
    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION azure_authorization;",
    )
    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT count(1) from example_blob_teradata_json;",
    )
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_blob_teradata_json;",
    )
    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT count(1) from example_blob_teradata_parquet;",
    )
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_blob_teradata_parquet;",
    )

    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_azure_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

本條目有幫助嗎?