airflow.providers.microsoft.azure.hooks.asb

屬性

MessageCallback

BaseAzureServiceBusHook

BaseAzureServiceBusHook 類,用於建立會話並使用連線字串建立連線。

AdminClientHook

與 ServiceBusAdministrationClient 互動。

MessageHook

與 ServiceBusClient 互動。

模組內容

airflow.providers.microsoft.azure.hooks.asb.MessageCallback[source]
class airflow.providers.microsoft.azure.hooks.asb.BaseAzureServiceBusHook(azure_service_bus_conn_id=default_conn_name)[source]

基類: airflow.hooks.base.BaseHook

BaseAzureServiceBusHook 類,用於建立會話並使用連線字串建立連線。

引數::

azure_service_bus_conn_id (str) – 引用 Azure Service Bus 連線

conn_name_attr = 'azure_service_bus_conn_id'[source]
default_conn_name = 'azure_service_bus_default'[source]
conn_type = 'azure_service_bus'[source]
hook_name = 'Azure Service Bus'[source]
classmethod get_connection_form_widgets()[source]

返回要新增到連線表單的連線小部件。

classmethod get_ui_field_behaviour()[source]

返回自定義欄位行為。

conn_id = 'azure_service_bus_default'[source]
abstract get_conn()[source]

返回 hook 的連線。

class airflow.providers.microsoft.azure.hooks.asb.AdminClientHook(azure_service_bus_conn_id=default_conn_name)[source]

基類: BaseAzureServiceBusHook

與 ServiceBusAdministrationClient 互動。

這可以建立、更新、列出和刪除 Service Bus 名稱空間的資源。此 hook 使用從基類繼承的相同 Azure Service Bus 客戶端連線。

get_conn()[source]

建立 ServiceBusAdministrationClient 例項。

這使用連線詳細資訊中的連線字串。

create_queue(queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True, enable_batched_operations=True)[source]

透過連線到 Service Bus 管理客戶端建立佇列,並返回 QueueProperties。

引數::
  • queue_name (str) – 佇列的名稱或具有名稱的 QueueProperties。

  • max_delivery_count (int) – 最大傳遞計數。訊息在此傳遞次數後會自動進入死信佇列。預設值為 10。

  • dead_lettering_on_message_expiration (bool) – 一個值,指示此訂閱在訊息到期時是否支援死信。

  • enable_batched_operations (bool) – 指示是否啟用伺服器端批處理操作的值。

delete_queue(queue_name)[source]

透過 service bus 名稱空間中的 queue_name 刪除佇列。

引數::

queue_name (str) – 佇列的名稱或具有名稱的 QueueProperties。

create_topic(topic_name, azure_service_bus_conn_id='azure_service_bus_default', default_message_time_to_live=None, max_size_in_megabytes=None, requires_duplicate_detection=None, duplicate_detection_history_time_window=None, enable_batched_operations=None, size_in_bytes=None, filtering_messages_before_publishing=None, authorization_rules=None, support_ordering=None, auto_delete_on_idle=None, enable_partitioning=None, enable_express=None, user_metadata=None, max_message_size_in_kilobytes=None)[source]

透過連線到 Service Bus 管理客戶端建立主題。

引數::
  • topic_name (str) – 主題名稱。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息生存時間跨度值。這是訊息傳送到 Service Bus 後,訊息到期前的持續時間。當訊息本身未設定 TimeToLive 時,這是使用的預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • max_size_in_megabytes (int | None) – 主題的最大大小(以兆位元組為單位),即為主題分配的記憶體大小。

  • requires_duplicate_detection (bool | None) – 一個值,指示此主題是否需要重複檢測。

  • duplicate_detection_history_time_window (datetime.timedelta | str | None) – ISO 8601 時間跨度結構,定義重複檢測歷史記錄的持續時間。預設值為 10 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

  • size_in_bytes (int | None) – 主題的大小,以位元組為單位。

  • filtering_messages_before_publishing (bool | None) – 在釋出前過濾訊息。

  • authorization_rules (list[azure.servicebus.management.AuthorizationRule] | None) – 資源的授權規則列表。

  • support_ordering (bool | None) – 一個值,指示主題是否支援排序。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,主題在此間隔後會自動刪除。最小持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • enable_partitioning (bool | None) – 一個值,指示主題是否跨多個訊息代理進行分割槽。

  • enable_express (bool | None) – 一個值,指示是否啟用了 Express 實體。Express 佇列在將訊息寫入持久儲存之前,會暫時將其儲存在記憶體中。

  • user_metadata (str | None) – 與主題關聯的元資料。

  • max_message_size_in_kilobytes (int | None) – 佇列可以接受的訊息負載的最大大小(以千位元組為單位)。此功能僅在使用高階名稱空間和 Service Bus API 版本“2021-05”或更高版本時可用。允許的最小值為 1024,最大允許值為 102400。預設值為 1024。

create_subscription(topic_name, subscription_name, lock_duration=None, requires_session=None, default_message_time_to_live=None, dead_lettering_on_message_expiration=True, dead_lettering_on_filter_evaluation_exceptions=None, max_delivery_count=10, enable_batched_operations=True, forward_to=None, user_metadata=None, forward_dead_lettered_messages_to=None, auto_delete_on_idle=None, filter_rule=None, filter_rule_name=None)[source]

在主題上建立具有指定名稱的訂閱,並返回其 SubscriptionProperties。

可以提供可選的 filter_rule,根據訊息的屬性過濾訊息。特別是,可以使用關聯 ID 過濾器將回復與請求配對。

