airflow.providers.amazon.aws.sensors.emr

EmrBaseSensor

包含 EMR 通用感測器行為。

EmrServerlessJobSensor

輪詢作業執行的狀態,直到其達到終止狀態;如果作業執行失敗則標記為失敗。

EmrServerlessApplicationSensor

輪詢應用程式的狀態,直到其達到終止狀態;如果應用程式失敗則標記為失敗。

EmrContainerSensor

輪詢作業執行的狀態,直到其達到終止狀態;如果作業執行失敗則標記為失敗。

EmrNotebookExecutionSensor

輪詢 EMR Notebook,直到其達到任何目標狀態;失敗時引發 AirflowException。

EmrJobFlowSensor

輪詢 EMR JobFlow 叢集,直到其達到任何目標狀態;失敗時引發 AirflowException。

EmrStepSensor

輪詢步驟的狀態,直到其達到任何目標狀態;失敗時引發 AirflowException。

模組內容

class airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor(*, aws_conn_id='aws_default', **kwargs)[source]

基類: airflow.sensors.base.BaseSensorOperator

包含 EMR 通用感測器行為。

子類應實現以下方法
  • get_emr_response()

  • state_from_response()

  • failure_message_from_response()

子類應設定 target_statesfailed_states 欄位。

引數::

aws_conn_id (str | None) – 用於 AWS 憑據的 Airflow 連線。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上進行維護)。

ui_color = '#66c3ff'[source]
aws_conn_id = 'aws_default'[source]
target_states: collections.abc.Iterable[str] = [][source]
failed_states: collections.abc.Iterable[str] = [][source]
property hook: airflow.providers.amazon.aws.hooks.emr.EmrHook[source]
poke(context)[source]

派生此類時重寫。

abstract get_emr_response(context)[source]

使用 boto3 進行 API 呼叫並獲取響應。

返回::

響應

返回型別::

dict[str, Any]

static state_from_response(response)[source]
抽象方法::

從 boto3 響應中獲取狀態。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

狀態

返回型別::

str

static failure_message_from_response(response)[source]
抽象方法::

從 boto3 響應中獲取狀態。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor(*, application_id, job_run_id, target_states=frozenset(EmrServerlessHook.JOB_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

基類: airflow.sensors.base.BaseSensorOperator

輪詢作業執行的狀態,直到其達到終止狀態;如果作業執行失敗則標記為失敗。

另請參閱

有關如何使用此感測器的更多資訊,請檢視指南: 等待 EMR Serverless 作業狀態

引數::
  • application_id (str) – 要檢查狀態的 application_id

  • job_run_id (str) – 要檢查狀態的 job_run_id

  • target_states (set | frozenset) – 一組需要等待的狀態,預設為 ‘SUCCESS’

  • aws_conn_id (str | None) – 要使用的 aws 連線,預設為 ‘aws_default’。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上進行維護)。

template_fields: collections.abc.Sequence[str] = ('application_id', 'job_run_id')[source]
aws_conn_id = 'aws_default'[source]
target_states[source]
application_id[source]
job_run_id[source]
poke(context)[source]

派生此類時重寫。

property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]

建立並返回一個 EmrServerlessHook。

static failure_message_from_response(response)[source]

從響應字典中獲取失敗訊息。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor(*, application_id, target_states=frozenset(EmrServerlessHook.APPLICATION_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

基類: airflow.sensors.base.BaseSensorOperator

輪詢應用程式的狀態,直到其達到終止狀態;如果應用程式失敗則標記為失敗。

另請參閱

有關如何使用此感測器的更多資訊,請檢視指南: 等待 EMR Serverless 應用程式狀態

引數::
  • application_id (str) – 要檢查狀態的 application_id

  • target_states (set | frozenset) – 一組需要等待的狀態,預設為 {‘CREATED’, ‘STARTED’}

  • aws_conn_id (str | None) – 要使用的 aws 連線,預設為 ‘aws_default’。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上進行維護)。

template_fields: collections.abc.Sequence[str] = ('application_id',)[source]
aws_conn_id = 'aws_default'[source]
target_states[source]
application_id[source]
poke(context)[source]

派生此類時重寫。

property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]

建立並返回一個 EmrServerlessHook。

static failure_message_from_response(response)[source]

從響應字典中獲取失敗訊息。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor(*, virtual_cluster_id, job_id, max_retries=None, aws_conn_id='aws_default', poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: airflow.sensors.base.BaseSensorOperator

輪詢作業執行的狀態,直到其達到終止狀態;如果作業執行失敗則標記為失敗。

另請參閱

有關如何使用此感測器的更多資訊,請檢視指南: 等待 Amazon EMR 虛擬叢集作業

引數::
  • job_id (str) – 要檢查狀態的 job_id

  • max_retries (int | None) – 輪詢查詢狀態直到返回當前狀態的次數,預設為 None

  • aws_conn_id (str | None) – 要使用的 aws 連線,預設為 ‘aws_default’。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果在分散式模式下執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個工作節點上進行維護)。

  • poll_interval (int) – 兩次連續呼叫之間等待檢查 athena 查詢狀態的秒數,預設為 10

  • deferrable (bool) – 在可延遲模式下執行感測器。

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
template_fields: collections.abc.Sequence[str] = ('virtual_cluster_id', 'job_id')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#66c3ff'[source]
aws_conn_id = 'aws_default'[source]
virtual_cluster_id[source]
job_id[source]
poll_interval = 10[source]
max_retries = None[source]
deferrable = True[source]
property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]
poke(context)[source]

