airflow.providers.amazon.aws.log.cloudwatch_task_handler

CloudWatchRemoteLogIO

配置有類名的日誌記錄器的便捷超類。

CloudwatchTaskHandler

CloudwatchTaskHandler 是一個 Python 日誌處理器,用於處理和讀取任務例項日誌。

函式

json_serialize_legacy(value)

複製舊版 watchtower 行為的 JSON 序列化器。

json_serialize(value)

複製當前 watchtower 行為的 JSON 序列化器。

模組內容

airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy(value)[source]

複製舊版 watchtower 行為的 JSON 序列化器。

繼承自 watchtower@2.0.1 的 JSON 序列化函式,將 datetime 物件序列化為 ISO 格式,將所有其他非 JSON 可序列化物件序列化為 null

引數:

value (Any) – 要序列化的物件

返回值:

如果 value 是 datetime 例項,則返回其字串表示形式,否則返回 None

返回型別:

str | None

airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize(value)[source]

複製當前 watchtower 行為的 JSON 序列化器。

這為使用者提供了一個可訪問的匯入路徑:airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize

引數:

value (Any) – 要序列化的物件

返回值:

value 的字串表示形式

返回型別:

str | None

class airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudWatchRemoteLogIO(context=None)[source]

基類: airflow.utils.log.logging_mixin.LoggingMixin

配置有類名的日誌記錄器的便捷超類。

base_log_folder: pathlib.Path[source]
remote_base: str = ''[source]
delete_local_copy: bool = True[source]
log_group_arn: str[source]
log_stream_name: str = ''[source]
log_group: str[source]
region_name: str[source]
property hook[source]

返回 AwsLogsHook。

property handler: watchtower.CloudWatchLogHandler[source]
property processors: tuple[structlog.typing.Processor, Ellipsis][source]
close()[source]
upload(path, ti)[source]
read(relative_path, ti)[source]
get_cloudwatch_logs(stream_name, task_instance)[source]

返回給定日誌流中的所有日誌。

引數:
  • stream_name (str) – 要獲取所有日誌的 Cloudwatch 日誌流名稱

  • task_instance (airflow.sdk.types.RuntimeTaskInstanceProtocol) – 要獲取日誌的任務例項

返回值:

給定日誌流中所有日誌的字串形式

class airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler(base_log_folder, log_group_arn, **kwargs)[source]

基類: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.LoggingMixin

CloudwatchTaskHandler 是一個 Python 日誌處理器,用於處理和讀取任務例項日誌。

它擴充套件了 Airflow FileTaskHandler,並上傳日誌到 Cloudwatch 並從 Cloudwatch 讀取日誌。

引數:
  • base_log_folder (str) – 本地儲存日誌的基礎資料夾

  • log_group_arn (str) – 用於遠端日誌儲存的 Cloudwatch 日誌組的 ARN,格式為 arn:aws:logs:{region name}:{account id}:log-group:{group name}

trigger_should_wrap = True[source]
handler = None[source]
log_group[source]
region_name[source]
closed = False[source]
io[source]
property hook[source]

返回 AwsLogsHook。

set_context(ti, *, identifier=None)[source]

為 Airflow 任務處理器提供任務例項上下文。

通常返回 None。但如果屬性 maintain_propagate 已設定為 propagate,則返回 sentinel MAINTAIN_PROPAGATE。這會覆蓋預設行為,即在每次呼叫 set_context 時將 propagate 設定為 False。在撰寫本文時,此功能僅用於單元測試。

引數:
  • ti (airflow.models.taskinstance.TaskInstance) – 任務例項物件

  • identifier (str | None) – 如果設定,則向日志文件名新增字尾。當從任務或觸發器執行以外的上下文將異常訊息中繼到任務日誌時使用。

close()[source]

關閉負責將本地日誌檔案上傳到 Cloudwatch 的處理器。

此條目是否有幫助?