Amazon Redshift Data¶
Amazon Redshift 管理資料倉庫的所有設定、運營和擴充套件工作:包括配置容量、監控和備份叢集,以及對 Amazon Redshift 引擎應用補丁和升級。您可以專注於使用您的資料來為您的業務和客戶獲取新的見解。
先決條件任務¶
要使用這些 Operator,您必須做一些事情
透過 pip 安裝 API 庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 的安裝
設定連線.
通用引數¶
- aws_conn_id
引用 Amazon Web Services 連線 ID。如果此引數設定為
None,則使用預設的 boto3 行為,不查詢連線。否則,使用連線中儲存的憑據。預設值:aws_default- region_name
AWS 區域名稱。如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 region_name。否則,使用指定的值而不是連線中的值。預設值:None- verify
是否驗證 SSL 證書。
False- 不驗證 SSL 證書。path/to/cert/bundle.pem - 要使用的 CA 證書包的檔名。如果您想使用與 botocore 使用的不同的 CA 證書包,可以指定此引數。
如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 verify。否則,使用指定的值而不是連線中的值。預設值:None- botocore_config
提供的字典用於構建 botocore.config.Config。此配置可用於配置 避免限流異常、超時等。
示例,有關引數的更多詳細資訊,請參閱 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 config_kwargs。否則,使用指定的值而不是連線中的值。預設值:None注意
指定一個空字典
{}將覆蓋 botocore.config.Config 的連線配置。
Operator¶
在 Amazon Redshift 叢集上執行語句¶
使用 RedshiftDataOperator 在 Amazon Redshift 叢集上執行語句。
這與 RedshiftSQLOperator 的區別在於,它允許使用者透過 AWS API 查詢和檢索資料,避免了 Postgres 連線的必要性。
tests/system/amazon/aws/example_redshift.py
create_table_redshift_data = RedshiftDataOperator(
task_id="create_table_redshift_data",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TABLE IF NOT EXISTS fruit (
fruit_id INTEGER,
name VARCHAR NOT NULL,
color VARCHAR NOT NULL
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
)
執行多個語句時重用會話¶
在上游任務中指定 session_keep_alive_seconds 引數。在下游任務中,從 XCom 中獲取會話 ID 並將其傳遞給 session_id 引數。這在處理臨時表時非常有用。
tests/system/amazon/aws/example_redshift.py
create_tmp_table_data_api = RedshiftDataOperator(
task_id="create_tmp_table_data_api",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TEMPORARY TABLE tmp_people (
id INTEGER,
first_name VARCHAR(100),
age INTEGER
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_keep_alive_seconds=600,
)
insert_data_reuse_session = RedshiftDataOperator(
task_id="insert_data_reuse_session",
sql=[
"INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
"INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
"INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)