airflow.providers.microsoft.azure.operators.asb

屬性

MessageCallback

AzureServiceBusCreateQueueOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 佇列。

AzureServiceBusSendMessageOperator

向 Service Bus 佇列傳送訊息或批次訊息。

AzureServiceBusReceiveMessageOperator

在指定的佇列名稱中一次接收一批訊息。

AzureServiceBusDeleteQueueOperator

刪除 Azure Service Bus 名稱空間中的佇列。

AzureServiceBusTopicCreateOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 主題。

AzureServiceBusSubscriptionCreateOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 主題訂閱。

AzureServiceBusUpdateSubscriptionOperator

在 Service Bus 名稱空間下更新 Azure ServiceBus 主題訂閱。

ASBReceiveSubscriptionMessageOperator

從特定主題下的 Service Bus 訂閱接收批次訊息。

AzureServiceBusSubscriptionDeleteOperator

刪除 Azure ServiceBus 名稱空間中的主題訂閱。

AzureServiceBusTopicDeleteOperator

刪除 Azure Service Bus 名稱空間中的主題。

模組內容

airflow.providers.microsoft.azure.operators.asb.MessageCallback[source]
class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusCreateQueueOperator(*, queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True, enable_batched_operations=True, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基類: airflow.models.BaseOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 佇列。

另請參閱

有關如何使用此 Operator 的更多資訊,請檢視指南: 建立 Azure Service Bus 佇列

引數:
  • queue_name (str) – 佇列的名稱。應是唯一的。

  • max_delivery_count (int) – 最大投遞次數。訊息在此次數投遞後將自動進入死信佇列。預設值為 10。

  • dead_lettering_on_message_expiration (bool) – 一個值,指示此訂閱在訊息過期時是否支援死信。

  • enable_batched_operations (bool) – 一個值,指示是否啟用伺服器端批處理操作。

  • azure_service_bus_conn_id (str) – 對 Azure Service Bus 連線 的引用。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
max_delivery_count = 10[source]
dead_lettering_on_message_expiration = True[source]
enable_batched_operations = True[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

透過連線到 hook 中的 Service Bus Admin 客戶端,在 Azure Service Bus 名稱空間中建立佇列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSendMessageOperator(*, queue_name, message, batch=False, azure_service_bus_conn_id='azure_service_bus_default', message_id=None, reply_to=None, message_headers=None, **kwargs)[source]

基類: airflow.models.BaseOperator

向 Service Bus 佇列傳送訊息或批次訊息。

另請參閱

有關如何使用此 Operator 的更多資訊,請檢視指南: 向 Azure Service Bus 佇列傳送訊息

引數:
  • queue_name (str) – 佇列的名稱。應是唯一的。

  • message (str | list[str]) – 需要傳送到佇列的訊息。可以是字串或字串列表。

  • batch (bool) – 布林標誌,預設為 False,如果訊息需要作為批處理訊息傳送,則可設定為 True。

  • azure_service_bus_conn_id (str) – 對 :ref: Azure Service Bus 連線<howto/connection:azure_service_bus> 的引用。

  • message_id (str | None) – 要設定在傳送到佇列的訊息上的訊息 ID。請注意,message_id 只能在傳送單個訊息時設定。

  • reply_to (str | None) – 接收者應回覆的佇列或主題名稱。回覆將傳送到佇列還是主題應在帶外確定。

  • message_headers (dict[str | bytes, int | float | bytes | bool | str | uuid.UUID] | None) – 要新增到 Azure Service Bus 訊息 application_properties 欄位的標頭。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
batch = False[source]
message[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
message_id = None[source]
reply_to = None[source]
message_headers = None[source]
execute(context)[source]

將訊息傳送到 Service Bus 名稱空間中的特定佇列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusReceiveMessageOperator(*, queue_name, azure_service_bus_conn_id='azure_service_bus_default', max_message_count=10, max_wait_time=5, message_callback=None, **kwargs)[source]

基類: airflow.models.BaseOperator

在指定的佇列名稱中一次接收一批訊息。

另請參閱

有關如何使用此 Operator 的更多資訊,請檢視指南: 接收 Azure Service Bus 佇列訊息

引數:
  • queue_name (str) – 佇列名稱的名稱或帶有名稱的 QueueProperties。

  • max_message_count (int) – 批處理中的最大訊息數。

  • max_wait_time (float) – 等待第一條訊息到達的最大時間(以秒為單位)。

  • azure_service_bus_conn_id (str) – 對 :ref: Azure Service Bus 連線 <howto/connection:azure_service_bus> 的引用。

  • message_callback (MessageCallback | None) – 用於處理每條訊息的可選回撥。如果未提供,則訊息將被記錄並完成。如果提供,並且丟擲異常,訊息將被放棄以供將來重新投遞。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
max_message_count = 10[source]
max_wait_time = 5[source]
message_callback = None[source]
execute(context)[source]

透過連線到 Service Bus 客戶端,在 Service Bus 名稱空間中的特定佇列中接收訊息。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusDeleteQueueOperator(*, queue_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基類: airflow.models.BaseOperator

刪除 Azure Service Bus 名稱空間中的佇列。

另請參閱

有關如何使用此 Operator 的更多資訊,請檢視指南: 刪除 Azure Service Bus 佇列

引數:
  • queue_name (str) – Service Bus 名稱空間中的佇列名稱。

  • azure_service_bus_conn_id (str) – 對 :ref: Azure Service Bus 連線 <howto/connection:azure_service_bus> 的引用。

template_fields: collections.abc.Sequence[str] = ('queue_name',)[source]
ui_color = '#e4f0e8'[source]
queue_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端,刪除 Service Bus 名稱空間中的佇列。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator(*, topic_name, azure_service_bus_conn_id='azure_service_bus_default', default_message_time_to_live=None, max_size_in_megabytes=None, requires_duplicate_detection=None, duplicate_detection_history_time_window=None, enable_batched_operations=None, size_in_bytes=None, filtering_messages_before_publishing=None, authorization_rules=None, support_ordering=None, auto_delete_on_idle=None, enable_partitioning=None, enable_express=None, user_metadata=None, max_message_size_in_kilobytes=None, **kwargs)[source]

基類: airflow.models.BaseOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 主題。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 建立 Azure Service Bus 主題

引數:
  • topic_name (str) – 主題的名稱。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息存活時間值。這是訊息過期前的持續時間,從訊息傳送到 Service Bus 時開始計算。當訊息本身未設定 TimeToLive 時,使用此預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • max_size_in_megabytes (int | None) – 主題的最大大小(以兆位元組為單位),這是為主題分配的記憶體大小。

  • requires_duplicate_detection (bool | None) – 一個值,指示此主題是否需要重複檢測。

  • duplicate_detection_history_time_window (datetime.timedelta | str | None) – 定義重複檢測歷史記錄持續時間的 ISO 8601 時間跨度結構。預設值為 10 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

  • size_in_bytes (int | None) – 主題的大小,以位元組為單位。

  • filtering_messages_before_publishing (bool | None) – 在釋出訊息之前對其進行篩選。

  • authorization_rules (list[azure.servicebus.management.AuthorizationRule] | None) – 資源的授權規則列表。

  • support_ordering (bool | None) – 一個值,指示主題是否支援排序。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,之後主題將自動刪除。最短持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • enable_partitioning (bool | None) – 一個值,指示主題是否將跨多個訊息代理進行分割槽。

  • enable_express (bool | None) – 一個值,指示是否啟用 Express 實體。快速佇列在將訊息寫入持久儲存之前會將其暫時儲存在記憶體中。

  • user_metadata (str | None) – 與主題關聯的元資料。

  • max_message_size_in_kilobytes (int | None) – 佇列可接受的訊息負載的最大大小(以千位元組為單位)。此功能僅在使用高階名稱空間和 Service Bus API 版本“2021-05”或更高版本時可用。允許的最小值為 1024,允許的最大值為 102400。預設值為 1024。

template_fields: collections.abc.Sequence[str] = ('topic_name',)[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
default_message_time_to_live = None[source]
max_size_in_megabytes = None[source]
requires_duplicate_detection = None[source]
duplicate_detection_history_time_window = None[source]
enable_batched_operations = None[source]
size_in_bytes = None[source]
filtering_messages_before_publishing = None[source]
authorization_rules = None[source]
support_ordering = None[source]
auto_delete_on_idle = None[source]
enable_partitioning = None[source]
enable_express = None[source]
user_metadata = None[source]
max_message_size_in_kilobytes = None[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端,在 Service Bus 名稱空間中建立主題。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionCreateOperator(*, topic_name, subscription_name, azure_service_bus_conn_id='azure_service_bus_default', lock_duration=None, requires_session=None, default_message_time_to_live=None, dead_lettering_on_message_expiration=True, dead_lettering_on_filter_evaluation_exceptions=None, max_delivery_count=10, enable_batched_operations=True, forward_to=None, user_metadata=None, forward_dead_lettered_messages_to=None, auto_delete_on_idle=None, filter_rule=None, filter_rule_name=None, **kwargs)[source]

基類: airflow.models.BaseOperator

在 Service Bus 名稱空間下建立一個 Azure Service Bus 主題訂閱。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 建立 Azure Service Bus 訂閱

引數:
  • topic_name (str) – 將擁有待建立訂閱的主題。

  • subscription_name (str) – 待建立的訂閱名稱

  • lock_duration (datetime.timedelta | str | None) – ISO 8601 探查鎖定(peek-lock)的時間跨度持續時間;即,訊息被鎖定以便其他接收方無法接收的時間量。LockDuration 的最大值為 5 分鐘;預設值為 1 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • requires_session (bool | None) – 一個值,指示訂閱是否支援會話概念。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息存活時間值。這是訊息過期前的持續時間,從訊息傳送到 Service Bus 時開始計算。當訊息本身未設定 TimeToLive 時,使用此預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • dead_lettering_on_message_expiration (bool | None) – 一個值,指示當訊息過期時,此訂閱是否支援死信佇列。

  • dead_lettering_on_filter_evaluation_exceptions (bool | None) – 一個值,指示在篩選器評估異常時此訂閱是否支援死信佇列。

  • max_delivery_count (int | None) – 最大投遞次數。訊息在此次數投遞後會自動進入死信佇列。預設值為 10。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

  • forward_to (str | None) – 所有傳送到訂閱的訊息將被轉發到的接收實體名稱。

  • user_metadata (str | None) – 與訂閱關聯的元資料。最大字元數為 1024。

  • forward_dead_lettered_messages_to (str | None) – 訂閱中的所有死信訊息將被轉發到的接收實體名稱。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,之後訂閱將自動刪除。最短持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)的字串輸入值。

  • filter_rule (azure.servicebus.management.CorrelationRuleFilter | azure.servicebus.management.SqlRuleFilter | None) – 可選的關聯規則或 SQL 規則篩選器,應用於訊息。

  • filter_rule_name (str | None) – 應用規則篩選器到訂閱時使用的可選規則名稱

  • azure_service_bus_conn_id (str) – 對 Azure Service Bus 連線 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
lock_duration = None[source]
requires_session = None[source]
default_message_time_to_live = None[source]
dl_on_message_expiration = True[source]
dl_on_filter_evaluation_exceptions = None[source]
max_delivery_count = 10[source]
enable_batched_operations = True[source]
forward_to = None[source]
user_metadata = None[source]
forward_dead_lettered_messages_to = None[source]
auto_delete_on_idle = None[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
filter_rule = None[source]
filter_rule_name = None[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端,在 Service Bus 名稱空間中建立訂閱。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusUpdateSubscriptionOperator(*, topic_name, subscription_name, max_delivery_count=None, dead_lettering_on_message_expiration=None, enable_batched_operations=None, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基類: airflow.models.BaseOperator

在 Service Bus 名稱空間下更新 Azure ServiceBus 主題訂閱。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 更新 Azure Service Bus 訂閱

引數:
  • topic_name (str) – 將擁有待建立訂閱的主題。

  • subscription_name (str) – 需要更新的訂閱名稱。

  • max_delivery_count (int | None) – 最大投遞次數。訊息在此次數投遞後會自動進入死信佇列。預設值為 10。

  • dead_lettering_on_message_expiration (bool | None) – 一個值,指示當訊息過期時,此訂閱是否支援死信佇列。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

  • azure_service_bus_conn_id (str) – 對 Azure Service Bus 連線 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
max_delivery_count = None[source]
dl_on_message_expiration = None[source]
enable_batched_operations = None[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端更新訂閱屬性。

class airflow.providers.microsoft.azure.operators.asb.ASBReceiveSubscriptionMessageOperator(*, topic_name, subscription_name, max_message_count=1, max_wait_time=5, azure_service_bus_conn_id='azure_service_bus_default', message_callback=None, **kwargs)[source]

基類: airflow.models.BaseOperator

從特定主題下的 Service Bus 訂閱接收批次訊息。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:接收 Azure Service Bus 訂閱訊息

引數:
  • subscription_name (str) – 將擁有主題中規則的訂閱名稱

  • topic_name (str) – 將擁有訂閱規則的主題。

  • max_message_count (int | None) – 批處理中訊息的最大數量。實際返回數量將取決於 prefetch_count 和傳入流速率。設定為 None 將完全依賴於預取配置。預設值為 1。

  • max_wait_time (float | None) – 等待第一條訊息到達的最大秒數。如果沒有訊息到達且未指定超時,則此呼叫將不會返回,直到連線關閉。如果指定了超時但在此期間沒有訊息到達,將返回一個空列表。

  • azure_service_bus_conn_id (str) – 對 Azure Service Bus 連線 的引用。

  • message_callback (MessageCallback | None) – 用於處理每條訊息的可選回撥。如果未提供,則訊息將被記錄並完成。如果提供,並且丟擲異常,訊息將被放棄以供將來重新投遞。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
max_message_count = 1[source]
max_wait_time = 5[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
message_callback = None[source]
execute(context)[source]

透過連線到 Service Bus 客戶端,在 Service Bus 名稱空間中的特定佇列中接收訊息。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionDeleteOperator(*, topic_name, subscription_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基類: airflow.models.BaseOperator

刪除 Azure ServiceBus 名稱空間中的主題訂閱。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:刪除 Azure Service Bus 訂閱

引數:
  • topic_name (str) – 將擁有待建立訂閱的主題。

  • subscription_name (str) – 待建立的訂閱名稱

  • azure_service_bus_conn_id (str) – 對 Azure Service Bus 連線 的引用。

template_fields: collections.abc.Sequence[str] = ('topic_name', 'subscription_name')[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
subscription_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端,刪除 Service Bus 名稱空間中的主題訂閱。

class airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator(*, topic_name, azure_service_bus_conn_id='azure_service_bus_default', **kwargs)[source]

基類: airflow.models.BaseOperator

刪除 Azure Service Bus 名稱空間中的主題。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:刪除 Azure Service Bus 主題

引數:
template_fields: collections.abc.Sequence[str] = ('topic_name',)[source]
ui_color = '#e4f0e8'[source]
topic_name[source]
azure_service_bus_conn_id = 'azure_service_bus_default'[source]
execute(context)[source]

透過連線到 Service Bus Admin 客戶端,刪除 Service Bus 名稱空間中的主題。

此條目有幫助嗎?