airflow.providers.opensearch.log.os_task_handler

屬性

OsLogMsgType

USE_PER_RUN_LOG_ID

LOG_LINE_DEFAULTS

OpensearchTaskHandler

OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。

函式

getattr_nested(obj, item, default)

從 obj 獲取 item,如果未找到則返回 default。

get_os_kwargs_from_config()

模組內容

airflow.providers.opensearch.log.os_task_handler.OsLogMsgType[source]
airflow.providers.opensearch.log.os_task_handler.USE_PER_RUN_LOG_ID = True[source]
airflow.providers.opensearch.log.os_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.opensearch.log.os_task_handler.getattr_nested(obj, item, default)[source]

從 obj 獲取 item,如果未找到則返回 default。

例如,呼叫 getattr_nested(a, 'b.c', "NA") 將返回 a.b.c(如果該值存在),否則返回 “NA”。

airflow.providers.opensearch.log.os_task_handler.get_os_kwargs_from_config()[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]

基類: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。

與 ElasticsearchTaskHandler 類似,Airflow 本身不處理日誌的索引。日誌被重新整理到本地檔案,可能需要額外的軟體(例如 Filebeat、Logstash)將日誌傳送到 OpenSearch。此處理程式隨後啟用從 OpenSearch 獲取和顯示日誌的功能。

為了高效地查詢和排序 Elasticsearch 結果,此處理程式假定每條日誌訊息都包含一個欄位 log_id,由任務例項 (ti) 的主鍵組成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息根據 offset 進行排序,offset 是一個唯一的整數,表示日誌訊息的順序。此處的時間戳不可靠,因為多條日誌訊息可能具有相同的時間戳。

引數
  • base_log_folder (str) – 在本地儲存日誌的基資料夾。

  • end_of_log_mark (str) – 表示日誌結束的標記字串。

  • write_stdout (bool) – 是否也將日誌寫入標準輸出。

  • json_format (bool) – 是否將日誌格式化為 JSON。

  • json_fields (str) – 要包含在 JSON 日誌輸出中的欄位的逗號分隔列表。

  • host (str) – OpenSearch 主機名。

  • port (int) – OpenSearch 埠。

  • username (str) – 用於 OpenSearch 認證的使用者名稱。

  • password (str) – 用於 OpenSearch 認證的密碼。

  • host_field (str) – 日誌中用於表示主機的欄位名(預設為 “host”)。

  • offset_field (str) – 日誌偏移量的欄位名(預設為 “offset”)。

  • index_patterns (str) – 用於儲存日誌的索引模式或模板。

  • index_patterns_callable (str) – 根據上下文動態生成索引模式的可呼叫物件。

  • os_kwargs (dict | None | Literal['default_os_kwargs']) – 其他 OpenSearch 客戶端選項。可以設定為 “default_os_kwargs” 以從 Airflow 的設定載入預設配置。

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Opensearch'[source]
trigger_should_wrap = True[source]
closed = False[source]
mark_end_on_close = True[source]
end_of_log_mark[source]
write_stdout[source]
json_format[source]
json_fields[source]
host_field = 'host'[source]
offset_field = 'offset'[source]
index_patterns[source]
index_patterns_callable[source]
context_set = False[source]
client[source]
formatter: logging.Formatter[source]
handler: logging.FileHandler | logging.StreamHandler[source]
set_context(ti, *, identifier=None)[source]

為 airflow 任務處理器提供 task_instance 上下文。

引數
emit(record)[source]

執行所需操作以實際記錄指定的日誌記錄。

此版本旨在由子類實現,因此會引發 NotImplementedError。

close()[source]

整理處理器使用的所有資源。

此版本將處理器從用於按名稱查詢處理器的內部處理器對映 _handlers 中移除。子類應確保從重寫的 close() 方法中呼叫此方法。

此條目有幫助嗎?