airflow.providers.amazon.aws.operators.ecs

EcsBaseOperator

這是所有彈性容器服務 (Elastic Container Service) 運算元的基類。

EcsCreateClusterOperator

建立一個 AWS ECS 叢集。

EcsDeleteClusterOperator

刪除一個 AWS ECS 叢集。

EcsDeregisterTaskDefinitionOperator

在 AWS ECS 上登出一個任務定義。

EcsRegisterTaskDefinitionOperator

在 AWS ECS 上註冊一個任務定義。

EcsRunTaskOperator

在 AWS ECS (彈性容器服務) 上執行一個任務。

模組內容

class airflow.providers.amazon.aws.operators.ecs.EcsBaseOperator(*, aws_conn_id='aws_default', region_name=None, verify=None, botocore_config=None, region=NOTSET, **kwargs)[source]

基類: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.ecs.EcsHook]

這是所有彈性容器服務 (Elastic Container Service) 運算元的基類。

aws_hook_class[source]
property client: boto3.client[source]

建立並返回 EcsHook 的客戶端。

abstract execute(context)[source]

必須在子類中重寫。

class airflow.providers.amazon.aws.operators.ecs.EcsCreateClusterOperator(*, cluster_name, create_cluster_kwargs=None, wait_for_completion=True, waiter_delay=15, waiter_max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: EcsBaseOperator

建立一個 AWS ECS 叢集。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:建立一個 AWS ECS 叢集

引數:
  • cluster_name (str) – 您的叢集名稱。如果您不為叢集指定名稱,則將建立一個名為 default 的叢集。

  • create_cluster_kwargs (dict | None) – 叢集建立的額外引數。

  • wait_for_completion (bool) – 如果為 True,則等待叢集建立完成。(預設值: True)

  • waiter_delay (int) – 兩次嘗試之間等待的時間間隔(秒),如果未設定,則使用預設的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大嘗試次數,如果未設定,則使用預設的等待者 (waiter) 值。

  • deferrable (bool) – 如果為 True,運算元將非同步等待作業完成。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值: False)

template_fields: collections.abc.Sequence[str][source]
cluster_name[source]
create_cluster_kwargs[source]
wait_for_completion = True[source]
waiter_delay = 15[source]
waiter_max_attempts = 60[source]
deferrable = True[source]
execute(context)[source]

必須在子類中重寫。

class airflow.providers.amazon.aws.operators.ecs.EcsDeleteClusterOperator(*, cluster_name, wait_for_completion=True, waiter_delay=15, waiter_max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: EcsBaseOperator

刪除一個 AWS ECS 叢集。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:刪除一個 AWS ECS 叢集

引數:
  • cluster_name (str) – 要刪除的叢集的簡稱或完整的 Amazon Resource Name (ARN)。

  • wait_for_completion (bool) – 如果為 True,則等待叢集建立完成。(預設值: True)

  • waiter_delay (int) – 兩次嘗試之間等待的時間間隔(秒),如果未設定,則使用預設的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大嘗試次數,如果未設定,則使用預設的等待者 (waiter) 值。

  • deferrable (bool) – 如果為 True,運算元將非同步等待作業完成。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值: False)

template_fields: collections.abc.Sequence[str] = ('cluster_name', 'wait_for_completion', 'deferrable')[source]
cluster_name[source]
wait_for_completion = True[source]
waiter_delay = 15[source]
waiter_max_attempts = 60[source]
deferrable = True[source]
execute(context)[source]

必須在子類中重寫。

class airflow.providers.amazon.aws.operators.ecs.EcsDeregisterTaskDefinitionOperator(*, task_definition, **kwargs)[source]

基類: EcsBaseOperator

在 AWS ECS 上登出一個任務定義。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:登出一個任務定義

引數:

task_definition (str) – 要登出的任務定義的族和修訂版本 (family:revision) 或完整的 Amazon Resource Name (ARN)。如果您使用族名稱,則必須指定修訂版本。

template_fields: collections.abc.Sequence[str] = ('task_definition',)[source]
task_definition[source]
execute(context)[source]

必須在子類中重寫。

class airflow.providers.amazon.aws.operators.ecs.EcsRegisterTaskDefinitionOperator(*, family, container_definitions, register_task_kwargs=None, **kwargs)[source]

基類: EcsBaseOperator

在 AWS ECS 上註冊一個任務定義。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:註冊一個任務定義

引數:
  • family (str) – 要建立的任務定義的族名稱。

  • container_definitions (list[dict]) – 一個包含容器定義的 JSON 格式列表,用於描述構成您的任務的不同容器。

  • register_task_kwargs (dict | None) – 註冊任務定義的額外引數。

template_fields: collections.abc.Sequence[str] = ('family', 'container_definitions', 'register_task_kwargs')[source]
family[source]
container_definitions[source]
register_task_kwargs[source]
execute(context)[source]

必須在子類中重寫。

class airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator(*, task_definition, cluster, overrides, launch_type='EC2', capacity_provider_strategy=None, volume_configurations=None, group=None, placement_constraints=None, placement_strategy=None, platform_version=None, network_configuration=None, tags=None, awslogs_group=None, awslogs_region=None, awslogs_stream_prefix=None, awslogs_fetch_interval=timedelta(seconds=30), container_name=None, propagate_tags=None, quota_retry=None, reattach=False, number_logs_exception=10, wait_for_completion=True, waiter_delay=6, waiter_max_attempts=1000000, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: EcsBaseOperator

在 AWS ECS (彈性容器服務) 上執行一個任務。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:執行一個任務

引數:
  • task_definition (str) – 彈性容器服務上的任務定義名稱

  • cluster (str) – 彈性容器服務上的叢集名稱

  • overrides (dict) – boto3 將接收的相同引數(可模板化):https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task

  • aws_conn_id – AWS 憑據/區域名稱的連線 ID。如果為 None,將使用 boto3 憑據策略 (https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html)。

  • region – 要在 AWS Hook 中使用的區域名稱。覆蓋連線中的區域(如果提供)

  • launch_type (str) – 執行任務的啟動型別(‘EC2’、‘EXTERNAL’ 或 ‘FARGATE’)

  • capacity_provider_strategy (list | None) – 用於任務的容量提供程式策略。指定 capacity_provider_strategy 時,將省略 launch_type 引數。如果未指定 capacity_provider_strategy 或 launch_type,則使用叢集的預設容量提供程式策略。

  • volume_configurations (list | None) – 使用容量提供程式時使用的卷配置。卷名稱必須與任務定義中的名稱匹配。您可以配置大小、卷型別、IOPS、吞吐量等設定,詳情請參閱 (https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskManagedEBSVolumeConfiguration.html)

  • group (str | None) – 與任務關聯的任務組名稱

  • placement_constraints (list | None) – 用於任務的放置約束物件陣列

  • placement_strategy (list | None) – 用於任務的放置策略物件陣列

  • platform_version (str | None) – 任務執行所在的平臺版本

  • network_configuration (dict | None) – 任務的網路配置

  • tags (dict | None) – 一個標籤字典,格式為 {‘tagKey’: ‘tagValue’}。

  • awslogs_group (str | None) – 儲存 ECS 容器日誌的 CloudWatch 日誌組。只有在作業完成後希望日誌顯示在 Airflow UI 中時才需要此引數。

  • awslogs_region (str | None) – 儲存 CloudWatch 日誌的區域。如果為 None,則與 region 引數相同。如果該引數也為 None,則使用基於連線設定的預設 AWS 區域。

  • awslogs_stream_prefix (str | None) – 用於 CloudWatch 日誌的流字首。這應與任務定義的日誌配置中指定的字首匹配。只有在作業完成後希望日誌顯示在 Airflow UI 中時才需要此引數。

  • awslogs_fetch_interval (datetime.timedelta) – ECS 任務日誌抓取器在每次抓取 CloudWatch 日誌之間應等待的時間間隔。如果 deferrable 設定為 True,則忽略此引數,而改用 waiter_delay。

  • container_name (str | None) – 要從中抓取日誌的容器名稱。如果未設定,則使用第一個容器。

  • quota_retry (dict | None) – 配置是否以及如何重試啟動新的 ECS 任務,以處理瞬時錯誤。

  • reattach (bool) – 如果設定為 True,將檢查任務例項先前啟動的任務是否已在執行。如果已執行,運算元將附加到該任務而不是啟動一個新任務。這是為了避免在任務執行時 Airflow 和 ECS 之間的連線斷開時(例如當 Airflow worker 重啟時)重新啟動一個新任務。

  • number_logs_exception (int) – 如果 ECS 任務停止,在 AirflowException 中返回的 CloudWatch 日誌的最後行數(用於接收包含 ECS 中執行程式碼失敗日誌的 Airflow 警報)。

  • wait_for_completion (bool) – 如果為 True,則等待叢集建立完成。(預設值: True)

  • waiter_delay (int) – 兩次嘗試之間等待的時間間隔(秒),如果未設定,則使用預設的等待者 (waiter) 值。

  • waiter_max_attempts (int) – 最大嘗試次數,如果未設定,則使用預設的等待者 (waiter) 值。

  • deferrable (bool) – 如果為 True,運算元將非同步等待作業完成。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值: False)

  • do_xcom_push – 如果為 True,運算元將把 ECS 任務 ARN 推送到 XCom,鍵為 ‘ecs_task_arn’。此外,如果抓取了日誌,最後一條日誌訊息將推送到 XCom,鍵為 ‘return_value’。(預設值: False)

ui_color = '#f0ede4'[source]
template_fields: collections.abc.Sequence[str] = ('task_definition', 'cluster', 'overrides', 'launch_type', 'capacity_provider_strategy',...[source]
template_fields_renderers[source]
task_definition[source]
cluster[source]
overrides[source]
launch_type = 'EC2'[source]
capacity_provider_strategy = None[source]
volume_configurations = None[source]
group = None[source]
placement_constraints = None[source]
placement_strategy = None[source]
platform_version = None[source]
network_configuration = None[source]
tags = None[source]
awslogs_group = None[source]
awslogs_stream_prefix = None[source]
awslogs_region = None[source]
awslogs_fetch_interval[source]
propagate_tags = None[source]
reattach = False[source]
number_logs_exception = 10[source]
arn: str | None = None[source]
container_name: str | None = None[source]
retry_args = None[source]
task_log_fetcher: airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher | None = None[source]
wait_for_completion = True[source]
waiter_delay = 6[source]
waiter_max_attempts = 1000000[source]
deferrable = True[source]
execute(context)[source]

必須在子類中重寫。

execute_complete(context, event=None)[source]
on_kill()[source]

覆蓋此方法以在任務例項被終止時清理子程序。

在操作器中使用 threading、subprocess 或 multiprocessing 模組的任何部分都需要清理,否則會留下殭屍程序。

此條目有幫助嗎?