airflow.providers.amazon.aws.hooks.sagemaker

LogState

一種列舉風格的類,包含 CloudWatch 日誌流的所有可能狀態。

Position

SageMakerHook

與 Amazon SageMaker 互動。

函式

argmin(arr, f)

給定可呼叫物件 f,在 arr 中找到使 f(arr[i]) 最小化的索引。

secondary_training_status_changed(...)

檢查訓練作業的二級狀態訊息是否已更改。

secondary_training_status_message(job_description, ...)

格式化包含開始時間和訓練作業二級狀態訊息的字串。

模組內容

class airflow.providers.amazon.aws.hooks.sagemaker.LogState[source]

一種列舉風格的類,包含 CloudWatch 日誌流的所有可能狀態。

https://sagemaker.readthedocs.io/en/stable/session.html#sagemaker.session.LogState

STARTING = 1[source]
WAIT_IN_PROGRESS = 2[source]
TAILING = 3[source]
JOB_COMPLETE = 4[source]
COMPLETE = 5[source]
class airflow.providers.amazon.aws.hooks.sagemaker.Position[source]

基類: tuple

timestamp[source]
skip[source]
airflow.providers.amazon.aws.hooks.sagemaker.argmin(arr, f)[source]

給定可呼叫物件 f,在 arr 中找到使 f(arr[i]) 最小化的索引。

如果 arr 為空,則返回 None。

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_changed(current_job_description, prev_job_description)[source]

檢查訓練作業的二級狀態訊息是否已更改。

引數:
  • current_job_description (dict) – 當前作業描述,來自 DescribeTrainingJob 呼叫返回。

  • prev_job_description (dict) – 之前的作業描述,來自 DescribeTrainingJob 呼叫返回。

返回:

訓練作業的二級狀態訊息是否已更改。

返回型別:

bool

airflow.providers.amazon.aws.hooks.sagemaker.secondary_training_status_message(job_description, prev_description)[source]

格式化包含開始時間和訓練作業二級狀態訊息的字串。

引數:
  • job_description (dict[str, list[Any]]) – DescribeTrainingJob 呼叫返回的響應

  • prev_description (dict | None) – DescribeTrainingJob 呼叫返回的之前的作業描述

返回:

要列印的作業狀態字串。

返回型別:

str

