Airflow 2025 峰會將於 10 月 07 日至 09 日舉行。立即註冊享受早鳥票優惠!

叢集策略

如果你想在叢集級別檢查或修改 DAG 或任務,叢集策略可以實現這一點。它們有三個主要目的:

  • 檢查 DAG / 任務是否符合特定標準

  • 為 DAG / 任務設定預設引數

  • 執行自定義路由邏輯

叢集策略主要有三種類型:

  • dag_policy:接收一個名為 dagDAG 引數。在從 DagBag DagBag 載入 DAG 時執行。

  • task_policy:接收一個名為 taskBaseOperator 引數。該策略在載入時從 DagBag 解析任務建立任務時執行。這意味著整個任務定義可以在任務策略中修改。它與在 DagRun 中執行的特定任務無關。定義的 task_policy 將應用於將來執行的所有任務例項。

  • task_instance_mutation_hook:接收一個名為 task_instanceTaskInstance 引數。`task_instance_mutation_hook` 不應用於任務本身,而是應用於與特定 DagRun 相關的任務例項。它在“工作器”中執行,而不是在 DAG 檔案處理器中執行,恰好在任務例項執行之前。該策略僅應用於該任務當前執行的執行(即例項)。

DAG 和任務叢集策略可以丟擲 AirflowClusterPolicyViolation 異常,以指示傳遞給它們的 DAG/任務不合規,不應被載入。

當有意跳過某個 DAG 時,它們也可以丟擲 AirflowClusterPolicySkipDag 異常。與 AirflowClusterPolicyViolation 不同,此異常不會顯示在 Airflow Web UI 上(在內部,它不會記錄在元資料庫的 import_error 表中)。

叢集策略設定的任何額外屬性優先於你在 DAG 檔案中定義的屬性;例如,如果你在 DAG 檔案中的任務上設定了 sla,然後叢集策略也設定了 sla,則叢集策略的值將優先。

如何定義策略函式

有兩種配置叢集策略的方法:

  1. 在 Python 搜尋路徑中的某個位置建立一個 airflow_local_settings.py 檔案($AIRFLOW_HOME 下的 config/ 資料夾是一個不錯的“預設”位置),然後將與上述一個或多個叢集策略名稱(例如 dag_policy)匹配的可呼叫物件新增到檔案中。

有關如何配置本地設定的詳細資訊,請參閱 配置本地設定

  1. 透過在自定義模組中使用 Pluggy 介面的 setuptools entrypoint

    在 2.6 版本中新增。

    此方法更高階,適用於已經熟悉 Python 打包的人員。

    首先在模組中建立你的策略函式

    from airflow.policies import hookimpl
    
    
    @hookimpl
    def task_policy(task) -> None:
        # Mutate task in place
        # ...
        print(f"Hello from {__file__}")
    

    然後將 entrypoint 新增到你的專案規範中。例如,使用 pyproject.tomlsetuptools

    [build-system]
    requires = ["setuptools", "wheel"]
    build-backend = "setuptools.build_meta"
    
    [project]
    name = "my-airflow-plugin"
    version = "0.0.1"
    # ...
    
    dependencies = ["apache-airflow>=2.6"]
    [project.entry-points.'airflow.policy']
    _ = 'my_airflow_plugin.policies'
    

    entrypoint 組必須是 airflow.policy,名稱將被忽略。值應該是使用 @hookimpl 標記裝飾的模組(或類)。

    完成這些步驟並將你的分發包安裝到你的 Airflow 環境後,各種 Airflow 元件將呼叫策略函式。(具體的呼叫順序未定義,因此如果你有多個外掛,請不要依賴任何特定的呼叫順序)。

需要注意的一點(無論使用哪種方式定義策略函式)是引數名稱必須與以下文件中所示的完全匹配。

可用策略函式

airflow.policies.task_policy(task)[source]

允許在任務載入到 DagBag 後對其進行修改。

它允許管理員重新配置某些任務的引數。或者,你可以丟擲 AirflowClusterPolicyViolation 異常來阻止 DAG 執行。

以下是一些展示其用途的示例:

  • 你可以為使用 SparkOperator 的任務強制使用特定的佇列(例如 spark 佇列),以確保這些任務被分配到正確的工作器。

  • 你可以強制執行任務超時策略,確保沒有任務執行超過 48 小時。

引數:

task (airflow.models.baseoperator.BaseOperator) – 要修改的任務

airflow.policies.dag_policy(dag)[source]

