2025 年 Airflow Summit 即將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

指標配置

Airflow 可以配置為將指標傳送到 StatsDOpenTelemetry

設定 - StatsD

要使用 StatsD,您必須先安裝所需的軟體包

pip install 'apache-airflow[statsd]'

然後將以下行新增到您的配置檔案中,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果您想使用自定義 StatsD 客戶端而不是 Airflow 提供的預設客戶端,則必須將以下鍵及其自定義 StatsD 客戶端的模組路徑新增到配置檔案中。此模組必須在您的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱模組管理

設定 - OpenTelemetry

要使用 OpenTelemetry,您必須先安裝所需的軟體包

pip install 'apache-airflow[otel]'

將以下行新增到您的配置檔案中,例如 airflow.cfg

[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

啟用 Https

要與 OpenTelemetry collector 建立 HTTPS 連線,您需要在 OpenTelemetry collector 的 config.yml 檔案中配置 SSL 證書和金鑰。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允許/阻止列表

如果您想避免傳送所有可用指標,您可以配置允許列表或阻止列表,以便只發送或阻止以列表元素開頭的指標

[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery

重新命名指標

如果您想將指標重定向到不同的名稱,您可以在 [metrics] 部分配置 stat_name_handler 選項。它應該指向一個函式,該函式驗證統計名稱,必要時應用更改,並返回轉換後的統計名稱。該函式可能如下所示

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他配置選項

注意

有關指標配置選項的詳細列表,請參閱配置參考文件 - [metrics]

指標描述

計數器

名稱

描述

<job_name>_start

已啟動的 <job_name> 作業數量,例如 SchedulerJob, LocalTaskJob

<job_name>_end

已結束的 <job_name> 作業數量,例如 SchedulerJob, LocalTaskJob

<job_name>_heartbeat_failure

<job_name> 作業心跳失敗次數,例如 SchedulerJob, LocalTaskJob

local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>

LocalTaskJob 在執行 DAG <dag_id> 的任務 <task_id> 時,以 <return_code> 終止的次數。

local_task_job.task_exit

LocalTaskJob 在執行 DAG <dag_id> 的任務 <task_id> 時,以 <return_code> 終止的次數。帶有 job_id、dag_id、task_id 和 return_code 標籤的指標。

operator_failures_<operator_name>

Operator <operator_name> 失敗次數

operator_failures

Operator <operator_name> 失敗次數。帶有 operator_name 標籤的指標。

operator_successes_<operator_name>

Operator <operator_name> 成功次數

operator_successes

Operator <operator_name> 成功次數。帶有 operator_name 標籤的指標。

ti_failures

總體任務例項失敗次數。帶有 dag_id 和 task_id 標籤的指標。

ti_successes

總體任務例項成功次數。帶有 dag_id 和 task_id 標籤的指標。

previously_succeeded

先前成功完成的任務例項數量。帶有 dag_id 和 task_id 標籤的指標。

task_instances_without_heartbeats_killed

沒有心跳的任務例項被終止。帶有 dag_id 和 task_id 標籤的指標。

scheduler_heartbeat

排程器心跳

dag_processor_heartbeat

獨立 DAG 處理器心跳

dag_processing.processes

當前正在執行的 DAG 解析程序的相對數量(即自從上次傳送指標以來,程序完成時,此增量為負)。帶有 file_path 和 action 標籤的指標。

dag_processing.processor_timeouts

由於耗時過長而被終止的檔案處理器數量。帶有 file_path 標籤的指標。

dag_processing.other_callback_count

接收到的非 SLA 回撥數量

dag_processing.file_path_queue_update_count

掃描檔案系統並將所有現有 dag 加入佇列的次數

dag_file_processor_timeouts

(已棄用) 與 dag_processing.processor_timeouts 行為相同

dag_processing.manager_stalls

DagFileProcessorManager 停滯次數

dag_file_refresh_error

載入任何 DAG 檔案失敗次數

scheduler.tasks.killed_externally

外部終止的任務數量。帶有 dag_id 和 task_id 標籤的指標。

scheduler.orphaned_tasks.cleared

排程器清除的孤立任務數量

scheduler.orphaned_tasks.adopted

排程器接管的孤立任務數量

scheduler.critical_section_busy

排程器程序嘗試鎖定關鍵部分(將任務傳送到執行器所需)並發現已被其他程序鎖定的次數。

ti.start.<dag_id>.<task_id>

給定 dag 中已啟動的任務數量。類似於 <job_name>_start 但用於任務

ti.start

給定 dag 中已啟動的任務數量。類似於 <job_name>_start 但用於任務。帶有 dag_id 和 task_id 標籤的指標。

ti.finish.<dag_id>.<task_id>.<state>

給定 dag 中已完成的任務數量。類似於 <job_name>_end 但用於任務

ti.finish

給定 dag 中已完成的任務數量。類似於 <job_name>_end 但用於任務。帶有 dag_id 和 task_id 標籤的指標。

dag.callback_exceptions

DAG 回撥丟擲的異常數量。發生這種情況時,表示 DAG 回撥不起作用。帶有 dag_id 標籤的指標

celery.task_timeout_error

將任務釋出到 Celery Broker 時丟擲的 AirflowTaskTimeout 錯誤數量。

celery.execute_command.failure

Celery 任務的非零退出碼數量。

task_removed_from_dag.<dag_id>

給定 dag 中已刪除的任務數量(即任務不再存在於 DAG 中)。

task_removed_from_dag

給定 dag 中已刪除的任務數量(即任務不再存在於 DAG 中)。帶有 dag_id 和 run_type 標籤的指標。

task_restored_to_dag.<dag_id>

給定 dag 中已恢復的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務例項已新增到 DAG 檔案中)

task_restored_to_dag.<dag_id>

給定 dag 中已恢復的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務例項已新增到 DAG 檔案中)。帶有 dag_id 和 run_type 標籤的指標。

