airflow.providers.amazon.aws.operators.athena

AthenaOperator

提交 Trino/Presto 查詢到 Amazon Athena 的 Operator。

模組內容

airflow.providers.amazon.aws.operators.athena.AthenaOperator(*, query, database, output_location=None, client_request_token=None, workgroup='primary', query_execution_context=None, result_configuration=None, sleep_time=30, max_polling_attempts=None, log_query=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), catalog='AwsDataCatalog', **kwargs)[source]

基類: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.athena.AthenaHook]

提交 Trino/Presto 查詢到 Amazon Athena 的 Operator。

注意

如果在任務執行時被殺死,它會取消已啟動的 athena 查詢,除非在可延遲模式下執行。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:在 Amazon Athena 中執行查詢

引數::
  • query (str) – 要在 Amazon Athena 上執行的 Trino/Presto 查詢。(templated)

  • database (str) – 要選擇的資料庫。(templated)

  • catalog (str) – 要選擇的 Catalog。(templated)

  • output_location (str | None) – 用於寫入查詢結果的 S3 路徑。(templated) 要執行查詢,您必須透過以下方式之一指定查詢結果位置:使用此設定(客戶端)針對單個查詢,或在工作組中使用 WorkGroupConfiguration。如果兩者都沒有設定,Athena 將發出錯誤,指示未提供輸出位置。

  • client_request_token (str | None) – 使用者建立的唯一令牌,用於避免多次執行同一個查詢。

  • workgroup (str) – 執行查詢的 Athena 工作組。(templated)

  • query_execution_context (dict[str, str] | None) – 需要執行查詢的上下文。

  • result_configuration (dict[str, Any] | None) – 包含儲存結果路徑和加密相關配置的字典。

  • sleep_time (int) – 兩次連續檢查 Athena 查詢狀態呼叫之間的等待時間(秒)。

  • max_polling_attempts (int | None) – 函式退出前輪詢查詢狀態的次數。要限制任務執行時間,請使用 execution_timeout。

  • log_query (bool) – 執行 athena 查詢及其他執行引數時是否記錄日誌。預設為 True

  • aws_conn_id – 用於 AWS 憑證的 Airflow 連線。如果此引數為 None 或為空,則使用預設的 boto3 行為。如果在分散式模式下執行 Airflow 並且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(且必須在每個 worker 節點上維護)。

  • region_name – AWS region_name。如果未指定,則使用預設的 boto3 行為。

  • verify – 是否驗證 SSL 證書。請參閱: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • botocore_config – botocore 客戶端的配置字典(鍵值對)。請參閱: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

aws_hook_class[source]
ui_color = '#44b5e2'[source]
template_fields: collections.abc.Sequence[str][source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
template_fields_renderers[source]
query[source]
database[source]
output_location = None[source]
client_request_token = None[source]
workgroup = 'primary'[source]
query_execution_context[source]
result_configuration[source]
sleep_time = 30[source]
max_polling_attempts = 999999[source]
query_execution_id: str | None = None[source]
log_query: bool = True[source]
deferrable = True[source]
catalog: str = 'AwsDataCatalog'[source]
execute(context)[source]

在 Amazon Athena 上執行 Trino/Presto 查詢。

execute_complete(context, event=None)[source]
on_kill()[source]

取消已提交的 Amazon Athena 查詢。

get_openlineage_facets_on_complete(_)[source]

透過解析 SQL 查詢並使用 Athena API 豐富資料來檢索 OpenLineage 資料。

除了 CTAS 查詢,查詢和計算結果還儲存在 S3 位置。因此,此位置會附加額外的輸出。我們不使用儲存結果的完整路徑(使用者字首 + 某個 UUID),而是僅使用使用者提供的路徑建立資料集。這應該有助於在不同程序中更容易地匹配此資料集。

get_openlineage_dataset(database, table)[source]

此條目有幫助嗎?