將日誌寫入 Elasticsearch¶
可以將 Airflow 配置為從 Elasticsearch 讀取任務日誌,並可選擇將日誌以標準格式或 JSON 格式寫入 stdout。然後可以使用 fluentd、logstash 或其他工具收集這些日誌並將其轉發到 Elasticsearch 叢集。
Airflow 還支援直接將日誌寫入 Elasticsearch,無需 filebeat 和 logstash 等額外軟體。要啟用此功能,請在 airflow.cfg 中將 write_to_es 和 json_format 設定為 True,將 write_stdout 設定為 False。請注意,如果在日誌記錄部分同時將 write_to_es 和 delete_local_logs 設定為 true,則 Airflow 在成功將任務日誌寫入 ElasticSearch 後會刪除本地的任務日誌副本。
您可以選擇將所有來自 worker 的任務日誌輸出到最高父級程序,而不是標準檔案位置。這在 Kubernetes 等容器環境中提供了一些額外的靈活性,因為容器的 stdout 已經記錄到了宿主節點上。然後可以使用日誌傳輸工具將其轉發到 Elasticsearch。要使用此功能,請在 airflow.cfg 中設定 write_stdout 選項。您還可以選擇使用 json_format 選項將日誌輸出為 JSON 格式。Airflow 使用標準的 Python 日誌記錄模組,JSON 欄位直接從 LogRecord 物件中提取。要使用此功能,請在 airflow.cfg 中設定 json_fields 選項。將您希望為日誌收集的欄位新增到逗號分隔的字串中。這些欄位來自 logging 模組中的 LogRecord 物件。不同屬性的文件可以在此處找到。
首先,要使用此 handler,airflow.cfg 必須按如下方式配置
[logging]
remote_logging = True
[elasticsearch]
host = <host>:<port>
要將任務日誌以 JSON 格式輸出到 stdout,可以使用以下配置
[logging]
remote_logging = True
[elasticsearch]
host = <host>:<port>
write_stdout = True
json_format = True
要將任務日誌輸出到 ElasticSearch,可以使用以下配置:(如果您不想保留任務日誌的本地副本,請將 delete_local_logs 設定為 true)
[logging]
remote_logging = True
delete_local_logs = False
[elasticsearch]
host = <host>:<port>
write_stdout = False
json_format = True
write_to_es = True
target_index = [name of the index to store logs]
透過 TLS 將日誌寫入 Elasticsearch¶
要向 ElasticSearch 新增自定義配置(例如,開啟 ssl_verify、新增自定義自簽名證書等),請在您的 airflow.cfg 中使用 elasticsearch_configs 設定
請注意,當您啟用將日誌寫入 ElasticSearch 時,這些配置也適用
[logging]
remote_logging = True
[elasticsearch_configs]
verify_certs=True
ca_certs=/path/to/CA_certs
此外,在 elasticsearch_configs 部分,您可以傳遞 Elasticsearch Python 客戶端支援的任何引數。這些引數將直接傳遞給 elasticsearch.Elasticsearch(**kwargs) 客戶端。例如
[elasticsearch_configs]
http_compress = True
ca_certs = /root/ca.pem
api_key = "SOMEAPIKEY"
verify_certs = True
Elasticsearch 外部連結¶
使用者可以配置 Airflow 來顯示指向 Elasticsearch 日誌檢視系統(例如 Kibana)的連結。
要啟用此功能,必須按以下示例配置 airflow.cfg。請注意 URL 中必需的 {log_id},構建外部連結時,Airflow 會將此引數替換為用於寫入日誌的相同 log_id_template(參見 將日誌寫入 Elasticsearch)。
[elasticsearch]
# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
# Code will construct log_id using the log_id template from the argument above.
# NOTE: scheme will default to https if one is not provided
frontend = <host_port>/{log_id}
[elasticsearch] log_id_template 的更改¶
如果您需要更改 [elasticsearch] log_id_template,Airflow 2.3.0+ 能夠跟蹤舊值,以便您現有的任務執行日誌仍然可以獲取。一旦您使用了 Airflow 2.3.0+,通常情況下,您可以隨意更改 log_id_template,Airflow 將會跟蹤這些更改。
然而,當您升級到 2.3.0+ 時,Airflow 可能無法正確儲存您之前的 log_id_template。如果升級後發現任務日誌無法訪問,請嘗試在 log_template 表中新增一行 id=0,其中包含您之前的 log_id_template`. 例如,如果您在 2.2.5 中使用了預設設定
INSERT INTO log_template (id, filename, elasticsearch_id, created_at) VALUES (0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());