引數::
  • topic_name (str) – 將擁有待建立訂閱的主題。

  • subscription_name (str) – 需要建立的訂閱名稱

  • lock_duration (datetime.timedelta | str | None) – ISO 8601 窺視鎖(peek-lock)的時間跨度持續時間;即訊息被鎖定以供其他接收者使用的時間量。LockDuration 的最大值為 5 分鐘;預設值為 1 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • requires_session (bool | None) – 一個值,指示佇列是否支援會話概念。

  • default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息生存時間跨度值。這是訊息傳送到 Service Bus 後,訊息到期前的持續時間。當訊息本身未設定 TimeToLive 時,這是使用的預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • dead_lettering_on_message_expiration (bool | None) – 一個值,指示此訂閱在訊息到期時是否支援死信。

  • dead_lettering_on_filter_evaluation_exceptions (bool | None) – 一個值,指示此訂閱在訊息到期時是否支援死信。

  • max_delivery_count (int | None) – 最大傳遞計數。訊息在此傳遞次數後會自動進入死信佇列。預設值為 10。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

  • forward_to (str | None) – 接收實體名稱,傳送到此訂閱的所有訊息都將轉發到該實體。

  • user_metadata (str | None) – 與訂閱關聯的元資料。最大字元數為 1024。

  • forward_dead_lettered_messages_to (str | None) – 接收實體名稱,傳送到此訂閱的所有訊息都將轉發到該實體。

  • auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,訂閱在此間隔後會自動刪除。最小持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。

  • filter_rule (azure.servicebus.management.CorrelationRuleFilter | azure.servicebus.management.SqlRuleFilter | None) – 應用於訊息的可選關聯或 SQL 規則過濾器。

  • filter_rule_name (str | None) – 應用於訂閱的規則過濾器的可選規則名稱

  • azure_service_bus_conn_id – 引用 Azure Service Bus 連線

update_subscription(topic_name, subscription_name, max_delivery_count=None, dead_lettering_on_message_expiration=None, enable_batched_operations=None)[source]

更新 ServiceBus 名稱空間下的 Azure ServiceBus 主題訂閱。

引數::
  • topic_name (str) – 將擁有待建立訂閱的主題。

  • subscription_name (str) – 需要建立的訂閱名稱。

  • max_delivery_count (int | None) – 最大傳遞計數。訊息在此傳遞次數後會自動進入死信佇列。預設值為 10。

  • dead_lettering_on_message_expiration (bool | None) – 一個值,指示此訂閱在訊息到期時是否支援死信。

  • enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。

delete_subscription(subscription_name, topic_name)[source]

刪除 ServiceBus 名稱空間下的主題訂閱實體。

引數::
  • subscription_name (str) – 將在主題中擁有規則的訂閱名稱

  • topic_name (str) – 將擁有訂閱規則的主題。

class airflow.providers.microsoft.azure.hooks.asb.MessageHook(azure_service_bus_conn_id=default_conn_name)[source]

基類: BaseAzureServiceBusHook

與 ServiceBusClient 互動。

這作為獲取 ServiceBusSender 和 ServiceBusReceiver 的高階介面。

get_conn()[source]

透過使用連線詳細資訊中的連線字串建立並返回 ServiceBusClient。

send_message(queue_name, messages, batch_message_flag=False, message_id=None, reply_to=None, message_headers=None)[source]

使用 ServiceBusClient Send 將訊息傳送到一個或多個 Service Bus 佇列。

透過使用 batch_message_flag,它啟用並以批次訊息傳送訊息。

引數::
  • queue_name (str) – 佇列的名稱或具有名稱的 QueueProperties。

  • messages (str | list[str]) – 需要傳送到佇列的訊息。可以是字串或字串列表。

  • batch_message_flag (bool) – 布林標誌,如果訊息需要作為批次訊息傳送,則可以設定為 True。

  • message_id (str | None) – 設定在傳送到佇列的訊息上的訊息 ID。請注意,message_id 只能在傳送單條訊息時設定。

  • reply_to (str | None) – 需要傳送到佇列的回覆目標。

  • message_headers (dict[str | bytes, int | float | bytes | bool | str | uuid.UUID] | None) – 要新增到 Azure Service Bus 訊息的 application_properties 欄位的頭部資訊。

static send_list_messages(sender, messages, message_creator)[source]
static send_batch_message(sender, messages, message_creator)[source]
receive_message(queue_name, context, max_message_count=1, max_wait_time=None, message_callback=None)[source]

在指定的佇列名稱中一次接收一批訊息。

引數::
  • queue_name (str) – 佇列的名稱或包含名稱的 QueueProperties 物件。

  • max_message_count (int | None) – 批次中訊息的最大數量。

  • max_wait_time (float | None) – 等待第一條訊息到達的最大時間(秒)。

  • message_callback (MessageCallback | None) – 用於處理每條訊息的可選回撥函式。如果未提供,訊息將被記錄並完成處理。如果提供且丟擲異常,訊息將被放棄以便將來重新投遞。

receive_subscription_message(topic_name, subscription_name, context, max_message_count, max_wait_time, message_callback=None)[source]

一次接收一批訂閱訊息。

如果您希望同時處理多條訊息,或者作為一次呼叫執行即時(ad-hoc)接收,此方法是最佳的。

引數::
  • subscription_name (str) – 將在主題中擁有規則的訂閱名稱

  • topic_name (str) – 將擁有訂閱規則的主題。

  • max_message_count (int | None) – 批次中訊息的最大數量。實際返回數量取決於 prefetch_count 和傳入流速率。設定為 None 將完全取決於預取(prefetch)配置。預設值為 1。

  • max_wait_time (float | None) – 等待第一條訊息到達的最大時間(秒)。如果沒有訊息到達且未指定超時,此呼叫將不會返回,直到連線關閉。如果指定了超時,並且在超時期限內沒有訊息到達,將返回一個空列表。

此條目有幫助嗎?