airflow.providers.microsoft.azure.hooks.asb¶
屬性¶
類¶
BaseAzureServiceBusHook 類,用於建立會話並使用連線字串建立連線。 |
|
與 ServiceBusAdministrationClient 互動。 |
|
與 ServiceBusClient 互動。 |
模組內容¶
- class airflow.providers.microsoft.azure.hooks.asb.BaseAzureServiceBusHook(azure_service_bus_conn_id=default_conn_name)[source]¶
基類:
airflow.hooks.base.BaseHookBaseAzureServiceBusHook 類,用於建立會話並使用連線字串建立連線。
- 引數::
azure_service_bus_conn_id (str) – 引用 Azure Service Bus 連線。
- class airflow.providers.microsoft.azure.hooks.asb.AdminClientHook(azure_service_bus_conn_id=default_conn_name)[source]¶
-
與 ServiceBusAdministrationClient 互動。
這可以建立、更新、列出和刪除 Service Bus 名稱空間的資源。此 hook 使用從基類繼承的相同 Azure Service Bus 客戶端連線。
- create_queue(queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True, enable_batched_operations=True)[source]¶
透過連線到 Service Bus 管理客戶端建立佇列,並返回 QueueProperties。
- delete_queue(queue_name)[source]¶
透過 service bus 名稱空間中的 queue_name 刪除佇列。
- 引數::
queue_name (str) – 佇列的名稱或具有名稱的 QueueProperties。
- create_topic(topic_name, azure_service_bus_conn_id='azure_service_bus_default', default_message_time_to_live=None, max_size_in_megabytes=None, requires_duplicate_detection=None, duplicate_detection_history_time_window=None, enable_batched_operations=None, size_in_bytes=None, filtering_messages_before_publishing=None, authorization_rules=None, support_ordering=None, auto_delete_on_idle=None, enable_partitioning=None, enable_express=None, user_metadata=None, max_message_size_in_kilobytes=None)[source]¶
透過連線到 Service Bus 管理客戶端建立主題。
- 引數::
topic_name (str) – 主題名稱。
default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息生存時間跨度值。這是訊息傳送到 Service Bus 後,訊息到期前的持續時間。當訊息本身未設定 TimeToLive 時,這是使用的預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
max_size_in_megabytes (int | None) – 主題的最大大小(以兆位元組為單位),即為主題分配的記憶體大小。
requires_duplicate_detection (bool | None) – 一個值,指示此主題是否需要重複檢測。
duplicate_detection_history_time_window (datetime.timedelta | str | None) – ISO 8601 時間跨度結構,定義重複檢測歷史記錄的持續時間。預設值為 10 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。
size_in_bytes (int | None) – 主題的大小,以位元組為單位。
filtering_messages_before_publishing (bool | None) – 在釋出前過濾訊息。
authorization_rules (list[azure.servicebus.management.AuthorizationRule] | None) – 資源的授權規則列表。
support_ordering (bool | None) – 一個值,指示主題是否支援排序。
auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,主題在此間隔後會自動刪除。最小持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
enable_partitioning (bool | None) – 一個值,指示主題是否跨多個訊息代理進行分割槽。
enable_express (bool | None) – 一個值,指示是否啟用了 Express 實體。Express 佇列在將訊息寫入持久儲存之前,會暫時將其儲存在記憶體中。
user_metadata (str | None) – 與主題關聯的元資料。
max_message_size_in_kilobytes (int | None) – 佇列可以接受的訊息負載的最大大小(以千位元組為單位)。此功能僅在使用高階名稱空間和 Service Bus API 版本“2021-05”或更高版本時可用。允許的最小值為 1024,最大允許值為 102400。預設值為 1024。
- create_subscription(topic_name, subscription_name, lock_duration=None, requires_session=None, default_message_time_to_live=None, dead_lettering_on_message_expiration=True, dead_lettering_on_filter_evaluation_exceptions=None, max_delivery_count=10, enable_batched_operations=True, forward_to=None, user_metadata=None, forward_dead_lettered_messages_to=None, auto_delete_on_idle=None, filter_rule=None, filter_rule_name=None)[source]¶
在主題上建立具有指定名稱的訂閱,並返回其 SubscriptionProperties。
可以提供可選的 filter_rule,根據訊息的屬性過濾訊息。特別是,可以使用關聯 ID 過濾器將回復與請求配對。
- 引數::
topic_name (str) – 將擁有待建立訂閱的主題。
subscription_name (str) – 需要建立的訂閱名稱
lock_duration (datetime.timedelta | str | None) – ISO 8601 窺視鎖(peek-lock)的時間跨度持續時間;即訊息被鎖定以供其他接收者使用的時間量。LockDuration 的最大值為 5 分鐘;預設值為 1 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
requires_session (bool | None) – 一個值,指示佇列是否支援會話概念。
default_message_time_to_live (datetime.timedelta | str | None) – ISO 8601 預設訊息生存時間跨度值。這是訊息傳送到 Service Bus 後,訊息到期前的持續時間。當訊息本身未設定 TimeToLive 時,這是使用的預設值。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
dead_lettering_on_message_expiration (bool | None) – 一個值,指示此訂閱在訊息到期時是否支援死信。
dead_lettering_on_filter_evaluation_exceptions (bool | None) – 一個值,指示此訂閱在訊息到期時是否支援死信。
max_delivery_count (int | None) – 最大傳遞計數。訊息在此傳遞次數後會自動進入死信佇列。預設值為 10。
enable_batched_operations (bool | None) – 指示是否啟用伺服器端批處理操作的值。
forward_to (str | None) – 接收實體名稱,傳送到此訂閱的所有訊息都將轉發到該實體。
user_metadata (str | None) – 與訂閱關聯的元資料。最大字元數為 1024。
forward_dead_lettered_messages_to (str | None) – 接收實體名稱,傳送到此訂閱的所有訊息都將轉發到該實體。
auto_delete_on_idle (datetime.timedelta | str | None) – ISO 8601 時間跨度空閒間隔,訂閱在此間隔後會自動刪除。最小持續時間為 5 分鐘。接受 ~datetime.timedelta 型別或 ISO 8601 持續時間格式(如“PT300S”)字串作為輸入值。
filter_rule (azure.servicebus.management.CorrelationRuleFilter | azure.servicebus.management.SqlRuleFilter | None) – 應用於訊息的可選關聯或 SQL 規則過濾器。
filter_rule_name (str | None) – 應用於訂閱的規則過濾器的可選規則名稱
azure_service_bus_conn_id – 引用 Azure Service Bus 連線。
- class airflow.providers.microsoft.azure.hooks.asb.MessageHook(azure_service_bus_conn_id=default_conn_name)[source]¶
-
與 ServiceBusClient 互動。
這作為獲取 ServiceBusSender 和 ServiceBusReceiver 的高階介面。
- send_message(queue_name, messages, batch_message_flag=False, message_id=None, reply_to=None, message_headers=None)[source]¶
使用 ServiceBusClient Send 將訊息傳送到一個或多個 Service Bus 佇列。
透過使用
batch_message_flag,它啟用並以批次訊息傳送訊息。- 引數::
queue_name (str) – 佇列的名稱或具有名稱的 QueueProperties。
batch_message_flag (bool) – 布林標誌,如果訊息需要作為批次訊息傳送,則可以設定為 True。
message_id (str | None) – 設定在傳送到佇列的訊息上的訊息 ID。請注意,message_id 只能在傳送單條訊息時設定。
reply_to (str | None) – 需要傳送到佇列的回覆目標。
message_headers (dict[str | bytes, int | float | bytes | bool | str | uuid.UUID] | None) – 要新增到 Azure Service Bus 訊息的 application_properties 欄位的頭部資訊。
- receive_message(queue_name, context, max_message_count=1, max_wait_time=None, message_callback=None)[source]¶
在指定的佇列名稱中一次接收一批訊息。