airflow.providers.amazon.aws.hooks.emr¶
類¶
與 Amazon Elastic MapReduce 服務 (EMR) 互動。 |
|
與 Amazon EMR Serverless 互動。 |
|
與 Amazon EMR Containers (Amazon EMR on EKS) 互動。 |
函式¶
|
模組內容¶
- class airflow.providers.amazon.aws.hooks.emr.EmrHook(emr_conn_id=default_conn_name, *args, **kwargs)[source]¶
基類:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook與 Amazon Elastic MapReduce 服務 (EMR) 互動。
圍繞
boto3.client("emr")提供厚包裝器。- 引數:
emr_conn_id (str | None) – Amazon Elastic MapReduce 連線。此屬性僅在使用
airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow()時需要。
可以指定其他引數(例如
aws_conn_id),並將其傳遞給底層的 AwsBaseHook。另請參閱
- get_cluster_id_by_name(emr_cluster_name, cluster_states)[source]¶
使用給定名稱和(可選)狀態獲取 EMR 叢集 ID;僅在找到單個 ID 時返回。
- create_job_flow(job_flow_overrides)[source]¶
建立並開始執行新的叢集(作業流)。
此方法使用
EmrHook.emr_conn_id來接收初始 Amazon EMR 叢集配置。如果EmrHook.emr_conn_id為空或連線不存在,則使用空的初始配置。- 引數:
job_flow_overrides (dict[str, Any]) – 用於覆蓋初始 Amazon EMR 配置叢集中的引數。生成的配置將用於
EMR.Client.run_job_flow()。
- add_job_flow_steps(job_flow_id, steps=None, wait_for_completion=False, waiter_delay=None, waiter_max_attempts=None, execution_role_arn=None)[source]¶
向正在執行的叢集新增新步驟。
- test_connection()[source]¶
返回 Amazon Elastic MapReduce 連線測試的失敗狀態(不可測試)。
我們需要覆蓋此方法,因為此 Hook 基於
AwsGenericHook,否則它將嘗試使用預設的 boto3 憑證策略測試與 AWS STS 的連線。
- class airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook(*args, **kwargs)[source]¶
基類:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook與 Amazon EMR Serverless 互動。
圍繞
boto3.client("emr-serverless")提供薄包裝器。可以指定其他引數(例如
aws_conn_id),並將其傳遞給底層的 AwsBaseHook。
- class airflow.providers.amazon.aws.hooks.emr.EmrContainerHook(*args, virtual_cluster_id=None, **kwargs)[source]¶
基類:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook與 Amazon EMR Containers (Amazon EMR on EKS) 互動。
圍繞
boto3.client("emr-containers")提供厚包裝器。- 引數:
virtual_cluster_id (str | None) – EMR on EKS 虛擬叢集的叢集 ID
可以指定其他引數(例如
aws_conn_id),並將其傳遞給底層的 AwsBaseHook。- create_emr_on_eks_cluster(virtual_cluster_name, eks_cluster_name, eks_namespace, tags=None)[source]¶
- submit_job(name, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, tags=None, retry_max_attempts=None)[source]¶
向 EMR Containers API 提交作業並返回作業 ID。
作業執行是指您提交到 Amazon EMR on EKS 的工作單元,例如 Spark jar、PySpark 指令碼或 SparkSQL 查詢。
- 引數:
name (str) – 作業執行的名稱。
execution_role_arn (str) – 與作業執行關聯的 IAM 角色 ARN。
release_label (str) – 用於作業執行的 Amazon EMR 版本標籤。
job_driver (dict) – 作業配置詳情,例如 Spark 作業引數。
configuration_overrides (dict | None) – 作業執行的配置覆蓋,特別是應用程式配置或監控配置。
client_request_token (str | None) – 作業執行請求的客戶端冪等令牌。如果您想指定唯一 ID 以防止啟動兩個作業,請使用此引數。
tags (dict | None) – 分配給作業執行的標籤。
retry_max_attempts (int | None) – 作業驅動程式的最大嘗試次數。
- 返回:
作業執行請求的 ID。
- 返回型別:
- get_job_failure_reason(job_id)[source]¶
獲取作業失敗的原因(例如錯誤訊息)。返回 None 或原因字串。
- 引數:
job_id (str) – 作業執行請求的 ID。