airflow.providers.elasticsearch.log.es_task_handler¶
屬性¶
類¶
ElasticsearchTaskHandler 是一個從 Elasticsearch 讀取日誌的 python 日誌處理程式。 |
函式¶
|
從 obj 獲取專案,如果未找到則返回 default。 |
模組內容¶
- class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, write_to_es=False, target_index='airflow-logs', host_field='host', offset_field='offset', host='https://:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[source]¶
基類:
airflow.utils.log.file_task_handler.FileTaskHandler,airflow.utils.log.logging_mixin.ExternalLoggingMixin,airflow.utils.log.logging_mixin.LoggingMixinElasticsearchTaskHandler 是一個從 Elasticsearch 讀取日誌的 python 日誌處理程式。
請注意,Airflow 預設不處理將日誌索引到 Elasticsearch 中。相反,Airflow 將日誌重新整理到本地檔案中。需要額外的軟體設定才能將日誌索引到 Elasticsearch 中,例如使用 Filebeat 和 Logstash。
可以將 Airflow 配置為支援直接將日誌寫入 Elasticsearch。要啟用此功能,請將 json_format 和 write_to_es 設定為 True。
為了高效地查詢和排序 Elasticsearch 結果,此處理程式假定每條日誌訊息都有一個由 ti 主鍵組成的 log_id 欄位:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息根據 offset 進行排序,offset 是一個唯一整數,表示日誌訊息的順序。此處的時間戳不可靠,因為多條日誌訊息可能具有相同的時間戳。
- formatter: logging.Formatter[source]¶
- set_context(ti, *, identifier=None)[source]¶
向 airflow 任務處理程式提供 task_instance 上下文。
- 引數:
ti (airflow.models.taskinstance.TaskInstance) – 任務例項物件
identifier (str | None) – 如果設定,則標識從與任務例項相關的異常場景中中繼日誌的 Airflow 元件
- close()[source]¶
清理處理程式使用的所有資源。
此版本會從內部處理程式對映 _handlers 中移除處理程式,該對映用於按名稱查詢處理程式。子類應確保在覆蓋的 close() 方法中呼叫此方法。
- get_external_log_url(task_instance, try_number)[source]¶
為外部日誌收集服務建立地址。
- 引數:
task_instance (airflow.models.taskinstance.TaskInstance) – 任務例項物件
try_number (int) – 要從中讀取日誌的任務例項嘗試次數(try_number)。
- 返回:
外部日誌收集服務的 URL
- 返回型別: