將日誌寫入 Elasticsearch

可以將 Airflow 配置為從 Elasticsearch 讀取任務日誌,並選擇以標準或 JSON 格式將日誌寫入 stdout。之後可以使用 fluentd、logstash 或其他工具收集這些日誌並將其轉發到 Elasticsearch 叢集。

您可以選擇將工作節點中的所有任務日誌輸出到最高父級進程,而不是標準文件位置。這在 Kubernetes 等容器環境中提供了一些額外的靈活性,其中容器 stdout 已經被記錄到主機節點。然後可以使用日誌傳輸工具將它們轉發到 Elasticsearch。若要使用此功能,請在 airflow.cfg 中設定 write_stdout 選項。您也可以選擇使用 json_format 選項以 JSON 格式輸出日誌。Airflow 使用標準的 Python 日誌記錄模組,並且直接從 LogRecord 物件中提取 JSON 欄位。若要使用此功能,請在 airflow.cfg 中設定 json_fields 選項。將欄位添加到您要為日誌收集的逗號分隔字串中。這些欄位來自 logging 模組中的 LogRecord 物件。有關不同屬性的文件,請參見此處

首先,若要使用處理程序,必須如下所示配置 airflow.cfg

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch]
host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout =
json_fields =

若要將任務日誌以 JSON 格式輸出到 stdout,可以使用以下配置

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch]
host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout = True
json_format = True
json_fields = asctime, filename, lineno, levelname, message

透過 TLS 將日誌寫入 Elasticsearch

若要將自訂配置添加到 ElasticSearch(例如,開啟 ssl_verify、添加自訂的自簽署憑證等),請在您的 airflow.cfg 中使用 elasticsearch_configs 設定

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch_configs]
use_ssl=True
verify_certs=True
ca_certs=/path/to/CA_certs

這篇文章有幫助嗎?