airflow.providers.amazon.aws.hooks.batch_waiters¶
AWS Batch 服務等待器。
另請參閱
類¶
一個用於管理 AWS Batch 服務等待器的工具類。 |
模組內容¶
- class airflow.providers.amazon.aws.hooks.batch_waiters.BatchWaitersHook(*args, waiter_config=None, **kwargs)[source]¶
基類:
airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook一個用於管理 AWS Batch 服務等待器的工具類。
import random from airflow.providers.amazon.aws.operators.batch_waiters import BatchWaiters # to inspect default waiters waiters = BatchWaiters() config = waiters.default_config # type: Dict waiter_names = waiters.list_waiters() # -> ["JobComplete", "JobExists", "JobRunning"] # The default_config is a useful stepping stone to creating custom waiters, e.g. custom_config = waiters.default_config # this is a deepcopy # modify custom_config['waiters'] as necessary and get a new instance: waiters = BatchWaiters(waiter_config=custom_config) waiters.waiter_config # check the custom configuration (this is a deepcopy) waiters.list_waiters() # names of custom waiters # During the init for BatchWaiters, the waiter_config is used to build a waiter_model; # and note that this only occurs during the class init, to avoid any accidental mutations # of waiter_config leaking into the waiter_model. waiters.waiter_model # -> botocore.waiter.WaiterModel object # The waiter_model is combined with the waiters.client to get a specific waiter # and the details of the config on that waiter can be further modified without any # accidental impact on the generation of new waiters from the defined waiter_model, e.g. waiters.get_waiter("JobExists").config.delay # -> 5 waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object waiter.config.delay = 10 waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model # To use a specific waiter, update the config and call the `wait()` method for jobId, e.g. waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object waiter.config.delay = random.uniform(1, 10) # seconds waiter.config.max_attempts = 10 waiter.wait(jobs=[jobId])
另請參閱
- 引數:
waiter_config (dict | None) – AWS Batch 服務的自定義等待器配置
aws_conn_id – AWS 憑證 / 區域名稱的連線 ID。如果為 None,將使用 boto3 憑證策略 (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html)。
region_name – 在 AWS 客戶端中使用的區域名稱。覆蓋連線中(如果提供)的 AWS 區域。
- property waiter_config: dict[source]¶
此例項的不可變等待器配置;此屬性返回一個
deepcopy。在 BatchWaiters 初始化期間,waiter_config 用於構建 waiter_model,這僅在類初始化期間發生,以避免 waiter_config 的任何意外修改洩露到 waiter_model 中。
- 返回:
AWS Batch 服務的等待器配置
- 返回型別:
- property waiter_model: botocore.waiter.WaiterModel[source]¶
用於在 AWS Batch 服務上生成等待器的已配置等待器模型。
- 返回:
AWS Batch 服務的等待器模型
- 返回型別:
botocore.waiter.WaiterModel
- get_waiter(waiter_name, parameters=None, config_overrides=None, deferrable=False, client=None)[source]¶
使用已配置的
.waiter_model獲取 AWS Batch 服務等待器。.waiter_model與.client結合使用以獲取特定的等待器,並且可以修改該等待器的屬性,而不會對從.waiter_model生成新等待器產生任何意外影響,例如:waiters.get_waiter("JobExists").config.delay # -> 5 waiter = waiters.get_waiter("JobExists") # a new waiter object waiter.config.delay = 10 waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model
要使用特定的等待器,請更新配置並呼叫 jobId 的 wait() 方法,例如:
import random waiter = waiters.get_waiter("JobExists") # a new waiter object waiter.config.delay = random.uniform(1, 10) # seconds waiter.config.max_attempts = 10 waiter.wait(jobs=[jobId])
- 引數:
waiter_name (str) – 等待器的名稱。該名稱應與等待器模型檔案中的鍵名稱(包括大小寫)匹配(通常是 CamelCasing);請參閱
.list_waiters。parameters (dict[str, str] | None) – 未使用,僅用於匹配 base_aws 中的方法簽名
config_overrides (dict[str, Any] | None) – 未使用,僅用於匹配 base_aws 中的方法簽名
deferrable (bool) – 未使用,僅用於匹配 base_aws 中的方法簽名
client – 未使用,僅用於匹配 base_aws 中的方法簽名
- 返回:
指定 AWS Batch 服務的等待器物件
- 返回型別:
botocore.waiter.Waiter
- wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]¶
等待 Batch 作業完成。
這假設
.waiter_model使用.default_config的某種變體進行配置,以便它可以生成具有以下名稱的等待器:“JobExists”、“JobRunning”和“JobComplete”。- 引數:
job_id (str) – Batch 作業 ID
get_batch_log_fetcher (Callable[[str], airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher | None] | None) – 一個方法,返回 AwsTaskLogFetcher 型別的 batch_log_fetcher,或者在 CloudWatch 日誌流尚未建立時返回 None。
- 丟擲:
AirflowException
注意
此方法會給
delay新增一個小的隨機抖動(+/- 2 秒,>= 1 秒)。使用隨機間隔有助於在許多併發任務請求作業描述時避免 AWS API 限制。它還將
max_attempts修改為使用sys.maxsize,這使得 Airflow 能夠管理等待超時。