airflow.providers.amazon.aws.hooks.batch_client

一個用於 AWS Batch 服務的客戶端。

BatchProtocol

一個結構化協議,用於 boto3.client('batch') -> botocore.client.Batch

BatchClientHook

與 AWS Batch 互動。

模組內容

class airflow.providers.amazon.aws.hooks.batch_client.BatchProtocol[source]

基類: Protocol

一個結構化協議,用於 boto3.client('batch') -> botocore.client.Batch

這用於 BatchClient.client() 的型別提示;它只包含所需客戶端方法的一個子集。

describe_jobs(jobs)[source]

從 AWS Batch 獲取作業描述。

引數:

jobs (list[str]) – 要描述的 JobId 列表

返回:

描述作業的 API 響應

返回型別:

dict

get_waiter(waiterName)[source]

獲取 AWS Batch 服務等待器。

引數:

waiterName (str) – 等待器的名稱。名稱應與等待器模型檔案中鍵名稱(包括大小寫)匹配(通常為 CamelCasing)。

返回:

指定名稱的 AWS Batch 服務的等待器物件

返回型別:

botocore.waiter.Waiter

注意

AWS Batch 可能沒有任何等待器(直到 botocore PR-1307 釋出)。

import boto3

boto3.client("batch").waiter_names == []
submit_job(jobName, jobQueue, jobDefinition, arrayProperties, parameters, containerOverrides, ecsPropertiesOverride, eksPropertiesOverride, tags)[source]

提交一個 Batch 作業。

引數:
  • jobName (str) – AWS Batch 作業的名稱

  • jobQueue (str) – AWS Batch 上的佇列名稱

  • jobDefinition (str) – AWS Batch 上的作業定義名稱

  • arrayProperties (dict) – boto3 將接收的相同引數

  • parameters (dict) – boto3 將接收的相同引數

  • containerOverrides (dict) – boto3 將接收的相同引數

  • ecsPropertiesOverride (dict) – boto3 將接收的相同引數

  • eksPropertiesOverride (dict) – boto3 將接收的相同引數

  • tags (dict) – boto3 將接收的相同引數

返回:

一個 API 響應

返回型別:

dict

terminate_job(jobId, reason)[source]

終止一個 Batch 作業。

引數:
  • jobId (str) – 要終止的作業 ID

  • reason (str) – 終止作業 ID 的原因

返回:

一個 API 響應

返回型別:

dict

create_compute_environment(**kwargs)[source]

建立一個 AWS Batch 計算環境。

引數:

kwargs – boto3 create_compute_environment 的引數

class airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook(*args, max_retries=None, status_retries=None, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 AWS Batch 互動。

提供 `boto3.client("batch")` 的厚封裝。

引數:
  • max_retries (int | None) – 指數退避重試,4200 = 48 小時;僅當 waiters 為 None 時使用輪詢

  • status_retries (int | None) – 獲取作業狀態的 HTTP 重試次數,10 次;僅當 waiters 為 None 時使用輪詢

注意

一些方法使用預設的隨機延遲來檢查或輪詢作業狀態,例如 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX) 使用隨機間隔有助於在許多併發任務請求作業描述時避免 AWS API 限制。

要修改在使用隨機延遲檢查 Batch 作業狀態時允許的抖動範圍的全域性預設值,請修改這些預設值,例如:.. code-block

BatchClient.DEFAULT_DELAY_MIN = 0
BatchClient.DEFAULT_DELAY_MAX = 5

使用顯式延遲值時,會將 1 秒的隨機抖動應用於延遲(例如,延遲 0 秒將是 random.uniform(0, 1) 的延遲)。通常建議將隨機抖動新增到 API 請求中。為此提供了一個方便的方法,例如,要獲得 10 秒 +/- 5 秒的隨機延遲:delay = BatchClient.add_jitter(10, width=5, minima=0)

可以指定其他引數(例如 aws_conn_id),這些引數將傳遞給底層的 AwsBaseHook。

MAX_RETRIES = 4200[source]
STATUS_RETRIES = 10[source]
DEFAULT_DELAY_MIN = 1[source]
DEFAULT_DELAY_MAX = 10[source]
FAILURE_STATE = 'FAILED'[source]
SUCCESS_STATE = 'SUCCEEDED'[source]
RUNNING_STATE = 'RUNNING'[source]
INTERMEDIATE_STATES[source]
COMPUTE_ENVIRONMENT_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
COMPUTE_ENVIRONMENT_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
JOB_QUEUE_TERMINAL_STATUS = ('VALID', 'DELETED')[source]
JOB_QUEUE_INTERMEDIATE_STATUS = ('CREATING', 'UPDATING', 'DELETING')[source]
max_retries = 4200[source]
status_retries = 10[source]
property client: BatchProtocol | botocore.client.BaseClient[source]

一個用於 Batch 服務的 AWS API 客戶端。

返回:

