airflow.providers.amazon.aws.hooks.emr

EmrHook

與 Amazon Elastic MapReduce 服務 (EMR) 互動。

EmrServerlessHook

與 Amazon EMR Serverless 互動。

EmrContainerHook

與 Amazon EMR Containers (Amazon EMR on EKS) 互動。

函式

is_connection_being_updated_exception(exception)

模組內容

class airflow.providers.amazon.aws.hooks.emr.EmrHook(emr_conn_id=default_conn_name, *args, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 Amazon Elastic MapReduce 服務 (EMR) 互動。

圍繞 boto3.client("emr") 提供厚包裝器。

引數:

emr_conn_id (str | None) – Amazon Elastic MapReduce 連線。此屬性僅在使用 airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow() 時需要。

可以指定其他引數(例如 aws_conn_id),並將其傳遞給底層的 AwsBaseHook。

另請參閱

AwsBaseHook

conn_name_attr = 'emr_conn_id'[source]
default_conn_name = 'emr_default'[source]
conn_type = 'emr'[source]
hook_name = 'Amazon Elastic MapReduce'[source]
emr_conn_id = 'emr_default'[source]
get_cluster_id_by_name(emr_cluster_name, cluster_states)[source]

使用給定名稱和(可選)狀態獲取 EMR 叢集 ID;僅在找到單個 ID 時返回。

引數:
  • emr_cluster_name (str) – 要查詢的叢集名稱

  • cluster_states (list[str]) – 要查詢的叢集狀態

返回:

EMR 叢集的 ID

返回型別:

str | None

create_job_flow(job_flow_overrides)[source]

建立並開始執行新的叢集(作業流)。

此方法使用 EmrHook.emr_conn_id 來接收初始 Amazon EMR 叢集配置。如果 EmrHook.emr_conn_id 為空或連線不存在,則使用空的初始配置。

引數:

job_flow_overrides (dict[str, Any]) – 用於覆蓋初始 Amazon EMR 配置叢集中的引數。生成的配置將用於 EMR.Client.run_job_flow()

add_job_flow_steps(job_flow_id, steps=None, wait_for_completion=False, waiter_delay=None, waiter_max_attempts=None, execution_role_arn=None)[source]

向正在執行的叢集新增新步驟。

引數:
  • job_flow_id (str) – 要新增步驟的作業流 ID

  • steps (list[dict] | str | None) – 作業流要執行的步驟列表

  • wait_for_completion (bool) – 如果為 True,等待步驟完成。預設為 False

  • waiter_delay (int | None) – 兩次嘗試之間等待的時間(秒)。預設為 5

  • waiter_max_attempts (int | None) – 最大嘗試次數。預設為 100

  • execution_role_arn (str | None) – 叢集上步驟的執行時角色的 ARN。

test_connection()[source]

返回 Amazon Elastic MapReduce 連線測試的失敗狀態(不可測試)。

我們需要覆蓋此方法,因為此 Hook 基於 AwsGenericHook,否則它將嘗試使用預設的 boto3 憑證策略測試與 AWS STS 的連線。

classmethod get_ui_field_behaviour()[source]

返回 Amazon Elastic MapReduce 連線的自定義 UI 欄位行為。

class airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook(*args, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 Amazon EMR Serverless 互動。

圍繞 boto3.client("emr-serverless") 提供薄包裝器。

可以指定其他引數(例如 aws_conn_id),並將其傳遞給底層的 AwsBaseHook。

JOB_INTERMEDIATE_STATES[source]
JOB_FAILURE_STATES[source]
JOB_SUCCESS_STATES[source]
JOB_TERMINAL_STATES[source]
APPLICATION_INTERMEDIATE_STATES[source]
APPLICATION_FAILURE_STATES[source]
APPLICATION_SUCCESS_STATES[source]
cancel_running_jobs(application_id, waiter_config=None, wait_for_completion=True)[source]

取消處於中間狀態的作業,並返回取消的作業數量。

如果 wait_for_completion 為 True,則方法將等待所有作業取消後再返回。

注意:如果在操作進行中觸發了新作業,則可能會超時並返回錯誤。

airflow.providers.amazon.aws.hooks.emr.is_connection_being_updated_exception(exception)[source]
class airflow.providers.amazon.aws.hooks.emr.EmrContainerHook(*args, virtual_cluster_id=None, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 Amazon EMR Containers (Amazon EMR on EKS) 互動。

圍繞 boto3.client("emr-containers") 提供厚包裝器。

引數:

virtual_cluster_id (str | None) – EMR on EKS 虛擬叢集的叢集 ID

可以指定其他引數(例如 aws_conn_id),並將其傳遞給底層的 AwsBaseHook。

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
TERMINAL_STATES = ('COMPLETED', 'FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
virtual_cluster_id = None[source]
create_emr_on_eks_cluster(virtual_cluster_name, eks_cluster_name, eks_namespace, tags=None)[source]
submit_job(name, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, tags=None, retry_max_attempts=None)[source]

向 EMR Containers API 提交作業並返回作業 ID。

作業執行是指您提交到 Amazon EMR on EKS 的工作單元,例如 Spark jar、PySpark 指令碼或 SparkSQL 查詢。

引數:
  • name (str) – 作業執行的名稱。

  • execution_role_arn (str) – 與作業執行關聯的 IAM 角色 ARN。

  • release_label (str) – 用於作業執行的 Amazon EMR 版本標籤。

  • job_driver (dict) – 作業配置詳情,例如 Spark 作業引數。

  • configuration_overrides (dict | None) – 作業執行的配置覆蓋,特別是應用程式配置或監控配置。

  • client_request_token (str | None) – 作業執行請求的客戶端冪等令牌。如果您想指定唯一 ID 以防止啟動兩個作業,請使用此引數。

  • tags (dict | None) – 分配給作業執行的標籤。

  • retry_max_attempts (int | None) – 作業驅動程式的最大嘗試次數。

返回:

作業執行請求的 ID。

返回型別:

str

get_job_failure_reason(job_id)[source]

獲取作業失敗的原因(例如錯誤訊息)。返回 None 或原因字串。

引數:

job_id (str) – 作業執行請求的 ID。

check_query_status(job_id)[source]

獲取已提交作業執行的狀態。返回 None 或有效的查詢狀態之一。

引數:

job_id (str) – 作業執行請求的 ID。

poll_query_status(job_id, poll_interval=30, max_polling_attempts=None)[source]

輪詢已提交作業執行的狀態,直到查詢狀態達到最終狀態;返回最終狀態。

引數:
  • job_id (str) – 作業執行請求的 ID。

  • poll_interval (int) – 檢查 EMR 上的查詢狀態兩次呼叫之間等待的時間(秒)

  • max_polling_attempts (int | None) – 函式退出前輪詢查詢狀態的次數

stop_query(job_id)[source]

取消已提交的作業執行。

引數:

job_id (str) – 要取消的作業執行 ID。

此條目有幫助嗎?