Airflow Summit 2025 即將於 10 月 07-09 日召開。立即註冊搶購早鳥票!

建立通知器

您還可以設定 template_fields 屬性來指定哪些屬性應被渲染為模板。

以下是建立 Notifier 類的一個示例

from airflow.sdk import BaseNotifier
from my_provider import send_message


class MyNotifier(BaseNotifier):
    template_fields = ("message",)

    def __init__(self, message):
        self.message = message

    def notify(self, context):
        # Send notification here, below is an example
        title = f"Task {context['task_instance'].task_id} failed"
        send_message(title, self.message)

使用通知器

以下是使用上述通知器的一個示例

from datetime import datetime

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
    dag_id="example_notifier",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    on_success_callback=MyNotifier(message="Success!"),
    on_failure_callback=MyNotifier(message="Failure!"),
):
    task = BashOperator(
        task_id="example_task",
        bash_command="exit 1",
        on_success_callback=MyNotifier(message="Task Succeeded!"),
    )

有關社群管理的通知器列表,請參閱 通知

此條目有幫助嗎?