派生此類時重寫。

execute(context)[source]

建立運算元時派生。

執行任務的主要方法。Context 與渲染 jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrNotebookExecutionSensor(notebook_execution_id, target_states=None, failed_states=None, **kwargs)[source]

基礎類: EmrBaseSensor

輪詢 EMR Notebook,直到其達到任何目標狀態;失敗時引發 AirflowException。

另請參閱

有關如何使用此 Sensor 的更多資訊,請參閱指南: 等待 EMR Notebook 執行狀態

引數::

notebook_execution_id (str) – 要檢查(poked)的 Notebook 執行的唯一 ID。

目標狀態:

Sensor 將等待執行達到的狀態。預設目標狀態為 FINISHED

失敗狀態:

如果執行達到任何失敗狀態,則 Sensor 將失敗。預設失敗狀態為 FAILED

template_fields: collections.abc.Sequence[str] = ('notebook_execution_id',)[source]
FAILURE_STATES[source]
COMPLETED_STATES[source]
notebook_execution_id[source]
target_states[source]
failed_states[source]
get_emr_response(context)[source]

使用 boto3 進行 API 呼叫並獲取響應。

返回::

響應

返回型別::

dict[str, Any]

static state_from_response(response)[source]

使用 boto3 呼叫 API 並獲取叢集級別的詳細資訊。

返回::

響應

返回型別::

str

static failure_message_from_response(response)[source]

從響應字典中獲取失敗訊息。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor(*, job_flow_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基礎類: EmrBaseSensor

輪詢 EMR JobFlow 叢集,直到其達到任何目標狀態;失敗時引發 AirflowException。

在預設目標狀態下,Sensor 會等待叢集終止。當 target_states 設定為 ['RUNNING', 'WAITING'] 時,Sensor 會等待作業流準備就緒(在 'STARTING' 和 'BOOTSTRAPPING' 狀態之後)。

另請參閱

有關如何使用此 Sensor 的更多資訊,請參閱指南: 等待 Amazon EMR 作業流狀態

引數::
  • job_flow_id (str) – 要檢查其狀態的作業流 ID

  • target_states (collections.abc.Iterable[str] | None) – 目標狀態,Sensor 會等待作業流達到這些狀態中的任何一個。在可推遲模式下,它將執行直到達到終端狀態。

  • failed_states (collections.abc.Iterable[str] | None) – 失敗狀態,當作業流達到這些狀態中的任何一個時,Sensor 將失敗。

  • max_attempts (int) – 失敗前的最大嘗試次數

  • deferrable (bool) – 在可延遲模式下執行感測器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
job_flow_id[source]
target_states = ['TERMINATED'][source]
failed_states = ['TERMINATED_WITH_ERRORS'][source]
max_attempts = 60[source]
deferrable = True[source]
get_emr_response(context)[source]

使用 boto3 呼叫 API 並獲取叢集級別的詳細資訊。

返回::

響應

返回型別::

dict[str, Any]

static state_from_response(response)[source]

從響應字典中獲取狀態。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

叢集當前狀態

返回型別::

str

static failure_message_from_response(response)[source]

從響應字典中獲取失敗訊息。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

execute(context)[source]

建立運算元時派生。

執行任務的主要方法。Context 與渲染 jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrStepSensor(*, job_flow_id, step_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基礎類: EmrBaseSensor

輪詢步驟的狀態,直到其達到任何目標狀態;失敗時引發 AirflowException。

在預設目標狀態下,Sensor 會等待步驟完成。

另請參閱

有關如何使用此 Sensor 的更多資訊,請參閱指南: 等待 Amazon EMR 步驟狀態

引數::
  • job_flow_id (str) – 包含要檢查狀態的步驟的作業流 ID

  • step_id (str) – 要檢查其狀態的步驟 ID

  • target_states (collections.abc.Iterable[str] | None) – 目標狀態,Sensor 會等待步驟達到這些狀態中的任何一個。在可推遲 Sensor 的情況下,它將等待直到達到終端狀態。

  • failed_states (collections.abc.Iterable[str] | None) – 失敗狀態,當步驟達到這些狀態中的任何一個時,Sensor 將失敗。

  • max_attempts (int) – 失敗前的最大嘗試次數

  • deferrable (bool) – 在可延遲模式下執行感測器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'step_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
job_flow_id[source]
step_id[source]
target_states = ['COMPLETED'][source]
failed_states = ['CANCELLED', 'FAILED', 'INTERRUPTED'][source]
max_attempts = 60[source]
deferrable = True[source]
get_emr_response(context)[source]

使用 boto3 呼叫 API 並獲取有關叢集步驟的詳細資訊。

返回::

響應

返回型別::

dict[str, Any]

static state_from_response(response)[source]

從響應字典中獲取狀態。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

叢集步驟的執行狀態

返回型別::

str

static failure_message_from_response(response)[source]

從響應字典中獲取失敗訊息。

引數::

response (dict[str, Any]) – 來自 AWS API 的響應

返回::

失敗訊息

返回型別::

str | None

execute(context)[source]

建立運算元時派生。

執行任務的主要方法。Context 與渲染 jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event=None)[source]

本條目有幫助嗎?