task_instance_created_<operator_name>

給定 Operator 建立的任務例項數量

task_instance_created

給定 Operator 建立的任務例項數量。帶有 dag_id 和 run_type 標籤的指標。

triggerer_heartbeat

觸發器心跳

triggers.blocked_main_thread

阻塞主執行緒的觸發器數量(可能是由於不是完全非同步)

triggers.failed

在觸發事件之前出錯的觸發器數量

triggers.succeeded

至少觸發了一個事件的觸發器數量

asset.updates

已更新資產數量

asset.orphaned

標記為孤立資產的數量,因為它們不再在 DAG 排程引數或任務輸出中被引用

asset.triggered_dagruns

由資產更新觸發的 DAG 執行數量

計量器

名稱

描述

dagbag_size

排程器根據其配置執行掃描時找到的 dag 數量

dag_processing.import_errors

嘗試解析 DAG 檔案時發生的錯誤數量

dag_processing.total_parse_time

掃描和匯入 dag_processing.file_path_queue_size 個 DAG 檔案所花費的秒數

dag_processing.file_path_queue_size

下次掃描要考慮的 DAG 檔案數量

dag_processing.last_run.seconds_ago.<dag_file>

自上次處理 <dag_file> 以來的秒數

dag_processing.last_num_of_db_queries.<dag_file>

解析每個 <dag_file> 期間對 Airflow 資料庫的查詢次數

scheduler.tasks.starving

由於資源池中沒有可用槽位而無法排程的任務數量

scheduler.tasks.executable

根據資源池限制、DAG 併發、執行器狀態和優先順序,已準備好執行(設定為 queued)的任務數量。

executor.open_slots.<executor_class_name>

特定執行器上的可用槽位數量。僅在配置了多個執行器時發出。

executor.open_slots

執行器上的可用槽位數量

executor.queued_tasks.<executor_class_name>

特定執行器上的排隊任務數量。僅在配置了多個執行器時發出。

executor.queued_tasks

執行器上的排隊任務數量

executor.running_tasks.<executor_class_name>

特定執行器上正在執行的任務數量。僅在配置了多個執行器時發出。

executor.running_tasks

執行器上正在執行的任務數量

pool.open_slots.<pool_name>

資源池中的可用槽位數量

pool.open_slots

資源池中的可用槽位數量。帶有 pool_name 標籤的指標。

pool.queued_slots.<pool_name>

資源池中的排隊槽位數量

pool.queued_slots

資源池中的排隊槽位數量。帶有 pool_name 標籤的指標。

pool.running_slots.<pool_name>

資源池中正在執行的槽位數量

pool.running_slots

資源池中正在執行的槽位數量。帶有 pool_name 標籤的指標。

pool.deferred_slots.<pool_name>

資源池中已延遲的槽位數量

pool.deferred_slots

資源池中已延遲的槽位數量。帶有 pool_name 標籤的指標。

