airflow.providers.elasticsearch.log.es_task_handler

屬性

EsLogMsgType

LOG_LINE_DEFAULTS

USE_PER_RUN_LOG_ID

VALID_ES_CONFIG_KEYS

ElasticsearchTaskHandler

ElasticsearchTaskHandler 是一個從 Elasticsearch 讀取日誌的 python 日誌處理程式。

函式

get_es_kwargs_from_config()

getattr_nested(obj, item, default)

從 obj 獲取專案,如果未找到則返回 default。

模組內容

airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
airflow.providers.elasticsearch.log.es_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.elasticsearch.log.es_task_handler.USE_PER_RUN_LOG_ID = True[source]
airflow.providers.elasticsearch.log.es_task_handler.VALID_ES_CONFIG_KEYS[source]
airflow.providers.elasticsearch.log.es_task_handler.get_es_kwargs_from_config()[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, write_to_es=False, target_index='airflow-logs', host_field='host', offset_field='offset', host='https://:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[source]

基類: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler 是一個從 Elasticsearch 讀取日誌的 python 日誌處理程式。

請注意,Airflow 預設不處理將日誌索引到 Elasticsearch 中。相反,Airflow 將日誌重新整理到本地檔案中。需要額外的軟體設定才能將日誌索引到 Elasticsearch 中,例如使用 Filebeat 和 Logstash。

可以將 Airflow 配置為支援直接將日誌寫入 Elasticsearch。要啟用此功能,請將 json_formatwrite_to_es 設定為 True

為了高效地查詢和排序 Elasticsearch 結果,此處理程式假定每條日誌訊息都有一個由 ti 主鍵組成的 log_id 欄位:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息根據 offset 進行排序,offset 是一個唯一整數,表示日誌訊息的順序。此處的時間戳不可靠,因為多條日誌訊息可能具有相同的時間戳。

引數:
  • base_log_folder (str) – 在本地儲存日誌的基礎資料夾

  • log_id_template – 日誌 ID 模板

  • host (str) – Elasticsearch 主機名

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Elasticsearch'[source]
trigger_should_wrap = True[source]
closed = False[source]
client[source]
frontend = 'localhost:5601'[source]
mark_end_on_close = True[source]
end_of_log_mark[source]
write_stdout[source]
json_format[source]
json_fields[source]
host_field = 'host'[source]
offset_field = 'offset'[source]
index_patterns[source]
index_patterns_callable[source]
context_set = False[source]
write_to_es = False[source]
target_index = 'airflow-logs'[source]
delete_local_copy[source]
formatter: logging.Formatter[source]
handler: logging.FileHandler | logging.StreamHandler[source]
static format_url(host)[source]

格式化給定的主機字串,確保它以“http”開頭,並檢查它是否表示一個有效的 URL。

引數 host:

要格式化和檢查的主機字串。

emit(record)[source]

執行所有必要操作以實際記錄指定的日誌記錄。

此版本旨在由子類實現,因此會引發 NotImplementedError。

set_context(ti, *, identifier=None)[source]

向 airflow 任務處理程式提供 task_instance 上下文。

引數:
close()[source]

清理處理程式使用的所有資源。

此版本會從內部處理程式對映 _handlers 中移除處理程式,該對映用於按名稱查詢處理程式。子類應確保在覆蓋的 close() 方法中呼叫此方法。

property log_name: str[source]

日誌名稱。

get_external_log_url(task_instance, try_number)[source]

為外部日誌收集服務建立地址。

引數:
返回:

外部日誌收集服務的 URL

返回型別:

str

我們是否支援外部連結。

airflow.providers.elasticsearch.log.es_task_handler.getattr_nested(obj, item, default)[source]

從 obj 獲取專案,如果未找到則返回 default。

例如,呼叫 getattr_nested(a, 'b.c', "NA") 將返回 a.b.c 如果存在該值,否則返回“NA”。

此條目是否有幫助?