airflow.providers.amazon.aws.transfers.s3_to_sql

S3ToSqlOperator

將資料從 S3 載入到 SQL 資料庫中。

模組內容

class airflow.providers.amazon.aws.transfers.s3_to_sql.S3ToSqlOperator(*, s3_key, s3_bucket, table, parser, column_list=None, commit_every=1000, schema=None, sql_conn_id='sql_default', sql_hook_params=None, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.models.BaseOperator

將資料從 S3 載入到 SQL 資料庫中。

你需要提供一個解析器函式,該函式以檔名作為輸入並返回一個行的可迭代物件。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: Amazon S3 到 SQL 傳輸運算子

引數
  • schema (str | None) – SQL 資料庫中特定 schema 的引用

  • table (str) – SQL 資料庫中特定 table 的引用

  • s3_bucket (str) – 特定 S3 儲存桶的引用

  • s3_key (str) – 特定 S3 key 的引用

  • sql_conn_id (str) – 特定 SQL 資料庫的引用。必須是 DBApiHook 型別。

  • sql_hook_params (dict | None) – 要傳遞給底層 hook 的額外配置引數。應與所需的 hook 建構函式引數匹配。

  • aws_conn_id (str | None) – 特定 S3 / AWS 連線的引用

  • column_list (list[str] | None) – 用於 insert SQL 的列名列表。

  • commit_every (int) – 單次事務中插入的最大行數。設定為 0 表示在一次事務中插入所有行。

  • parser (Callable[[str], collections.abc.Iterable[collections.abc.Iterable]]) –

    一個解析器函式,以檔案路徑作為輸入並返回一個可迭代物件。例如,要使用按行生成行的 CSV 解析器,請傳遞以下函式

    def parse_csv(filepath):
        import csv
    
        with open(filepath, newline="") as file:
            yield from csv.reader(file)
    

template_fields: collections.abc.Sequence[str] = ('s3_bucket', 's3_key', 'schema', 'table', 'column_list', 'sql_conn_id')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#f4a460'[source]
s3_bucket[source]
s3_key[source]
table[source]
schema = None[source]
aws_conn_id = 'aws_default'[source]
sql_conn_id = 'sql_default'[source]
column_list = None[source]
commit_every = 1000[source]
parser[source]
sql_hook_params = None[source]
execute(context)[source]

建立運算子時派生。

context 是用於渲染 jinja 模板的同一個字典。

有關更多 context 資訊,請參閱 get_template_context。

property db_hook[source]

此條目有幫助嗎?