class airflow.providers.amazon.aws.hooks.sagemaker.SageMakerHook(*args, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 Amazon SageMaker 互動。

提供對 boto3.client("sagemaker") 的厚封裝。

可以指定其他引數(例如 aws_conn_id),這些引數會傳遞給底層的 AwsBaseHook。

non_terminal_states[source]
endpoint_non_terminal_states[source]
pipeline_non_terminal_states[source]
processing_job_non_terminal_states[source]
failed_states[source]
processing_job_failed_states[source]
training_failed_states[source]
s3_hook[source]
logs_hook[source]
tar_and_s3_upload(path, key, bucket)[source]

將本地檔案或目錄打包成 tar 檔案並上傳到 S3。

引數:
  • path (str) – 本地檔案或目錄

  • key (str) – S3 鍵

  • bucket (str) – S3 儲存桶

configure_s3_resources(config)[source]

從配置中提取 S3 操作並執行它們。

引數:

config (dict) – SageMaker 操作的配置

check_s3_url(s3url)[source]

檢查 S3 URL 是否存在。

引數:

s3url (str) – S3 URL

check_training_config(training_config)[source]

檢查訓練配置是否有效。

引數:

training_config (dict) – 訓練配置

check_tuning_config(tuning_config)[source]

檢查調優配置是否有效。

引數:

tuning_config (dict) – 調優配置

multi_stream_iter(log_group, streams, positions=None)[source]

遍歷可用事件。

從單個日誌組中的一組日誌流中提取事件,並將來自每個流的事件交織在一起,以便按時間戳順序生成。

引數:
  • log_group (str) – 日誌組的名稱。

  • streams (list) – 日誌流名稱列表。流在此列表中的位置即為流編號。

  • positions – 一個由 (timestamp, skip) 對組成的列表,表示從每個流讀取的最後一條記錄。

返回:

一個 (流編號, CloudWatch 日誌事件) 元組。

返回型別:

collections.abc.Generator

create_training_job(config, wait_for_completion=True, print_log=True, check_interval=30, max_ingestion_time=None)[source]

啟動一個模型訓練作業。

訓練完成後,Amazon SageMaker 會將生成的模型製品儲存到您指定的 Amazon S3 位置。

引數:
  • config (dict) – 訓練配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

訓練作業建立的響應

create_tuning_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

啟動一個超引數調優作業。

超引數調優作業透過使用您選擇的演算法和在指定範圍內超引數值,在您的資料集上執行許多訓練作業來找到模型的最佳版本。然後,它會選擇能夠產生最佳效能模型的超引數值(以您選擇的目標指標衡量)。

引數:
  • config (dict) – 調優配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

調優作業建立的響應

create_transform_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

啟動一個轉換作業。

轉換作業使用訓練好的模型在資料集上進行推理,並將結果儲存到您指定的 Amazon S3 位置。

引數:
  • config (dict) – 轉換作業配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

轉換作業建立的響應

create_processing_job(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

使用 Amazon SageMaker Processing 分析資料和評估模型。

透過 Processing,您可以在 SageMaker 上使用簡化的託管體驗來執行資料處理工作負載,例如特徵工程、資料驗證、模型評估和模型解釋。

引數:
  • config (dict) – 處理作業配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

轉換作業建立的響應

create_model(config)[source]

在 Amazon SageMaker 中建立模型。

在請求中,您命名模型並描述一個主容器。對於主容器,您指定包含推理程式碼、製品(來自之前的訓練)以及部署模型進行預測時推理程式碼使用的自定義環境變數對映的 Docker 映象。

引數:

config (dict) – 模型配置

返回:

模型建立的響應

create_endpoint_config(config)[source]

建立用於部署模型的端點配置。

在配置中,您確定要部署的一個或多個模型(使用 CreateModel API 建立)以及您希望 Amazon SageMaker 預置的資源。

引數:

config (dict) – 端點配置

返回:

端點配置建立的響應

create_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

從配置建立端點。

建立無伺服器端點時,SageMaker 會為您預置和管理計算資源。然後,您可以向端點發送推理請求並接收模型預測作為響應。SageMaker 會根據請求流量需要擴充套件或縮減計算資源。

引數:
  • config (dict) – 端點配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

端點建立的響應

update_endpoint(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]

部署請求中的配置並切換到使用新端點。

使用之前的 EndpointConfig 為端點預置的資源將被刪除(不會丟失可用性)。

引數:
  • config (dict) – 端點配置

  • wait_for_completion (bool) – 如果程式應持續執行直到作業完成

  • check_interval (int) – Operator 檢查任何 SageMaker 作業狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

返回:

端點更新的響應

describe_training_job(name)[source]

獲取與名稱關聯的訓練作業資訊。

引數:

name (str) – 訓練作業的名稱

返回:

包含所有訓練作業資訊的字典

describe_training_job_with_log(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]

獲取關聯的訓練作業資訊並列印 CloudWatch 日誌。

describe_tuning_job(name)[source]

獲取與名稱關聯的調優作業資訊。

引數:

name (str) – 調優作業的名稱

返回:

一個字典包含所有微調作業資訊

返回型別:

字典

describe_model(name)[source]

獲取與名稱關聯的 SageMaker 模型資訊。

引數:

name (str) – SageMaker 模型的名稱

返回:

一個字典包含所有模型資訊

返回型別:

字典

describe_transform_job(name)[source]

獲取與名稱關聯的轉換作業資訊。

引數:

name (str) – 轉換作業的名稱

返回:

一個字典包含所有轉換作業資訊

返回型別:

字典

describe_processing_job(name)[source]

獲取與名稱關聯的處理作業資訊。

引數:

name (str) – 處理作業的名稱

返回:

一個字典包含所有處理作業資訊

返回型別:

字典

describe_endpoint_config(name)[source]

獲取與名稱關聯的終端節點配置資訊。

引數:

name (str) – 終端節點配置的名稱

返回:

一個字典包含所有終端節點配置資訊

返回型別:

字典

describe_endpoint(name)[source]

獲取終端節點的描述。

引數:

name (str) – 終端節點的名稱

返回:

一個字典包含所有終端節點資訊

返回型別:

字典

check_status(job_name, key, describe_function, check_interval, max_ingestion_time=None, non_terminal_states=None)[source]

檢查 SageMaker 資源的狀態。

引數:
  • job_name (str) – 要檢查狀態的資源的名稱,可以是作業,例如也可以是管道。

  • key (str) – 響應字典中指向狀態的鍵

  • describe_function (Callable) – 用於檢索狀態的函式

  • args – 函式的引數

  • check_interval (int) – 運算子檢查任何 SageMaker 資源狀態的時間間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 資源都將失敗。將其設定為 None 表示對任何 SageMaker 資源都沒有超時限制。

  • non_terminal_states (set | None) – 非終止狀態的集合

返回:

資源完成後 describe 呼叫的響應

返回型別:

字典

check_training_status_with_log(job_name, non_terminal_states, failed_states, wait_for_completion, check_interval, max_ingestion_time=None)[source]

顯示給定訓練作業的日誌。

可選地跟蹤它們,直到作業完成。

引數:
  • job_name (str) – 要檢查狀態和顯示日誌的訓練作業的名稱

  • non_terminal_states (set) – 非終止狀態的集合

  • failed_states (set) – 失敗狀態的集合

  • wait_for_completion (bool) – 是否持續查詢新的日誌條目,直到作業完成

  • check_interval (int) – 輪詢新日誌條目和作業完成之間的間隔(秒)

  • max_ingestion_time (int | None) – 最大攝取時間(秒)。任何執行時間超過此值的 SageMaker 作業將失敗。將其設定為 None 表示對任何 SageMaker 作業都沒有超時限制。

list_training_jobs(name_contains=None, max_results=None, **kwargs)[source]

呼叫 boto3 的 list_training_jobs

訓練作業名稱和最大結果數可透過引數配置。其他引數不可透過引數配置,應透過 kwargs 提供。請注意,boto3 期望這些引數採用駝峰命名法(CamelCase),例如

list_training_jobs(name_contains="myjob", StatusEquals="Failed")
引數:
  • name_contains (str | None) – (可選)要匹配的部分名稱

  • max_results (int | None) – (可選)要返回的最大結果數。None 表示返回無限結果

  • kwargs – (可選)傳遞給 boto3 的 list_training_jobs 方法的 kwargs 引數

返回:

list_training_jobs 請求的結果

返回型別:

list[dict]

list_transform_jobs(name_contains=None, max_results=None, **kwargs)[source]

呼叫 boto3 的 list_transform_jobs

轉換作業名稱和最大結果數可透過引數配置。其他引數不可透過引數配置,應透過 kwargs 提供。請注意,boto3 期望這些引數採用駝峰命名法(CamelCase),例如

list_transform_jobs(name_contains="myjob", StatusEquals="Failed")
引數:
  • name_contains (str | None) – (可選)要匹配的部分名稱。

  • max_results (int | None) – (可選)要返回的最大結果數。None 表示返回無限結果。

  • kwargs – (可選)傳遞給 boto3 的 list_transform_jobs 方法的 kwargs 引數。

返回:

list_transform_jobs 請求的結果。

返回型別:

list[dict]

list_processing_jobs(**kwargs)[source]

呼叫 boto3 的 list_processing_jobs

所有引數都應透過 kwargs 提供。請注意,boto3 期望這些引數採用駝峰命名法(CamelCase),例如

list_processing_jobs(NameContains="myjob", StatusEquals="Failed")
引數:

kwargs – (可選)傳遞給 boto3 的 list_training_jobs 方法的 kwargs 引數

返回:

list_processing_jobs 請求的結果

返回型別:

list[dict]

count_processing_jobs_by_name(processing_job_name, job_name_suffix=None, throttle_retry_delay=2, retries=3)[source]

獲取找到的具有指定名稱字首的處理作業數量。

引數:
  • processing_job_name (str) – 要查詢的字首。

  • job_name_suffix (str | None) – 可選的字尾,可附加用於消除現有作業名稱的重複。

  • throttle_retry_delay (int) – 遇到 ThrottlingException 時等待的秒數。

  • retries (int) – 最大重試次數。

返回:

以指定字首開頭的處理作業數量。

返回型別:

int

delete_model(model_name)[source]

刪除 SageMaker 模型。

引數:

model_name (str) – 模型的名稱

describe_pipeline_exec(pipeline_exec_arn, verbose=False)[source]

獲取有關 SageMaker 管道執行的資訊。

引數:
  • pipeline_exec_arn (str) – 管道執行的 ARN

  • verbose (bool) – 是否記錄管道執行中步驟狀態的詳細資訊

start_pipeline(pipeline_name, display_name='airflow-triggered-execution', pipeline_params=None)[source]

啟動 SageMaker 管道的新執行。

引數:
  • pipeline_name (str) – 要啟動的管道的名稱(這不是 ARN)。

  • display_name (str) – 此管道執行將在 UI 中顯示的名稱。無需唯一。

  • pipeline_params (dict | None) – 管道的可選引數。提供的所有引數都需要已存在於管道定義中。

返回:

啟動的管道執行的 ARN。

返回型別:

str

stop_pipeline(pipeline_exec_arn, fail_if_not_running=False)[source]

停止 SageMaker 管道執行。

引數:
  • pipeline_exec_arn (str) – 管道執行的 Amazon Resource Name (ARN)。它是管道本身的 ARN 加上“/execution/”和一個 id。

  • fail_if_not_running (bool) – 如果在我們嘗試停止管道時,該管道在呼叫傳送時未處於“Executing”(執行中)狀態(這意味著管道已經處於停止中或已停止),此方法將引發異常。請注意,將其設定為 True 將在管道在停止前已成功完成時引發錯誤。

返回:

操作後管道執行的狀態。是 ‘Executing’|’Stopping’|’Stopped’|’Failed’|’Succeeded’ 中的一個。

返回型別:

str

create_model_package_group(package_group_name, package_group_desc='')[source]

如果模型包組尚不存在,則建立它。

引數:
  • package_group_name (str) – 如果模型包組尚不存在,要建立的模型包組的名稱。

  • package_group_desc (str) – 模型包組的描述,如果它將被建立(可選)。

返回:

如果建立了模型包組,則為 True;如果它已存在,則為 False。

返回型別:

bool

create_auto_ml_job(job_name, s3_input, target_attribute, s3_output, role_arn, compressed_input=False, time_limit=None, autodeploy_endpoint_name=None, extras=None, wait_for_completion=True, check_interval=30)[source]

建立一個 AutoML 作業來預測給定列。

學習輸入基於透過 S3 提供的資料,輸出寫入指定的 S3 位置。

引數:
  • job_name (str) – 要建立的作業名稱,在賬戶內必須唯一。

  • s3_input (str) – 用於獲取資料的 S3 位置(資料夾或檔案)。預設情況下,它期望帶標題行的 csv。

  • target_attribute (str) – 包含要預測的值的列的名稱。

  • s3_output (str) – 寫入模型工件的 S3 資料夾。必須少於或等於 128 個字元。

  • role_arn (str) – 與 S3 互動時使用的 ARN 或 IAM 角色。必須對輸入資料夾有讀訪問許可權,對輸出資料夾有寫訪問許可權。

  • compressed_input (bool) – 如果輸入是 gzipped 格式,則設定為 True。

  • time_limit (int | None) – 用於訓練模型的最長時間(秒)。

  • autodeploy_endpoint_name (str | None) – 如果指定,最佳模型將部署到具有該名稱的終端節點。否則不進行部署。

  • extras (dict | None) – 使用此字典設定此函式引數未提供的任何可變輸入引數,用於作業建立。格式說明見:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_auto_ml_job

  • wait_for_completion (bool) – 是否等待作業完成後再返回。預設為 True。

  • check_interval (int) – 等待完成時兩次狀態檢查之間的間隔(秒)。

返回:

僅當等待完成時,返回一個詳細說明最佳模型的字典。其結構與以下連結中“BestCandidate”鍵的結構相同:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_auto_ml_job

返回型別:

dict | None

static count_billable_seconds(training_start_time, training_end_time, instance_count)[source]
async describe_training_job_async(job_name)[source]

返回與名稱關聯的訓練作業資訊。

引數:

job_name (str) – 訓練作業的名稱

async describe_training_job_with_log_async(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]

返回與 job_name 關聯的訓練作業資訊並列印 CloudWatch 日誌。

引數:
  • job_name (str) – 要檢查狀態的作業名稱

  • positions (dict[str, Any]) – 一個由 (timestamp, skip) 對組成的列表,表示從每個流讀取的最後一條記錄。

  • stream_names (list[str]) – 日誌流名稱的列表。流在此列表中的位置是流編號。

  • instance_count (int) – 作業最初建立的例項數量

  • state (int) – 日誌狀態

  • last_description (dict[str, Any]) – 訓練作業的最新描述

  • last_describe_job_call (float) – 上次呼叫作業的時間

async get_multi_stream(log_group, streams, positions)[source]

迭代處理可用的事件,並將來自每個流的事件交錯排列,以便它們按時間戳順序產生。

引數:
  • log_group (str) – 日誌組的名稱。

  • streams (list[str]) – 日誌流名稱的列表。流在此列表中的位置是流編號。

  • positions (dict[str, Any]) – 一個由 (timestamp, skip) 對組成的列表,表示從每個流讀取的最後一條記錄。

此條目是否有幫助?