Amazon Redshift Data

Amazon Redshift 管理資料倉庫的所有設定、運營和擴充套件工作:包括配置容量、監控和備份叢集,以及對 Amazon Redshift 引擎應用補丁和升級。您可以專注於使用您的資料來為您的業務和客戶獲取新的見解。

先決條件任務

要使用這些 Operator,您必須做一些事情

通用引數

aws_conn_id

引用 Amazon Web Services 連線 ID。如果此引數設定為 None,則使用預設的 boto3 行為,不查詢連線。否則,使用連線中儲存的憑據。預設值:aws_default

region_name

AWS 區域名稱。如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 region_name。否則,使用指定的值而不是連線中的值。預設值:None

verify

是否驗證 SSL 證書。

  • False - 不驗證 SSL 證書。

  • path/to/cert/bundle.pem - 要使用的 CA 證書包的檔名。如果您想使用與 botocore 使用的不同的 CA 證書包,可以指定此引數。

如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 verify。否則,使用指定的值而不是連線中的值。預設值:None

botocore_config

提供的字典用於構建 botocore.config.Config。此配置可用於配置 避免限流異常、超時等。

示例,有關引數的更多詳細資訊,請參閱 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 config_kwargs。否則,使用指定的值而不是連線中的值。預設值:None

注意

指定一個空字典 {} 將覆蓋 botocore.config.Config 的連線配置。

Operator

在 Amazon Redshift 叢集上執行語句

使用 RedshiftDataOperator 在 Amazon Redshift 叢集上執行語句。

這與 RedshiftSQLOperator 的區別在於,它允許使用者透過 AWS API 查詢和檢索資料,避免了 Postgres 連線的必要性。

tests/system/amazon/aws/example_redshift.py

create_table_redshift_data = RedshiftDataOperator(
    task_id="create_table_redshift_data",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TABLE IF NOT EXISTS fruit (
        fruit_id INTEGER,
        name VARCHAR NOT NULL,
        color VARCHAR NOT NULL
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
)

執行多個語句時重用會話

在上游任務中指定 session_keep_alive_seconds 引數。在下游任務中,從 XCom 中獲取會話 ID 並將其傳遞給 session_id 引數。這在處理臨時表時非常有用。

tests/system/amazon/aws/example_redshift.py

create_tmp_table_data_api = RedshiftDataOperator(
    task_id="create_tmp_table_data_api",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TEMPORARY TABLE tmp_people (
        id INTEGER,
        first_name VARCHAR(100),
        age INTEGER
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_keep_alive_seconds=600,
)

insert_data_reuse_session = RedshiftDataOperator(
    task_id="insert_data_reuse_session",
    sql=[
        "INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
        "INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
        "INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)

參考

此條目有幫助嗎?