pool.scheduled_slots.<pool_name>

資源池中已排程的槽位數量

pool.scheduled_slots

資源池中已排程的槽位數量。帶有 pool_name 標籤的指標。

pool.starving_tasks.<pool_name>

資源池中飢餓任務的數量

pool.starving_tasks

資源池中飢餓任務的數量。帶有 pool_name 標籤的指標。

task.cpu_usage.<dag_id>.<task_id>

任務使用的 CPU 百分比

task.mem_usage.<dag_id>.<task_id>

任務使用的記憶體百分比

triggers.running.<hostname>

正在為某個觸發器執行的觸發器數量(由 hostname 描述)

triggers.running

正在為某個觸發器執行的觸發器數量(由 hostname 描述)。帶有 hostname 標籤的指標。

triggerer.capacity_left.<hostname>

觸發器執行觸發器的剩餘容量(由 hostname 描述)

triggerer.capacity_left

觸發器執行觸發器的剩餘容量(由 hostname 描述)。帶有 hostname 標籤的指標。

ti.running.<queue>.<dag_id>.<task_id>

給定 dag 中正在執行的任務數量。由於 ti.start 和 ti.finish 可能不同步,此指標顯示所有正在執行的 ti。

ti.running

給定 dag 中正在執行的任務數量。由於 ti.start 和 ti.finish 可能不同步,此指標顯示所有正在執行的 ti。帶有 queue、dag_id 和 task_id 標籤的指標。

計時器

名稱

描述

dagrun.dependency-check.<dag_id>

檢查 DAG 依賴項所花費的毫秒數

dagrun.dependency-check

檢查 DAG 依賴項所花費的毫秒數。帶有 dag_id 標籤的指標。

dag.<dag_id>.<task_id>.duration

執行任務所花費的毫秒數

task.duration

執行任務所花費的毫秒數。帶有 dag_id 和 task-id 標籤的指標。

dag.<dag_id>.<task_id>.scheduled_duration

任務在 Scheduled 狀態花費的毫秒數,然後進入 Queued 狀態

task.scheduled_duration

任務在 Scheduled 狀態花費的毫秒數,然後進入 Queued 狀態。帶有 dag_id 和 task_id 標籤的指標。

dag.<dag_id>.<task_id>.queued_duration

任務在 Queued 狀態花費的毫秒數,然後進入 Running 狀態

task.queued_duration

任務在 Queued 狀態花費的毫秒數,然後進入 Running 狀態。帶有 dag_id 和 task_id 標籤的指標。

dag_processing.last_duration.<dag_file>

載入給定 DAG 檔案所花費的毫秒數

dag_processing.last_duration

載入給定 DAG 檔案所花費的毫秒數。帶有 file_name 標籤的指標。

dagrun.duration.success.<dag_id>

DagRun 達到成功狀態所花費的毫秒數

dagrun.duration.success

DagRun 達到成功狀態所花費的毫秒數。帶有 dag_id 和 run_type 標籤的指標。

dagrun.duration.failed.<dag_id>

DagRun 達到失敗狀態所花費的毫秒數

dagrun.duration.failed

DagRun 達到失敗狀態所花費的毫秒數。帶有 dag_id 和 run_type 標籤的指標。

dagrun.schedule_delay.<dag_id>

計劃的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數

dagrun.schedule_delay

計劃的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數。帶有 dag_id 標籤的指標。

scheduler.critical_section_duration

排程器迴圈關鍵部分花費的毫秒數 – 每次只有一個排程器可以進入此迴圈

scheduler.critical_section_query_duration

執行關鍵部分任務例項查詢花費的毫秒數

scheduler.scheduler_loop_duration

執行一次排程器迴圈花費的毫秒數

dagrun.<dag_id>.first_task_scheduling_delay

第一個任務 start_date 與 dagrun 預期開始時間之間的毫秒數

dagrun.first_task_scheduling_delay

第一個任務 start_date 與 dagrun 預期開始時間之間的毫秒數。帶有 dag_id 和 run_type 標籤的指標。

collect_db_dags

從資料庫中獲取所有序列化 Dag 所花費的毫秒數

kubernetes_executor.clear_not_launched_queued_tasks.duration

在 Kubernetes Executor 中清除未啟動的排隊任務所花費的毫秒數

kubernetes_executor.adopt_task_instances.duration

在 Kubernetes Executor 中接管任務例項所花費的毫秒數

此條目有幫助嗎?