airflow.providers.opensearch.log.os_task_handler¶
屬性¶
類¶
OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。 |
函式¶
|
從 obj 獲取 item,如果未找到則返回 default。 |
模組內容¶
- airflow.providers.opensearch.log.os_task_handler.getattr_nested(obj, item, default)[source]¶
從 obj 獲取 item,如果未找到則返回 default。
例如,呼叫
getattr_nested(a, 'b.c', "NA")將返回a.b.c(如果該值存在),否則返回 “NA”。
- class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]¶
基類:
airflow.utils.log.file_task_handler.FileTaskHandler,airflow.utils.log.logging_mixin.ExternalLoggingMixin,airflow.utils.log.logging_mixin.LoggingMixinOpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。
與 ElasticsearchTaskHandler 類似,Airflow 本身不處理日誌的索引。日誌被重新整理到本地檔案,可能需要額外的軟體(例如 Filebeat、Logstash)將日誌傳送到 OpenSearch。此處理程式隨後啟用從 OpenSearch 獲取和顯示日誌的功能。
為了高效地查詢和排序 Elasticsearch 結果,此處理程式假定每條日誌訊息都包含一個欄位 log_id,由任務例項 (ti) 的主鍵組成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息根據 offset 進行排序,offset 是一個唯一的整數,表示日誌訊息的順序。此處的時間戳不可靠,因為多條日誌訊息可能具有相同的時間戳。
- 引數:
base_log_folder (str) – 在本地儲存日誌的基資料夾。
end_of_log_mark (str) – 表示日誌結束的標記字串。
write_stdout (bool) – 是否也將日誌寫入標準輸出。
json_format (bool) – 是否將日誌格式化為 JSON。
json_fields (str) – 要包含在 JSON 日誌輸出中的欄位的逗號分隔列表。
host (str) – OpenSearch 主機名。
port (int) – OpenSearch 埠。
username (str) – 用於 OpenSearch 認證的使用者名稱。
password (str) – 用於 OpenSearch 認證的密碼。
host_field (str) – 日誌中用於表示主機的欄位名(預設為 “host”)。
offset_field (str) – 日誌偏移量的欄位名(預設為 “offset”)。
index_patterns (str) – 用於儲存日誌的索引模式或模板。
index_patterns_callable (str) – 根據上下文動態生成索引模式的可呼叫物件。
os_kwargs (dict | None | Literal['default_os_kwargs']) – 其他 OpenSearch 客戶端選項。可以設定為 “default_os_kwargs” 以從 Airflow 的設定載入預設配置。
- formatter: logging.Formatter[source]¶
- set_context(ti, *, identifier=None)[source]¶
為 airflow 任務處理器提供 task_instance 上下文。
- 引數:
ti (airflow.models.taskinstance.TaskInstance) – task instance 物件
identifier (str | None) – 如果設定,標識中繼與任務例項相關的異常情況日誌的 Airflow 元件