Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊以獲得早鳥票!

高階日誌配置

並非所有配置選項都可以在 airflow.cfg 檔案中獲得。配置檔案描述瞭如何為任務配置日誌記錄,因為任務生成的日誌預設不僅會記錄到單獨的檔案中,還必須可以透過 Web 伺服器訪問。

預設情況下,標準的 Airflow 元件日誌寫入到 $AIRFLOW_HOME/logs 目錄,但你也可以透過覆蓋 Python 日誌記錄器配置來自定義和按需配置它,這可以透過提供自定義日誌配置物件來完成。你還可以為特定的 operators 和 tasks 建立並使用日誌配置。

某些配置選項要求覆蓋日誌配置類。你可以透過複製 Airflow 的預設配置並修改它以滿足你的需求來做到這一點。

可以在 airflow_local_settings.py 模板檔案 中看到預設配置,並且可以在其中看到使用的日誌記錄器和處理程式。

參見 配置本地設定 以瞭解如何配置本地設定的詳細資訊。

除了可以透過 airflow.cfg 配置的自定義日誌記錄器和處理程式外,Airflow 中的日誌記錄方法遵循常見的 Python 日誌記錄約定,即 Python 物件將日誌記錄到遵循命名約定的日誌記錄器中,該約定為 <package>.<module_name>

你可以在 Python 日誌記錄文件 中閱讀更多關於標準 Python 日誌記錄類(日誌記錄器、處理程式、格式化程式)的資訊。

建立自定義日誌類

可以透過 airflow.cfg 檔案中的 logging_config_class 選項來配置你的日誌類。此配置應指定匯入路徑,該路徑指向與 logging.config.dictConfig() 相容的配置。如果你的檔案位於標準的匯入位置,則應設定 PYTHONPATH 環境變數。

按照以下步驟啟用自定義日誌配置類

  1. 首先將環境變數設定為已知目錄,例如 ~/airflow/

    export PYTHONPATH=~/airflow/
    
  2. 建立一個目錄用於儲存配置檔案,例如 ~/airflow/config

  3. 建立一個名為 ~/airflow/config/log_config.py 的檔案,內容如下

    from copy import deepcopy
    from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
    
    LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
    
  4. 在檔案末尾,新增程式碼來修改預設的字典配置。

  5. 更新 $AIRFLOW_HOME/airflow.cfg 檔案以包含以下內容

    [logging]
    logging_config_class = log_config.LOGGING_CONFIG
    

如果你計劃在啟用遠端日誌記錄的同時僅擴充套件/更新配置,你也可以將 logging_config_class 與遠端日誌記錄一起使用。此時,深度複製的字典將包含為你生成的遠端日誌配置,並且你的修改將在遠端日誌配置新增後應用。

[logging]
remote_logging = True
logging_config_class = log_config.LOGGING_CONFIG
  1. 重啟應用程式。

參見 模組管理 以瞭解 Python 和 Airflow 如何管理模組的詳細資訊。

注意

你可以覆蓋處理元件標準日誌和“任務”日誌的方式。

Operators、Hooks 和 Tasks 的自定義日誌記錄器

你可以建立自定義日誌處理程式並將其應用於特定的 Operators、Hooks 和 tasks。預設情況下,Operators 和 Hooks 的日誌記錄器是 airflow.task 日誌記錄器的子級:它們分別遵循命名約定 airflow.task.operators.<package>.<module_name>airflow.task.hooks.<package>.<module_name>。在 建立自定義日誌類 後,你可以為它們分配特定的日誌記錄器。

SQLExecuteQueryOperatorHttpHook 的自定義日誌記錄示例

from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "loggers": {
            "airflow.task.operators.airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator": {
                "handlers": ["task"],
                "level": "DEBUG",
                "propagate": True,
            },
            "airflow.task.hooks.airflow.providers.http.hooks.http.HttpHook": {
                "handlers": ["task"],
                "level": "WARNING",
                "propagate": False,
            },
        }
    },
)

你還可以使用 logger_name 屬性為 Dag 的任務設定自定義名稱。如果多個任務使用相同的 Operator,但你希望為其中一些任務停用日誌記錄,這會很有用。

自定義日誌記錄器名稱示例

# In your Dag file
SQLExecuteQueryOperator(..., logger_name="sql.big_query")

# In your custom `log_config.py`
LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "loggers": {
            "airflow.task.operators.sql.big_query": {
                "handlers": ["task"],
                "level": "WARNING",
                "propagate": True,
            },
        }
    },
)

如果你想限制任務的日誌大小,可以新增 handlers.task.max_bytes 引數。

限制任務大小的示例

from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deep_update(
    deepcopy(DEFAULT_LOGGING_CONFIG),
    {
        "handlers": {
            "task": {"max_bytes": 104857600, "backup_count": 1}  # 100MB and keep 1 history rotate log.
        }
    },
)

本條目有幫助嗎?