airflow.providers.amazon.aws.hooks.batch_waiters

AWS Batch 服務等待器。

BatchWaitersHook

一個用於管理 AWS Batch 服務等待器的工具類。

模組內容

class airflow.providers.amazon.aws.hooks.batch_waiters.BatchWaitersHook(*args, waiter_config=None, **kwargs)[source]

基類:airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook

一個用於管理 AWS Batch 服務等待器的工具類。

import random
from airflow.providers.amazon.aws.operators.batch_waiters import BatchWaiters

# to inspect default waiters
waiters = BatchWaiters()
config = waiters.default_config  # type: Dict
waiter_names = waiters.list_waiters()  # -> ["JobComplete", "JobExists", "JobRunning"]

# The default_config is a useful stepping stone to creating custom waiters, e.g.
custom_config = waiters.default_config  # this is a deepcopy
# modify custom_config['waiters'] as necessary and get a new instance:
waiters = BatchWaiters(waiter_config=custom_config)
waiters.waiter_config  # check the custom configuration (this is a deepcopy)
waiters.list_waiters()  # names of custom waiters

# During the init for BatchWaiters, the waiter_config is used to build a waiter_model;
# and note that this only occurs during the class init, to avoid any accidental mutations
# of waiter_config leaking into the waiter_model.
waiters.waiter_model  # -> botocore.waiter.WaiterModel object

# The waiter_model is combined with the waiters.client to get a specific waiter
# and the details of the config on that waiter can be further modified without any
# accidental impact on the generation of new waiters from the defined waiter_model, e.g.
waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

# To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
引數:
property default_config: dict[source]

一個不可變的預設等待器配置。

返回:

AWS Batch 服務的等待器配置

返回型別:

dict

property waiter_config: dict[source]

此例項的不可變等待器配置;此屬性返回一個 deepcopy

在 BatchWaiters 初始化期間,waiter_config 用於構建 waiter_model,這僅在類初始化期間發生,以避免 waiter_config 的任何意外修改洩露到 waiter_model 中。

返回:

AWS Batch 服務的等待器配置

返回型別:

dict

property waiter_model: botocore.waiter.WaiterModel[source]

用於在 AWS Batch 服務上生成等待器的已配置等待器模型。

返回:

AWS Batch 服務的等待器模型

返回型別:

botocore.waiter.WaiterModel

get_waiter(waiter_name, parameters=None, config_overrides=None, deferrable=False, client=None)[source]

使用已配置的 .waiter_model 獲取 AWS Batch 服務等待器。

.waiter_model.client 結合使用以獲取特定的等待器,並且可以修改該等待器的屬性,而不會對從 .waiter_model 生成新等待器產生任何意外影響,例如:

waiters.get_waiter("JobExists").config.delay  # -> 5
waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = 10
waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model

要使用特定的等待器,請更新配置並呼叫 jobId 的 wait() 方法,例如:

import random

waiter = waiters.get_waiter("JobExists")  # a new waiter object
waiter.config.delay = random.uniform(1, 10)  # seconds
waiter.config.max_attempts = 10
waiter.wait(jobs=[jobId])
引數:
  • waiter_name (str) – 等待器的名稱。該名稱應與等待器模型檔案中的鍵名稱(包括大小寫)匹配(通常是 CamelCasing);請參閱 .list_waiters

  • parameters (dict[str, str] | None) – 未使用,僅用於匹配 base_aws 中的方法簽名

  • config_overrides (dict[str, Any] | None) – 未使用,僅用於匹配 base_aws 中的方法簽名

  • deferrable (bool) – 未使用,僅用於匹配 base_aws 中的方法簽名

  • client – 未使用,僅用於匹配 base_aws 中的方法簽名

返回:

指定 AWS Batch 服務的等待器物件

返回型別:

botocore.waiter.Waiter

list_waiters()[source]

列出 AWS Batch 服務的等待器配置中的等待器。

返回:

AWS Batch 服務的等待器名稱

返回型別:

list[str]

wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]

等待 Batch 作業完成。

這假設 .waiter_model 使用 .default_config 的某種變體進行配置,以便它可以生成具有以下名稱的等待器:“JobExists”、“JobRunning”和“JobComplete”。

引數:
丟擲:

AirflowException

注意

此方法會給 delay 新增一個小的隨機抖動(+/- 2 秒,>= 1 秒)。使用隨機間隔有助於在許多併發任務請求作業描述時避免 AWS API 限制。

它還將 max_attempts 修改為使用 sys.maxsize,這使得 Airflow 能夠管理等待超時。

此條目有幫助嗎?