針對 .region_name 的 boto3 'batch' 客戶端

返回型別:

BatchProtocol | botocore.client.BaseClient

terminate_job(job_id, reason)[source]

終止一個 Batch 作業。

引數:
  • job_id (str) – 要終止的作業 ID

  • reason (str) – 終止作業 ID 的原因

返回:

一個 API 響應

返回型別:

dict

check_job_success(job_id)[source]

檢查 Batch 作業的最終狀態。

如果作業狀態為“SUCCEEDED”,則返回 True,否則丟擲 AirflowException。

引數:

job_id (str) – Batch 作業 ID

丟擲:

AirflowException

wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]

等待 Batch 作業完成。

引數:
  • job_id (str) – Batch 作業 ID

  • delay (int | float | None) – 輪詢作業狀態前的延遲

:param get_batch_log_fetcher : 返回 batch_log_fetcher 的方法

丟擲:

AirflowException

poll_for_job_running(job_id, delay=None)[source]

輪詢作業執行狀態。

指示作業正在執行或已完成的狀態有:“RUNNING”|“SUCCEEDED”|“FAILED”。

因此,此方法將等待的狀態轉換包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”|“SUCCEEDED”|“FAILED”。

包含已完成狀態選項是為了處理狀態變化過快,導致輪詢無法檢測到從 STARTING 快速變為 RUNNING 然後完成(通常是失敗)的情況。

引數:
  • job_id (str) – Batch 作業 ID

  • delay (int | float | None) – 輪詢作業狀態前的延遲

丟擲:

AirflowException

poll_for_job_complete(job_id, delay=None)[source]

輪詢作業完成狀態。

指示作業完成的狀態有:“SUCCEEDED”|“FAILED”。

因此,此方法將等待的狀態轉換包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”>“SUCCEEDED”|“FAILED”。

引數:
  • job_id (str) – Batch 作業 ID

  • delay (int | float | None) – 輪詢作業狀態前的延遲

丟擲:

AirflowException

poll_job_status(job_id, match_status)[source]

使用指數退避策略(帶有 max_retries)輪詢作業狀態。

引數:
  • job_id (str) – Batch 作業 ID

  • match_status (list[str]) – 要匹配的作業狀態列表;Batch 作業狀態包括:“SUBMITTED”|“PENDING”|“RUNNABLE”|“STARTING”|“RUNNING”|“SUCCEEDED”|“FAILED”。

丟擲:

AirflowException

get_job_description(job_id)[source]

獲取作業描述(使用 status_retries)。

引數:

job_id (str) – Batch 作業 ID

返回:

一個描述作業的 API 響應

丟擲:

AirflowException

返回型別:

dict

static parse_job_description(job_id, response)[source]

解析作業描述以提取 job_id 的描述。

引數:
  • job_id (str) – Batch 作業 ID

  • response (dict) – 描述作業的 API 響應

返回:

描述 job_id 的 API 響應

丟擲:

AirflowException

返回型別:

dict

get_job_awslogs_info(job_id)[source]
get_job_all_awslogs_info(job_id)[source]

解析作業描述以提取 AWS CloudWatch 資訊。

引數:

job_id (str) – AWS Batch 作業 ID

static add_jitter(delay, width=1, minima=0)[source]

使用 delay +/- width 實現隨機抖動。

在狀態輪詢中新增抖動有助於在使用 Airflow 任務高併發監控 Batch 作業時避免 AWS Batch API 限制。

引數:
  • delay (int | float) – 暫停的秒數;delay 假定為正數

  • width (int | float) – 隨機抖動的 delay +/- width;width 假定為正數

  • minima (int | float) – 允許的最小延遲;minima 假定為非負數

返回:

uniform(delay - width, delay + width) 抖動,且為非負數

返回型別:

float

static delay(delay=None)[source]

暫停執行 delay 秒。

引數:

delay (int | float | None) – 使用 time.sleep(delay) 暫停執行的延遲;延遲將應用小的 1 秒抖動。

注意

此方法使用預設的隨機延遲,例如 random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);使用隨機間隔有助於在許多併發任務請求作業描述時避免 AWS API 限制。

static exponential_delay(tries)[source]

應用帶有隨機抖動的指數退避延遲。

最大間隔為 10 分鐘(隨機抖動範圍在 3 到 10 分鐘之間)。這用於 poll_for_job_status() 方法。

行為示例

def exp(tries):
    max_interval = 600.0  # 10 minutes in seconds
    delay = 1 + pow(tries * 0.6, 2)
    delay = min(max_interval, delay)
    print(delay / 3, delay)


for tries in range(10):
    exp(tries)

#  0.33  1.0
#  0.45  1.35
#  0.81  2.44
#  1.41  4.23
#  2.25  6.76
#  3.33 10.00
#  4.65 13.95
#  6.21 18.64
#  8.01 24.04
# 10.05 30.15
引數:

tries (int) – 嘗試次數

此條目是否有幫助?