允許在 DAG 載入到 DagBag 後對其進行修改。

它允許管理員重新配置某些 DAG 的引數。或者,你可以丟擲 AirflowClusterPolicyViolation 異常來阻止 DAG 執行。

以下是一些展示其用途的示例:

  • 你可以為 DAG 強制指定預設使用者

  • 檢查每個 DAG 是否配置了標籤

引數:

dag (airflow.models.dag.DAG) – 要修改的 DAG

airflow.policies.task_instance_mutation_hook(task_instance)[source]

允許在任務例項被 Airflow 排程器排隊之前對其進行修改。

例如,這可以用於在重試期間修改任務例項。

引數:

task_instance (airflow.models.taskinstance.TaskInstance) – 要修改的任務例項

airflow.policies.pod_mutation_hook(pod)[source]

在排程 Pod 之前對其進行修改。

此設定允許在將 kubernetes.client.models.V1Pod 物件傳遞給 Kubernetes 客戶端進行排程之前對其進行修改。

例如,這可用於向 KubernetesExecutor 或 KubernetesPodOperator 啟動的每個工作器 Pod 新增 sidecar 或 init 容器。

airflow.policies.get_airflow_context_vars(context)[source]

將 airflow 上下文變數注入預設的 airflow 上下文變數中。

此設定允許獲取 airflow 上下文變數,它們是鍵值對。然後它們被注入到預設的 airflow 上下文變數中,最終在執行任務時作為環境變數可用。`dag_id`, `task_id`, `logical_date`, `dag_run_id`, `try_number` 是保留鍵。

引數:

context – 感興趣的任務例項的上下文。

示例

DAG 策略

此策略檢查每個 DAG 是否至少定義了一個標籤

def dag_policy(dag: DAG):
    """Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
        )

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )


注意

為避免迴圈匯入,如果在叢集策略的型別註解中使用了 DAG,請確保從 airflow.models 匯入,而不是從 airflow 匯入。

注意

DAG 策略在 DAG 完全載入後應用,因此覆蓋 default_args 引數無效。如果你想覆蓋預設的 operator 設定,請改用任務策略。

任務策略

以下是強制對每個任務應用最大超時策略的示例

class TimedOperator(BaseOperator, ABC):
    timeout: timedelta


def task_policy(task: TimedOperator):
    if task.task_type == "HivePartitionSensor":
        task.queue = "sensor_queue"
    if task.timeout > timedelta(hours=48):
        task.timeout = timedelta(hours=48)


你也可以實現保護機制來防止常見錯誤,而不僅僅是作為技術安全控制。例如,不允許執行沒有 Airflow 所有者的任務。

def task_must_have_owners(task: BaseOperator):
    if task.owner and not isinstance(task.owner, str):
        raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")

    if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
        raise AirflowClusterPolicyViolation(
            f"""Task must have non-None non-default owner. Current value: {task.owner}"""
        )


如果你有多個要應用的檢查,最佳實踐是將這些規則整理在一個單獨的 Python 模組中,並擁有一個執行多個此類自定義檢查並聚合各種錯誤訊息的單一策略/任務修改 hook,以便在 UI(和資料庫中的 import_errors 表)中報告單個 AirflowClusterPolicyViolation

例如,你的 airflow_local_settings.py 可以遵循此模式

TASK_RULES: list[Callable[[BaseOperator], None]] = [
    task_must_have_owners,
]


def _check_task_rules(current_task: BaseOperator):
    """Check task rules for given task."""
    notices = []
    for rule in TASK_RULES:
        try:
            rule(current_task)
        except AirflowClusterPolicyViolation as ex:
            notices.append(str(ex))
    if notices:
        notices_list = " * " + "\n * ".join(notices)
        raise AirflowClusterPolicyViolation(
            f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
            f"Notices:\n"
            f"{notices_list}"
        )


def example_task_policy(task: BaseOperator):
    """Ensure Tasks have non-default owners."""
    _check_task_rules(task)


有關如何配置本地設定的詳細資訊,請參閱 配置本地設定

任務例項修改

以下是將正在進行第二次(或更多次)重試的任務重新路由到不同佇列的示例

def task_instance_mutation_hook(task_instance: TaskInstance):
    if task_instance.try_number >= 1:
        task_instance.queue = "retry_queue"


請注意,由於優先順序權重是使用權重規則動態確定的,因此你無法在修改 hook 中更改任務例項的 priority_weight

此條目有幫助嗎?