airflow.providers.amazon.aws.transfers.dynamodb_to_s3

JSONEncoder

自定義 JSON 編碼器實現。

DynamoDBToS3Operator

將記錄從 DynamoDB 表複製到 S3。

模組內容

class airflow.providers.amazon.aws.transfers.dynamodb_to_s3.JSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

繼承自: json.JSONEncoder

自定義 JSON 編碼器實現。

default(obj)[source]

將 Decimal 物件轉換為 JSON 可序列化格式。

class airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator(*, dynamodb_table_name, s3_bucket_name, file_size=1000, dynamodb_scan_kwargs=None, s3_key_prefix='', process_func=_convert_item_to_json_bytes, point_in_time_export=False, export_time=None, export_format='DYNAMODB_JSON', export_table_to_point_in_time_kwargs=None, check_interval=30, max_attempts=60, **kwargs)[source]

繼承自: airflow.providers.amazon.aws.transfers.base.AwsToAwsBaseOperator

將記錄從 DynamoDB 表複製到 S3。

它掃描一個 DynamoDB 表並將接收到的記錄寫入本地檔案系統上的檔案。當檔案大小超過使用者指定的檔案大小限制時,它將檔案重新整理到 S3。

使用者還可以使用 dynamodb_scan_kwargs 指定過濾條件,以僅複製滿足條件的記錄。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:Amazon DynamoDB 到 Amazon S3 傳輸運算子

引數:
  • dynamodb_table_name (str) – 要從中複製資料的 Dynamodb 表

  • s3_bucket_name (str) – 要複製資料到的 S3 儲存桶

  • file_size (int) – 如果檔案大小 >= file_size,則將檔案重新整理到 S3

  • dynamodb_scan_kwargs (dict[str, Any] | None) – 傳遞給 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> 的 kwargs

  • s3_key_prefix (str) – S3 物件鍵的字首

  • process_func (Callable[[dict[str, Any]], bytes]) – 如何將 DynamoDB 條目轉換為位元組。預設情況下,我們轉儲 JSON

  • point_in_time_export (bool) – 布林值,指示運算子使用 ‘scan’ 還是 ‘point in time export’(時間點匯出)

  • export_time (datetime.datetime | None) – 過去的時間點,用於匯出表資料,以 Unix 紀元開始的秒數計算。表匯出將是該時間點表狀態的快照。

  • export_format (str) – 匯出資料的格式。ExportFormat 的有效值為 DYNAMODB_JSON 或 ION。

  • export_table_to_point_in_time_kwargs (dict | None) – 傳遞給 boto3 export_table_to_point_in_time 函式呼叫的額外引數。例如,ExportTypeIncrementalExportSpecification

  • check_interval (int) – 兩次嘗試之間等待的時間間隔(秒)。僅在提供了 export_time 時有效。

  • max_attempts (int) – 最大嘗試次數。僅在提供了 export_time 時有效。

template_fields: collections.abc.Sequence[str] = ('source_aws_conn_id', 'dest_aws_conn_id', 'dynamodb_table_name', 's3_bucket_name', 'file_size',...[source]
template_fields_renderers[source]
file_size = 1000[source]
process_func[source]
dynamodb_table_name[source]
dynamodb_scan_kwargs = None[source]
s3_bucket_name[source]
s3_key_prefix = ''[source]
point_in_time_export = False[source]
export_time = None[source]
export_format = 'DYNAMODB_JSON'[source]
export_table_to_point_in_time_kwargs[source]
check_interval = 30[source]
max_attempts = 60[source]
屬性 hook[source]

建立 DynamoDBHook。

execute(context)[source]

在建立運算子時派生。

Context 是渲染 jinja 模板時使用的相同字典。

參考 get_template_context 獲取更多上下文資訊。

此條目有幫助嗎?