Airflow Summit 2025 即將於 10 月 07 日至 09 日舉行。立即註冊獲取早鳥票!

任務日誌

Airflow 以一種允許您在 Airflow UI 中單獨檢視每個任務日誌的方式寫入任務日誌。核心 Airflow 提供了一個介面 FileTaskHandler,用於將任務日誌寫入檔案,幷包含一個在任務執行時從 worker 提供日誌的機制。Apache Airflow 社群還發布了許多服務的提供者 (提供者),其中一些提供程式擴充套件了 Apache Airflow 的日誌記錄能力。您可以在寫入日誌中檢視所有這些提供者。

使用 S3、GCS、WASB、HDFS 或 OSS 遠端日誌服務時,可以將本地日誌檔案上傳到遠端位置後刪除,透過設定配置項來實現

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

配置日誌

對於預設處理器 FileTaskHandler,您可以使用 base_log_folderairflow.cfg 中指定日誌檔案存放的目錄。預設情況下,日誌存放在 AIRFLOW_HOME 目錄中。

注意

有關配置設定的更多資訊,請參閱設定配置選項

任務日誌檔案的命名遵循預設模式

  • 對於普通任務:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 對於動態對映的任務:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

這些模式可以透過log_filename_template進行調整。

此外,您可以提供一個遠端位置來儲存當前日誌和備份。

從程式碼寫入任務日誌

Airflow 使用標準的 Python logging 框架來寫入日誌,並且在任務執行期間,根 logger 配置為寫入任務日誌。

大多數 operator 會自動將日誌寫入任務日誌。這是因為它們有一個 log logger,您可以使用它來寫入任務日誌。這個 logger 由所有 operator 繼承的 LoggingMixin 建立和配置。此外,由於根 logger 的處理,任何將日誌傳播到根的標準 logger(使用預設設定)也會寫入任務日誌。

因此,如果您想從自己的自定義程式碼向任務日誌寫入日誌,可以執行以下任何操作

  • 使用 BaseOperator 中的 self.log logger 寫入日誌

  • 使用標準 print 語句列印到 stdout(不推薦,但在某些情況下可能有用)

  • 使用標準的 logger 方法,即使用 Python 模組名建立一個 logger,然後使用它向任務日誌寫入日誌

這是 logger 在 Python 程式碼中直接使用的常用方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日誌行分組

在版本 2.9.0 中新增。

與 CI 流水線一樣,Airflow 日誌也可能非常大且難以閱讀。因此,有時對日誌區域進行分組並提供文字區域摺疊以隱藏不相關內容會很有用。因此,Airflow 實現了一種相容的日誌訊息分組方式,類似於 GithubAzure DevOps,以便可以摺疊文字區域。所實現的方案相容,因此在 CI 中生成輸出的工具可以直接在 Airflow 中利用相同的體驗。

透過新增具有起始和結束位置的日誌標記,如下所示的日誌訊息可以分組

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

在 web UI 中顯示日誌時,日誌的顯示將會被精簡

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果您點選日誌文字標籤,將顯示詳細的日誌行。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日誌交錯

Airflow 的遠端任務日誌處理器大致可分為兩類:流式處理器(如 ElasticSearch、AWS Cloudwatch 和 GCP 操作日誌,即原先的 stackdriver)和 Blob 儲存處理器(如 S3、GCS、WASB)。

對於 Blob 儲存處理器,根據任務的狀態,日誌可能位於許多不同的位置和多個不同的檔案中。因此,我們需要檢查所有位置並將找到的內容交錯。要做到這一點,我們需要能夠解析每一行的時間戳。如果您正在使用自定義格式化程式,您可能需要在 Airflow 設定 [logging] interleave_timestamp_parser 處提供一個可呼叫名稱來覆蓋預設解析器。

對於流式處理器,無論任務階段或執行位置如何,所有日誌訊息都可以使用相同的識別符號傳送到日誌服務,因此一般來說無需檢查多個源並進行交錯。

故障排除

如果您想檢查當前設定的是哪個任務處理器,可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 輸出已被截斷,僅顯示與日誌配置相關的部分。您還可以執行 airflow config list 來檢查日誌配置選項是否具有有效值。

高階配置

您可以配置高階功能——包括新增您自己的自定義任務日誌處理器(以及所有 Airflow 元件的日誌處理器),併為每個 operator、hook 和任務建立自定義日誌處理器。

從 Worker 和 Triggerer 提供日誌

大多數任務處理器在任務完成後傳送日誌。為了即時檢視日誌,Airflow 在以下情況下啟動 HTTP 伺服器來提供日誌

  • 如果使用 LocalExecutor,則在 airflow scheduler 執行時提供。

  • 如果使用 CeleryExecutor,則在 airflow worker 執行時提供。

在 triggerer 中,除非服務以選項 --skip-serve-logs 啟動,否則會提供日誌。

伺服器執行在 [logging] 部分的 worker_log_server_port 選項指定的埠上,triggerer 則執行在 triggerer_log_server_port 選項指定的埠上。預設值分別為 8793 和 8794。Webserver 和 worker 之間的通訊使用 [webserver] 部分的 secret_key 選項指定的金鑰進行簽名。您必須確保金鑰匹配,以便通訊順利進行。

我們使用 Gunicorn 作為 WSGI 伺服器。其配置選項可以透過 GUNICORN_CMD_ARGS 環境變數覆蓋。詳情請參閱 Gunicorn 設定

實現自定義檔案任務處理器

注意

這是一個高階主題,大多數使用者應該可以直接使用寫入日誌中的現有處理器。

在我們的提供者中,我們為所有主要的雲提供商提供了豐富的選項。但如果您需要使用不同的服務實現日誌記錄,並決定實現自定義 FileTaskHandler,則需要注意一些設定,特別是在 trigger 日誌記錄的上下文中。

Trigger 要求改變日誌設定的方式。與任務不同,許多 trigger 在同一程序中執行,並且由於 trigger 執行在 asyncio 中,我們必須注意不要透過日誌處理器引入阻塞呼叫。並且由於處理器的行為各異(一些寫入檔案,一些上傳到 Blob 儲存,一些在訊息到達時透過網路傳送訊息,一些線上程中執行此操作),我們需要有一種方法讓 triggerer 知道如何使用它們。

為了實現這一點,我們有一些可以在處理器例項或類上設定的屬性。這些引數不遵循繼承,因為 FileTaskHandler 的子類在相關特性上可能與其不同。這些引數描述如下

  • trigger_should_wrap:控制此處理器是否應由 TriggerHandlerWrapper 包裝。當處理器的每個例項都建立一個檔案處理器並向其寫入所有訊息時,這是必要的。

  • trigger_should_queue:控制 triggerer 是否應在事件迴圈和處理器之間放置一個 QueueListener,以確保處理器中的阻塞 IO 不會中斷事件迴圈。

  • trigger_send_end_marker:控制當 trigger 完成時是否應向 logger 傳送 END 訊號。它用於告訴包裝器關閉並移除剛完成的 trigger 特定的個體檔案處理器。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 不是 True,我們通常假定處理器不支援 trigger。但在這種情況下,如果處理器的 trigger_supported 設定為 True,那麼我們仍然會在 triggerer 啟動時將處理器移動到根,以便它將處理 trigger 訊息。本質上,這對於“原生”支援 trigger 的處理器應該為 true。例如 StackdriverTaskHandler 就是這樣的例子。

此條目有幫助嗎?