airflow.providers.amazon.aws.operators.emr¶
類¶
將步驟新增到現有 EMR job_flow 的運算子。 |
|
啟動 EMR notebook 執行的運算子。 |
|
停止正在執行的 EMR notebook 執行的運算子。 |
|
建立 EKS 上 EMR 虛擬叢集的運算子。 |
|
將作業提交到 EKS 上 EMR 虛擬叢集的運算子。 |
|
建立一個 EMR JobFlow,從 EMR 連線讀取配置。 |
|
修改現有 EMR 叢集的運算子。 |
|
終止 EMR JobFlows 的運算子。 |
|
建立無伺服器 EMR Application 的運算子。 |
|
啟動無伺服器 EMR job 的運算子。 |
|
停止無伺服器 EMR application 的運算子。 |
|
刪除無伺服器 EMR application 的運算子。 |
模組內容¶
- class airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator(*, job_flow_id=None, job_flow_name=None, cluster_states=None, aws_conn_id='aws_default', steps=None, wait_for_completion=False, waiter_delay=30, waiter_max_attempts=60, execution_role_arn=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator將步驟新增到現有 EMR job_flow 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 將步驟新增到 EMR job flow
- 引數:
job_flow_id (str | None) – 要新增步驟的 JobFlow 的 ID。(已模板化)
job_flow_name (str | None) – 要新增步驟的 JobFlow 名稱。用作傳遞 job_flow_id 的替代方法。將在引數 cluster_states 中指定的狀態之一中搜索具有匹配名稱的 JobFlow ID。必須僅存在一個這樣的叢集,否則將失敗。(已模板化)
cluster_states (list[str] | None) – 透過 job_flow_name 搜尋 JobFlow ID 時可接受的叢集狀態。(已模板化)
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
steps (list[dict] | str | None) – boto3 風格的步驟或對要新增到 jobflow 的步驟檔案(必須是“.json”)的引用。(已模板化)
wait_for_completion (bool) – 如果為 True,運算子將等待所有步驟完成。
execution_role_arn (str | None) – 叢集上步驟的執行時角色的 ARN。
do_xcom_push – 如果為 True,job_flow_id 將透過鍵 job_flow_id 推送到 XCom。
wait_for_completion – 是否等待作業執行完成。(預設值:True)
deferrable (bool) – 如果為 True,運算子將非同步等待作業完成。這意味著等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'job_flow_name', 'cluster_states', 'steps', 'execution_role_arn')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStartNotebookExecutionOperator(editor_id, relative_path, cluster_id, service_role, notebook_execution_name=None, notebook_params=None, notebook_instance_security_group_id=None, master_instance_security_group_id=None, tags=None, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基類:
airflow.models.BaseOperator啟動 EMR notebook 執行的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 啟動 EMR notebook 執行
- 引數:
editor_id (str) – 用於 notebook 執行的 EMR notebook 的唯一識別符號。
relative_path (str) – 本次執行的 notebook 檔案的路徑和檔名,相對於為 EMR notebook 指定的路徑。
cluster_id (str) – notebook 所連線的 EMR 叢集的唯一識別符號。
service_role (str) – 用於 notebook 執行的 Amazon EMR 服務角色(EMR 角色)的 IAM 角色名稱或 ARN。
notebook_execution_name (str | None) – notebook 執行的可選名稱。
notebook_params (str | None) – 在執行時以 JSON 格式傳遞給 EMR notebook 執行的輸入引數。
notebook_instance_security_group_id (str | None) – 與本次 notebook 執行的 EMR notebook 關聯的 Amazon EC2 安全組的唯一識別符號。
master_instance_security_group_id (str | None) – 與本次 notebook 執行的 EMR 叢集主例項關聯的 EC2 安全組的可選唯一 ID。
tags (list | None) – 與 notebook 執行關聯的可選鍵值對列表。
waiter_max_attempts (int | None) – 失敗前的最大嘗試次數。
waiter_delay (int | None) – 輪詢 notebook 狀態之間的秒數。
- template_fields: collections.abc.Sequence[str] = ('editor_id', 'cluster_id', 'relative_path', 'service_role', 'notebook_execution_name',...[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStopNotebookExecutionOperator(notebook_execution_id, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基類:
airflow.models.BaseOperator停止正在執行的 EMR notebook 執行的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 停止 EMR notebook 執行
- 引數:
notebook_execution_id (str) – notebook 執行的唯一識別符號。
wait_for_completion (bool) – 如果為 True,運算子將等待 notebook 進入 STOPPED 或 FINISHED 狀態。預設為 False。
aws_conn_id (str | None) – 要使用的 AWS 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
waiter_max_attempts (int | None) – 失敗前的最大嘗試次數。
waiter_delay (int | None) – 輪詢 notebook 狀態之間的秒數。
- template_fields: collections.abc.Sequence[str] = ('notebook_execution_id', 'waiter_delay', 'waiter_max_attempts')[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrEksCreateClusterOperator(*, virtual_cluster_name, eks_cluster_name, eks_namespace, virtual_cluster_id='', aws_conn_id='aws_default', tags=None, **kwargs)[source]¶
基類:
airflow.models.BaseOperator建立 EKS 上 EMR 虛擬叢集的運算子。
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:建立 Amazon EMR EKS 虛擬叢集
- 引數:
- template_fields: collections.abc.Sequence[str] = ('virtual_cluster_name', 'eks_cluster_name', 'eks_namespace')[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]¶
建立並返回 EmrContainerHook。
- class airflow.providers.amazon.aws.operators.emr.EmrContainerOperator(*, name, virtual_cluster_id, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, aws_conn_id='aws_default', wait_for_completion=True, poll_interval=30, tags=None, max_polling_attempts=None, job_retry_max_attempts=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator將作業提交到 EKS 上 EMR 虛擬叢集的運算子。
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:向 Amazon EMR 虛擬叢集提交作業
- 引數:
name (str) – 作業執行的名稱。
virtual_cluster_id (str) – EMR on EKS 虛擬叢集 ID。
execution_role_arn (str) – 與作業執行關聯的 IAM 角色 ARN。
release_label (str) – 用於作業執行的 Amazon EMR 發行版版本。
job_driver (dict) – 作業配置詳細資訊,例如 Spark 作業引數。
configuration_overrides (dict | None) – 作業執行的配置覆蓋,特別是應用程式配置或監控配置。
client_request_token (str | None) – 作業執行請求的客戶端冪等性令牌。如果想指定唯一的 ID 以防止同時啟動兩個作業,請使用此引數。如果未提供令牌,則會為您生成一個 UUIDv4 令牌。
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。
wait_for_completion (bool) – 是否在 Operator 中等待作業完成。
poll_interval (int) – 兩次連續呼叫以檢查 EMR 上的查詢狀態之間等待的時間(以秒為單位)。
max_polling_attempts (int | None) – 等待作業執行完成的最大嘗試次數。預設為 None,這將一直輪詢直到作業狀態不是 pending、submitted 或 running。
job_retry_max_attempts (int | None) – EMR 作業失敗時最大重試次數。預設為 None,表示停用重試。
tags (dict | None) – 分配給作業執行的標籤。預設為 None
deferrable (bool) – 以可延遲模式執行 Operator。
- template_fields: collections.abc.Sequence[str] = ('name', 'virtual_cluster_id', 'execution_role_arn', 'release_label', 'job_driver',...[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]¶
建立並返回 EmrContainerHook。
- class airflow.providers.amazon.aws.operators.emr.EmrCreateJobFlowOperator(*, aws_conn_id='aws_default', emr_conn_id='emr_default', job_flow_overrides=None, region_name=None, wait_for_completion=None, wait_policy=None, waiter_max_attempts=None, waiter_delay=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator建立一個 EMR JobFlow,從 EMR 連線讀取配置。
可以傳遞一個 JobFlow 覆蓋字典,該字典會覆蓋連線中的配置。
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:建立 EMR 作業流
- 引數:
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果以分散式方式執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上維護此配置)
emr_conn_id (str | None) – Amazon Elastic MapReduce 連線。用於接收初始 Amazon EMR 叢集配置:
boto3.client('emr').run_job_flow請求體。如果此引數為 None 或為空,或者連線不存在,則使用空的初始配置。job_flow_overrides (str | dict[str, Any] | None) – boto3 風格的引數或對引數檔案的引用(必須是 '.json'),用於覆蓋特定的
emr_conn_id額外引數。(templated)region_name (str | None) – 傳遞給 EmrHook 的區域名稱。
wait_for_completion (bool | None) – 已棄用 - 請改用 wait_policy。任務建立後立即完成 (False) 還是等待作業流完成 (True)(預設值:None)
wait_policy (airflow.providers.amazon.aws.utils.waiter.WaitPolicy | None) – 任務建立後是否立即完成 (None) 或: - 等待作業流完成 (WaitPolicy.WAIT_FOR_COMPLETION) - 等待作業流完成和叢集終止 (WaitPolicy.WAIT_FOR_STEPS_COMPLETION)(預設值:None)
waiter_max_attempts (int | None) – 失敗前的最大嘗試次數。
waiter_delay (int | None) – 輪詢 notebook 狀態之間的秒數。
deferrable (bool) – 如果為 True,則 Operator 將非同步等待爬取完成。這意味著將等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_overrides', 'waiter_delay', 'waiter_max_attempts')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator(*, cluster_id, step_concurrency_level, aws_conn_id='aws_default', **kwargs)[source]¶
基類:
airflow.models.BaseOperator修改現有 EMR 叢集的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 修改 Amazon EMR 容器
- 引數:
- template_fields: collections.abc.Sequence[str] = ('cluster_id', 'step_concurrency_level')[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator(*, job_flow_id, aws_conn_id='aws_default', waiter_delay=60, waiter_max_attempts=20, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator終止 EMR JobFlows 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 終止 EMR 作業流
- 引數:
job_flow_id (str) – 要終止的 JobFlow ID。(templated)
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
waiter_delay (int) – 兩次連續檢查 JobFlow 狀態之間等待的時間(秒)
waiter_max_attempts (int) – 輪詢 JobFlow 狀態的最大嘗試次數。
deferrable (bool) – 如果為 True,則 Operator 將非同步等待爬取完成。這意味著將等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id',)[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator(release_label, job_type, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator建立無伺服器 EMR Application 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 建立 EMR Serverless 應用程式
- 引數:
release_label (str) – 與應用程式關聯的 EMR 發行版本。
job_type (str) – 要啟動的應用程式型別,例如 Spark 或 Hive。
wait_for_completion (bool) – 如果為 true,則在返回之前等待應用程式啟動。預設為 True。如果設定為 False,則 `waiter_max_attempts` 和 `waiter_delay` 僅在等待應用程式處於 `CREATED` 狀態時應用。
client_request_token (str) – 要建立的應用程式的客戶端冪等令牌。其值對於每個請求必須是唯一的。
config (dict | None) – boto API create_application 呼叫可選的任意引數字典。
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 輪詢應用程式狀態之間的秒數。
deferrable (bool) – 如果為 True,運算子將非同步等待應用程式建立完成。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False,但可以透過在配置檔案中將 default_deferrable 設定為 True 來覆蓋)
- Waiter_max_attempts:
等待器應輪詢應用程式檢查狀態的次數。如果未設定,等待器將使用其預設值。
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
建立並返回 EmrServerlessHook。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator(application_id, execution_role_arn, job_driver, configuration_overrides=None, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', name=None, waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), enable_application_ui_links=False, **kwargs)[source]¶
基類:
airflow.models.BaseOperator啟動無伺服器 EMR job 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 啟動 EMR Serverless 作業
- 引數:
application_id (str) – 要啟動的 EMR Serverless 應用程式的 ID。
execution_role_arn (str) – 執行操作的角色 ARN。
job_driver (dict) – 作業執行在其上的驅動程式。
configuration_overrides (dict | None) – 用於覆蓋現有配置的配置規範。
client_request_token (str) – 要建立的應用程式的客戶端冪等令牌。其值對於每個請求必須是唯一的。
config (dict | None) – boto API start_job_run 呼叫可選的任意引數字典。
wait_for_completion (bool) – 如果為 True,則在返回之前等待作業啟動。預設為 True。如果設定為 False,則 `waiter_countdown` 和 `waiter_check_interval_seconds` 僅在等待應用程式處於 `STARTED` 狀態時應用。
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
name (str | None) – EMR Serverless 作業的名稱。如果未提供,將分配一個預設名稱。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 輪詢作業執行狀態之間的秒數。
deferrable (bool) – 如果為 True,運算子將非同步等待爬蟲完成。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False,但可以透過在配置檔案中將 default_deferrable 設定為 True 來覆蓋)
enable_application_ui_links (bool) – 如果為 True,運算子將生成指向 EMR Serverless 應用程式 UI 的一次性連結。生成的連結將允許任何有權訪問 DAG 的使用者檢視 Spark 或 Tez UI 或 Spark 標準輸出日誌。預設為 False。
- Waiter_max_attempts:
等待器應輪詢應用程式檢查狀態的次數。如果未設定,等待器將使用其預設值。
- template_fields: collections.abc.Sequence[str] = ('application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides',...[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
建立並返回 EmrServerlessHook。
- execute(context, event=None)[source]¶
建立運算子時派生。
上下文與渲染 Jinja 模板時使用的字典相同。
有關更多上下文,請參閱 get_template_context。
- is_monitoring_in_job_override(config_key, job_override)[source]¶
檢查是否為作業啟用了監控。
注意:這與應用程式預設設定不相容:https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/default-configs.html
這用於確定應顯示哪些額外連結。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStopApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.models.BaseOperator停止無伺服器 EMR application 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南:開啟應用程式 UI
- 引數:
application_id (str) – 要停止的 EMR Serverless 應用程式的 ID。
wait_for_completion (bool) – 如果為 true,則在返回之前等待應用程式停止。預設為 True
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
force_stop (bool) – 如果設定為 True,則該應用程式中所有未處於終止狀態的作業都將被取消。否則,嘗試停止具有正在執行作業的應用程式將返回錯誤。如果您希望等待作業正常完成,請使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensorwaiter_delay (int | airflow.utils.types.ArgNotSet) – 輪詢應用程式狀態之間的秒數。預設為 60 秒。
deferrable (bool) – 如果為 True,運算子將非同步等待應用程式停止。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False,但可以透過在配置檔案中將 default_deferrable 設定為 True 來覆蓋)
- Waiter_max_attempts:
等待器應輪詢應用程式檢查狀態的次數。預設為 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
建立並返回 EmrServerlessHook。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
繼承自:
EmrServerlessStopApplicationOperator刪除無伺服器 EMR application 的運算子。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南:刪除 EMR Serverless 應用程式
- 引數:
application_id (str) – 要刪除的 EMR Serverless 應用程式的 ID。
wait_for_completion (bool) – 如果為 true,則在返回之前等待應用程式被刪除。預設為 True。請注意,此運算子將始終先等待應用程式停止。
aws_conn_id (str | None) – 用於 AWS 憑證的 Airflow 連線。如果為 None 或空,則使用 boto3 預設行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或空,則將使用 boto3 預設配置(並且必須在每個工作節點上維護)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 輪詢應用程式狀態之間的秒數。預設為 60 秒。
deferrable (bool) – 如果為 True,運算子將非同步等待應用程式被刪除。這意味著需要等待完成。此模式需要安裝 aiobotocore 模組。(預設值:False,但可以透過在配置檔案中將 default_deferrable 設定為 True 來覆蓋)
force_stop (bool) – 如果設定為 True,則該應用程式中所有未處於終止狀態的作業都將被取消。否則,嘗試刪除具有正在執行作業的應用程式將返回錯誤。如果您希望等待作業正常完成,請使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor
- Waiter_max_attempts:
等待器應輪詢應用程式檢查狀態的次數。預設為 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