訊息觸發器

等待佇列中的訊息

使用 MessageQueueTrigger 來等待佇列中的訊息。觸發器的引數是

  • queue - 佇列識別符號

可以根據佇列提供者提供附加引數。需要提供相關的預設連線 ID,例如,連線到 AWS SQS 中的佇列時,連線 ID 應為 aws_default

下面是一個示例,說明如何配置 Airflow DAG 以由 Amazon SQS 中的訊息觸發。

tests/system/common/messaging/example_message_queue_trigger.py

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")

# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])

with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
    EmptyOperator(task_id="task")

工作原理

1. 訊息佇列觸發器: MessageQueueTrigger 監聽來自外部佇列(例如,AWS SQS、Kafka 或其他訊息系統)的訊息。

2. 資產和監聽器: Asset 抽象外部實體,在此示例中即 SQS 佇列。AssetWatcher 將觸發器與一個名稱關聯。此名稱可幫助您識別哪個觸發器與哪個資產關聯。

3. 事件驅動的 DAG: DAG 不會按照固定的計劃執行,而是在資產接收到更新(例如,佇列中的新訊息)時執行。

此條目是否有幫助?