airflow.providers.amazon.aws.operators.batch

AWS Batch 服務。

BatchOperator

在 AWS Batch 上執行作業。

BatchCreateComputeEnvironmentOperator

建立 AWS Batch 計算環境。

模組內容

class airflow.providers.amazon.aws.operators.batch.BatchOperator(*, job_name, job_definition, job_queue, container_overrides=None, array_properties=None, ecs_properties_override=None, eks_properties_override=None, node_overrides=None, share_identifier=None, scheduling_priority_override=None, parameters=None, retry_strategy=None, job_id=None, waiters=None, max_retries=4200, status_retries=None, tags=None, wait_for_completion=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=30, awslogs_enabled=False, awslogs_fetch_interval=timedelta(seconds=30), submit_job_timeout=None, **kwargs)[source]

基類: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

在 AWS Batch 上執行作業。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:提交新的 AWS Batch 作業

引數:
  • job_name (str) – 將在 AWS Batch 上執行的作業名稱(模板化)

  • job_definition (str) – AWS Batch 上的作業定義名稱

  • job_queue (str) – AWS Batch 上的佇列名稱

  • container_overrides (dict | None) – boto3 的 containerOverrides 引數(模板化)

  • ecs_properties_override (dict | None) – boto3 的 ecsPropertiesOverride 引數(模板化)

  • eks_properties_override (dict | None) – boto3 的 eksPropertiesOverride 引數(模板化)

  • node_overrides (dict | None) – boto3 的 nodeOverrides 引數(模板化)

  • share_identifier (str | None) – 作業的共享識別符號。如果作業佇列沒有排程策略,請勿指定此引數。

  • scheduling_priority_override (int | None) – 作業的排程優先順序。排程優先順序較高的作業將在排程優先順序較低的作業之前排程。這將覆蓋作業定義中的任何排程優先順序。

  • array_properties (dict | None) – boto3 的 arrayProperties 引數

  • parameters (dict | None) – boto3 的 parameters 引數(模板化)

  • job_id (str | None) – 作業 ID,通常在 submit_job 操作獲取 AWS Batch 定義的 jobId 之前未知(None)

  • waiters (Any | None) – 一個 BatchWaiters 物件(參見下面的說明);如果為 None,則使用 max_retries 和 status_retries 進行輪詢。

  • max_retries (int) – 指數回退重試,4200 = 48 小時;只有在 waiters 為 None 時才使用輪詢。

  • status_retries (int | None) – 獲取作業狀態的 HTTP 重試次數,10 次;只有在 waiters 為 None 時才使用輪詢。

  • aws_conn_id – 用於 AWS 憑證的 Airflow 連線。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果以分散式方式執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上維護此配置)。

  • region_name – AWS region_name。如果未指定,則使用預設的 boto3 行為。

  • verify – 是否驗證 SSL 證書。參見:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • tags (dict | None) – 應用於 AWS Batch 作業提交的標籤集合;如果為 None,則不提交任何標籤。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • awslogs_enabled (bool) – 指定是否列印來自 CloudWatch 的日誌,預設為 False。如果是陣列作業,將只打印第一個任務的日誌。

  • awslogs_fetch_interval (datetime.timedelta) – 獲取 CloudWatch 日誌的間隔,30 秒。

  • poll_interval (int) – (僅限可延遲模式)兩次輪詢之間等待的時間間隔(秒)。

  • submit_job_timeout (int | None) – 提交的批處理作業的執行超時時間(秒)。

注意

任何自定義等待器必須為這些呼叫返回一個等待器:.. code-block:: python

waiter = waiters.get_waiter(“JobExists”) waiter = waiters.get_waiter(“JobRunning”) waiter = waiters.get_waiter(“JobComplete”)

aws_hook_class[source]
ui_color = '#c3dae0'[source]
arn: str | None = None[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
job_id = None[source]
job_name[source]
job_definition[source]
job_queue[source]
container_overrides = None[source]
ecs_properties_override = None[source]
eks_properties_override = None[source]
node_overrides = None[source]
share_identifier = None[source]
scheduling_priority_override = None[source]
array_properties = None[source]
parameters[source]
retry_strategy = None[source]
waiters = None[source]
tags[source]
wait_for_completion = True[source]
deferrable = True[source]
poll_interval = 30[source]
awslogs_enabled = False[source]
awslogs_fetch_interval[source]
submit_job_timeout = None[source]
max_retries = 4200[source]
status_retries = None[source]
execute(context)[source]

提交併監控 AWS Batch 作業。

引發異常:

AirflowException

execute_complete(context, event=None)[source]
on_kill()[source]

當任務例項被殺死時,覆蓋此方法以清理子程序。

在運算子中使用 threading、subprocess 或 multiprocessing 模組的任何地方都需要清理,否則會留下殭屍程序。

submit_job(context)[source]

提交 AWS Batch 作業。

引發異常:

AirflowException

monitor_job(context)[source]

監控 AWS Batch 作業。

如果使用 execution_timeout 建立任務,這可能會引發異常或 AirflowTaskTimeout。

class airflow.providers.amazon.aws.operators.batch.BatchCreateComputeEnvironmentOperator(compute_environment_name, environment_type, state, compute_resources, unmanaged_v_cpus=None, service_role=None, tags=None, poll_interval=30, max_retries=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook]

建立 AWS Batch 計算環境。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:建立 AWS Batch 計算環境

引數:
  • compute_environment_name (str) – AWS Batch 計算環境的名稱(模板化)。

  • environment_type (str) – 計算環境的型別。

  • state (str) – 計算環境的狀態。

  • compute_resources (dict) – 關於計算環境管理的資源的詳細資訊(模板化)。更多詳情:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_compute_environment

  • unmanaged_v_cpus (int | None) – 非託管計算環境的最大 vCPU 數量。僅當 type 引數設定為 UNMANAGED 時支援此引數。

  • service_role (str | None) – 允許 Batch 代表您呼叫其他 AWS 服務的 IAM 角色(模板化)。

  • tags (dict | None) – 應用於計算環境的標籤,幫助您分類和組織資源。

  • poll_interval (int) – 兩次輪詢環境狀態之間等待的時間間隔(秒)。僅在 deferrable 為 True 時有用。

  • max_retries (int | None) – 輪詢環境狀態的次數。僅在 deferrable 為 True 時有用。

  • aws_conn_id – 用於 AWS 憑證的 Airflow 連線。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果以分散式方式執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上維護此配置)。

  • region_name – AWS region_name。如果未指定,則使用預設的 boto3 行為。

  • verify – 是否驗證 SSL 證書。參見:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • deferrable (bool) – 如果為 True,運算子將非同步等待環境建立完成。此模式需要安裝 aiobotocore 模組。(預設為 False)

aws_hook_class[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
compute_environment_name[source]
environment_type[source]
state[source]
unmanaged_v_cpus = None[source]
compute_resources[source]
service_role = None[source]
tags[source]
poll_interval = 30[source]
max_retries = 120[source]
deferrable = True[source]
execute(context)[source]

建立一個 AWS Batch 計算環境。

execute_complete(context, event=None)[source]

此條目有幫助嗎?